In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, FloatType, DoubleType
from datetime import datetime

In [2]:
# Kafka configuration
kafka_bootstrap_servers = "localhost:9092"
kafka_topic = "data-ingestion-topic"

storage_account_name = 'vipasha'
storage_account_key = 'oTRZNpvHO2pKAUlRctA2cgtq1eJAX3lDTjJD4pNdzkEy+3Ib2x9DsCrWEJ/+HG3jmax7jz/cuqmW+AStV6AU0A=='
storage_container_name = 'etl-storage-container'

# Define output path and filename
output_path = f"wasbs://{storage_container_name}@{storage_account_name}.blob.core.windows.net/bronze/"
current_date = datetime.now().strftime("%d_%m_%Y")
output_file = f"{current_date}_processed.parquet"

In [4]:
spark = SparkSession.builder \
    .appName("KafkaSparkStructuredStreaming") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.hadoop:hadoop-azure:3.2.0") \
    .getOrCreate()
    
# Set Azure Storage account key
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_key)

In [5]:
# Define the schema of the Kafka messages (example schema)
schema = StructType([
        StructField("number_of_elements", FloatType(), True),
        StructField("mean_atomic_mass", DoubleType(), True),
        StructField("wtd_mean_atomic_mass", DoubleType(), True),
        StructField("gmean_atomic_mass", DoubleType(), True),
        StructField("wtd_gmean_atomic_mass", DoubleType(), True),
        StructField("entropy_atomic_mass", DoubleType(), True),
        StructField("wtd_entropy_atomic_mass", DoubleType(), True),
        StructField("range_atomic_mass", DoubleType(), True),
        StructField("wtd_range_atomic_mass", DoubleType(), True),
        StructField("std_atomic_mass", DoubleType(), True),
        StructField("wtd_std_atomic_mass", DoubleType(), True),
        StructField("mean_fie", DoubleType(), True),
        StructField("wtd_mean_fie", DoubleType(), True),
        StructField("gmean_fie", DoubleType(), True),
        StructField("wtd_gmean_fie", DoubleType(), True),
        StructField("entropy_fie", DoubleType(), True),
        StructField("wtd_entropy_fie", DoubleType(), True),
        StructField("range_fie", DoubleType(), True),
        StructField("wtd_range_fie", DoubleType(), True),
        StructField("std_fie", DoubleType(), True),
        StructField("wtd_std_fie", DoubleType(), True),
        StructField("mean_atomic_radius", DoubleType(), True),
        StructField("wtd_mean_atomic_radius", DoubleType(), True),
        StructField("gmean_atomic_radius", DoubleType(), True),
        StructField("wtd_gmean_atomic_radius", DoubleType(), True),
        StructField("entropy_atomic_radius", DoubleType(), True),
        StructField("wtd_entropy_atomic_radius", DoubleType(), True),
        StructField("range_atomic_radius", DoubleType(), True),
        StructField("wtd_range_atomic_radius", DoubleType(), True),
        StructField("std_atomic_radius", DoubleType(), True),
        StructField("wtd_std_atomic_radius", DoubleType(), True),
        StructField("mean_Density", DoubleType(), True),
        StructField("wtd_mean_Density", DoubleType(), True),
        StructField("gmean_Density", DoubleType(), True),
        StructField("wtd_gmean_Density", DoubleType(), True),
        StructField("entropy_Density", DoubleType(), True),
        StructField("wtd_entropy_Density", DoubleType(), True),
        StructField("range_Density", DoubleType(), True),
        StructField("wtd_range_Density", DoubleType(), True),
        StructField("std_Density", DoubleType(), True),
        StructField("wtd_std_Density", DoubleType(), True),
        StructField("mean_ElectronAffinity", DoubleType(), True),
        StructField("wtd_mean_ElectronAffinity", DoubleType(), True),
        StructField("gmean_ElectronAffinity", DoubleType(), True),
        StructField("wtd_gmean_ElectronAffinity", DoubleType(), True),
        StructField("entropy_ElectronAffinity", DoubleType(), True),
        StructField("wtd_entropy_ElectronAffinity", DoubleType(), True),
        StructField("range_ElectronAffinity", DoubleType(), True),
        StructField("wtd_range_ElectronAffinity", DoubleType(), True),
        StructField("std_ElectronAffinity", DoubleType(), True),
        StructField("wtd_std_ElectronAffinity", DoubleType(), True),
        StructField("mean_FusionHeat", DoubleType(), True),
        StructField("wtd_mean_FusionHeat", DoubleType(), True),
        StructField("gmean_FusionHeat", DoubleType(), True),
        StructField("wtd_gmean_FusionHeat", DoubleType(), True),
        StructField("entropy_FusionHeat", DoubleType(), True),
        StructField("wtd_entropy_FusionHeat", DoubleType(), True),
        StructField("range_FusionHeat", DoubleType(), True),
        StructField("wtd_range_FusionHeat", DoubleType(), True),
        StructField("std_FusionHeat", DoubleType(), True),
        StructField("wtd_std_FusionHeat", DoubleType(), True),
        StructField("mean_ThermalConductivity", DoubleType(), True),
        StructField("wtd_mean_ThermalConductivity", DoubleType(), True),
        StructField("gmean_ThermalConductivity", DoubleType(), True),
        StructField("wtd_gmean_ThermalConductivity", DoubleType(), True),
        StructField("entropy_ThermalConductivity", DoubleType(), True),
        StructField("wtd_entropy_ThermalConductivity", DoubleType(), True),
        StructField("range_ThermalConductivity", DoubleType(), True),
        StructField("wtd_range_ThermalConductivity", DoubleType(), True),
        StructField("std_ThermalConductivity", DoubleType(), True),
        StructField("wtd_std_ThermalConductivity", DoubleType(), True),
        StructField("mean_Valence", DoubleType(), True),
        StructField("wtd_mean_Valence", DoubleType(), True),
        StructField("gmean_Valence", DoubleType(), True),
        StructField("wtd_gmean_Valence", DoubleType(), True),
        StructField("entropy_Valence", DoubleType(), True),
        StructField("wtd_entropy_Valence", DoubleType(), True),
        StructField("range_Valence", DoubleType(), True),
        StructField("wtd_range_Valence", DoubleType(), True),
        StructField("std_Valence", DoubleType(), True),
        StructField("wtd_std_Valence", DoubleType(), True),
        StructField("critical_temp", DoubleType(), True)
])

