In [1]:
from pyspark import SparkContext                                                                                        
from pyspark.sql import SparkSession                                                                                    
from pyspark.streaming import StreamingContext                                                                          
from pyspark.streaming.kafka import KafkaUtils    
import pandas as pd
import json
import yaml

In [2]:
ss = SparkSession.Builder() \
     .appName("SparkStreamingKafka") \
     .master("spark://streaming-spark-master:7077") \
     .config("spark.jars", "./spark-streaming-kafka-0-8-assembly_2.11-2.4.1.jar") \
     .config("spark.sql.warehouse.dir", "hdfs://namenode:9000/") \
     .getOrCreate()

In [3]:
sc = ss.sparkContext
ssc = StreamingContext(sc, 5)
ss.sparkContext.setLogLevel('WARN')

In [4]:
def handle_rdd1(rdd):
    if not rdd.isEmpty():
        global ss
        print(f"Recieved {len(rdd.collect())} records - transfrom 1")
        df = ss.createDataFrame(
            rdd,
            schema=[
                'ID',
                'ArrivalTime',
                'BusinessLeisure',
                'CabinCategory',
                'CreationDate',
                'CurrencyCode',
                'DepartureTime',
                'Destination',
                'OfficeIdCountry',
                'Origin',
                'TotalAmount',
                'nPAX',
                'Record'
            ])
        df.write.parquet(path='hdfs://namenode:9000/trips/trips.parquet', mode='append')
def handle_rdd2(rdd):
    if not rdd.isEmpty():
        global ss
        print(f"Recieved {len(rdd.collect())} records - transfrom 2")
        df = ss.createDataFrame(
            rdd,
            schema=[
                'ID',
                'ArrivalTime',
                'BusinessLeisure',
                'CabinCategory',
                'CreationDate',
                'CurrencyCode',
                'DepartureTime',
                'Destination',
                'OfficeIdCountry',
                'Origin',
                'TotalAmount',
                'nPAX',
                'Record'
            ])
        df.write.parquet(path='hdfs://namenode:9000/trips/processed_trips.parquet', mode='append')

In [5]:
def read_yaml(filename: str):
    with open(filename, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            print(exc)

In [6]:
mapping_and_statistic = read_yaml('mapping_and_statistic.yml')

In [7]:
def json_to_list(s):
    t = json.loads(s)
    results = []
    for k, v in t.items():
        results.append(v)
    results.append(s)
    return results

def get_continous(x, m):
    if str(x) == 'nan':
        return 0.0
    else:
        x = float(x)
        return (x - m['statistic']['mean']) / m['statistic']['std']

def get_categorical(x, m):
    if str(x) == 'nan':
        return 0.0
    else:
        v = m['mapping'][str(x)]
        return (v - m['statistic']['mean']) / m['statistic']['std']
    
def json_to_processed_data(s):
    t = json.loads(s)
    return [
        t['ID'],
        get_continous(t['ArrivalTime'], mapping_and_statistic['ArrivalTime']),
        get_categorical(t['BusinessLeisure'], mapping_and_statistic['BusinessLeisure']),
        get_categorical(t['CabinCategory'], mapping_and_statistic['CabinCategory']),
        get_continous(t['CreationDate'], mapping_and_statistic['CreationDate']),
        get_categorical(t['CurrencyCode'], mapping_and_statistic['CurrencyCode']),
        get_continous(t['DepartureTime'], mapping_and_statistic['DepartureTime']),
        get_categorical(t['Destination'], mapping_and_statistic['Destination']),
        get_categorical(t['OfficeIdCountry'], mapping_and_statistic['OfficeIdCountry']),
        get_categorical(t['Origin'], mapping_and_statistic['Origin']),
        get_continous(t['TotalAmount'], mapping_and_statistic['TotalAmount']),
        get_continous(t['nPAX'], mapping_and_statistic['nPAX']),
        s
    ]

In [11]:
ks = KafkaUtils.createDirectStream(
    ssc, ['trips'], {'metadata.broker.list': 'kafka-broker-1:9093,kafka-broker-2:9093'})
lines = ks.map(lambda x: x[1])

transform1 = lines.map(lambda tripInfo: json_to_list(tripInfo))
transform1.foreachRDD(handle_rdd1)

transform2 = lines.map(lambda tripInfo: json_to_processed_data(tripInfo))
transform2.foreachRDD(handle_rdd2)

ssc.start()
ssc.awaitTermination()

Py4JJavaError: An error occurred while calling o9377.createDirectStreamWithoutMessageHandler.
: org.apache.spark.SparkException: org.apache.spark.SparkException: Couldn't find leaders for Set([trips,1])
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:387)
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:387)
	at scala.util.Either.fold(Either.scala:98)
	at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:386)
	at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:223)
	at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:721)
	at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStreamWithoutMessageHandler(KafkaUtils.scala:689)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
