In [0]:
spark.conf.set(
    "fs.azure.account.key.mystore.dfs.core.windows.net",
    "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
)

In [0]:
# Event Hub config 
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StringType, IntegerType, DoubleType

eh_namespace = "databevent"
eh_name = "datab"

eh_connection_string = (
    "Endpoint=sb://databevent.servicebus.windows.net/;"
    "SharedAccessKeyName=RootManageSharedAccessKey;"
    "SharedAccessKey=XXXXXXXXXXXXXXXXXXXXXXX")

eh_sasl = (
    'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required '
    'username="$ConnectionString" '
    f'password="{eh_connection_string}";'
)

In [0]:
# Schema 
schema = StructType() \
    .add("Temperature", IntegerType()) \
    .add("Humidity", IntegerType()) \
    .add("WindSpeed", DoubleType()) \
    .add("Precipitation", IntegerType()) \
    .add("AtmosphericPressure", DoubleType()) \
    .add("UVIndex", IntegerType()) \
    .add("Season", StringType()) \
    .add("Visibility", DoubleType()) \
    .add("Location", StringType())

In [0]:
# Read from Kafka
df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", f"{eh_namespace}.servicebus.windows.net:9093")
    .option("subscribe", eh_name)
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.sasl.jaas.config", eh_sasl)
    .option("startingOffsets", "earliest")
    .option("failOnDataLoss", "false")
    .load()
)

In [0]:
# Parse JSON
df_parsed = (
    df.selectExpr("CAST(value AS STRING) as json_str")
      .select(from_json(col("json_str"), schema).alias("data"))
      .select("data.*")
)

In [0]:
# Write to MANAGED DELTA TABLE
(
    df_parsed.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/delta/checkpoints/weather_stream")
    .table("default.weather_stream_table")
)

<pyspark.sql.streaming.query.StreamingQuery at 0x7ff87042cd70>

In [0]:
# Load Table as dataframe
x_new_df= spark.table('weather_stream_table').toPandas()
x_new_df

Unnamed: 0,Temperature,Humidity,WindSpeed,Precipitation,AtmosphericPressure,UVIndex,Season,Visibility,Location
0,27.0,74.0,17.0,66.0,990.67,1.0,Autumn,2.5,mountain
1,29.0,45.0,8.5,71.0,990.0,7.0,Spring,10.0,coastal
2,20.0,73.0,9.5,87.0,1010.82,2.0,Winter,3.5,inland
3,32.0,55.0,3.5,26.0,1010.03,2.0,Summer,5.0,inland
4,,,,,,,,,


In [0]:
# Load locally developed Model
import tensorflow as tf
import numpy as np
import pickle
dbutils.fs.cp(
  "abfss://rawdata@mystore.dfs.core.windows.net/model/",
  "dbfs:/FileStore/modelh5/",
  recurse=True
)
model = tf.keras.models.load_model("/dbfs/FileStore/modelh5/weather_classifier.h5")
sc = pickle.load(open("/dbfs/FileStore/modelh5/sc.pkl", "rb"))
le_season = pickle.load(open("/dbfs/FileStore/modelh5/le_season.pkl", "rb"))
le_location = pickle.load(open("/dbfs/FileStore/modelh5/le_location.pkl", "rb"))
le_wt = pickle.load(open("/dbfs/FileStore/modelh5/le_wt.pkl", "rb"))

2026-01-07 17:10:39.978472: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2026-01-07 17:10:40.008199: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2026-01-07 17:10:40.119783: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2026-01-07 17:10:40.244770: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1767805840.360116    1382 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1767805840.38

In [0]:
# Prediction
x_new_df= x_new_df.loc[0:3]
x_new_numerical = x_new_df[['Temperature','Humidity','WindSpeed','Precipitation','AtmosphericPressure','UVIndex','Visibility']]
x_new_numerical_scaled = sc.transform(x_new_numerical)
x_new_df['Season'] = le_season.transform(x_new_df['Season'])   
x_new_df['Location'] = le_location.transform(x_new_df['Location'])
x_new_season = x_new_df['Season'].values
x_new_location = x_new_df['Location'].values
val_preds = model.predict([x_new_numerical_scaled,x_new_season,x_new_location])
predicted_indices = np.argmax(val_preds, axis=1)
predicted_labels = le_wt.inverse_transform(predicted_indices)
x_new_df = x_new_df.copy()  
x_new_df["predicted_value"] = predicted_labels

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 177ms/step[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 178ms/step


In [0]:
x_new_df['Season'] = le_season.inverse_transform(x_new_df['Season'])   
x_new_df['Location'] = le_location.inverse_transform(x_new_df['Location'])

In [0]:
x_new_df

Unnamed: 0,Temperature,Humidity,WindSpeed,Precipitation,AtmosphericPressure,UVIndex,Season,Visibility,Location,predicted_value
0,27.0,74.0,17.0,66.0,990.67,1.0,Autumn,2.5,mountain,Windy
1,29.0,45.0,8.5,71.0,990.0,7.0,Spring,10.0,coastal,Cloudy
2,20.0,73.0,9.5,87.0,1010.82,2.0,Winter,3.5,inland,Windy
3,32.0,55.0,3.5,26.0,1010.03,2.0,Summer,5.0,inland,Cloudy