In [6]:
# Read data from Kafka
kafka_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .load()

In [7]:
# Select the value column and cast it to string
value_df = kafka_df.selectExpr("CAST(value AS STRING) as json_value")

# Parse JSON data
parsed_df = value_df.select(from_json(col("json_value"), schema).alias("data")).select("data.*")

In [8]:
parsed_df

DataFrame[number_of_elements: float, mean_atomic_mass: double, wtd_mean_atomic_mass: double, gmean_atomic_mass: double, wtd_gmean_atomic_mass: double, entropy_atomic_mass: double, wtd_entropy_atomic_mass: double, range_atomic_mass: double, wtd_range_atomic_mass: double, std_atomic_mass: double, wtd_std_atomic_mass: double, mean_fie: double, wtd_mean_fie: double, gmean_fie: double, wtd_gmean_fie: double, entropy_fie: double, wtd_entropy_fie: double, range_fie: double, wtd_range_fie: double, std_fie: double, wtd_std_fie: double, mean_atomic_radius: double, wtd_mean_atomic_radius: double, gmean_atomic_radius: double, wtd_gmean_atomic_radius: double, entropy_atomic_radius: double, wtd_entropy_atomic_radius: double, range_atomic_radius: double, wtd_range_atomic_radius: double, std_atomic_radius: double, wtd_std_atomic_radius: double, mean_Density: double, wtd_mean_Density: double, gmean_Density: double, wtd_gmean_Density: double, entropy_Density: double, wtd_entropy_Density: double, range_D

In [9]:
# Write the streaming DataFrame to Azure Blob Storage in Parquet format
query = parsed_df \
    .writeStream \
    .format("parquet") \
    .option("path", output_path + output_file) \
    .option("checkpointLocation", "/tmp/checkpoints/kafka_to_blob/") \
    .start()
    
# Await termination
try:
    query.awaitTermination()
except KeyboardInterrupt:
    print("Stopping the query...")
    query.stop()
    print("Query stopped.")

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 