Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OPS-5046 support spark 3.4.0 on SPOK #98

Merged
merged 3 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ LABEL maintainer="dev@tubularlabs.com"
# Install Java 8
RUN apt-get update && apt-get install -y software-properties-common
RUN apt-add-repository 'deb http://security.debian.org/debian-security stretch/updates main'
RUN apt-add-repository 'deb http://deb.debian.org/debian/ sid main'
RUN apt-get update && apt-get install -y openjdk-8-jdk

# Python env
Expand Down
71 changes: 4 additions & 67 deletions sparkly/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,73 +175,10 @@ def parse_schema(schema):
...
sparkly.exceptions.UnsupportedDataType: Cannot parse type from string: "unsupported"
"""
field_type, args_string = re.match('(\w+)<?(.*)>?$', schema).groups()
args = _parse_args(args_string) if args_string else []

if field_type in ATOMIC_TYPES:
return ATOMIC_TYPES[field_type]()
elif field_type in COMPLEX_TYPES:
return COMPLEX_TYPES[field_type](*args)
else:
message = 'Cannot parse type from string: "{}"'.format(field_type)
raise UnsupportedDataType(message)


def _parse_args(args_string):
args = []
balance = 0
pos = 0
for i, ch in enumerate(args_string):
if ch == '<':
balance += 1
elif ch == '>':
balance -= 1
elif ch == ',' and balance == 0:
args.append(args_string[pos:i])
pos = i + 1

args.append(args_string[pos:])

return args


def _is_atomic_type(obj):
return inspect.isclass(obj) and issubclass(obj, T.AtomicType) and obj is not T.DecimalType


ATOMIC_TYPES = {
_type[1]().simpleString(): _type[1]
for _type in inspect.getmembers(T, _is_atomic_type)
}


def _init_map(*args):
return T.MapType(
keyType=parse_schema(args[0]),
valueType=parse_schema(args[1]),
)


def _init_struct(*args):
struct = T.StructType()
for item in args:
field_name, field_type = item.split(':', 1)
field_type = parse_schema(field_type)
struct.add(field_name, field_type)

return struct


def _init_array(*args):
return T.ArrayType(parse_schema(args[0]))


COMPLEX_TYPES = {
'map': _init_map,
'struct': _init_struct,
'array': _init_array,
}

try:
return T._parse_datatype_string(schema)
except Exception as e:
raise UnsupportedDataType(f'Cannot parse schema: {schema}: {e}')

def schema_has(t, required_fields):
"""Check whether a complex dataType has specific fields.
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from pyspark.sql.types import StringType

import pyspark
from sparkly import SparklySession
from sparkly.utils import absolute_path

Expand All @@ -26,7 +27,7 @@ class SparklyTestSession(SparklySession):
packages = [
'com.datastax.spark:spark-cassandra-connector_2.12:3.2.0',
'org.elasticsearch:elasticsearch-spark-30_2.12:7.17.8',
'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1',
'org.apache.spark:spark-sql-kafka-0-10_2.12:{}'.format(pyspark.__version__),
'mysql:mysql-connector-java:8.0.31',
'io.confluent:kafka-avro-serializer:3.0.1',
]
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ def test_rename_table_non_default_db(self):
self.assertTrue(self.spark.catalog_ext.has_table('test_db.test_table'))
self.assertFalse(self.spark.catalog_ext.has_table('test_db.new_test_table'))

self.spark.catalog_ext.rename_table('test_db.test_table', 'new_test_table')
self.spark.catalog_ext.rename_table('test_db.test_table', 'test_db.new_test_table')

self.assertFalse(self.spark.catalog_ext.has_table('test_db.test_table'))
self.assertTrue(self.spark.catalog_ext.has_table('default.new_test_table'))
self.assertEqual(self.spark.table('default.new_test_table').count(), 2)
self.assertTrue(self.spark.catalog_ext.has_table('test_db.new_test_table'))
self.assertEqual(self.spark.table('test_db.new_test_table').count(), 2)

def test_get_table_properties(self):
properties = self.spark.catalog_ext.get_table_properties('test_table')
Expand Down
10 changes: 9 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#

[tox]
envlist = spark32,spark33,no_extras,docs
envlist = spark32,spark33,spark34,no_extras,docs

[testenv:spark32]
commands = py.test --cov=sparkly --cov-report term-missing tests/integration tests/unit
Expand All @@ -33,6 +33,14 @@ deps =
-rrequirements_extras.txt
pyspark==3.3.1

[testenv:spark34]
commands = py.test --cov=sparkly --cov-report term-missing tests/integration tests/unit
deps =
-rrequirements.txt
-rrequirements_dev.txt
-rrequirements_extras.txt
pyspark==3.4.0

[testenv:no_extras]
commands = py.test tests/no_extras
deps =
Expand Down