In [None]:
import os
import json

import pyspark
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark import SparkFiles
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

In [None]:
# Initi Spark context and streaming

sconf = SparkConf()
sconf.setMaster(os.environ["SPARK_MASTER"])
app_name = 'default_stream_consumer_app'
sc = SparkContext(appName=app_name, conf=sconf)
ssc = StreamingContext(sc, 2)

In [None]:
# Create kafka direct stream

kvs = KafkaUtils.createDirectStream(ssc, [os.environ["KAFKA_DEFAULT_TOPIC"]], 
                                         {"metadata.broker.list":os.environ["KAFKA_DEFAULT_BROKER"]})

In [None]:
# Define expected structure fo the message
# Save streaming messages to a csv file

from datetime import datetime
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql import SparkSession

fields = ['timestamp', 'location', 'person_no', 'car_no', 'motorcycle_no', 'bus_no', 'truck_no', 'temp', 'temp_feels_like', 'temp_min', 'temp_max', 'pressure', 'humidity', 'visibility', 'wind_speed', 'wind_deg']
sschema =  StructType([
  StructField(field, StringType(), True) for field in fields
])

def store_result(rdd_raw):
    if not rdd_raw.isEmpty():
        ss = SparkSession(rdd_raw.context)
        df = ss.createDataFrame(rdd_raw, schema=sschema)
        print('-----------------------Message received-----------------------------------')
        current_time = datetime.now().strftime("%H:%M:%S")
        print('Saving message at: %s' % current_time)
        df.coalesce(1).write.csv(os.environ["CSV_FILES_LOCATION"], mode='append', header=True)
        print('--------------------------------------------------------------------------')



In [None]:
# Parse stream result

parsed = kvs.map(lambda v: json.loads(v[1]))
parsed.foreachRDD(store_result)

In [None]:
# Start listening

ssc.start()
ssc.awaitTermination()