In [0]:
pip install pyspark jproperties argparse json5

In [0]:
%fs rm -r /tmp/stream/pykafka_ss/

In [0]:
dbutils.fs.mkdirs("/tmp/stream/pykafka_ss/")

In [0]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Aug  9 11:21:19 2020

@author: sunilmiriyala
"""
# https://spark.apache.org/docs/latest/api/python/index.html

import sys
import pyspark
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from jproperties import Properties
from pyspark.storagelevel import StorageLevel
from pyspark.sql.types import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
import json5 as json
from pyspark.sql.functions import col

# https://spark.apache.org/docs/latest/api/python/index.html

class MyPySparkApp:
    def __init__(self, **kwargs):
        print("__init__::kwargs:%s" % kwargs)
        self.appname = kwargs.get("spark.name", "MyPySparkKafka")
        self.master = kwargs.get("spark.master", "local")
        self.batch_dur_sec = kwargs.get("spark.stream.batch.duration.secs", 5)

        self.src_type = kwargs.get("data.source.type", "kafka")
        self.src_format = kwargs.get("data.source.format", "json")

        self.kfk_topic = kwargs.get("data.source.kafka.topic")
        self.kfk_topic_out = kwargs.get("data.sink.kafka.topic")
        self.kfk_brokers = kwargs.get("data.source.kafka.brokers")
        
        self.kfk_start = kwargs.get("data.source.kafka.startingposition", "earliest")
        self.kfk_chkpoint_int = kwargs.get("data.source.kafka.checkpointinterval", 10)
        self.kfk_chkpoint_dir = kwargs.get("data.source.kafka.checkpointdir", "/tmp/")

        self.kfk_auto_commit = kwargs.get("enable.auto.commit", True)
        self.sasl_protocol = kwargs.get("security.protocol", "SASL_SSL")
        self.sasl_mech = kwargs.get("sasl.mechanisms", "SCRAM-SHA-256")
        self.sasl_username = kwargs.get("sasl.username")
        self.sasl_password = kwargs.get("sasl.password")
        self.file_location = kwargs.get("data.lookup.location")
        self.file_names = kwargs.get("data.lookup.files")

        self.conf = SparkConf().setAppName(self.appname)
        print("__init__::self.conf:%s" % self.conf)
        sc.setCheckpointDir(self.kfk_chkpoint_dir)

        self.ssc = StreamingContext(sc, int(self.batch_dur_sec))
        print("__init__::self.ssc:%s" % (self.ssc))

        self.spark = SparkSession.builder.config(conf=self.conf).getOrCreate()
        print("__init__::self.spark:%s" % self.spark)

    def loadLookupTable(self):
        if self.file_names is None:
            print("loadLookupTable::file_name is none")
            return None
        files = self.file_names.split(",")
        lookupTableDF = self.readFile(filePath=self.file_location + files[0])
        try:
          lookupTableDF.createTempView("BROADCAST_CUST")
        except Exception as ex:
          print("loadLookupTable::Exception:", str(ex))

    def readFile(self, filePath):
        print("readFile::**********************")
        print("readFile::filePath:%s" % filePath)
        df = self.spark.read.format('csv').options(header='true').options(inferSchema='true').load(filePath).cache()
        print("readFile::df:%s" % (df))
        df.printSchema()
        df.show(truncate=True)
        #print("readFile::df.count:%s" % df.count())
        return df
      
    def enrichCustId(self, input_df):
        print("enrichCustId::input_df:", input_df)
        brd_cust_df = self.spark.sql("select * from BROADCAST_CUST")
        print("enrichCustId::brd_cust_df:", brd_cust_df)
        #input_df_new = input_df.rdd().toDF()
        #enrichCustId::input_df: DataFrame[from_json(value): struct<id:int,cust_id:int,app_id:int,active:int>]
        #enrichCustId::brd_cust_df: DataFrame[id: int, name: string, active: int]
        enrich_df = input_df.join(brd_cust_df, input_df.cust_id == brd_cust_df.id, 'inner').select(brd_cust_df.name, input_df.id, input_df.app_id, input_df.active)
        print("enrichCustId::enrich_df:", enrich_df)
        return enrich_df
    

    def readData(self):
        print("readData::START2")
        
        self.loadLookupTable()
        #print("readData::broadcast_var:", broadcast_var)
        
        cust_app_schema = StructType(
          [ StructField("id",IntegerType()), 
           StructField("cust_id",IntegerType()), 
           StructField("app_id",IntegerType()), 
           StructField("active",IntegerType()) 
          ]
        )
        print("readData::cust_app_schema::", cust_app_schema)
        
        # Read from Prop 'public.dept'
        # https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
        kafka_data_df = self.spark.readStream.format("kafka") \
            .option("kafka.bootstrap.servers", self.kfk_brokers) \
            .option("subscribe", self.kfk_topic) \
            .option("kafka.security.protocol", self.sasl_protocol) \
            .option("kafka.sasl.mechanism", self.sasl_mech) \
            .option("security.protocol", self.sasl_protocol) \
            .option("sasl.mechanisms", self.sasl_mech) \
            .load()
        print("readData::kafka_data_df(original)::", kafka_data_df)
        kafka_data_df = kafka_data_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
        print("readData::kafka_data_df(as-string)::", kafka_data_df)
        cust_apps_df = kafka_data_df.select(from_json(col("value"), cust_app_schema).alias("data")).select("data.*")
        print("readData::cust_apps_df::", cust_apps_df)
        #cust_apps_enrich_df = cust_apps_df
        cust_apps_enrich_df = cust_apps_df.transform(self.enrichCustId)
        print("readData::cust_apps_enrich_df::", cust_apps_enrich_df)
        #Note that In order to write Spark Streaming data to Kafka, value column is required and all other fields are optional.
        #columns key and value are binary in Kafka; hence, first, these should convert to String before processing. 
        #If a key column is not specified, then a null valued key column will be automatically added.
        cust_apps_enrich_df = cust_apps_enrich_df.selectExpr("to_json(struct(*)) AS value")
        print("readData::final::cust_apps_enrich_df(k,v)::", cust_apps_enrich_df)
        cust_apps_enrich_df.writeStream.format("kafka")\
            .option("checkpointLocation", "/tmp/stream/pykafka_ss/")\
            .option("kafka.bootstrap.servers", self.kfk_brokers)\
            .option("kafka.security.protocol", self.sasl_protocol) \
            .option("kafka.sasl.mechanism", self.sasl_mech) \
            .option("security.protocol", self.sasl_protocol) \
            .option("sasl.mechanisms", self.sasl_mech) \
            .option("topic", self.kfk_topic_out)\
            .start()\
            .awaitTermination()
        

    def start(self):
        self.readData()


#("kafka.bootstrap.servers", "omnibus-01.srvs.cloudkafka.com:9094,omnibus-02.srvs.cloudkafka.com:9094,omnibus-03.srvs.cloudkafka.com:9094") \
#("kafka.security.protocol", "SASL_SSL") \
#("kafka.sasl.mechanism", "SCRAM-SHA-256") \
#("security.protocol", "SASL_SSL") \
#("sasl.mechanism", "SCRAM-SHA-256") \
#("subscribe", "yrmfqh3q-test") \
if __name__ == '__main__':
    kw = {
        "spark.master": "local",
        "spark.name": "MyPySpark-Kafka",
        "spark.stream.batch.duration.secs": "10",
        "data.source.type": "kafka",
        "data.source.format": "json",
        "data.source.kafka.topic": "yrmfqh3q-test",
        "data.sink.kafka.topic": "yrmfqh3q-default",
        "data.source.kafka.group": "pyspark-kafka",
        "data.source.kafka.brokers": "omnibus-01.srvs.cloudkafka.com:9094,omnibus-03.srvs.cloudkafka.com:9094,omnibus-02.srvs.cloudkafka.com:9094",
        "data.source.kafka.startingposition": "earliest",
        "data.source.kafka.checkpointinterval": "5",
        "security.protocol": "SASL_SSL",
        "sasl.mechanisms": "SCRAM-SHA-256",
        "sasl.username": "yrmfqh3q",
        "sasl.password": "WfKJEYL_hwNUHVU8laSbj6gXkPIw_xuc",
        "checkpointdir": "/tmp/stream/pykafka_ss/",
        "data.lookup.location": "dbfs:/FileStore/tables/",
        "data.lookup.files": "customers.csv"
    }
    print("kw::%s" % kw)
    app = MyPySparkApp(**kw)
    app.start()


In [0]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Aug  9 11:21:19 2020

@author: sunilmiriyala
"""
# https://spark.apache.org/docs/latest/api/python/index.html

