### Pyspark code to read from 'order_items' stream, preprocess, and write to BigQuery

In [7]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, concat, col, lit
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType, DateType, IntegerType, FloatType
from time import sleep

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("assignment2_stream2")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

In [8]:
from pyspark.sql import functions as f
from pyspark.sql.functions import from_csv

# Define schema
order_items_dataSchema = StructType(
        [StructField("order_id", StringType(), True),
         StructField("order_item_id", IntegerType(), True),
         StructField("product_id", StringType(), True),
         StructField("seller_id", StringType(), True),
         StructField("shipping_limit_date", StringType(), True),
         StructField("price", FloatType(), True),       
         StructField("freight_value", FloatType(), True)
         ])

# Read the whole dataset as a batch
kafkaStream = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka1:9093") \
        .option("subscribe", "order_items") \
        .option("startingOffsets", "earliest") \
        .load()

df = kafkaStream.selectExpr("CAST(value AS STRING)")

df1 = df.select(from_csv(df.value, order_items_dataSchema.simpleString()))

order_items = df1.select(col("from_csv(value).*"))
order_items.printSchema()

# preprocessing
order_items = order_items.withColumn("shipping_limit_date",order_items.shipping_limit_date.cast(DateType())) # change to datetype
order_items = order_items.withColumn("total_price", order_items.order_item_id*(order_items.price + order_items.freight_value)) # Add 'total price' column to order_items 

order_items.printSchema()


root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: string (nullable = true)
 |-- price: float (nullable = true)
 |-- freight_value: float (nullable = true)

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: date (nullable = true)
 |-- price: float (nullable = true)
 |-- freight_value: float (nullable = true)
 |-- total_price: float (nullable = true)



### Saving to BigQuery as batch

In [11]:
# We need to set the following configuration whenever we need to use GCS.
# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

# Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
bucket = "de_jads_temp_2093373"
spark.conf.set('temporaryGcsBucket', bucket)

In [12]:
def my_foreach_batch_function(df, batch_id):
   # Saving the data to BigQuery as batch processing sink -see, use write(), save(), etc.
    df.write.format('bigquery') \
      .option('table', 'de2022-362707.assignment2.order_items') \
      .mode("overwrite") \
      .save()

# Write to a sink - here, the output is written to a Big Query Table
# ProcessingTime trigger with 60-seconds micro-batch interval as the dataset is large and does not get updated within the 60 second timeframe
# Using output mode append as only new rows need to be appeneded to BigQuery and no aggregating is done with previous data
order_itemsQuery = order_items.writeStream.outputMode("append") \
                        .trigger(processingTime = '60 seconds').foreachBatch(my_foreach_batch_function).start()
try:
    order_itemsQuery.awaitTermination()
except KeyboardInterrupt:
    order_itemsQuery.stop()
    # Stop the spark context
    spark.stop()
    print("Stoped the streaming query and the spark context")

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


Stoped the streaming query and the spark context
