In [1]:
# Generate Spark Session with Iceberg configuration
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode, col, lit, struct

spark = SparkSession.builder \
    .appName("SocketStreamingToIceberg") \
    .master("local[*]") \
    .config("spark.ui.port", "4041") \
    .config("spark.sql.streaming.checkpointLocation", "checkpoint") \
    .config("spark.sql.adaptive.enabled", "false") \
    .getOrCreate()
spark

24/06/04 06:39:25 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
# Read streaming data from sockets (maintain error handling)
try:
    df_raw = spark.readStream \
        .format("socket") \
        .option("host", "localhost") \
        .option("port", "9999") \
        .load()
except Exception as e:
    print(f"Error reading from socket: {e}")
    exit(1)

24/06/04 06:39:25 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.


In [3]:
print("Streaming data received from socket!")

df_raw.printSchema()


Streaming data received from socket!
root
 |-- value: string (nullable = true)



In [4]:
# Split the line into words and handle potential exceptions
from pyspark.sql.functions import split

try:
    df_words = df_raw.withColumn("words", split("value", " "))
except Exception as e:
    print(f"Error splitting words: {e}")
    exit(1)

In [5]:
# Explode the list of words
from pyspark.sql.functions import explode

df_explode = df_words.withColumn("word", explode("words")).drop("value", "words")


In [6]:
# Aggregate the words to generate count
from pyspark.sql.functions import count, lit

df_agg = df_explode.groupBy("word").agg(count(lit(1)).alias("cnt"))

In [None]:
# Write the output to console streaming

df_agg.writeStream.format("console").outputMode("complete").start().awaitTermination()

                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+----+---+
|word|cnt|
+----+---+
+----+---+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+---+
| word|cnt|
+-----+---+
|hello|  1|
+-----+---+



                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+-----+---+
| word|cnt|
+-----+---+
|hello|  2|
|world|  1|
+-----+---+



In [None]:
# Stop SparkSession
# spark.stop()