import sys
import pyspark
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from jproperties import Properties
from pyspark.storagelevel import StorageLevel
from pyspark.sql.types import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
import json5 as json
from pyspark.sql.functions import col

# https://spark.apache.org/docs/latest/api/python/index.html

class MyPySparkApp:
    def __init__(self, **kwargs):
        print("__init__::kwargs:%s" % kwargs)
        self.appname = kwargs.get("spark.name", "MyPySparkKafka")
        self.master = kwargs.get("spark.master", "local")
        self.batch_dur_sec = kwargs.get("spark.stream.batch.duration.secs", 5)

        self.src_type = kwargs.get("data.source.type", "kafka")
        self.src_format = kwargs.get("data.source.format", "json")

        self.kfk_topic = kwargs.get("data.source.kafka.topic")
        self.kfk_topic_out = kwargs.get("data.sink.kafka.topic")
        self.kfk_brokers = kwargs.get("data.source.kafka.brokers")
        
        self.kfk_start = kwargs.get("data.source.kafka.startingposition", "earliest")
        self.kfk_chkpoint_int = kwargs.get("data.source.kafka.checkpointinterval", 10)
        self.kfk_chkpoint_dir = kwargs.get("data.source.kafka.checkpointdir", "/tmp/")

        self.kfk_auto_commit = kwargs.get("enable.auto.commit", True)
        self.sasl_protocol = kwargs.get("security.protocol", "SASL_SSL")
        self.sasl_mech = kwargs.get("sasl.mechanisms", "SCRAM-SHA-256")
        self.sasl_username = kwargs.get("sasl.username")
        self.sasl_password = kwargs.get("sasl.password")
        self.file_location = kwargs.get("data.lookup.location")
        self.file_names = kwargs.get("data.lookup.files")

        self.conf = SparkConf().setAppName(self.appname)
        print("__init__::self.conf:%s" % self.conf)
        sc.setCheckpointDir(self.kfk_chkpoint_dir)

        self.ssc = StreamingContext(sc, int(self.batch_dur_sec))
        print("__init__::self.ssc:%s" % (self.ssc))

        self.spark = SparkSession.builder.config(conf=self.conf).getOrCreate()
        print("__init__::self.spark:%s" % self.spark)

    def loadLookupTable(self):
        if self.file_names is None:
            print("loadLookupTable::file_name is none")
            return None
        files = self.file_names.split(",")
        lookupTableDF = self.readFile(filePath=self.file_location + files[0])
        pdf_j = lookupTableDF.toJSON().collect()
        print("readData::pdf_j:%s" % pdf_j)
        broadcast_var = sc.broadcast(json.loads(pdf_j))
        print("readData::broadcast_var:", broadcast_var, broadcast_var.value)
        return broadcast_var

    def readFile(self, filePath):
        print("readFile::**********************")
        print("readFile::filePath:%s" % filePath)
        df = self.spark.read.format('csv').options(header='true').options(inferSchema='true').load(filePath).cache()
        print("readFile::df:%s" % (df))
        df.printSchema()
        df.show(truncate=True)
        #print("readFile::df.count:%s" % df.count())
        return df

    def enrichCustId(self, input_df):
        print("enrichCustId:", type(input_df), input_df)
        vaf = input_df["value"]
        return input_df

    def readData(self):
        print("readData")
        broadcast_var = self.loadLookupTable()

        # Read from Prop 'public.dept'
        # https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
        kafka_data_df = self.spark.readStream.format("kafka") \
            .option("kafka.bootstrap.servers", self.kfk_brokers) \
            .option("subscribe", self.kfk_topic) \
            .option("kafka.security.protocol", self.sasl_protocol) \
            .option("kafka.sasl.mechanism", self.sasl_mech) \
            .option("security.protocol", self.sasl_protocol) \
            .option("sasl.mechanisms", self.sasl_mech) \
            .load()
        print("readData::kafka_data_df(1)::", kafka_data_df)
        kafka_data_df = kafka_data_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
        print("readData::kafka_data_df(2)::", type(kafka_data_df), kafka_data_df)
        
        #kafka_data_df.transform(self.enrichCustId)
        
        kafka_data_df.writeStream.format("kafka")\
            .option("checkpointLocation", "/tmp/stream/pykafka_ss/")\
            .option("kafka.bootstrap.servers", self.kfk_brokers)\
            .option("kafka.security.protocol", self.sasl_protocol) \
            .option("kafka.sasl.mechanism", self.sasl_mech) \
            .option("security.protocol", self.sasl_protocol) \
            .option("sasl.mechanisms", self.sasl_mech) \
            .option("topic", self.kfk_topic_out)\
            .start()\
            .awaitTermination()

    def start(self):
        self.readData()


