In [1]:
from pyspark.sql.session import SparkSession
import os
path = f"{os.getcwd()}/../"

In [2]:
packages = [
    "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1",
    "org.apache.kafka:kafka-clients:3.2.1"
]

jars = [
    f"{path}/jars/spark-sql-kafka-0-10_2.12-3.4.1.jar",
    f"{path}/jars/kafka-clients-3.5.1.jar",
    f"{path}/jars/mysql-connector-j-8.0.31.jar",
    f"{path}/jars/commons-pool2-2.11.1.jar",
    f"{path}/jars/spark-token-provider-kafka-0-10_2.12-3.4.1.jar"
]

spark = (SparkSession
         .Builder()
         .appName(name="test_kafka")
         .master("spark://spark:7077")
         .config("spark.jars", ",".join(jars))
         .config("spark.jars.packages", ",".join(packages))\
         .getOrCreate())

In [3]:
spark

In [4]:
spark.sparkContext.defaultParallelism

2

In [35]:
#spark.stop()

In [5]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, DateType, TimestampType

schema = StructType([
    StructField("bme280", StructType([
        StructField("pressure", StringType()),
        StructField("temperature", StringType()),
        StructField("humidity", StringType()),
        StructField("read_datetime", StringType())
    ])),
    StructField("picow", StructType([
        StructField("local_ip", StringType()),
        StructField("temperature", DoubleType())
    ]))
])

In [65]:
picow_df = (spark 
              .read
              .format("kafka") 
              .option("kafka.bootstrap.servers", "kafka:9092") 
              .option("subscribe", "iot_source")
              #.option("startingOffsets", "earliest")
              #.option("endingOffsets", "latest")
              .load())

df = (picow_df
      .where("offset > 2")
      .selectExpr("CAST(offset AS INTEGER)","CAST(timestamp AS TIMESTAMP)","CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(topic AS STRING)"))

In [87]:
from pyspark.sql.functions import lit, col, from_json, regexp_replace, to_timestamp, current_timestamp, to_utc_timestamp


stg_df =  (df
            .withColumn('data', from_json(col('value'), schema))
            .selectExpr("offset as kafka_offset",
                        "timestamp as kafka_datetime",
                        "topic as kafka_topic",
                        "data.bme280.pressure as bme280_pressure",
                        "data.bme280.temperature as bme280_temperature",
                        "data.bme280.humidity as bme280_humidity",
                        "data.bme280.read_datetime as read_datetime",
                        "data.picow.local_ip as picow_local_ip",
                        "data.picow.temperature as picow_temperature")
            .drop("key","value"))

final_df = (stg_df
                .withColumn("kafka_datetime", to_utc_timestamp(col("kafka_datetime"), "UTC"))
                .withColumn("spark_process_datetime", current_timestamp())
                .withColumn("bme280_pressure",regexp_replace(col("bme280_pressure"),"hPa","").cast(DoubleType()))
                .withColumn("bme280_temperature",regexp_replace(col("bme280_temperature"),"C","").cast(DoubleType()))
                .withColumn("bme280_humidity",regexp_replace(col("bme280_humidity"),"%","").cast(DoubleType()))
                .withColumn("read_datetime", to_timestamp(col("read_datetime"), "yyyy-M-d HH:mm:s"))
                .withColumn("read_datetime", to_utc_timestamp(col("kafka_datetime"), "UTC"))
                .orderBy(col("read_datetime").desc())
           )

final_df.printSchema()
final_df.show(10,False)


root
 |-- kafka_offset: integer (nullable = true)
 |-- kafka_datetime: timestamp (nullable = true)
 |-- kafka_topic: string (nullable = true)
 |-- bme280_pressure: double (nullable = true)
 |-- bme280_temperature: double (nullable = true)
 |-- bme280_humidity: double (nullable = true)
 |-- read_datetime: timestamp (nullable = true)
 |-- picow_local_ip: string (nullable = true)
 |-- picow_temperature: double (nullable = true)
 |-- spark_process_datetime: timestamp (nullable = false)

+------------+-----------------------+-----------+---------------+------------------+---------------+-----------------------+--------------+-----------------+--------------------------+
|kafka_offset|kafka_datetime         |kafka_topic|bme280_pressure|bme280_temperature|bme280_humidity|read_datetime          |picow_local_ip|picow_temperature|spark_process_datetime    |
+------------+-----------------------+-----------+---------------+------------------+---------------+-----------------------+--------------+

In [88]:
final_df.count()

821