In [0]:
%run ./1-config

In [0]:
class Bronze():
    def __init__(self):        
        self.Conf = Config()
        self.landing_zone = self.Conf.base_dir_data + "/raw" 
        self.checkpoint_base = self.Conf.base_dir_checkpoint + "/checkpoints"
        self.db_name = self.Conf.db_name

    def list_tables(self):
        spark.sql(f"USE {self.db_name}")
        tables = spark.sql("show tables").select("tableName").rdd.flatMap(lambda x: x).collect()
        return tables
    
    def list_kafka_topics(self):
        return ["bpm_stream_topic","gym_logins_topic","registered_user_topic","user_info_topic","work_out_session_topic"]

    def consume_user_registration(self,table_name,topic,once=True, processing_time="5 seconds"):

            df_stream = (spark.readStream
                            .format("kafka")
                            .option("kafka.bootstrap.servers", self.Conf.kafka_server) 
                            .option("kafka.security.protocol", "SASL_SSL") 
                            .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}'  \
                            password='{}';".format(self.Conf.kafka_api_key, self.Conf.kafka_api_secret)) 
                            .option("kafka.ssl.endpoint.identification.algorithm", "https") 
                            .option("kafka.sasl.mechanism", "PLAIN") 
                            .option("subscribe",topic) 
                            .load()
                        )
            print(df_stream.printSchema())     
            # Use append mode because bronze layer is expected to insert only from source
            stream_writer = df_stream.writeStream \
                                    .format("delta") \
                                    .option("checkpointLocation", self.checkpoint_base + f"/{table_name}_bz") \
                                    .outputMode("append") \
                                    .queryName(f"{table_name}_bz_ingestion_stream")
                        
            if once == True:
                return stream_writer.trigger(availableNow=True).toTable(f"{self.db_name}.{table_name}")
            else:
                return stream_writer.trigger(processingTime=processing_time).toTable(f"{self.db_name}.{table_name}")
            

    def consume(self):
        tb = self.list_tables()
        print(tb)
        tp = self.list_kafka_topics()
        print(tp)
        for i in range(len(tb)):
            self.consume_user_registration(tb[i],tp[i],once=True, processing_time="5 seconds")


In [0]:
bz = Bronze()
bz.consume()

['bpm_stream_table', 'gym_logins_table', 'registered_users_tables', 'user_info_table', 'workout_session_table']
['bpm_stream_topic', 'gym_logins_topic', 'registered_user_topic', 'user_info_topic', 'work_out_session_topic']
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

None
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

None
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- time

In [0]:
df = spark.sql("select * from sbit_db.workout_session_table")

In [0]:
df.printSchema()

root