#("kafka.bootstrap.servers", "omnibus-01.srvs.cloudkafka.com:9094,omnibus-02.srvs.cloudkafka.com:9094,omnibus-03.srvs.cloudkafka.com:9094") \
#("kafka.security.protocol", "SASL_SSL") \
#("kafka.sasl.mechanism", "SCRAM-SHA-256") \
#("security.protocol", "SASL_SSL") \
#("sasl.mechanism", "SCRAM-SHA-256") \
#("subscribe", "yrmfqh3q-test") \
if __name__ == '__main__':
    kw = {
        "spark.master": "local",
        "spark.name": "MyPySpark-Kafka",
        "spark.stream.batch.duration.secs": "10",
        "data.source.type": "kafka",
        "data.source.format": "json",
        "data.source.kafka.topic": "yrmfqh3q-test",
        "data.sink.kafka.topic": "yrmfqh3q-default",
        "data.source.kafka.group": "pyspark-kafka",
        "data.source.kafka.brokers": "omnibus-01.srvs.cloudkafka.com:9094,omnibus-03.srvs.cloudkafka.com:9094,omnibus-02.srvs.cloudkafka.com:9094",
        "data.source.kafka.startingposition": "earliest",
        "data.source.kafka.checkpointinterval": "5",
        "security.protocol": "SASL_SSL",
        "sasl.mechanisms": "SCRAM-SHA-256",
        "sasl.username": "yrmfqh3q",
        "sasl.password": "WfKJEYL_hwNUHVU8laSbj6gXkPIw_xuc",
        "checkpointdir": "/tmp/stream/pykafka_ss/",
        "data.lookup.location": "dbfs:/FileStore/tables/",
        "data.lookup.files": "customers.csv"
    }
    print("kw::%s" % kw)
    app = MyPySparkApp(**kw)
    app.start()
