In [None]:
import findspark 
findspark.init() 

import pyspark
from delta import *
from delta import *
import pandas as pd
from pyspark.sql.functions import col

pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)

In [None]:
spark = pyspark.sql.SparkSession.builder.master("spark://spark-master:7077").appName("spark_kafka").config("spark.sql.streaming.metricsEnabled", "true").getOrCreate()

sc = spark.sparkContext
sc.setLogLevel('ERROR')

In [None]:
spark.sql("show databases").toPandas()

In [None]:
spark.sql("show tables in test_db").toPandas()

In [None]:
# spark.sql("drop table test_db.spark_kafka").toPandas()

In [None]:
spark.sql("create database if not exists bronze")

In [None]:
# topic = "test-topic-1"
topic = "weather_test_v1"
maxOffsetsPerTrigger = 100

df = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafka-broker:9092")\
    .option("subscribe", topic)\
    .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger) \
    .option("startingOffsets", "earliest") \
    .load()

In [None]:
# df.selectExpr("cast(value as string) as payload", "*").limit(2).toPandas()

In [None]:
trigger_interval = "60 seconds"
queryName = "weather_test_v1"
tableName = "weather_test_v1"

df\
.selectExpr("key", "topic", "partition", "offset", "timestamp", "timestampType", "cast(value as string) as payload") \
.writeStream \
.trigger(processingTime = trigger_interval) \
.queryName(queryName) \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", f"/opt/spark/work-dir/odp_intra_storage/spark/datalake/bronze.db/{tableName}/_ckpt") \
.toTable(f"bronze.{tableName}")

In [None]:
spark.sql("select payload from bronze.weather_test_v1 limit 2").toPandas()

In [None]:
spark.sql("select current_timestamp - INTERVAL 10 minutes").toPandas()

In [None]:
spark.sql("OPTIMIZE test_db.spark_kafka").toPandas()

In [None]:
spark.sql("set spark.databricks.delta.retentionDurationCheck.enabled = false")
spark.sql("SET spark.databricks.delta.vacuum.parallelDelete.enabled = true")
spark.sql("VACUUM test_db.spark_kafka RETAIN 0 HOURS").toPandas()

In [None]:
select_expr = ["get_json_object(payload, '$.data.location') as location",
 "get_json_object(payload, '$.data.time') as time",
 "get_json_object(payload, '$.data.info') as info",
 "get_json_object(payload, '$.data.temperature') as temperature",
 "get_json_object(payload, '$.data.precipitation') as precipitation",
 "get_json_object(payload, '$.data.humidiy') as humidity",
 "get_json_object(payload, '$.data.wind') as wind",
 "topic as kafka_topic",
 "partition as kafka_partition",
 "offset as kafka_offset",
 "cast(timestamp as timestamp) as kafka_timestamp",
 "cast(timestamp as date) as kafka_date", 
 "current_timestamp as load_time"
]

In [None]:
spark.table("bronze.weather_test_v1").selectExpr(select_expr).orderBy(col("kafka_offset").desc()).limit(2).toPandas()

In [None]:
spark.sql("create database if not exists silver")

In [None]:
spark.sql("drop table if exists silver.weather_test")

In [None]:
spark.sql("""
create table silver.weather_test 
(
location string,
time string,
info string,
temperature string,
precipitation string,
humidity string,
wind string,
kafka_topic string,
kafka_partition int,
kafka_offset bigint,
kafka_timestamp timestamp,
kafka_date date,
load_time timestamp
)
using delta
;
""").toPandas()

In [None]:
trigger_interval = "60 seconds"
queryName = "silver_weather_test"
tableName = "weather_test"

spark.readStream \
.format("delta") \
.table("bronze.weather_test_v1") \
.selectExpr(select_expr) \
.writeStream \
.trigger(processingTime = trigger_interval) \
.queryName(queryName) \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", f"/opt/spark/work-dir/odp_intra_storage/spark/datalake/silver.db/{tableName}/_ckpt") \
.toTable(f"silver.{tableName}")

In [None]:
spark.sql("""
select 
kafka_timestamp
, max(temperature) as temperature
, max(cast(replace(precipitation, '%', '') as int)) as precipitation
, max(cast(replace(humidity, '%', '') as int)) as humidity
, max(cast(replace(wind, ' mph', '') as int)) as wind
from silver.weather_test 
where 1=1
and location = 'Naperville'
group by 1
;
""").limit(2).toPandas()