In [1]:
# Create the Spark Session
from pyspark.sql import SparkSession # type: ignore

spark = (
    SparkSession 
    .builder 
    .appName("Streaming dasdadsaas Kafka") 
    .config("spark.streaming.stopGracefullyOnShutdown", True) 
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')
    .config("spark.sql.shuffle.partitions", 4)
    .master("local[*]") 
    .getOrCreate()
)

spark

In [2]:

kafka_df = (
    spark
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers", "ed-kafka:29092")
    .option("subscribe", "climate_data.temperature_data")
    .option("startingOffsets", "earliest")
    .load()
)

In [3]:
kafka_df.printSchema()


root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [4]:
from pyspark.sql.functions import expr
from pyspark.sql.functions import regexp_replace, col, from_json
from pyspark.sql.types import StringType

kafka_json_df = kafka_df.withColumn("value", expr("cast(value as string)"))
kafka_json_df.show(truncate=False)

Traceback (most recent call last):
  File "c:\Users\SD-LORENZO-PC\.vscode\extensions\ms-python.python-2024.22.0-win32-x64\python_files\python_server.py", line 133, in exec_user_input
    retval = callable_(user_input, user_globals)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<string>", line 6, in <module>
  File "C:\Users\SD-LORENZO-PC\pyproject\rndPy\my_env\Lib\site-packages\pyspark\sql\dataframe.py", line 947, in show
    print(self._show_string(n, truncate, vertical))
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\SD-LORENZO-PC\pyproject\rndPy\my_env\Lib\site-packages\pyspark\sql\dataframe.py", line 978, in _show_string
    return self._jdf.showString(n, int_truncate, vertical)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\SD-LORENZO-PC\pyproject\rndPy\my_env\Lib\site-packages\py4j\java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
                   ^^^^^^^^^^^^^^^^^
  File "C:\Users\SD-LOREN

In [5]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

schema = StructType([
    StructField("_id", StructType([
        StructField("$oid", StringType(), True)
    ]), True),
    StructField("operationType", StringType(), True),
    StructField("clusterTime", StructType([
        StructField("$timestamp", StructType([
            StructField("t", DoubleType(), True),
            StructField("i", DoubleType(), True)
        ]), True)
    ]), True),
    StructField("wallTime", StructType([
        StructField("$date", DoubleType(), True)
    ]), True),
    StructField("fullDocument", StructType([
        StructField("_id", StructType([
            StructField("$oid", StringType(), True)
        ]), True),
        StructField("lat", StringType(), True),
        StructField("lon", StringType(), True),
        StructField("z", StringType(), True),
        StructField("time", StringType(), True),
        StructField("anom", StringType(), True)
    ]), True),
    StructField("ns", StructType([
        StructField("db", StringType(), True),
        StructField("coll", StringType(), True)
    ]), True),
    StructField("documentKey", StructType([
        StructField("_id", StructType([
            StructField("$oid", StringType(), True)
        ]), True)
    ]), True)
])

In [6]:
from pyspark.sql.functions import regexp_replace, col

# Remove unwanted characters using a stricter regular expression
cleaned_kafka_df = kafka_json_df.withColumn(
    "clean_value",
    regexp_replace(col("value"), r"[^ -~]", "",)  # Matches printable ASCII characters only
)

# Show the cleaned data
# cleaned_kafka_df.select("clean_value").show(truncate=False)

In [7]:
from pyspark.sql.functions import from_json,col
from pyspark.sql.functions import regexp_replace, col

streaming_df = cleaned_kafka_df.withColumn(
    "values_json", 
    from_json(col("clean_value"), schema)
).selectExpr("values_json.*")

# Show the parsed DataFrame
# streaming_df.show(truncate=False)

In [8]:
streaming_df.printSchema()


root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- operationType: string (nullable = true)
 |-- clusterTime: struct (nullable = true)
 |    |-- $timestamp: struct (nullable = true)
 |    |    |-- t: double (nullable = true)
 |    |    |-- i: double (nullable = true)
 |-- wallTime: struct (nullable = true)
 |    |-- $date: double (nullable = true)
 |-- fullDocument: struct (nullable = true)
 |    |-- _id: struct (nullable = true)
 |    |    |-- $oid: string (nullable = true)
 |    |-- lat: string (nullable = true)
 |    |-- lon: string (nullable = true)
 |    |-- z: string (nullable = true)
 |    |-- time: string (nullable = true)
 |    |-- anom: string (nullable = true)
 |-- ns: struct (nullable = true)
 |    |-- db: string (nullable = true)
 |    |-- coll: string (nullable = true)
 |-- documentKey: struct (nullable = true)
 |    |-- _id: struct (nullable = true)
 |    |    |-- $oid: string (nullable = true)



In [9]:
from pyspark.sql.functions import explode

exploded_df = streaming_df.selectExpr("fullDocument")

In [10]:
exploded_df.printSchema()
exploded_df.show(truncate=False)

root
 |-- fullDocument: struct (nullable = true)
 |    |-- _id: struct (nullable = true)
 |    |    |-- $oid: string (nullable = true)
 |    |-- lat: string (nullable = true)
 |    |-- lon: string (nullable = true)
 |    |-- z: string (nullable = true)
 |    |-- time: string (nullable = true)
 |    |-- anom: string (nullable = true)

Traceback (most recent call last):
  File "c:\Users\SD-LORENZO-PC\.vscode\extensions\ms-python.python-2024.22.0-win32-x64\python_files\python_server.py", line 133, in exec_user_input
    retval = callable_(user_input, user_globals)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<string>", line 2, in <module>
  File "C:\Users\SD-LORENZO-PC\pyproject\rndPy\my_env\Lib\site-packages\pyspark\sql\dataframe.py", line 947, in show
    print(self._show_string(n, truncate, vertical))
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\SD-LORENZO-PC\pyproject\rndPy\my_env\Lib\site-packages\pyspark\sql\dataframe.py", line 978, in _show_string

In [11]:
from pyspark.sql.functions import col

# Flatten the `fullDocument` struct
flattened_fullDocument_df = streaming_df.select(
    col("fullDocument._id.$oid").alias("document_id"),
    col("fullDocument.lat").alias("latitude"),
    col("fullDocument.lon").alias("longitude"),
    col("fullDocument.z").alias("altitude"),
    col("fullDocument.time").alias("timestamp"),
    col("fullDocument.anom").alias("anomaly")
)

In [12]:
# Check the schema of the flattened_df, place a sample json file and change readStream to read 
flattened_fullDocument_df.printSchema()
flattened_fullDocument_df.show(10, truncate=False)


root
 |-- document_id: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- altitude: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- anomaly: string (nullable = true)

Traceback (most recent call last):
  File "c:\Users\SD-LORENZO-PC\.vscode\extensions\ms-python.python-2024.22.0-win32-x64\python_files\python_server.py", line 133, in exec_user_input
    retval = callable_(user_input, user_globals)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<string>", line 3, in <module>
  File "C:\Users\SD-LORENZO-PC\pyproject\rndPy\my_env\Lib\site-packages\pyspark\sql\dataframe.py", line 947, in show
    print(self._show_string(n, truncate, vertical))
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\SD-LORENZO-PC\pyproject\rndPy\my_env\Lib\site-packages\pyspark\sql\dataframe.py", line 978, in _show_string
    return self._jdf.showString(n, int_truncate, vertical)
           ^^^^^^^^^^^^^^^