# HU Extension              Assignment 06       E63 Big Data Analytics 		                  	
## Handed out: 03/03/2023                                              Due by 11:59 AM EST, 03/11/2023
## Student: Emmanuel Aboah

### Problem 1.
Use NetCat utility on your Ubuntu VM or your operating system to generate a steady stream of short variable sentences (one to three English words long). Use and continuously increasing integer as the key for those messages. You can use any other utility or a small Python program to send that continuous stream of sentences to port 9999. Change the port if you have a good reason. Create a tool using Spark Structured Streaming API that will listen to that port and  pushes the count of different words used so far to the console. Choose the length of the period based on your convenience. If you are generating those sentences programmatically, the period should be short. If you are typing sentence by sentence, use longer period. Your sentence generation utility or/and Spark could reside either on Ubuntu VM or on your host operating system. 

In [1]:
# PySpark stream API consuming text messages on port 9999.

# Imports
import findspark
from pyspark import SparkContext
from pyspark.sql import SparkSession

findspark.init()

sc = SparkContext.getOrCreate()

sc.setLogLevel("ERROR")

In [2]:
# count lines of words being published to port 9999.

spark = SparkSession(sc)

lines = (
    spark.readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load()
)

In [3]:
# Imports
from pyspark.sql.functions import *

# Get words in lines and calculate counts
words = lines.select(split(col("value"), "\\s").alias("word"))
counts: DataFrame = words.groupBy("word").count()

# File system checkpoint
checkpoint_dir = "./checkpoint"

In [None]:
# streaming query
streaming_query = (
    counts.writeStream
    .format("console")
    .outputMode("complete")
    .trigger(processingTime="5 second")
    .option("checkPointLocation", checkpoint_dir)
    .start()
)

streaming_query.awaitTermination()

```bash
# Spark Stream sink to console output:

-------------------------------------------
Batch: 0
-------------------------------------------
+----+-----+
|word|count|
+----+-----+
+----+-----+

                                                                                
-------------------------------------------
Batch: 1
-------------------------------------------
+----+-----+
|word|count|
+----+-----+
|  []|    1|
+----+-----+

                                                                                
-------------------------------------------
Batch: 2
-------------------------------------------
+----------+-----+
|      word|count|
+----------+-----+
|[1:, more]|    1|
|        []|    1|
+----------+-----+

                                                                                
-------------------------------------------
Batch: 3
-------------------------------------------
+----------+-----+
|      word|count|
+----------+-----+
|[2:, more]|    1|
|[1:, more]|    1|
|        []|    1|
+----------+-----+

                                                                                
-------------------------------------------
Batch: 4
-------------------------------------------
+----------+-----+
|      word|count|
+----------+-----+
|[2:, more]|    1|
|[1:, more]|    1|
|        []|    1|
|[3:, test]|    1|
```

### Problem 2.
Intercept messages produced by your “sentence generating utility” by Kafka and continuously write them to a Kafka topic. Write a Spark Structured Streaming application that would read that topic. Query the resulting Unbounded Table for the number of words read every minute or 5 minutes. Write results of such query to a database table in a MySQL database.

```bash
# Create kafka topic for message write counts

manny@LAPTOP-85L1BUVJ:~/dev/cscie-63/hw05/docker$ docker exec -it broker1 /bin/kafka-topics --bootstrap-server broker1:29092 --create --topic wordcounts --partitions 1

Created topic wordcounts.
```

Consume text stream messages and send them to a kafka topic table every 5 minutes.

```bash
# Netcat utility to produce messages
(base) manny@LAPTOP-85L1BUVJ:~$ nc -lk 9999
data
data
data
data
data
more
more
data
more
food
food
put
put
anything
anything
data
data
more
anything
food
save
put
```

```python
'''
    🐍 App that reads text streams from tcp port 9999,
    and publishes them to wordCounts kafka topic.
'''
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import split, col

'''
    Versions of packages
'''
scala_version = '2.12'
spark_version = '3.3.1'

'''
    Import jars from maven central.
    Follows Format -> groupId:artifactId:version
'''
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.3.1'
]

# Session Builder
spark = (
    SparkSession.builder
    .appName("StructuredWordCount")
    .config("spark.jars.packages", ",".join(packages))
    .getOrCreate()
)

lines = (
    spark.readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999).load()
)

# Split the lines into words
words = lines.select(split(col("value"), "\\s").alias("word"))

# Generate running word count
counts: DataFrame = words.groupBy("word").count()
checkpointDir = "./checkpoint2"

# Sink counts streams to a kafka topic
streamingQuery = (
    counts.selectExpr("cast(word as string) as key", "cast(count as string) as value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "wordCounts")
    .outputMode("update")
    .option("checkpointLocation", checkpointDir)
    .start()
)

streamingQuery.awaitTermination()
```

