-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream_kafka_read.py
49 lines (40 loc) · 1.42 KB
/
stream_kafka_read.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import logging
import time
import pyspark
from dynaconf import settings
from pyspark.sql.functions import from_json, col
from schema import LCR_RESULT_SCHEMA
INBOUND_DIR = settings.INBOUND_DIR
LOG_LEVEL = logging.DEBUG
logging.basicConfig(datefmt='%T',
format='%(asctime)s %(name)s %(levelname)s %(message)s'
)
logger = logging.getLogger(__name__)
logger.setLevel(LOG_LEVEL)
def main(spark: pyspark.sql.SparkSession):
try:
sdf = spark.readStream.format('kafka') \
.option('kafka.bootstrap.servers', 'localhost:9092') \
.option('subscribe', 'lcr-events') \
.option("startingOffsets", "earliest") \
.load()
sdf.select(from_json(col('value').cast("string"), schema=LCR_RESULT_SCHEMA).alias('data')) \
.select('data.*')\
.writeStream \
.format('console') \
.start()
spark.streams.awaitAnyTermination()
finally:
spark.stop()
if __name__ == '__main__':
logger.info('Creating Spark session..')
t00 = time.time()
spark_session = pyspark.sql.SparkSession.builder \
.appName(settings.APPNAME_KAFKA) \
.master(settings.MASTER) \
.getOrCreate()
t01 = time.time()
logger.info(f'Spark session created in {t01-t00:.0f} seconds')
main(spark_session)
t02 = time.time()
logger.info(f'Program completed in {t02 - t01:.0f} seconds')