# Real-time Data Processing with Azure Databricks (and Event Hubs)

This notebook demonstrates the below architecture to build real-time data pipelines.
![Solution Architecture](https://raw.githubusercontent.com/malvik01/Real-Time-Streaming-with-Azure-Databricks/main/Azure%20Solution%20Architecture.png)


- Data Sources: Streaming data from IoT devices or social media feeds. (Simulated in Event Hubs)
- Ingestion: Azure Event Hubs for capturing real-time data.
- Processing: Azure Databricks for stream processing using Structured Streaming.
- Storage: Processed data stored Azure Data Lake (Delta Format).
- Visualisation: Data visualized using Power BI.


### Azure Services Required
- Databricks Workspace (Unity Catalog enabled)
- Azure Data Lake Storage (Premium)
- Azure Event Hub (Basic Tier)

### Azure Databricks Configuration Required
- Single Node Compute Cluster: `12.2 LTS (includes Apache Spark 3.3.2, Scala 2.12)`
- Maven Library installed on Compute Cluster: `com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22`

Importing the libraries.

In [None]:
from pyspark.sql.functions import *
# from pyspark.sql.types import StructType
from pyspark.sql.types import *
import pickle
import pandas as pd

The code block below creates the catalog and schemas for our solution. 

The approach utilises a multi-hop data storage architecture (medallion), consisting of bronze, silver, and gold schemas within a 'streaming' catalog. 

In [None]:
try:
    spark.sql("create catalog v2_streaming;")
except:
    print('check if catalog already exists')

try:
    spark.sql("create schema v2_streaming.bronze;")
except:
    print('check if bronze schema already exists')

try:
    spark.sql("create schema v2_streaming.silver")
except:
    print('check if silver schema already exists')

try:
    spark.sql("create schema v2_streaming.gold;")
except:
    print('check if gold schema already exists')

check if catalog already exists
check if bronze schema already exists
check if silver schema already exists
check if gold schema already exists


#### Bronze Layer

Set up Azure Event hubs connection string.

In [None]:
# Config
# Replace with your Event Hub namespace, name, and key
connectionString = "connection_string"

ehConf = {
  'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString),
  'eventhubs.eventHubName': eventHubName
}

In [None]:
# Fetch the secret dynamically from Azure Key Vault via Databricks secret scope
service_credential =  dbutils.secrets.get(scope="dev_env", key="adls-secret") #scope name in DBX, secret name in keyvault

# Azure AD and Storage configurations
application_id = 'app_id'  
directory_id = 'directory_id'  

configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": f"{application_id}",
           "fs.azure.account.oauth2.client.secret": f"{service_credential}",
           "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{directory_id}/oauth2/token"}

In [None]:
try:
    dbutils.fs.mount(
    source = "abfss://gold-layer@saibdpadls.dfs.core.windows.net/",
    mount_point = "/mnt/saibdpadls/gold-layer",
    extra_configs = configs)

except:
    dbutils.fs.unmount("/mnt/saibdpadls/gold-layer")
    print("mount point already exists")
    dbutils.fs.mount(
    source = "abfss://gold-layer@saibdpadls.dfs.core.windows.net/",
    mount_point = "/mnt/saibdpadls/gold-layer",
    extra_configs = configs)
    print("mount point re-mounted")

/mnt/saibdpadls/gold-layer has been unmounted.
mount point already exists
mount point re-mounted


In [None]:


cosmosdb_endpoint = dbutils.secrets.get(scope="dev_env", key="cosmosdb-secret")
cosmosdb_key = dbutils.secrets.get(scope="dev_env", key="cosmosdb-primary-key")


In [None]:
spark.version

'3.5.0'

In [None]:
cosmos_config = {
    "spark.cosmos.accountEndpoint": cosmosdb_endpoint,
    "spark.cosmos.accountKey": cosmosdb_key,
    "spark.cosmos.database": "incremental",
    "spark.cosmos.container": "Container2"
   
}

df = spark.read.format("cosmos.oltp").options(**cosmos_config).option("spark.cosmos.read.inferschema.enabled", "true").load()

Reading and writing the stream to the bronze layer.

In [None]:

# Reading stream: Load data from Azure Event Hub into DataFrame 'df' using the previously configured settings
df = spark.readStream \
    .format("eventhubs") \
    .options(**ehConf) \
    .load() \

# Displaying stream: Show the incoming streaming data for visualization and debugging purposes
df.display()

