In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col, avg, concat, lit, from_csv, to_timestamp, when
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from time import sleep


sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("stream_assignment_57")
sparkConf.set("spark.driver.memory", "4g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

#  Google Storage File Path
gsc_file_path = 'gs://in_assignment_2/Resources.csv'  #   upload Resources.csv first
# Create data frame

dataSchema_donor = StructType(
    [StructField("Project_ID", StringType(), True),
     StructField("Resource_Quantity", IntegerType(), True),
     StructField("Resource_Unit_Price", FloatType(), True),
     StructField("Resource_Vendor_Name", StringType(), True)
     ])

resources = spark.read.format("csv").schema(dataSchema_donor).option("header", "true") \
       .load(gsc_file_path)
resources.printSchema()


dataSchema_donation = StructType(
    [StructField("Project_ID", StringType(), True),
     StructField("Donation_ID", StringType(), True),
     StructField("Donor_ID", StringType(), True),
     StructField("Donation_Included_Optional_Donation", StringType(), True),
     StructField("Donation_Amount", FloatType(), True),
     StructField("Donor_Cart_Sequence", IntegerType(), True),
     StructField("Donation_Received_Date", StringType(), True)
     ])

def read_kafka(topic, dataSchema):
    kafkaStream = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka1:9093") \
        .option("failOnDataLoss", "false") \
        .option("subscribe", topic) \
        .option("startingOffsets", "latest") \
        .load()
    df = kafkaStream.selectExpr("CAST(value AS STRING)")
    df1 = df.select(from_csv(df.value, dataSchema.simpleString()))
    sdf = df1.select(col("from_csv(value).*"))
    return sdf


donations = read_kafka("data", dataSchema_donation)

donation_resource = resources.join(donations, "Project_ID")

# create the event time column 
withEventTimedf = donation_resource.selectExpr(
    "*")

withEventTimedf = withEventTimedf.withColumn("event_time",to_timestamp("Donation_Received_Date"))
withEventTimedf = withEventTimedf.withColumn("cost", col('Resource_Quantity') * col('Resource_Unit_Price'))

withEventTimedf.printSchema()


avgscoredf = withEventTimedf \
    .groupBy(window(col("event_time"), "10 seconds"), "Project_ID") \
    .agg(avg('cost').alias("avg_cost"), avg("Donation_Amount").alias("avg_amount"))

resultdf = avgscoredf.select(col("Project_ID").alias("key"), concat(col("avg_cost").cast("string"),lit(" "), col("avg_amount").cast("string")).alias("value"))

resultdf.printSchema()

query = resultdf \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9093") \
    .option("checkpointLocation", "/home/jovyan/checkpoint") \
    .option("topic", "resource_result") \
    .outputMode("complete") \
    .start()

print(query.lastProgress)

try:
    query.awaitTermination()
except KeyboardInterrupt:
    query.stop()
    # Stop the spark context
    spark.stop()
    print("Stoped the streaming query and the spark context")

root
 |-- Project_ID: string (nullable = true)
 |-- Resource_Quantity: integer (nullable = true)
 |-- Resource_Unit_Price: float (nullable = true)
 |-- Resource_Vendor_Name: string (nullable = true)

root
 |-- Project_ID: string (nullable = true)
 |-- Resource_Quantity: integer (nullable = true)
 |-- Resource_Unit_Price: float (nullable = true)
 |-- Resource_Vendor_Name: string (nullable = true)
 |-- Donation_ID: string (nullable = true)
 |-- Donor_ID: string (nullable = true)
 |-- Donation_Included_Optional_Donation: string (nullable = true)
 |-- Donation_Amount: float (nullable = true)
 |-- Donor_Cart_Sequence: integer (nullable = true)
 |-- Donation_Received_Date: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- cost: float (nullable = true)

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)

None
Stoped the streaming query and the spark context


In [2]:
spark.stop()
