From 6cb586e931d29cb74e0a448d41bc1ae9e1d96a78 Mon Sep 17 00:00:00 2001 From: Scott Strickland Date: Mon, 22 Nov 2021 13:07:05 -0800 Subject: [PATCH] IMP use official kafka io Rather than depending on python libs & low-level rdd manipulation. This has two benefits: 1. Reading kafka uses constructs which are not deprecated & removed in spark 3.x 2. Use officially supported constructs for both reading and writing; avoids overhead associated with python client libs APIs remain unbroken / compatible, but there is some additional functionality: * Reading does not require explicit deserialization, and as such does not need to specify schema in some scenarios. This is actually the recommendation from spark; read data as binary and handle it externally. However, we maintain backwards compatibility. * Reading can now return additional metadata columns specific to kafka, like offset, partition, timestamp, etc. for each record --- docker-compose.yml | 1 + docs/source/reader_and_writer.rst | 23 +++---- requirements_dev.txt | 1 - setup.py | 1 - sparkly/reader.py | 105 +++++++++++++++++++++--------- sparkly/writer.py | 57 ++++++++-------- tests/integration/base.py | 2 +- tests/integration/test_reader.py | 34 +++++++++- tests/integration/test_writer.py | 4 +- tests/no_extras/test_writer.py | 46 ------------- tox.ini | 2 + 11 files changed, 147 insertions(+), 129 deletions(-) delete mode 100644 tests/no_extras/test_writer.py diff --git a/docker-compose.yml b/docker-compose.yml index 8fac34b..d7d9517 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -89,6 +89,7 @@ services: KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" KAFKA_ZOOKEEPER_CONNECT: zookeeper.docker:2181 KAFKA_NUM_PARTITIONS: 3 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 healthcheck: test: ps ax | grep kafka diff --git a/docs/source/reader_and_writer.rst b/docs/source/reader_and_writer.rst index 4840768..ea09200 100644 --- a/docs/source/reader_and_writer.rst +++ b/docs/source/reader_and_writer.rst @@ -74,24 +74,14 @@ Sparkly relies on the official elastic spark connector and was successfully test Kafka ----- -Sparkly's reader and writer for Kafka are built on top of the official spark package -for Kafka and python library `kafka-python `_ . -The first one allows us to read data efficiently, -the second covers a lack of writing functionality in the official distribution. +Sparkly's reader and writer for Kafka are built on top of the official spark package for Kafka-SQL. +---------------+------------------------------------------------------------------------------------------+ -| Package | https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11/2.4.0 | +| Package | https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.11/2.4.0 | +---------------+------------------------------------------------------------------------------------------+ -| Configuration | http://spark.apache.org/docs/2.4.0/streaming-kafka-0-8-integration.html | +| Configuration | https://spark.apache.org/docs/2.4.0/structured-streaming-kafka-integration.html | +---------------+------------------------------------------------------------------------------------------+ -.. note:: - - To interact with Kafka, ``sparkly`` needs the ``kafka-python`` library. You can get it via: - ``` - pip install sparkly[kafka] - ``` - - Sparkly was tested in production using Apache Kafka **0.10.x**. - .. code-block:: python import json @@ -101,7 +91,7 @@ the second covers a lack of writing functionality in the official distribution. class MySession(SparklySession): packages = [ - 'org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.0', + 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0', ] spark = MySession() @@ -123,8 +113,11 @@ the second covers a lack of writing functionality in the official distribution. df = hc.read_ext.kafka( 'kafka.host', topic='my.topic', + # key & value deserialization is optional; if not provided, + # then the user will have to deal with decoding the binary directly. key_deserializer=lambda item: json.loads(item.decode('utf-8')), value_deserializer=lambda item: json.loads(item.decode('utf-8')), + # if deserializers are used, the schema must be provided: schema=df_schema, ) @@ -132,6 +125,8 @@ the second covers a lack of writing functionality in the official distribution. df.write_ext.kafka( 'kafka.host', topic='my.topic', + # key & value serialization is optional; if not provided, + # the `key` and `value` columns MUST already be StringType or BinaryType key_serializer=lambda item: json.dumps(item).encode('utf-8'), value_serializer=lambda item: json.dumps(item).encode('utf-8'), ) diff --git a/requirements_dev.txt b/requirements_dev.txt index d863026..f30c4da 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -15,7 +15,6 @@ # mock==1.3.0 -pyspark==2.4.0 pytest==6.2.5 pytest-cov==3.0.0 Sphinx==4.2.0 diff --git a/setup.py b/setup.py index 916ede6..cbee86e 100644 --- a/setup.py +++ b/setup.py @@ -89,7 +89,6 @@ # https://packaging.python.org/en/latest/requirements.html install_requires=requirements, extras_require={ - 'kafka': ['kafka-python>=2.0.2,<2.1'], 'redis': ['redis>=2.10,<3', 'ujson>=1.33,<2'], 'test': ['cassandra-driver>=3.7,<3.8', 'PyMySQL>=0.7,<0.10', 'kafka-python>=2.0.2,<2.1', 'redis>=2.10,<3', 'ujson>=1.33,<2'], }, diff --git a/sparkly/reader.py b/sparkly/reader.py index 0f95205..16bfe2e 100644 --- a/sparkly/reader.py +++ b/sparkly/reader.py @@ -14,6 +14,10 @@ # limitations under the License. # +import json + +from pyspark.sql import functions as F + from sparkly.exceptions import InvalidArgumentError from sparkly.utils import kafka_get_topics_offsets @@ -213,7 +217,8 @@ def kafka(self, schema=None, port=9092, parallelism=None, - options=None): + options=None, + include_meta_cols=None): """Creates dataframe from specified set of messages from Kafka topic. Defining ranges: @@ -221,23 +226,23 @@ def kafka(self, - If `offset_ranges` is omitted it will auto-discover it's partitions. The `schema` parameter, if specified, should contain two top level fields: - `key` and `value`. + `key` and `value`. It is only required if deserializers are used. Parameters `key_deserializer` and `value_deserializer` are callables which get bytes as input and should return python structures as output. Args: host (str): Kafka host. - topic (str|None): Kafka topic to read from. + topic (str|List[str]|None): Kafka topic(s) to read from. offset_ranges (list[(int, int, int)]|None): List of partition ranges [(partition, start_offset, end_offset)]. key_deserializer (function): Function used to deserialize the key. value_deserializer (function): Function used to deserialize the value. schema (pyspark.sql.types.StructType): Schema to apply to create a Dataframe. port (int): Kafka port. - parallelism (int|None): The max number of parallel tasks that could be executed - during the read stage (see :ref:`controlling-the-load`). options (dict|None): Additional kafka parameters, see KafkaUtils.createRDD docs. + include_meta_cols (bool|None): If true, also return "metadata" columns + like offset, topic, etc. Returns: pyspark.sql.DataFrame @@ -245,39 +250,77 @@ def kafka(self, Raises: InvalidArgumentError """ - try: - from pyspark.streaming.kafka import KafkaUtils, OffsetRange - except ImportError: - raise NotImplementedError('Reading from kafka not supported') - - if not key_deserializer or not value_deserializer or not schema: - raise InvalidArgumentError('You should specify all of parameters:' - '`key_deserializer`, `value_deserializer` and `schema`') - - kafka_params = { - 'metadata.broker.list': '{}:{}'.format(host, port), - } + if isinstance(topic, str): + topic = [topic] - if options: - kafka_params.update(options) + reader = ( + self._spark.read.format('kafka') + .option('kafka.bootstrap.servers', f'{host}:{port}') + .option('subscribe', ','.join(topic)) + ) - if not offset_ranges: - offset_ranges = kafka_get_topics_offsets(host, topic, port) + def get_offsets(offsets, which): + return { + offset[0]: offset[which] + for offset in offsets + } + + if offset_ranges: + if len(topic) > 1: + raise InvalidArgumentError( + 'Specifying offset_ranges for multiple topics is not currently supported; ' + 'please specify options "startingOffsets" and "endingOffsets" manually' + ) + starting_offsets = json.dumps({t: get_offsets(offset_ranges, 1) for t in topic}) + ending_offsets = json.dumps({t: get_offsets(offset_ranges, 2) for t in topic}) + reader = ( + reader + .option('startingOffsets', starting_offsets) + .option('endingOffsets', ending_offsets) + ) - offset_ranges = [OffsetRange(topic, partition, start_offset, end_offset) - for partition, start_offset, end_offset in offset_ranges] + for key, value in (options or {}).items(): + reader = reader.option(key, value) + + df = reader.load() + + def get_schema(field): + if schema is None: + raise InvalidArgumentError( + 'Cannot use a deserializer without specifying schema' + ) + candidates = [x for x in schema.fields if x.name == field] + if not candidates: + raise InvalidArgumentError( + f'Cannot find field: {field} in schema: {schema.simpleString()}' + ) + result = candidates[0].dataType + return result + + if key_deserializer is not None: + df = df.withColumn( + 'key', + F.udf( + key_deserializer, + returnType=get_schema('key'), + )(F.col('key')), + ) + if value_deserializer is not None: + df = df.withColumn( + 'value', + F.udf( + value_deserializer, + returnType=get_schema('value'), + )(F.col('value')), + ) - rdd = KafkaUtils.createRDD(self._spark.sparkContext, - kafkaParams=kafka_params, - offsetRanges=offset_ranges or [], - keyDecoder=key_deserializer, - valueDecoder=value_deserializer, - ) + if not include_meta_cols: + df = df.select('key', 'value') if parallelism: - rdd = rdd.coalesce(parallelism) + df = df.coalesce(parallelism) - return self._spark.createDataFrame(rdd, schema=schema) + return df def _basic_read(self, reader_options, additional_options, parallelism): reader_options.update(additional_options or {}) diff --git a/sparkly/writer.py b/sparkly/writer.py index 2f01718..89c1346 100644 --- a/sparkly/writer.py +++ b/sparkly/writer.py @@ -19,13 +19,6 @@ except ImportError: from urlparse import urlparse, parse_qsl -try: - from kafka import KafkaProducer -except ImportError: - KAFKA_WRITER_SUPPORT = False -else: - KAFKA_WRITER_SUPPORT = True - try: import redis import ujson @@ -239,35 +232,39 @@ def kafka(self, during the write stage (see :ref:`controlling-the-load`). options (dict|None): Additional options. """ - if not KAFKA_WRITER_SUPPORT: - raise NotImplementedError('kafka-python package isn\'t available. ' - 'Use pip install sparkly[kafka] to fix it.') - - def write_partition_to_kafka(messages): - producer = KafkaProducer( - bootstrap_servers=['{}:{}'.format(host, port)], - key_serializer=key_serializer, - value_serializer=value_serializer, - ) - for message in messages: - as_dict = message.asDict(recursive=True) - try: - result = producer.send(topic, key=as_dict['key'], value=as_dict['value']) - except Exception as exc: - raise WriteError('Error publishing to kafka: {}'.format(exc)) - if result.failed(): - raise WriteError('Error publishing to kafka: {}'.format(result.exception)) - producer.flush() - producer.close() + def get_serializer_udf(serializer_func): - rdd = self._df.rdd + @F.udf(returnType=T.BinaryType()) + def serializer(value): + if isinstance(value, T.Row): + value = value.asDict(recursive=True) + return serializer_func(value) + + return serializer + + df = self._df + if key_serializer: + key_serializer = get_serializer_udf(key_serializer) + df = df.withColumn('key', key_serializer(F.col('key'))) + if value_serializer: + value_serializer = get_serializer_udf(value_serializer) + df = df.withColumn('value', value_serializer(F.col('value'))) if parallelism: - rdd = rdd.coalesce(parallelism) + df = df.coalesce(parallelism) + + writer = ( + df.write.format('kafka') + .option('kafka.bootstrap.servers', f'{host}:{port}') + .option('topic', topic) + ) + options = options or {} + for key, value in options.items(): + writer = writer.option(key, value) - rdd.foreachPartition(write_partition_to_kafka) + return writer.save() def redis(self, key_by, diff --git a/tests/integration/base.py b/tests/integration/base.py index 474d45a..9885ec0 100644 --- a/tests/integration/base.py +++ b/tests/integration/base.py @@ -26,7 +26,7 @@ class SparklyTestSession(SparklySession): packages = [ 'com.datastax.spark:spark-cassandra-connector_2.11:2.4.0', 'org.elasticsearch:elasticsearch-spark-20_2.11:7.3.0', - 'org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.0', + 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0', 'mysql:mysql-connector-java:6.0.6', 'io.confluent:kafka-avro-serializer:3.0.1', ] diff --git a/tests/integration/test_reader.py b/tests/integration/test_reader.py index d1213ab..cf1e7bf 100644 --- a/tests/integration/test_reader.py +++ b/tests/integration/test_reader.py @@ -212,6 +212,28 @@ def test_read_by_offsets(self): self.assertDataFrameEqual(df, self.expected_data * 2) + df = self.spark.read_ext.kafka( + 'kafka.docker', + topic=self.topic, + offset_ranges=offsets, + key_deserializer=self.json_decoder, + value_deserializer=self.json_decoder, + schema=self.expected_data_df.schema, + include_meta_cols=True, + ) + expected = [ + # normal fields: + 'key', + 'value', + # meta fields: + 'topic', + 'partition', + 'offset', + 'timestamp', + 'timestampType', + ] + self.assertListEqual(sorted(expected), sorted(df.schema.fieldNames())) + def test_argument_errors(self): with self.assertRaises(InvalidArgumentError): self.spark.read_ext.kafka( @@ -219,11 +241,17 @@ def test_argument_errors(self): topic=self.topic, key_deserializer=self.json_decoder, value_deserializer=self.json_decoder, + # no schema! + ) + self.spark.read_ext.kafka( + 'kafka.docker', + topic=self.topic, + key_deserializer=self.json_decoder, + # no schema! ) - - with self.assertRaises(InvalidArgumentError): self.spark.read_ext.kafka( 'kafka.docker', topic=self.topic, - schema=self.expected_data_df.schema, + value_deserializer=self.json_decoder, + # no schema! ) diff --git a/tests/integration/test_writer.py b/tests/integration/test_writer.py index 80e1366..48a97c9 100644 --- a/tests/integration/test_writer.py +++ b/tests/integration/test_writer.py @@ -220,7 +220,7 @@ def test_write_kafka_dataframe(self): def test_write_kafka_dataframe_error(self): def _errored_serializer(data): - raise ValueError + raise ValueError('this is a testable error') try: self.expected_data.write_ext.kafka( @@ -230,7 +230,7 @@ def _errored_serializer(data): value_serializer=_errored_serializer, ) except Py4JJavaError as ex: - self.assertIn('WriteError(\'Error publishing to kafka', str(ex)) + self.assertIn('this is a testable error', str(ex)) else: raise AssertionError('WriteError exception not raised') diff --git a/tests/no_extras/test_writer.py b/tests/no_extras/test_writer.py deleted file mode 100644 index 112adf8..0000000 --- a/tests/no_extras/test_writer.py +++ /dev/null @@ -1,46 +0,0 @@ -# -# Copyright 2017 Tubular Labs, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import json -import uuid - -from sparkly.utils import absolute_path -from sparkly.testing import ( - SparklyGlobalSessionTest, -) -from tests.integration.base import SparklyTestSession - - -class TestWriteKafka(SparklyGlobalSessionTest): - session = SparklyTestSession - - def setUp(self): - self.json_decoder = lambda item: json.loads(item.decode('utf-8')) - self.json_encoder = lambda item: json.dumps(item).encode('utf-8') - self.topic = 'test.topic.write.kafka.{}'.format(uuid.uuid4().hex[:10]) - self.fixture_path = absolute_path(__file__, '..', 'integration', 'resources', - 'test_write', 'kafka_setup.json', - ) - self.expected_data = self.spark.read.json(self.fixture_path) - - def test_write_kafka_dataframe(self): - with self.assertRaises(NotImplementedError): - self.expected_data.write_ext.kafka( - 'kafka.docker', - self.topic, - key_serializer=self.json_encoder, - value_serializer=self.json_encoder, - ) diff --git a/tox.ini b/tox.ini index 3908cc9..387ab5b 100644 --- a/tox.ini +++ b/tox.ini @@ -30,6 +30,7 @@ commands = py.test tests/no_extras deps = -rrequirements.txt -rrequirements_dev.txt + pyspark==2.4.0 [testenv:docs] commands = sphinx-build -b html docs/source docs/build @@ -37,3 +38,4 @@ deps = -rrequirements_dev.txt -rrequirements_extras.txt -rrequirements.txt + pyspark==2.4.0