In [None]:
%run ./install-liberaries

In [None]:
from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
import os 
from dotenv import load_env


class Bronze:
    """
    - ingest data from kafka
    - transform value from binary to string
    - transform string value to struct type
    - save the stream to bronze table
    """

    def __init__(self):
        self.load_env(),
        self.BOOTSTRAP_SERVERS= f"{os.getenv('bootstrap')}",
        self.protocol= f"{os.getenv('protocol')}",
        self.mechanism= f"{os.getenv('mechanism')}",
        self.JAAS_MODULE= f"{os.getenv('JAAS_MODULE')}",
        self.CLUSTER_API_KEY= f"{os.getenv('api_key')}",
        self.CLUSTER_API_SECRETS= f"{os.getenv('password')}"


        pass
    def getSchema(self):
        """return the schema for the json"""
        return ( StructType([
                        StructField("device_id", IntegerType()),
                        StructField("sensor_type", StringType()),
                        StructField("reading_value", IntegerType()),
                        StructField("patient_id", StringType()),
                        StructField("timestamp", TimestampType())
                ]))
            

    def ingestFromKafka(self):
        return (
            spark.readStream.format("kafka")
            .option("kafka.bootstrap.servers", self.BOOTSTRAP_SERVERS)
            .option("kafka.security.protocol", self.protocol)
            .option("kafka.sasl.mechanism", self.mechanism)
            .option("kafka.sasl.jaas.config",f"{self.JAAS_MODULE} required username='{self.CLUSTER_API_KEY}' password='{self.CLUSTER_API_SECRETS}';",
            )
            .option("subscribe", "topic_0")
            .load()
        )

    def getReading(self, kafka_df):
        return kafka_df.select(
            f.col("key").cast("string"), f.col("value").cast("string")
        )

    def process(self):
        print("starting Bronze Stream...")

        rawDF = self.ingestFromKafka()
        readingDF = self.getReading(rawDF)

        processDF= readingDF.withColumn(
            "json_value", f.from_json(f.col("value"), self.getSchema())
            ).select(
                f.col("json_value.device_id").alias("device_id"),
                f.col("json_value.sensor_type").alias("sensor"),
                f.col("json_value.reading_value").alias("reading"),
                f.col("json_value.timestamp").alias("timestamp"),
                f.col("json_value.patient_id").alias("patient_id"),
            )
        sQuery = (
            processDF.writeStream.queryName("bronze-ingestion")
            .option("checkpointLocation", f"{self.base_data_dir}/chekpoint/deviceReading_bz")
            .outputMode("append")
            .toTable("device_reading_bz")
        )

       

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting confluent_kafka
  Downloading confluent_kafka-2.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.0 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 4.0/4.0 MB 11.9 MB/s eta 0:00:00
Installing collected packages: confluent_kafka
Successfully installed confluent_kafka-2.3.0
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


In [None]:
class silver():
    ''' - read streaming bronze tabel 
        -read patient_details json file
        -create dataframe from patient_details json
        -find the max reading in a window of 10 with an interval of 2 min 
        -calculate new col `condition` based on the max reading 
        - sink the data to silver layer 
        '''
    def __init__(self):
        self.jPATH="dbfs:/FileStore/patient.json"
        self.bzPATH="device_reading_bz"
    

    

    def getPatient(self):
       from pyspark.sql import functions as f
       return(
           spark.read.format("json")
                .option("multiline","True")
                .load(self.jPATH)\
                .select(
                    f.col('key').alias('pid'),
                    f.col('value.name'),
                    f.col('value.age'),
                    f.col('value.gender'),
                    f.col('value.demographics.marital_status'),
                    f.col('value.demographics.ethencity'),
                    f.col('value.demographics.address'),
                    f.col('value.demographics.contact_number'),
                    f.col('value.insurance_info.insurance_provider'),
                    f.col('value.insurance_info.validity'),
                    f.col('value.insurance_info.policy_number')
                )
       )

    def readBronze(self):
        return(
            spark.readStream.table(self.bzPATH)
                    )
    def getAggregate(self, sensorDF):
        print("starting silver layer processing .....")
        from pyspark.sql.functions import window, max,substring

        patientDF = self.getPatient()  # Assuming getPatient returns a DataFrame
        return (
            sensorDF.withWatermark("timestamp", "30 Minutes")
            .groupby(
                sensorDF.patient_id, sensorDF.device_id, sensorDF.sensor,
                window("timestamp", "10 Minutes", "2 Minutes").alias("window")
            )
            .agg(max(sensorDF.reading).alias("maxReading"))
            .join(
                patientDF,
                substring(patientDF.pid, 4, 12) == sensorDF.patient_id,
                "inner"
            )
            .select(
                sensorDF.patient_id, patientDF.name, patientDF.age, sensorDF.sensor,
                sensorDF.device_id, "window.start", "window.end", "maxReading"
            )
        )
        

    def process(self):
        sensorDF = self.readBronze()
        sQuery = self.getAggregate(sensorDF).writeStream.queryName("bronze-ingestion")\
                                            .option("checkpointLocation", f"{self.base_data_dir}/chekpoint/deviceReading_bz")\
                                            .outputMode("append")\
                                            .toTable("CareSl")       
        
        


In [None]:
bz=Bronze()
bz.process()
sl=silver()
sl.process()


starting Bronze Stream...