# Writing stream: Persist the streaming data to a Delta table 'streaming.bronze.weather' in 'append' mode with checkpointing
df.writeStream\
    .option("checkpointLocation", "/mnt/v2_streaming/bronze/transactions")\
    .outputMode("append")\
    .format("delta")\
    .toTable("v2_streaming.bronze.transactions")

<pyspark.sql.streaming.query.StreamingQuery at 0x7ff4940f4f50>

#### Silver Layer

Defining the schema for the JSON object.

In [None]:
# Defining the schema for the JSON object

json_schema = StructType([
    StructField("TransactionID", IntegerType(), False),
    StructField("Timestamp", TimestampType(), False),
    StructField("CustomerID", IntegerType(), False),
    StructField("Amount", DoubleType(), False),
    StructField("Location", StringType(), False),
    StructField("CardType", StringType(), False),
    StructField("TransactionType", StringType(), False),
    StructField("ProductCategory", StringType(), False),
    StructField("Duration", StringType(), False),  # Can be converted to seconds if needed
    StructField("Label", StringType(), False)
])

Reading, transforming and writing the stream from the bronze to the silver layer.

In [None]:
# TO stop streaming and removing checkpoint
# for stream in spark.streams.active:
#     stream.stop()

# dbutils.fs.rm("/mnt/v2_streaming/silver/transactions", recurse=True)

In [None]:

# Reading and Transforming: Load streaming data from the 'streaming.bronze.weather' Delta table, cast 'body' to string, parse JSON, and select specific fields
df = spark.readStream\
    .format("delta")\
    .table("v2_streaming.bronze.transactions")\
    .withColumn("body", col("body").cast("string"))\
    .withColumn("body",from_json(col("body"), json_schema))\
    .select("body.*")

# Displaying stream: Visualize the transformed data in the DataFrame for verification and analysis
df.display()

# Writing stream: Save the transformed data to the 'streaming.silver.weather' Delta table in 'append' mode with checkpointing for data reliability
df.writeStream\
    .option("checkpointLocation", "/mnt/v2_streaming/silver/transactions")\
    .outputMode("append")\
    .format("delta")\
    .toTable("v2_streaming.silver.transactions")

<pyspark.sql.streaming.query.StreamingQuery at 0x7ff4940e4910>

#### Gold Layer

Reading, aggregating and writing the stream from the silver to the gold layer.

In [None]:
# Step 1: Load the pre-trained model
with open("/dbfs/mnt/saibdpadls/ml-model/best_fraud_model.pkl", "rb") as f:
    loaded_model = pickle.load(f)

# Step 2: Define a prediction function
def predict_batch(pdf: pd.DataFrame) -> pd.DataFrame:
    """
    Takes a Pandas DataFrame (micro-batch), makes predictions, and returns the predictions.
    """
    # Make predictions using the loaded model
    predictions = loaded_model.predict(pdf)  # Drop the target column if present
    # Add predictions as a new column
    pdf["prediction"] = predictions
    return pdf



# Step 4: Read the streaming data
df = spark.readStream \
    .format("delta") \
    .table("v2_streaming.silver.transactions")

# # Step 5: Apply the prediction function to the stream
# predicted_stream = df.mapInPandas(predict_batch, schema=output_schema)

https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


In [None]:
import numpy as np
import traceback
import pickle
import pandas as pd
from sklearn.preprocessing import StandardScaler

# Load the scaler
with open("/dbfs/mnt/saibdpadls/ml-model/scaler.pkl", "rb") as f:
    scaler = pickle.load(f)

# Load the training columns
with open("/dbfs/mnt/saibdpadls/ml-model/training_columns.pkl", "rb") as f:
    training_columns = pickle.load(f)

# Load the trained model
with open("/dbfs/mnt/saibdpadls/ml-model/best_fraud_model.pkl", "rb") as f:
    loaded_model = pickle.load(f)

# Define categorical columns used in training
categorical_cols = ["Location", "CardType", "TransactionType", "ProductCategory"]