```py
'''
    🐍 Structured Stream Kafka Consumer Mysql Sink App
'''
from pyspark.sql import SparkSession, DataFrame

'''
    Versions of packages
'''
scala_version = '2.12'
spark_version = '3.3.1'
kafka_version = '3.3.1'
mysql_version = '8.0.31'

'''
    Import jars from maven central.
    Follows Format -> groupId:artifactId:version
'''
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    f'org.apache.kafka:kafka-clients:{kafka_version}',
    f'mysql:mysql-connector-java:{mysql_version}'
]

# Session Builder
spark = (
    SparkSession.builder
    .appName("StructuredWordCount")
    .config("spark.jars.packages", ",".join(packages))
    .getOrCreate()
)

'''
    User defined function to write streams to MySql in foreachBatch
'''
def my_sql_sink(df: DataFrame, batch_id: int):
    url = "jdbc:mysql://127.0.0.1:3306"
    (
        df.write
        .format("jdbc")
        .option("url", url)
        .option("dbtable", "db.wordCounts")
        .option("user", "root")
        .option("password", "password")
        .mode("append")
        .save()
    )

# Subscribe to wordCounts topic and convert key:value bytecode to string
stream_df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "wordCounts")
    .load()
    .selectExpr("CAST(key AS STRING) as word", "CAST(value AS STRING) as count")
)

checkpointDir = "./checkpoint_mysql"

stream_query = (
    stream_df
    .writeStream
    .outputMode("update")
    .foreachBatch(my_sql_sink)
    .option("checkpointLocation", checkpointDir)
    .trigger(processingTime="5 seconds")
    .start()
)

stream_query.awaitTermination()
```

MySQL Output

```bash
mysql> select * from wordCounts;
+------------+-------+
| word       | count |
+------------+-------+
| [anything] | 2     |
| [data]     | 6     |
| [more]     | 4     |
| [anything] | 3     |
| [food]     | 3     |
| [put]      | 3     |
| [save]     | 1     |
+------------+-------+
```

### Problem 3.
Modify your Spark Streaming application so it reads a fixed number of messages, for example 20. Write an “once trigger” that would start your Spark Structured Streaming application. Inside your Spark Structured Streaming application organize messages and the time stamps as short JSON objects with keys and values. Dump those messages in a memory table. Demonstrate that you can query that table and transfer its content into a regular Spark DataFrame.

```py
'''
🐍 App that streams collects messages from a kafka topic and sinks to in memory datasource.
'''
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_json, struct

'''
    Versions of packages
'''
scala_version = "2.12"
spark_version = "3.3.1"
kafka_version = "3.3.1"
mysql_version = "8.0.31"

'''
    Import jars from maven central.
    Follows Format -> groupId:artifactId:version
'''
packages = [
    f"org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}",
    f"org.apache.kafka:kafka-clients:{kafka_version}",
    f"mysql:mysql-connector-java:{mysql_version}"
]

# Session Builder
spark = (
    SparkSession.builder
    .appName("StructuredWordCount")
    .config("spark.jars.packages", ",".join(packages))
    .getOrCreate()
)

# Subscribe to wordcounts with offset set to earliest
stream_df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "wordCounts")
    .option("startingOffsets", "earliest")
    .load()
)

# Convert topic values and select required columns and filter for latest messages.
stream_df = (
    stream_df
    .withColumn("word", stream_df["key"].cast("string").alias("word"))
    .drop("key")
    .withColumn("count", stream_df["value"].cast("string").alias("count"))
    .drop("value")
    .select("word", "count", "timestamp")
    .where("timestamp >= '2023-03-12 19:00:00'")
    .limit(20)
)


# Convert to json
stream_df_json = (
    stream_df
    .withColumn("words_json", to_json(struct("word", "count", "timestamp")))
    .select("words_json")
)


# In memory sink
stream_query = (
    stream_df_json
    .writeStream
    .format("memory")
    .queryName("word_counts")
    .outputMode("append")
    .trigger(once=True)
    .start()
)

stream_query.awaitTermination()

spark.sql("select * from word_counts").show()
```

Output from DF

