# Final project

## Part 1. Building an End-to-End Streaming Pipeline

### 1.0. Preparations

In [1]:
import os
import time
import datetime

from kafka.admin import KafkaAdminClient, NewTopic

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, round, to_json, from_json, struct, avg, lit

from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    FloatType,
    IntegerType,
)

from configs import kafka_config, jdbc_config

In [2]:
TOPIC_EVENT_RESULTS_IX = 0
TOPIC_GROUP_RESULTS_IX = 1

topic_names = [
    f'{kafka_config['name']}_de_fp_athlete_event_results',
    f'{kafka_config['name']}_de_fp_group_results',
]

#### Delete Kafka topics

In [4]:
# Open to delete all topics with name "kafka_config['name']*"
if False:
    admin_client = KafkaAdminClient(
        bootstrap_servers=kafka_config['bootstrap_servers'][0],
        security_protocol=kafka_config['security_protocol'],
        sasl_mechanism=kafka_config['sasl_mechanism'],
        sasl_plain_username=kafka_config['username'],
        sasl_plain_password=kafka_config['password']
    )

    for topic in admin_client.list_topics():
        if kafka_config['name'] in topic:
            print(f'Deleting topic: {topic}')
            admin_client.delete_topics(topics=[topic])

    admin_client.close()

#### Create Kafka topics

In [6]:
admin_client = KafkaAdminClient(
    bootstrap_servers=kafka_config['bootstrap_servers'][0],
    security_protocol=kafka_config['security_protocol'],
    sasl_mechanism=kafka_config['sasl_mechanism'],
    sasl_plain_username=kafka_config['username'],
    sasl_plain_password=kafka_config['password']
)

num_partitions = 2
replication_factor = 1
new_topics = [ NewTopic(name=n, num_partitions=num_partitions, replication_factor=replication_factor) for n in topic_names ]

try:
    admin_client.create_topics(new_topics=new_topics, validate_only=False)
    print(f"Topics are created successfully.")
    [print(topic) for topic in admin_client.list_topics() if kafka_config['name'] in topic]
except Exception as e:
    print(f"An error occurred: {e}")

admin_client.close()

Topics are created successfully.
maksymp_de_fp_athlete_event_results
maksymp_de_fp_group_results


#### Create Spark session

In [3]:
# Packet to work with Kafka in Spark
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 pyspark-shell'

In [4]:
spark = SparkSession.builder \
    .config("spark.jars", "mysql-connector-j-8.0.32.jar") \
    .appName("DE_FP") \
    .getOrCreate()

your 131072x1 screen size is bogus. expect trouble
25/01/31 00:25:00 WARN Utils: Your hostname, AMD resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/01/31 00:25:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/max/.ivy2/cache
The jars for the packages stored in: /home/max/.ivy2/jars
org.apache.spark#spark-streaming-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e17c35b9-a7df-4c41-a9c1-a47be17762c5;1.0
	confs: [default]
	found org.apache.spark#spark-streaming-kafka-0-10_2.12;3.5.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.1 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12

In [5]:
if False:
    spark.stop()

25/01/31 00:25:13 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


#### JDBC

In [6]:
def get_jdbc_df(jdbc_table, limit=None):
    df = spark.read.format('jdbc').options(
                url = f'{jdbc_config['url']}/olympic_dataset',
                driver = 'com.mysql.cj.jdbc.Driver',
                dbtable = jdbc_table,
                user = jdbc_config['user'],
                password = jdbc_config['password']) \
            .load()
    if limit:
        df = df.limit(limit)
    return df

### 1.1. Load `athlete_bio`

In [7]:
athlete_bio_raw_df = get_jdbc_df("athlete_bio")

In [8]:
athlete_bio_raw_df.show(10)

+----------+-----------------+------+--------------+------+------+-----------------+-----------+--------------------+--------------------+
|athlete_id|             name|   sex|          born|height|weight|          country|country_noc|         description|       special_notes|
+----------+-----------------+------+--------------+------+------+-----------------+-----------+--------------------+--------------------+
|     65649|     IvankaBonova|Female|    4April1949| 166.0|    55|         Bulgaria|        BUL|PersonalBest40053...|                 nan|
|    112510| NataliyaUryadova|Female|   15March1977| 184.0|    70|RussianFederation|        RUS|                 nan|ListedinOlympians...|
|    114973| EssaIsmailRashed|  Male|14December1986| 165.0|    55|            Qatar|        QAT|PersonalBest10000...|ListedinOlympians...|
|     30359|        PterBoros|  Male| 12January1908|      |   nan|          Hungary|        HUN|Between1927and193...|                 nan|
|     50557|     RudolfPiow

### 1.2. Filter `athlete_bio`

In [9]:
athlete_bio_df = athlete_bio_raw_df \
    .withColumn("height", col("height").cast(FloatType())) \
    .withColumn("weight", col("weight").cast(FloatType())) \
    .fillna({"height": 0, "weight": 0}) \
    .filter((col("height") != 0) & (col("weight") != 0))

athlete_bio_df.show(10)

