## Streaming Pipeline- Medals Table

Here we imagine that the games are taking place, the results are being streamed in so they can be displayed on our dashboard.

In [1]:
# Setting up Spark configuration and necessary existing tables
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import window, col, avg, concat, lit, from_csv

# Setting Spark configuration
sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("SparkStreamAssignment2")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")
# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

# Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
#Have to rename bucket
bucket = "dejads_temp_assignment2_team1"
spark.conf.set('temporaryGcsBucket', bucket)

# Importing country tables from bucket so they can be used in a join with our stream data
country_file_path = 'gs://dejads_output_assignment2_team1/country.csv/*.csv' 

country_df = spark.read.format("csv").option("header", "true") \
       .load(country_file_path)
country_df.printSchema()

root
 |-- country_code: string (nullable = true)
 |-- country: string (nullable = true)



In [6]:
# Setting up streaming settings
from pyspark.sql.functions import count, max, current_timestamp, col, desc

# Setting schema of data prior to reading stream
dataSchema = StructType(
    [StructField("medal_type", StringType(), True),
     StructField("medal_code", IntegerType(), True),
     StructField("medal_date", StringType(), True),
     StructField("athlete_short_name", StringType(), True),
     StructField("athlete_name", StringType(), True),
     StructField("athlete_sex", StringType(), True),
     StructField("athlete_link", StringType(), True),
     StructField("country_code", StringType(), True),
     StructField("discipline_code", StringType(), True),
     StructField("event", StringType(), True)
     ])

# Reading from Kafka stream of medal results
# Don't want stream to fail in case of potential data loss
# Subscribing to medal topic
# Want to pick up all medal data available on topic so using startingOffsets = earliest
kafkaStream = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka1:9092") \
  .option("failOnDataLoss", "false") \
  .option("subscribe", "medals") \
  .option("startingOffsets", "earliest") \
  .load()

# Cast Kafka received values to the correct column structure
df = kafkaStream.selectExpr("CAST(value AS STRING)")
df1 = df.select(from_csv(df.value, dataSchema.simpleString()))
df1.printSchema()
sdf = df1.select(col("from_csv(value).*"))
sdf.printSchema()

root
 |-- from_csv(value): struct (nullable = true)
 |    |-- medal_type: string (nullable = true)
 |    |-- medal_code: integer (nullable = true)
 |    |-- medal_date: string (nullable = true)
 |    |-- athlete_short_name: string (nullable = true)
 |    |-- athlete_name: string (nullable = true)
 |    |-- athlete_sex: string (nullable = true)
 |    |-- athlete_link: string (nullable = true)
 |    |-- country_code: string (nullable = true)
 |    |-- discipline_code: string (nullable = true)
 |    |-- event: string (nullable = true)

root
 |-- medal_type: string (nullable = true)
 |-- medal_code: integer (nullable = true)
 |-- medal_date: string (nullable = true)
 |-- athlete_short_name: string (nullable = true)
 |-- athlete_name: string (nullable = true)
 |-- athlete_sex: string (nullable = true)
 |-- athlete_link: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- discipline_code: string (nullable = true)
 |-- event: string (nullable = true)



In [7]:
# Query for producing medals table
from pyspark.sql.functions import countDistinct, when, col, count, desc

# Selecting all distinct rows for country, medal type, discipline and event (because medal won by a team will appear multiple times in dataset)
df = sdf.select('medal_type',  "medal_code", "event", "country_code", "discipline_code", "event").distinct()
# Joining to country table to get country column and dropping country code
df = df.join(country_df, "country_code", "left_outer").drop('country_code')
# Counting the number of gold, silver and bronze medals
df = df.groupBy("country").agg(
    count(when(col("medal_code") == 1, 1)),
    count(when(col("medal_code") == 2, 1)),
    count(when(col("medal_code") == 3, 1)),
).withColumnRenamed(
    'count(CASE WHEN (medal_code = 1) THEN 1 END)', 'gold_medal_count'
).withColumnRenamed(
    'count(CASE WHEN (medal_code = 2) THEN 1 END)', 'silver_medal_count'
).withColumnRenamed(
    'count(CASE WHEN (medal_code = 3) THEN 1 END)', 'bronze_medal_count'
)

In [8]:
import time
# Writing the data to BigQuery in batches
def foreach_batch_function_totalmedals(df, batch_id):
    # Saving the data to BigQuery as batch processing sink - see, use write(), save(), etc.
    df.write.format('bigquery') \
      .option('table', 'de2021-assignment2.assignment2.medals_table') \
      .mode("overwrite") \
      .save()

query = df.writeStream.outputMode("complete") \
                   .trigger(processingTime = '2 seconds').foreachBatch(foreach_batch_function_totalmedals).start()

In [2]:
spark.stop()