def process_batch(df, epoch_id):
    try:
        if df.rdd.isEmpty():
            print(f"Epoch {epoch_id}: Empty batch, skipping...")
            return

        print(f"Epoch {epoch_id}: Processing batch...")

        pdf = df.toPandas()
        
        if pdf.empty:
            print(f"Epoch {epoch_id}: Pandas DataFrame is empty, skipping...")
            return

        print(f"Epoch {epoch_id}: Checking DataFrame structure...")
        print(pdf.dtypes)
        print(pdf.head())

        # Extract new time-based features
        pdf["Timestamp"] = pd.to_datetime(pdf["Timestamp"])
        pdf["Hour"] = pdf["Timestamp"].dt.hour
        pdf["Day"] = pdf["Timestamp"].dt.day
        pdf["Month"] = pdf["Timestamp"].dt.month

        # Convert Duration to seconds
        pdf["Duration"] = pdf["Duration"].apply(lambda x: __builtins__.sum(int(t) * sec for t, sec in zip(x.split(":"), [3600, 60, 1])))

        # Drop original Timestamp and TransactionID if present
        pdf.drop(columns=["Timestamp", "TransactionID"], errors="ignore", inplace=True)

        # One-hot encoding for categorical variables (aligning with training)
        pdf = pd.get_dummies(pdf, columns=categorical_cols, drop_first=True)

        # Ensure all columns match training (fill missing with 0)
        for col in training_columns:  # Use the loaded training columns
            if col not in pdf.columns:
                pdf[col] = 0  # Add missing columns

        # Drop the Label column if present (it should not be passed to the scaler)
        pdf.drop(columns=["Label"], errors="ignore", inplace=True)

        # Reorder columns to match the training data
        pdf = pdf[training_columns]

        # Standardize numerical features
        features = scaler.transform(pdf)

        print(f"Epoch {epoch_id}: Running model prediction...")

        predictions = loaded_model.predict(features)
        pdf["prediction"] = predictions  # Add predictions to the DataFrame

        print(f"Epoch {epoch_id}: Successfully generated predictions!")

        # Convert back to Spark DataFrame
        predicted_stream = spark.createDataFrame(pdf)

        # Ensure unique id
        if "id" not in predicted_stream.columns:
            from pyspark.sql.functions import monotonically_increasing_id
            predicted_stream = predicted_stream.withColumn("id", monotonically_increasing_id().cast("string"))

        print(f"Epoch {epoch_id}: Writing batch to Cosmos DB...")

        predicted_stream.write.format("cosmos.oltp") \
            .options(**cosmos_config) \
            .mode("append") \
            .save()

        print(f"Epoch {epoch_id}: Batch successfully written to Cosmos DB.")

    except Exception as e:
        print(f"Epoch {epoch_id}: Error encountered.")
        traceback.print_exc()

# Start streaming process
query = df.writeStream.foreachBatch(process_batch).start()
query.awaitTermination()


https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
Exception ignored on calling ctypes callback function: <function _ThreadpoolInfo._find_modules_with_dl_iterate_phdr.<locals>.match_module_callback at 0x7ff496aab560>
Traceback (most recent call last):
  File "/databricks/python/lib/python3.11/site-packages/threadpoolctl.py", line 400, in match_module_callback
    self._make_module_from_path(filepath)
  File "/databricks/python/lib/python3.11/site-packages/threadpoolctl.py", line 515, in _make_module_from_path
    module = module_class(filepath, prefix, user_api, internal_api)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/python/lib/python3.11/site-packages/threadpoolctl.py", line 606, in __init__
    self.version = self.get_version()
                   ^^^^^^^^^^^^^^^^^^
  File "/databricks/python/lib/python3.11/s

Epoch 0: Processing batch...
Epoch 0: Checking DataFrame structure...
TransactionID               int32
Timestamp          datetime64[ns]
CustomerID                  int32
Amount                    float64
Location                   object
CardType                   object
TransactionType            object
ProductCategory            object
Duration                   object
Label                      object
dtype: object
   TransactionID           Timestamp  ...  Duration   Label
0              1 2023-09-26 12:36:00  ...   0:16:01  Benign
1              6 2023-06-04 02:38:00  ...   0:48:38  Benign

[2 rows x 10 columns]
Epoch 0: Running model prediction...
Epoch 0: Successfully generated predictions!


  Unable to convert the field Location_Los Angeles. If this column is not necessary, you may consider dropping it or converting to primitive type before the conversion.
Direct cause: [UNSUPPORTED_DATA_TYPE_FOR_ARROW_CONVERSION] uint8 is not supported in conversion to Arrow.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


Epoch 0: Writing batch to Cosmos DB...
Epoch 0: Batch successfully written to Cosmos DB.


com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can