In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col, avg, concat, lit, from_json, to_json, struct, split
from pyspark.sql.types import StructType, StructField, LongType, StringType, IntegerType
from pyspark.sql.functions import avg, col, concat, lit, to_json, struct


# Spark configuration
sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("Stream_Pipeline")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

# Create the spark session
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

# Define schema for players and transfers
playersSchema = StructType([
    StructField("player_id", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("name", StringType(), True),
    StructField("last_season", IntegerType(), True),
    StructField("current_club_id", StringType(), True),
    StructField("player_code", StringType(), True),
    StructField("country_of_birth", StringType(), True),
    StructField("city_of_birth", StringType(), True),
    StructField("country_of_citizenship", StringType(), True),
    StructField("date_of_birth", StringType(), True),
    StructField("sub_position", StringType(), True),
    StructField("position", StringType(), True),
    StructField("foot", StringType(), True),
    StructField("height_in_cm", IntegerType(), True),
    StructField("contract_expiration_date", StringType(), True),
    StructField("agent_name", StringType(), True),
    StructField("image_url", StringType(), True),
    StructField("url", StringType(), True),
    StructField("current_club_domestic_competition_id", StringType(), True),
    StructField("current_club_name", StringType(), True),
    StructField("market_value_in_eur", DoubleType(), True),
    StructField("highest_market_value_in_eur", DoubleType(), True)
])

transfersSchema = StructType([
    StructField("player_id", StringType(), True),
    StructField("transfer_date", StringType(), True),
    StructField("transfer_season", StringType(), True),
    StructField("from_club_id", StringType(), True),
    StructField("to_club_id", StringType(), True),
    StructField("from_club_name", StringType(), True),
    StructField("to_club_name", StringType(), True),
    StructField("transfer_fee", DoubleType(), True),
    StructField("market_value_in_eur", DoubleType(), True),
    StructField("player_name", StringType(), True)
])

# Read the Kafka stream
kafkaStream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9093") \
    .option("failOnDataLoss", "false") \
    .option("subscribe", "club") \
    .option("startingOffsets", "latest") \
    .load()

# Parse the players data from Kafka stream
df_players = kafkaStream \
    .selectExpr("CAST(value AS STRING) as csv") \
    .select(split(col("csv"), ",").alias("csv")) \
    .selectExpr(
        "csv[0] as player_id",
        "csv[1] as first_name",
        "csv[2] as last_name",
        "csv[3] as name",
        "csv[4] as last_season",
        "csv[5] as current_club_id",
        "csv[6] as player_code",
        "csv[7] as country_of_birth",
        "csv[8] as city_of_birth",
        "csv[9] as country_of_citizenship",
        "csv[10] as date_of_birth",
        "csv[11] as sub_position",
        "csv[12] as position",
        "csv[13] as foot",
        "csv[14] as height_in_cm",
        "csv[15] as contract_expiration_date",
        "csv[16] as agent_name",
        "csv[17] as image_url",
        "csv[18] as url",
        "csv[19] as current_club_domestic_competition_id",
        "csv[20] as current_club_name",
        "csv[21] as market_value_in_eur",
        "csv[22] as highest_market_value_in_eur"
    )

# Parse the transfers data from Kafka stream  
df_transfers = kafkaStream \
    .selectExpr("CAST(value AS STRING) as csv") \
    .select(split(col("csv"), ",").alias("csv")) \
    .selectExpr(
        "csv[0] as player_id",
        "csv[1] as transfer_date",
        "csv[2] as transfer_season",
        "csv[3] as from_club_id",
        "csv[4] as to_club_id",
        "csv[5] as from_club_name",
        "csv[6] as to_club_name",
        "csv[7] as transfer_fee",
        "csv[8] as market_value_in_eur",
        "csv[9] as player_name"
    )
# Now df_players and df_transfers are streaming DataFrames

# Join players and transfers DataFrames
joinExpression = df_players["player_id"] == df_transfers["player_id"]
players_merged = df_transfers.join(df_players, joinExpression, "left").drop("rid")

# Write the merged DataFrame to a new Kafka topic
players_merged \
    .selectExpr("CAST(player_id AS STRING) AS key", "to_json(struct(*)) AS value") \
    .write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9093") \
    .option("topic", "players_merged_topic") \
    .save()

# Calculate the average transfer fee grouped by the player's current club
avg_transfer_fee_df = players_merged \
    .groupBy("current_club_name") \
    .agg(avg("transfer_fee").alias("avg_transfer_fee"))

# Select the key and value for Kafka
resultdf = avg_transfer_fee_df.select(
    col("current_club_name").alias("key"),
    col("avg_transfer_fee").cast("string").alias("value")
)

resultdf.printSchema()

# Start the streaming query
query = resultdf \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9093") \
    .option("checkpointLocation", "gs://your-bucket-name/checkpoints/your-checkpoint-directory") \
    .option("topic", "avg_transfer_fee_topic") \
    .outputMode("complete") \
    .start()
try:
    query.awaitTermination()
except KeyboardInterrupt:
    query.stop()

root
 |-- from_json(value): struct (nullable = true)
 |    |-- username: string (nullable = true)
 |    |-- teamname: string (nullable = true)
 |    |-- score: integer (nullable = true)
 |    |-- timestamp_in_ms: long (nullable = true)
 |    |-- readable_time: string (nullable = true)

root
 |-- username: string (nullable = true)
 |-- teamname: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- timestamp_in_ms: long (nullable = true)
 |-- readable_time: string (nullable = true)

root
 |-- username: string (nullable = true)
 |-- teamname: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- timestamp_in_ms: long (nullable = true)
 |-- readable_time: string (nullable = true)
 |-- event_time: timestamp (nullable = true)

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


Stoped the streaming query and the spark context


In [None]:
spark.stop()
print("Stoped the streaming query and the spark context")