diff --git a/sparkly/testing.py b/sparkly/testing.py index 5be90df..e29bb08 100644 --- a/sparkly/testing.py +++ b/sparkly/testing.py @@ -36,7 +36,7 @@ from sparkly import SparklySession from sparkly.exceptions import FixtureError -from sparkly.utils import kafka_get_topics_offsets +from sparkly.utils import kafka_create_topic, kafka_get_topics_offsets if sys.version_info.major == 3: from http.client import HTTPConnection @@ -58,8 +58,7 @@ MYSQL_FIXTURES_SUPPORT = False try: - import kafka.admin - from kafka import KafkaConsumer, KafkaProducer, KafkaAdminClient + from kafka import KafkaConsumer, KafkaProducer KAFKA_FIXTURES_SUPPORT = True except ImportError: KAFKA_FIXTURES_SUPPORT = False @@ -862,8 +861,6 @@ def __init__( host, topic, port=9092, - num_partitions=2, - replication_factor=1, ): """Initialize context manager @@ -887,14 +884,7 @@ def __init__( self._df = None self.count = 0 - kafka_admin = KafkaAdminClient(bootstrap_servers=host) - kafka_admin.create_topics([ - kafka.admin.NewTopic( - name=topic, - num_partitions=num_partitions, - replication_factor=replication_factor, - ), - ]) + kafka_create_topic(host, topic) def __enter__(self): self._df = None diff --git a/sparkly/utils.py b/sparkly/utils.py index b40df10..6800640 100644 --- a/sparkly/utils.py +++ b/sparkly/utils.py @@ -21,7 +21,8 @@ import re try: - from kafka import KafkaConsumer, TopicPartition + from kafka import KafkaAdminClient, KafkaConsumer, TopicPartition + import kafka.admin except ImportError: pass @@ -77,18 +78,40 @@ def kafka_get_topics_offsets(host, topic, port=9092): """ brokers = ['{}:{}'.format(host, port)] consumer = KafkaConsumer(bootstrap_servers=brokers) - topic_partitions = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)] - start_offsets_raw = consumer.beginning_offsets(topic_partitions) - end_offsets_raw = consumer.end_offsets(topic_partitions) - start_offsets = {tp.partition: offset for tp, offset in start_offsets_raw.items()} - end_offsets = {tp.partition: offset for tp, offset in end_offsets_raw.items()} - offsets = [ - (partition, start_offsets[partition], end_offsets[partition]) - for partition in start_offsets - ] + partitions = consumer.partitions_for_topic(topic) + offsets = [] + if partitions: + topic_partitions = [TopicPartition(topic, p) for p in partitions] + start_offsets_raw = consumer.beginning_offsets(topic_partitions) + end_offsets_raw = consumer.end_offsets(topic_partitions) + start_offsets = {tp.partition: offset for tp, offset in start_offsets_raw.items()} + end_offsets = {tp.partition: offset for tp, offset in end_offsets_raw.items()} + offsets = [ + (partition, start_offsets[partition], end_offsets[partition]) + for partition in start_offsets + ] return offsets +def kafka_create_topic(host, topic, port=9092, num_partitions=2, replication_factor=1): + """Creates Kafka topic. + + Args: + host (str): Kafka host. + topic (str): Kafka topic. + port (int): Kafka port. + num_partitions (int): Number of topic's partitions. + replication_factor (int): Number of partition's replicas. + """ + kafka_admin = KafkaAdminClient(bootstrap_servers=f'{host}:{port}') + kafka_admin.create_topics([ + kafka.admin.NewTopic( + name=topic, + num_partitions=num_partitions, + replication_factor=replication_factor, + ), + ]) + class lru_cache(object): """LRU cache that supports DataFrames.