+----------+-------------------+------+--------------+------+------+-----------------+-----------+--------------------+--------------------+
|athlete_id|               name|   sex|          born|height|weight|          country|country_noc|         description|       special_notes|
+----------+-------------------+------+--------------+------+------+-----------------+-----------+--------------------+--------------------+
|     65649|       IvankaBonova|Female|    4April1949| 166.0|  55.0|         Bulgaria|        BUL|PersonalBest40053...|                 nan|
|    112510|   NataliyaUryadova|Female|   15March1977| 184.0|  70.0|RussianFederation|        RUS|                 nan|ListedinOlympians...|
|    114973|   EssaIsmailRashed|  Male|14December1986| 165.0|  55.0|            Qatar|        QAT|PersonalBest10000...|ListedinOlympians...|
|    133041|    VincentRiendeau|  Male|13December1996| 178.0|  68.0|           Canada|        CAN|                 nan|ListedinOlympians...|
|    110425| 

In [10]:
athlete_bio_short_df = athlete_bio_df.select(['athlete_id', 'sex', 'height', 'weight'])
athlete_bio_short_df.show(5)

+----------+------+------+------+
|athlete_id|   sex|height|weight|
+----------+------+------+------+
|     65649|Female| 166.0|  55.0|
|    112510|Female| 184.0|  70.0|
|    114973|  Male| 165.0|  55.0|
|    133041|  Male| 178.0|  68.0|
|    110425|Female| 164.0|  58.0|
+----------+------+------+------+
only showing top 5 rows



### 1.3. Load `athlete_event_results`

#### Load from MySQL

In [15]:
athlete_event_results_raw_df = get_jdbc_df("athlete_event_results")

In [16]:
athlete_event_results_raw_df.show(5)

[Stage 3:>                                                          (0 + 1) / 1]

+------------------+----------+-----------+---------+-------------+---------+--------------+----------+------+-----+-----------+
|           edition|edition_id|country_noc|    sport|        event|result_id|       athlete|athlete_id|   pos|medal|isTeamSport|
+------------------+----------+-----------+---------+-------------+---------+--------------+----------+------+-----+-----------+
|1908SummerOlympics|         5|        ANZ|Athletics|100metres,Men|    56265|ErnestHutcheon|     64710|   DNS|  nan|      False|
|1908SummerOlympics|         5|        ANZ|Athletics|400metres,Men|    56313|   HenryMurray|     64756|   DNS|  nan|      False|
|1908SummerOlympics|         5|        ANZ|Athletics|800metres,Men|    56338|  HarveySutton|     64808|3h8r12|  nan|      False|
|1908SummerOlympics|         5|        ANZ|Athletics|800metres,Men|    56338|    GuyHaskins|    922519|   DNS|  nan|      False|
|1908SummerOlympics|         5|        ANZ|Athletics|800metres,Men|    56338|   JosephLynch|     

                                                                                

In [17]:
athlete_event_results_raw_df.count()

316834

#### Convert to json

In [18]:
columns = athlete_event_results_raw_df.columns
athlete_event_results_df = athlete_event_results_raw_df.withColumn("key", expr("uuid()"))
athlete_event_results_json_df = athlete_event_results_df.select(
        'key',
        to_json(struct(columns)).alias('value')
    )
athlete_event_results_json_df.show(5)

