From f1dd33aebf754e00cfb8e774fef1ec67b531b88a Mon Sep 17 00:00:00 2001 From: Xu Zeng Date: Wed, 21 Jun 2023 16:50:06 -0700 Subject: [PATCH 1/3] OPS-5046 support spark 3.4.0 on SPOK --- sparkly/utils.py | 71 +++--------------------------------------------- 1 file changed, 4 insertions(+), 67 deletions(-) diff --git a/sparkly/utils.py b/sparkly/utils.py index 6800640..18e506c 100644 --- a/sparkly/utils.py +++ b/sparkly/utils.py @@ -175,73 +175,10 @@ def parse_schema(schema): ... sparkly.exceptions.UnsupportedDataType: Cannot parse type from string: "unsupported" """ - field_type, args_string = re.match('(\w+)?$', schema).groups() - args = _parse_args(args_string) if args_string else [] - - if field_type in ATOMIC_TYPES: - return ATOMIC_TYPES[field_type]() - elif field_type in COMPLEX_TYPES: - return COMPLEX_TYPES[field_type](*args) - else: - message = 'Cannot parse type from string: "{}"'.format(field_type) - raise UnsupportedDataType(message) - - -def _parse_args(args_string): - args = [] - balance = 0 - pos = 0 - for i, ch in enumerate(args_string): - if ch == '<': - balance += 1 - elif ch == '>': - balance -= 1 - elif ch == ',' and balance == 0: - args.append(args_string[pos:i]) - pos = i + 1 - - args.append(args_string[pos:]) - - return args - - -def _is_atomic_type(obj): - return inspect.isclass(obj) and issubclass(obj, T.AtomicType) and obj is not T.DecimalType - - -ATOMIC_TYPES = { - _type[1]().simpleString(): _type[1] - for _type in inspect.getmembers(T, _is_atomic_type) -} - - -def _init_map(*args): - return T.MapType( - keyType=parse_schema(args[0]), - valueType=parse_schema(args[1]), - ) - - -def _init_struct(*args): - struct = T.StructType() - for item in args: - field_name, field_type = item.split(':', 1) - field_type = parse_schema(field_type) - struct.add(field_name, field_type) - - return struct - - -def _init_array(*args): - return T.ArrayType(parse_schema(args[0])) - - -COMPLEX_TYPES = { - 'map': _init_map, - 'struct': _init_struct, - 'array': _init_array, -} - + try: + return T._parse_datatype_string(schema) + except Exception as e: + raise UnsupportedDataType(f'Cannot parse schema: {schema}: {e}') def schema_has(t, required_fields): """Check whether a complex dataType has specific fields. From d186b08755de90b0b155c411d34af820eb3f6e68 Mon Sep 17 00:00:00 2001 From: Xu Zeng Date: Wed, 21 Jun 2023 17:44:33 -0700 Subject: [PATCH 2/3] OPS-5046 fix openjdk dockerfile --- Dockerfile | 1 + tox.ini | 10 +++++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index ddff009..7846fa2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -21,6 +21,7 @@ LABEL maintainer="dev@tubularlabs.com" # Install Java 8 RUN apt-get update && apt-get install -y software-properties-common RUN apt-add-repository 'deb http://security.debian.org/debian-security stretch/updates main' +RUN apt-add-repository 'deb http://deb.debian.org/debian/ sid main' RUN apt-get update && apt-get install -y openjdk-8-jdk # Python env diff --git a/tox.ini b/tox.ini index 4cfc097..174ecfa 100644 --- a/tox.ini +++ b/tox.ini @@ -15,7 +15,7 @@ # [tox] -envlist = spark32,spark33,no_extras,docs +envlist = spark32,spark33,spark34,no_extras,docs [testenv:spark32] commands = py.test --cov=sparkly --cov-report term-missing tests/integration tests/unit @@ -33,6 +33,14 @@ deps = -rrequirements_extras.txt pyspark==3.3.1 +[testenv:spark34] +commands = py.test --cov=sparkly --cov-report term-missing tests/integration tests/unit +deps = + -rrequirements.txt + -rrequirements_dev.txt + -rrequirements_extras.txt + pyspark==3.4.0 + [testenv:no_extras] commands = py.test tests/no_extras deps = From f5a859534fcfb36dc923e2cde8d9dd1e8aa192cf Mon Sep 17 00:00:00 2001 From: Xu Zeng Date: Thu, 22 Jun 2023 10:03:47 -0700 Subject: [PATCH 3/3] OPS-5046 Fix spark 3.4.0 tests --- tests/integration/base.py | 3 ++- tests/integration/test_catalog.py | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/integration/base.py b/tests/integration/base.py index 6cfc6f4..1eafaf5 100644 --- a/tests/integration/base.py +++ b/tests/integration/base.py @@ -18,6 +18,7 @@ from pyspark.sql.types import StringType +import pyspark from sparkly import SparklySession from sparkly.utils import absolute_path @@ -26,7 +27,7 @@ class SparklyTestSession(SparklySession): packages = [ 'com.datastax.spark:spark-cassandra-connector_2.12:3.2.0', 'org.elasticsearch:elasticsearch-spark-30_2.12:7.17.8', - 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1', + 'org.apache.spark:spark-sql-kafka-0-10_2.12:{}'.format(pyspark.__version__), 'mysql:mysql-connector-java:8.0.31', 'io.confluent:kafka-avro-serializer:3.0.1', ] diff --git a/tests/integration/test_catalog.py b/tests/integration/test_catalog.py index a0a8768..d57e77c 100644 --- a/tests/integration/test_catalog.py +++ b/tests/integration/test_catalog.py @@ -108,11 +108,11 @@ def test_rename_table_non_default_db(self): self.assertTrue(self.spark.catalog_ext.has_table('test_db.test_table')) self.assertFalse(self.spark.catalog_ext.has_table('test_db.new_test_table')) - self.spark.catalog_ext.rename_table('test_db.test_table', 'new_test_table') + self.spark.catalog_ext.rename_table('test_db.test_table', 'test_db.new_test_table') self.assertFalse(self.spark.catalog_ext.has_table('test_db.test_table')) - self.assertTrue(self.spark.catalog_ext.has_table('default.new_test_table')) - self.assertEqual(self.spark.table('default.new_test_table').count(), 2) + self.assertTrue(self.spark.catalog_ext.has_table('test_db.new_test_table')) + self.assertEqual(self.spark.table('test_db.new_test_table').count(), 2) def test_get_table_properties(self): properties = self.spark.catalog_ext.get_table_properties('test_table')