```bash
+--------------------+                                                          
|          words_json|
+--------------------+
|{"word":"[count]"...|
|{"word":"[test]",...|
|{"word":"[test]",...|
|{"word":"[money]"...|
|{"word":"[money]"...|
|{"word":"[test]",...|
|{"word":"[county]...|
|{"word":"[country...|
|{"word":"[count]"...|
|{"word":"[county]...|
|{"word":"[country...|
|{"word":"[count]"...|
|{"word":"[county]...|
|{"word":"[money]"...|
|{"word":"[test]",...|
|{"word":"[money]"...|
|{"word":"[county]...|
|{"word":"[test]",...|
|{"word":"[money]"...|
|{"word":"[money]"...|
+--------------------+
```

### Problem 4.
Transform your trigger into a processing time trigger that fires every 2 minutes and upends messages it reads to a MySQL table. Prove that the number of messages in the table continuously grow.

```py
'''
🐍 App that reads pre-aggregated word counts from a kafka 
Topic and sinks to MySql as json field.
'''
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import to_json, struct

'''
    Versions of packages
'''
scala_version = '2.12'
spark_version = '3.3.1'
kafka_version = '3.3.1'
mysql_version = '8.0.31'

'''
    Import jars from maven central.
    Follows Format -> groupId:artifactId:version
'''
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    f'org.apache.kafka:kafka-clients:{kafka_version}',
    f'mysql:mysql-connector-java:{mysql_version}'
]

# Build Session
spark = (
    SparkSession.builder
    .appName("StructuredWordCount")
    .config("spark.jars.packages", ",".join(packages))
    .getOrCreate()
)


# User defined function to write streams to MySql in foreachBatch
def my_sql_sink(df: DataFrame, batch_id: int):
    url = "jdbc:mysql://127.0.0.1:3306"
    (
        df.write
        .format("jdbc")
        .option("url", url)
        .option("dbtable", "db.wordCountsJson")
        .option("user", "root")
        .option("password", "password")
        .mode("append")
        .save()
    )

# Subscribe to wordCounts topic with offset at latest.
stream_df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "wordCounts")
    .option("startingOffsets", "latest")
    .load()
)

# Convert topic values and select required columns.
stream_df = (
    stream_df
    .withColumn("word", stream_df["key"].cast("string").alias("word"))
    .drop("key")
    .withColumn("count", stream_df["value"].cast("string").alias("count"))
    .drop("value")
)

# Convert to json
stream_df_json = (
    stream_df
    .withColumn("words_json", to_json(struct("word", "count", "timestamp")))
    .select("words_json")
)

# Checkpoint directory for running job.
checkpointDir = "./checkpoint_mysql"

# MySql Sink updates date on 2 minute trigger
stream_query = (
    stream_df_json
    .writeStream
    .outputMode("update")
    .foreachBatch(my_sql_sink)
    .option("checkpointLocation", checkpointDir)
    .trigger(processingTime="2 minutes")
    .start()
)

stream_query.awaitTermination()
```

Output in MySql wordCountsJson Table.

```bash
mysql> select * from wordCountsJson;
+---------------------------------------------------------------------------------+
| words_json                                                                      |
+---------------------------------------------------------------------------------+
| {"word": "[bate]", "count": "2", "timestamp": "2023-03-13T16:42:25.915-04:00"}  |
| {"word": "[money]", "count": "2", "timestamp": "2023-03-13T16:42:31.022-04:00"} |
| {"word": "[put]", "count": "1", "timestamp": "2023-03-13T16:42:35.570-04:00"}   |
| {"word": "[test]", "count": "1", "timestamp": "2023-03-13T16:42:38.185-04:00"}  |
| {"word": "[put]", "count": "2", "timestamp": "2023-03-13T16:42:40.466-04:00"}   |
| {"word": "[money]", "count": "3", "timestamp": "2023-03-13T16:42:40.838-04:00"} |
| {"word": "[test]", "count": "2", "timestamp": "2023-03-13T16:42:43.369-04:00"}  |
| {"word": "[put]", "count": "3", "timestamp": "2023-03-13T16:42:45.597-04:00"}   |
| {"word": "[bate]", "count": "3", "timestamp": "2023-03-13T16:42:50.828-04:00"}  |
| {"word": "[test]", "count": "3", "timestamp": "2023-03-13T16:42:58.595-04:00"}  |
| {"word": "[many]", "count": "1", "timestamp": "2023-03-13T16:43:28.454-04:00"}  |
| {"word": "[more]", "count": "1", "timestamp": "2023-03-13T16:43:40.598-04:00"}  |
| {"word": "[many]", "count": "2", "timestamp": "2023-03-13T16:43:42.867-04:00"}  |
| {"word": "[more]", "count": "2", "timestamp": "2023-03-13T16:43:45.567-04:00"}  |
+---------------------------------------------------------------------------------+
14 rows in set (0.00 sec)
```