In [None]:
from pyspark.sql.functions import lit, from_json, explode, split, concat, col, lit, countDistinct, dense_rank, desc, row_number
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType
from time import sleep
from pyspark.sql import Window

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("Assignment 2 Streaming")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

# set up the gcp connection

conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

# specify the temporary bucket

bucket = "gs://de_2024_574440/temp_de2024"
spark.conf.set('temporaryGcsBucket', bucket)

#  Google Storage File Path
gsc_file_path = 'gs://de_2024_574440/data/data_genre24_top5.csv'  

# load and clean the top5 per genre in 2024 dataset 

df_24 = spark.read.format("csv").option("header", "true") \
       .load(gsc_file_path)

df_24 = df_24.withColumn("averageRating", col("averageRating").cast("float")) \
    .withColumn("releaseYear", col("releaseYear").cast("integer")) \
    .withColumn("numVotes", col("numVotes").cast("integer"))

df_24 = df_24.drop("year_genre_rank")

# creat the schema for streaming data

dataSchema = StructType(
        [StructField("id", StringType(), True),
         StructField("title", StringType(), True),
         StructField("type", StringType(), True),
         StructField("genres", StringType(), True),
         StructField("averageRating", StringType(), True),
         StructField("numVotes", StringType(), True),
         StructField("releaseYear", StringType(), True)
         ])

# define the foreachbatch function

def write_to_bigquery(batch_df, batch_id):

    # make initial transformations to get it into a dataframe form
    
    sdf = batch_df.selectExpr("CAST(value AS STRING)")

    df = sdf.withColumn("data", from_json(col("value"), dataSchema)) \
            .select("data.*")  

    #filter on moves and tv shows only
    
    df = df.where((df.type=="movie") | (df.type=="tvSeries"))

    #eemove rows with null values for key columns
    df = df.filter(df.averageRating != 'NULL') \
         .filter(df.genres != 'NULL') \
         .filter(df.releaseYear != 'NULL')
    
    #transform strings into float and integers
    df = df.withColumn("averageRating", col("averageRating").cast("float")) \
        .withColumn("releaseYear", col("releaseYear").cast("integer")) \
        .withColumn("numVotes", col("numVotes").cast("integer"))
    
    #Clean genres, so that only the main genre is available per row
    df = df.withColumn("genre_t", split(df.genres, ', ')).drop("genres") \
         .withColumn("one_genre", col("genre_t").getItem(0)).drop("genre_t")

    #Merge with the dataset from the bucket

    df_merged = df_24.union(df)
        
    #Create windows and rank by year and genre/year

    window=Window.partitionBy("one_genre").orderBy(desc("averageRating"))

    window_year = Window.orderBy(desc("averageRating"))
    
    df_transformed = df_merged.withColumn("year_genre_rank", row_number().over(window)) \
        .withColumn("year_rank", dense_rank().over(window_year))

    #select only the top 5 ranked movies per genre in a year
    
    df_filtered = df_transformed.filter(df_transformed.year_genre_rank < 6)
    
    # write to BigQuery
    df_filtered.write \
        .format("bigquery") \
        .option("table", "still-entity-435508-a1.Movies.genre24_top5") \
        .mode("overwrite") \
        .save()

    # write to Google Bucket
    df_filtered.write.mode("overwrite").option("header", "true").csv(gsc_file_path)


streaming_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9093") \
    .option("failOnDataLoss", "false") \
    .option("subscribe", "IMDB_C") \
    .option("startingOffsets", "earliest") \
    .load()

# streaming query with foreachbatch
query = streaming_df.writeStream \
    .foreachBatch(write_to_bigquery) \
    .outputMode("append") \
    .start()

query.awaitTermination()


In [10]:
# Stop the spark context
spark.stop()