In [1]:
from pyspark.sql import SparkSession

scala_version = '2.12'
spark_version = '3.4.0'
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.4.0'
]
spark = SparkSession \
        .builder \
        .appName("weather1") \
        .config("spark.jars.packages", ",".join(packages))\
        .master("spark://10.245.211.187:7077") \
        .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")


In [2]:
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StringType, StructField, IntegerType, TimestampType, FloatType

schema = StructType([
    StructField("date", StringType()),
    StructField("hour", StringType()),
    StructField("prcp", FloatType()),
    StructField("stp", FloatType()),
    StructField("smax", FloatType()),
    StructField("smin", FloatType()),
    StructField("gbrd", FloatType()),
    StructField("temp", IntegerType()),
    StructField("dewp", FloatType()),
    StructField("tmax", FloatType()),
    StructField("tmin", FloatType()),
    StructField("dmax", FloatType()),
    StructField("dmin", FloatType()),
    StructField("hmax", IntegerType()),
    StructField("hmin", IntegerType()),
    StructField("hmdy", IntegerType()),
    StructField("wdct", IntegerType()),
    StructField("gust", StringType()),
    StructField("wdsp", StringType()),
    StructField("region", StringType()),
    StructField("state", StringType()),
    StructField("latitude", FloatType()),
    StructField("longitude", FloatType()),
    StructField("height", FloatType()),
    #StructField("year", IntegerType())
])


In [3]:
# Kafka readStream
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "weather2") \
    .option("startingOffsets", "earliest") \
    .load()

kafka_df.printSchema()
   

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [4]:
weatherdf = kafka_df.selectExpr("CAST(value AS STRING)").select(from_json(col("value").cast("string"), schema).alias("data")) \
    .select("data.*")

weatherdf.printSchema()

root
 |-- date: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- prcp: float (nullable = true)
 |-- stp: float (nullable = true)
 |-- smax: float (nullable = true)
 |-- smin: float (nullable = true)
 |-- gbrd: float (nullable = true)
 |-- temp: integer (nullable = true)
 |-- dewp: float (nullable = true)
 |-- tmax: float (nullable = true)
 |-- tmin: float (nullable = true)
 |-- dmax: float (nullable = true)
 |-- dmin: float (nullable = true)
 |-- hmax: integer (nullable = true)
 |-- hmin: integer (nullable = true)
 |-- hmdy: integer (nullable = true)
 |-- wdct: integer (nullable = true)
 |-- gust: string (nullable = true)
 |-- wdsp: string (nullable = true)
 |-- region: string (nullable = true)
 |-- state: string (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- height: float (nullable = true)



In [5]:

df_query = kafka_df.writeStream \
                .queryName('qdf') \
                .format('memory') \
                .start()

raw = spark.sql("select * from qdf")
raw.show()

+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+
+---+-----+-----+---------+------+---------+-------------+



In [5]:
df_query1 = weatherdf.writeStream \
                .queryName('qdf1') \
                .format('memory') \
                .start()

raw1 = spark.sql("select * from qdf1")
raw1.show()

+----+----+----+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+------+-----+--------+---------+------+
|date|hour|prcp|stp|smax|smin|gbrd|temp|dewp|tmax|tmin|dmax|dmin|hmax|hmin|hmdy|wdct|gust|wdsp|region|state|latitude|longitude|height|
+----+----+----+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+------+-----+--------+---------+------+
+----+----+----+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+------+-----+--------+---------+------+



In [16]:
weatherdf = weatherdf.selectExpr("to_json(struct(*)) AS value")

In [18]:
write_stream = weatherdf.writeStream \
      .outputMode("append")\
      .format("console")\
      .start() 

write_stream.awaitTermination()

In [8]:
from pyspark.sql.functions import *
query1 = kafka_df.groupBy("region").agg(approx_count_distinct("date").alias("count"))
print(query1)


DataFrame[region: string, count: bigint]


In [9]:
res = query1.selectExpr().withColumn("value", to_json(struct("*")).cast("string"))


In [None]:
res1 = res.select("value") \
        .writeStream \
        .outputMode("append")  \
        .format("console")  \
        .option("truncate", "False")  \
        .start() 
res1.awaitTermination()

In [7]:
res = spark.sql("select * from qdf")
res.show()

+----+----+----+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+------+-----+--------+---------+------+
|date|hour|prcp|stp|smax|smin|gbrd|temp|dewp|tmax|tmin|dmax|dmin|hmax|hmin|hmdy|wdct|gust|wdsp|region|state|latitude|longitude|height|
+----+----+----+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+------+-----+--------+---------+------+
+----+----+----+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+------+-----+--------+---------+------+



In [None]:
import pyspark.sql.functions as F
ds_kafka = (
        kafka_df.selectExpr("CAST(value AS STRING)")
        .select(F.from_json("value", schema=schema).alias("data"))
        .select("data.*")
        # .coalesce(1)
        # .writeStream.queryName("streaming_to_console")
        # .trigger(processingTime="2 minute")
        # .outputMode("complete")
        # .format("console")
        # .option("truncate", False)
        # .start()
    )





In [None]:
ds_kafka.writeStream \
      .outputMode("append")  \
      .format("console")  \
      .option("truncate", "False")  \
      .start() 


In [None]:
ds = kafka_df.selectExpr("CAST(value AS STRING)")
print(type(kafka_df))
print(type(ds))

In [None]:
df_query = kafka_df.writeStream \
                .queryName('qdf') \
                .format('memory') \
                .start()

In [None]:
df_alert = ds.writeStream \
            .queryName('qalerts') \
            .format('memory') \
            .start()

In [None]:
alerts = spark.sql("select * from qalerts")
alerts.show()

In [None]:
rddAleartsRdd = alerts.rdd.map(lambda alert: literal_eval(alert['value']))

In [None]:
rddAleartsRdd

In [None]:
rddAlerts = rddAleartsRdd.collect()

In [None]:
type(rddAlerts)

In [None]:
rddAlerts[0]