+--------------------+--------------------+
|                 key|               value|
+--------------------+--------------------+
|d4fd9d75-b0da-4a3...|{"edition":"1908S...|
|9d55030c-d7d6-420...|{"edition":"1908S...|
|b66f244f-a926-4de...|{"edition":"1908S...|
|fb235cae-bfe0-485...|{"edition":"1908S...|
|55aa1d47-d304-48d...|{"edition":"1908S...|
+--------------------+--------------------+
only showing top 5 rows



#### Write to Kafka

In [19]:
athlete_event_results_json_df \
    .write \
        .format('kafka') \
        .option('kafka.bootstrap.servers', kafka_config['bootstrap_servers'][0]) \
        .option('topic', topic_names[TOPIC_EVENT_RESULTS_IX]) \
        .option('kafka.security.protocol', 'SASL_PLAINTEXT') \
        .option('kafka.sasl.mechanism', 'PLAIN') \
        .option('kafka.sasl.jaas.config',
            f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafka_config['username']}" password="{kafka_config['password']}";') \
        .save()

                                                                                

![write_to_kafka1](screenshots/write_to_kafka1.png)

#### Read from Kafka

In [11]:
json_schema = StructType([
        StructField('edition', StringType(), True),
        StructField('edition_id', IntegerType(), True),
        StructField('country_noc', StringType(), True),
        StructField('sport', StringType(), True),
        StructField('event', StringType(), True),
        StructField('result_id', StringType(), True),
        StructField('athlete', StringType(), True),
        StructField('athlete_id', IntegerType(), True),
        StructField('pos', StringType(), True),
        StructField('medal', StringType(), True),
        StructField('isTeamSport', StringType(), True),
])

df = spark \
        .readStream \
        .format('kafka') \
        .option('kafka.bootstrap.servers', kafka_config['bootstrap_servers'][0]) \
        .option('kafka.security.protocol', 'SASL_PLAINTEXT') \
        .option('kafka.sasl.mechanism', 'PLAIN') \
        .option('kafka.sasl.jaas.config',
                f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafka_config['username']}" password="{kafka_config['password']}";') \
        .option('subscribe', topic_names[TOPIC_EVENT_RESULTS_IX]) \
        .option('startingOffsets', 'earliest') \
        .option('maxOffsetsPerTrigger', '300') \
        .load()

clean_df = df.selectExpr('CAST(value AS STRING) AS value_deserialized') \
        .withColumn('value_json', from_json(col('value_deserialized'), json_schema)) \
        .withColumn('edition', col('value_json.edition')) \
        .withColumn('edition_id', col('value_json.edition_id')) \
        .withColumn('country_noc', col('value_json.country_noc')) \
        .withColumn('sport', col('value_json.sport')) \
        .withColumn('event', col('value_json.event')) \
        .withColumn('result_id', col('value_json.result_id')) \
        .withColumn('athlete', col('value_json.athlete')) \
        .withColumn('athlete_id', col('value_json.athlete_id')) \
        .withColumn('pos', col('value_json.pos')) \
        .withColumn('medal', col('value_json.medal')) \
        .withColumn('isTeamSport', col('value_json.isTeamSport')) \
        .drop('value_json', 'value_deserialized')

### 1.4. Join `athlete_event_results` with `athlete_bio`

In [12]:
joined_df = clean_df.join(athlete_bio_short_df, 'athlete_id', 'inner')

### 1.5. Calculate average height and weight

In [13]:
grouped_df = joined_df \
        .groupBy('sport', 'medal', 'sex', 'country_noc') \
        .agg(
                round(avg('height'), 4).alias('avg_height'),
                round(avg('weight'), 4).alias('avg_weight'),
        ) \
        .withColumn('timestamp', lit(str(datetime.datetime.now())))

### 1.6. Write streaming data to Kafka and MySQL

In [14]:
def foreach_batch_function(batch_df, batch_id):

    # convert to json
    columns = batch_df.columns
    batch_json_df = batch_df.withColumn("key", expr("uuid()"))
    batch_json_df = batch_json_df.select(
            'key',
            to_json(struct(columns)).alias('value')
        )

    # batch_json_df.show()

    # send to kafka
    batch_json_df.write \
        .format("kafka") \
        .option('kafka.bootstrap.servers', kafka_config['bootstrap_servers'][0]) \
        .option('topic', topic_names[TOPIC_GROUP_RESULTS_IX]) \
        .option('kafka.security.protocol', 'SASL_PLAINTEXT') \
        .option('kafka.sasl.mechanism', 'PLAIN') \
        .option('kafka.sasl.jaas.config',
            f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafka_config['username']}" password="{kafka_config['password']}";') \
        .save()

    # send to mysql
    batch_df.write \
        .format("jdbc") \
        .option("url", f'{jdbc_config['url']}/{kafka_config['name']}') \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("dbtable", 'athlete_enriched_agg') \
        .option("user", jdbc_config['user']) \
        .option("password", jdbc_config['password']) \
        .mode("append") \
        .save()


stream = grouped_df.writeStream \
    .foreachBatch(foreach_batch_function) \
    .outputMode("update") \
    .option('checkpointLocation', f'/tmp/checkpoints-{topic_names[TOPIC_GROUP_RESULTS_IX]}-display1') \
    .trigger(processingTime="10 seconds") \
    .start()

25/01/31 00:25:45 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


25/01/31 00:25:49 WARN HDFSBackedStateStoreProvider: The state for version 92 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
25/01/31 00:25:49 WARN HDFSBackedStateStoreProvider: The state for version 92 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
25/01/31 00:25:49 WARN HDFSBackedStateStoreProvider: The state for version 92 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
25/01/31 00:25:49 WARN HDFSBackedStateStoreProvider: The state for version 92 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
25/01/31 00:25:49 WARN HDFSBackedStateStoreProvider: The state for version 92 doesn't exist in loadedMaps. Reading s

In [15]:
time.sleep(30)
stream.stop()
stream.awaitTermination()

25/01/31 00:25:54 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
25/01/31 00:26:21 WARN Shell: Interrupted while joining on: Thread[#16518,Thread-16236,5,main]
java.lang.InterruptedException
	at java.base/java.lang.Object.wait0(Native Method)
	at java.base/java.lang.Object.wait(Object.java:366)
	at java.base/java.lang.Thread.join(Thread.java:2079)
	at java.base/java.lang.Thread.join(Thread.java:2155)
	at org.apache.hadoop.util.Shell.joinThread(Shell.java:1042)
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:1002)
	at org.apache.hadoop.util.Shell.run(Shell.java:900)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1212)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1306)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1288)
	at org.apache.hadoop.fs.FileUtil.readLink(FileUtil.java:212)
	at org.apache.hadoop.fs

##### Kafka

![write_to_kafka2](./screenshots/write_to_kafka2.png)

##### MySQL

![write_to_mysql](./screenshots/write_to_mysql.png)