In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col, avg, concat, lit, from_csv, round, count, corr
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType, IntegerType, ByteType, BooleanType, FloatType
from time import sleep

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("Spotify_Stream")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

# We need to set the following configuration whenever we need to use GCS.
# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

# Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
bucket = "temp_rgather"
spark.conf.set('temporaryGcsBucket', bucket)

dataSchema = StructType(
    [StructField("index", IntegerType(), True),
     StructField("track_name", StringType(), True),
     StructField("track_genre", StringType(), True),
     StructField("duration_ms", IntegerType(), True),
     StructField("popularity", IntegerType(), True),
     StructField("danceability", FloatType(), True),
     StructField("energy", FloatType(), True),
     StructField("loudness", FloatType(), True),
     StructField("speechiness", FloatType(), True),
     StructField("acousticness", FloatType(), True),
     StructField("intrumentalness", FloatType(), True),
     StructField("liveness", FloatType(), True),
     StructField("valence", FloatType(), True),
     StructField("explicit", BooleanType(), True),
     StructField("tempo", FloatType(), True)
     ])

kafkaStream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9093") \
    .option("subscribe", "songs") \
    .option("startingOffsets", "latest") \
    .load()

df = kafkaStream.selectExpr("CAST(value AS STRING)")

df1 = df.select(from_csv(df.value, dataSchema.simpleString()))

df1.printSchema()

sdf = df1.select(col("from_csv(value).*"))

sdf.printSchema()

DEC = 3
df_avg = sdf.groupby('track_genre').agg( \
    round(avg('popularity')).alias("popularity_s"), \
    round(avg('danceability'),DEC).alias("danceability_s"),\
    round(avg('energy'),DEC).alias("energy_s"),\
    round(avg('loudness'),DEC).alias("loudness_s"),\
    round(avg('speechiness'),DEC).alias("speechiness_s"),\
    round(avg('acousticness'),DEC).alias("acousticness_s"),\
    round(avg('intrumentalness'),DEC).alias("intrumentalness_s"),\
    round(avg('liveness'),DEC).alias("liveness_s"),\
    round(avg('valence'),DEC).alias("positivity_s"),\
    round(avg('tempo')).alias("tempo_s"),\
    avg('duration_ms').cast(IntegerType()).alias("duration_ms_s"),\
    avg(col('explicit').cast("float")).alias('explicit_perc_s'),\
    round(corr(col('explicit').cast('float'), "popularity"),3).alias("explicit_pop_corr_s"),\
    count(lit(1)).alias("amount_s")\
)

def my_foreach_batch_function(df, batch_id):
   # Saving the data to BigQuery as batch processing sink -see, use write(), save(), etc.
    print(":)")
    df.write.format('bigquery') \
      .option('table', 'de2022-362710.musicdataset.genre-characteristics-stream') \
      .mode("overwrite") \
      .save()

query = df_avg.writeStream.outputMode("complete") \
                    .queryName("spotify_stream") \
                    .trigger(processingTime = '2 seconds') \
                    .foreachBatch(my_foreach_batch_function) \
                    .start()

try:
    query.awaitTermination()
except KeyboardInterrupt:
    query.stop()
    spark.stop()
    print("Stoped the streaming query and the spark context")

root
 |-- from_csv(value): struct (nullable = true)
 |    |-- index: integer (nullable = true)
 |    |-- track_name: string (nullable = true)
 |    |-- track_genre: string (nullable = true)
 |    |-- duration_ms: integer (nullable = true)
 |    |-- popularity: integer (nullable = true)
 |    |-- danceability: float (nullable = true)
 |    |-- energy: float (nullable = true)
 |    |-- loudness: float (nullable = true)
 |    |-- speechiness: float (nullable = true)
 |    |-- acousticness: float (nullable = true)
 |    |-- intrumentalness: float (nullable = true)
 |    |-- liveness: float (nullable = true)
 |    |-- valence: float (nullable = true)
 |    |-- explicit: boolean (nullable = true)
 |    |-- tempo: float (nullable = true)

root
 |-- index: integer (nullable = true)
 |-- track_name: string (nullable = true)
 |-- track_genre: string (nullable = true)
 |-- duration_ms: integer (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- danceability: float (nullable = true)


In [15]:
# Stop the spark context
spark.stop()

ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/spark/python/pyspark/sql/utils.py", line 272, in call
    raise e
  File "/usr/local/spark/python/pyspark/sql/utils.py", line 269, in call
    self.func(DataFrame(jdf, self.session), batch_id)
  File "/tmp/ipykernel_780/3462003301.py", line 86, in my_foreach_batch_function
    .save()
  File "/usr/local/spark/python/pyspark/sql/readwriter.py", line 966, in save
    self._jwrite.save()
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/usr/local/spark/python/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-sr