# Structured Streaming - Jesmine Tey Khai Jing

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col, lower, substring, current_timestamp, regexp_extract
from time import sleep

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Lexicon Data Cleaning with Structured Streaming") \
    .getOrCreate()

24/12/21 20:38:56 WARN Utils: Your hostname, MSI. resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/12/21 20:38:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/21 20:38:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/12/21 20:38:57 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/12/21 20:38:57 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [2]:
# Define schema for lexicon data
schema = StructType([
    StructField("Kata", StringType(), True)
])

# Simulate streaming by reading files from a directory
streaming_df = spark.readStream \
    .schema(schema) \
    .option("header", "true") \
    .option("maxFilesPerTrigger", 1) \
    .csv("data")

In [3]:
# Perform data cleaning
cleaned_df = streaming_df.na.drop() 
cleaned_df = cleaned_df.withColumn("Kata", lower(col("Kata"))) 

# Extract the first character of each word
cleaned_df = cleaned_df.withColumn("Alphabet", substring(col("Kata"), 1, 1))

cleaned_df = cleaned_df.filter(regexp_extract(col("Alphabet"), "^[a-zA-Z]$", 0) != "")

cleaned_df = cleaned_df.withColumn("Timestamp", current_timestamp())

# Group by the extracted character and count the number of words
wordCounts = cleaned_df.groupBy("Alphabet").count()

wordCounts = wordCounts.withColumn("Timestamp", current_timestamp())

# Sort the results in ascending order by the character
wordCounts = wordCounts.orderBy("Alphabet")

In [4]:
query = wordCounts.writeStream \
    .queryName("word_counts") \
    .format("memory") \
    .outputMode("complete") \
    .trigger(processingTime="1 second") \
    .start()

sleep(5)

for i in range(5):
    spark.sql("SELECT * FROM word_counts").show(26)
    sleep(3)

24/12/21 20:39:01 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-4605e77f-6730-4794-972f-8af852324b29. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/12/21 20:39:01 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/12/21 20:39:05 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 4, schema size: 1
CSV file: hdfs://localhost:9000/user/student/data/lexicon_streaming.csv

+--------+-----+---------+
|Alphabet|count|Timestamp|
+--------+-----+---------+
+--------+-----+---------+



24/12/21 20:39:10 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 8817 milliseconds


+--------+-----+--------------------+
|Alphabet|count|           Timestamp|
+--------+-----+--------------------+
|       a|  218|2024-12-21 20:39:...|
|       b|  228|2024-12-21 20:39:...|
|       c|   71|2024-12-21 20:39:...|
|       d|  169|2024-12-21 20:39:...|
|       e|   46|2024-12-21 20:39:...|
|       f|   35|2024-12-21 20:39:...|
|       g|   73|2024-12-21 20:39:...|
|       h|   86|2024-12-21 20:39:...|
|       i|   74|2024-12-21 20:39:...|
|       j|   63|2024-12-21 20:39:...|
|       k|  347|2024-12-21 20:39:...|
|       l|  114|2024-12-21 20:39:...|
|       m|  228|2024-12-21 20:39:...|
|       n|   45|2024-12-21 20:39:...|
|       o|   32|2024-12-21 20:39:...|
|       p|  377|2024-12-21 20:39:...|
|       q|    1|2024-12-21 20:39:...|
|       r|   83|2024-12-21 20:39:...|
|       s|  407|2024-12-21 20:39:...|
|       t|  193|2024-12-21 20:39:...|
|       u|   41|2024-12-21 20:39:...|
|       v|   12|2024-12-21 20:39:...|
|       w|   45|2024-12-21 20:39:...|
|       x|  

In [5]:
query.stop()

In [6]:
spark.stop()