In [0]:
%run ./01-config

In [0]:
class Upserter():
    def __init__(self,temp_view_name,merge_query):
        self.temp_view_name = temp_view_name
        self.merge_query = merge_query
    
    def upsertdata(self,batch_data,batch_id):
        batch_data.createOrReplaceTempView(self.temp_view_name)
        # print(self.merge_query)
        # spark.sql(self.merge_query)
        batch_data._jdf.sparkSession().sql(self.merge_query)


In [0]:
class CDCUpserter():
    def __init__(self,temp_view_name,merge_query,column_id,sort_id):
        self.temp_view_name = temp_view_name
        self.merge_query = merge_query
        self.column_id = column_id
        self.sort_id = sort_id
    
    def upsertdata(self,batch_data,batch_id):
        import pyspark.sql.functions as f
        from pyspark.sql.window import Window

        window_frame = Window.partitionBy(f.col(self.column_id)).orderBy(f.col(self.sort_id).desc())
        batch_data = batch_data.filter("update_type in ('new','update')")
        batch_data = batch_data.withColumn("rank",f.rank().over(window_frame)).filter("rank=1").drop("rank")
        batch_data.createOrReplaceTempView(self.temp_view_name)
        batch_data._jdf.sparkSession().sql(self.merge_query)

In [0]:
class Silver():
    def __init__(self,env):
        Conf  = Config()
        self.checkpoint_base = Conf.base_dir_checkpoint + "/checkpoints"        
        self.catalog = env
        self.bronze_db = Conf.bronze_db_name
        self.silver_db = Conf.silver_db_name
        spark.sql(f"USE {self.catalog}.{self.silver_db}")
    
    def upsert_user_details(self,once=False,process_time = "5 seconds",startingVersion=0):
        merge_query = f"""MERGE INTO {self.catalog}.{self.silver_db}.users a 
        USING users_delta b ON a.user_id = b.user_id
        WHEN NOT MATCHED THEN INSERT *"""

        upsert = Upserter(f"users_delta",merge_query)    
        
        read_stream = (spark.readStream
                       .option("startingVersion",startingVersion)
                       .option("ignoreDeletes",True)
                       .table(f"{self.catalog}.{self.bronze_db}.registered_users_bz")
                       .selectExpr("user_id","device_id","mac_address","cast(registration_timestamp as timestamp)")
                       .dropDuplicates(["user_id","device_id"])
                       )
        
        write_stream = (read_stream.writeStream
                        .foreachBatch(upsert.upsertdata)
                        .outputMode("update")
                        .option("queryName","upsert_user_details_stream")
                        .option("checkpointLocation",f"{self.checkpoint_base}/user_details")
                        )
        
        if once:
            return write_stream.trigger(availableNow=True).start()
        else:
            return write_stream.trigger(processingTime=process_time).start() 
        

    def upsert_gym_logs(self,once=False,process_time = "5 seconds",startingVersion=0):
        merge_query = f"""MERGE INTO {self.catalog}.{self.silver_db}.gym_logs a 
        USING gym_logs_delta b ON a.mac_address = b.mac_address AND a.gym = b.gym AND a.login = b.login 
        WHEN MATCHED AND b.logout>a.login AND b.logout>a.logout THEN UPDATE SET logout = b.logout 
        WHEN NOT MATCHED THEN INSERT *"""

        upsert = Upserter("gym_logs_delta",merge_query)    
        
        read_stream = (spark.readStream
                       .option("startingVersion",startingVersion)
                       .option("ignoreDeletes",True)
                       .table(f"{self.catalog}.{self.bronze_db}.gym_logins_bz")
                       .selectExpr("mac_address","gym","cast(login as timestamp)","cast(logout as timestamp)")
                       .dropDuplicates(["mac_address","gym","login"])
                       )
        
        write_stream = (read_stream.writeStream
                        .foreachBatch(upsert.upsertdata)
                        .outputMode("update")
                        .option("queryName","upsert_gym_logs_stream")
                        .option("checkpointLocation",f"{self.checkpoint_base}/gym_logs")
                        )
        
        if once:
            return write_stream.trigger(availableNow=True).start()
        else:
            return write_stream.trigger(processingTime=process_time).start()  
    
    def upsert_user_profile(self,once=False,process_time = "5 seconds",startingVersion=0):
        import pyspark.sql.functions as f

        #Idempotent - Insert new record
        #           - Ignore deletes
        #           - Update user details when
        #               1. update_type in ("new", "append")
        #               2. current update is newer than the earlier

        schema = """
            user_id bigint, update_type STRING, timestamp FLOAT, 
            dob STRING, sex STRING, gender STRING, first_name STRING, last_name STRING, 
            address STRUCT<street_address: STRING, city: STRING, state: STRING, zip: INT>"""
    
        
        merge_query = f"""MERGE INTO {self.catalog}.{self.silver_db}.user_profile a USING user_profile_delta b ON a.user_id = b.user_id 
        WHEN MATCHED AND a.updated < b.updated THEN UPDATE SET *
            WHEN NOT MATCHED
              THEN INSERT *"""

        cdcupsert = CDCUpserter("user_profile_delta",merge_query,"user_id","updated")    
        
        read_stream = (spark.readStream
                       .option("startingVersion",startingVersion)
                       .option("ignoreDeletes",True)
                       .table(f"{self.catalog}.{self.bronze_db}.kafka_multiplex_bz")
                       .filter("topic='user_info'")
                       .select(f.from_json(f.col("value").cast("string"),schema).alias("v"))
                       .select("v.*")
                       .select("user_id",f.to_date("dob","MM/dd/yyyy").alias("dob"),"sex","gender","first_name","last_name","address.street_address","address.city","address.state","address.zip",f.to_timestamp("timestamp").alias("updated"),"update_type")
                       .withWatermark("updated", "30 seconds")
                       .dropDuplicates(["user_id","updated"])
                       )
        
        write_stream = (read_stream.writeStream
                        .foreachBatch(cdcupsert.upsertdata)
                        .outputMode("update")
                        .option("queryName","upsert_user_profile_stream")
                        .option("checkpointLocation",f"{self.checkpoint_base}/user_profile")
                        )
        
        if once:
            return write_stream.trigger(availableNow=True).start()
        else:
            return write_stream.trigger(processingTime=process_time).start()    
    
    def upsert_workouts(self,once=False,process_time = "5 seconds",startingVersion=0):
        import pyspark.sql.functions as f
        schema = "user_id INT, workout_id INT,timestamp FLOAT, action STRING, session_id INT"
        
        merge_query = f"""MERGE INTO {self.catalog}.{self.silver_db}.workouts a 
                            USING workouts_delta b ON a.user_id = b.user_id AND 
                            a.time=b.time WHEN NOT MATCHED THEN INSERT *
                            """

        upsert = Upserter("workouts_delta",merge_query)

        #Idempotent - User cannot have two workout sessions at the same time. So ignore the duplicates and insert the new records

        read_stream = (spark.readStream.option("startingVersion",startingVersion)
                       .option("ignoreDeletes",True)
                       .table(f"{self.catalog}.{self.bronze_db}.kafka_multiplex_bz")
                       .filter("topic='workout'")
                       .select(f.from_json(f.col("value").cast("string"),schema).alias("v"))
                       .select("v.*")
                       .select("user_id","workout_id",f.col("timestamp").cast("timestamp").alias("time"),"action","session_id")
                       .withWatermark("time", "30 seconds")
                       .dropDuplicates(["user_id", "time"])
                       )
        
        write_stream = (read_stream.
                        writeStream
                        .foreachBatch(upsert.upsertdata)
                        .outputMode("update")
                        .option("queryName","upsert_workouts_stream")
                        .option("checkpointLocation",f"{self.checkpoint_base}/workouts")
                        )
        
        if once:
            return write_stream.trigger(availableNow=True).start()
        else:
            return write_stream.trigger(processingTime=process_time).start()
        
    
    def upsert_heart_rate(self,once=False,process_time = "5 seconds",startingVersion=0):
        import pyspark.sql.functions as f
        schema = "device_id BIGINT, time TIMESTAMP, heartrate DOUBLE"

        merge_query = f"""MERGE INTO {self.catalog}.{self.silver_db}.heart_rate a 
        USING heart_rate_delta b ON a.device_id = b.device_id AND a.time=b.time 
        WHEN NOT MATCHED THEN INSERT *"""


        upsert = Upserter("heart_rate_delta",merge_query)

        #Idempotent - Only one BPM signal is allowed at a timestamp. So ignore the duplicates and insert the new records

        read_stream = (spark.readStream.option("startingVersion",startingVersion)
                       .option("ignoreDeletes",True)
                       .table(f"{self.catalog}.{self.bronze_db}.kafka_multiplex_bz")
                       .filter("topic='bpm'")
                       .select(f.from_json(f.col("value").cast("string"),schema).alias("v"))
                       .select("v.*",f.when(f.col("v.heartrate")<=0,False).otherwise(True).alias("valid"))
                       .withWatermark("time", "30 seconds")
                       .dropDuplicates(["device_id", "time"])
                       )
        
        write_stream = (read_stream.
                        writeStream
                        .foreachBatch(upsert.upsertdata)
                        .outputMode("update")
                        .option("queryName","upsert_heart_rate_stream")
                        .option("checkpointLocation",f"{self.checkpoint_base}/heart_rate")
                        )
        
        if once:
            return write_stream.trigger(availableNow=True).start()
        else:
            return write_stream.trigger(processingTime=process_time).start()
        

    def get_user_age(self,dob_col):
        import pyspark.sql.functions as f
        age_col = f.floor((f.months_between(f.current_date(),dob_col))/12).alias("age")
        return (f.when(age_col<18,"Under 18")
                .when(((age_col>=18)&(age_col<25)),"18-25")
                .when(((age_col>=25)&(age_col<35)),"25-35")
                .when(((age_col>=35)&(age_col<45)),"35-45")
                .when(((age_col>=45)&(age_col<55)),"45-55")
                .when(((age_col>=55)&(age_col<65)),"55-65")
                .when(((age_col>=65)&(age_col<75)),"65-75")
                .when(((age_col>=75)&(age_col<85)),"75-85")
                .when(((age_col>=85)&(age_col<95)),"85-95")
                .otherwise("95+").alias("age"))

    def upsert_user_bins(self,once=False,process_time = "5 seconds",startingVersion=0):
        import pyspark.sql.functions as f

        merge_query = f"""
                        MERGE INTO {self.catalog}.{self.silver_db}.user_bins a USING user_bins_delta b 
                        ON a.user_id = b.user_id WHEN MATCHED THEN UPDATE SET *
                        WHEN NOT MATCHED THEN INSERT *
                        """

        upserter = Upserter("user_bins_delta",merge_query)
        
        users_tbl = spark.read.table(f"{self.catalog}.{self.silver_db}.users")

        read_stream = (spark.readStream
                       .option("ignoreChanges",True)
                       .option("startingVersion",startingVersion)
                       .table(f"{self.catalog}.{self.silver_db}.user_profile")
                       )
        read_user_details = read_stream.join(users_tbl,["user_id"],"left").select("user_id",self.get_user_age(f.col("dob")),"gender","city","state")
        
        write_stream = (read_user_details.writeStream
                        .foreachBatch(upserter.upsertdata)
                        .outputMode("update")
                        .option("queryName","upsert_user_bins_stream")
                        .option("checkpointLocation",f"{self.checkpoint_base}/user_bins"))
        
        if once:
            write_stream.trigger(availableNow=True).start()
        else:
            write_stream.trigger(processingTime=process_time).start()
    
    def upsert_completed_workouts(self,once=False,process_time = "5 seconds",startingVersion=0):
        import pyspark.sql.functions as f

        merge_query = f"""
                    MERGE INTO {self.catalog}.{self.silver_db}.completed_workouts a USING completed_workouts_delta b
                    ON a.user_id=b.user_id AND a.workout_id=b.workout_id AND a.session_id=b.session_id 
                    WHEN NOT MATCHED THEN INSERT *
                    """

        upserter = Upserter("completed_workouts_delta",merge_query)

        df_stop = (spark.readStream
                   .option("staringVersion",startingVersion)
                   .option("ignoreDeletes",True)
                   .table(f"{self.catalog}.{self.silver_db}.workouts")
                   .withWatermark("time","30 seconds")
                   .filter("action = 'stop'")
                   .selectExpr("user_id","workout_id","session_id","action","time as end_time")
                   
                   )
        df_start = (spark.readStream
                   .option("staringVersion",startingVersion)
                   .option("ignoreDeletes",True)
                   .table(f"{self.catalog}.{self.silver_db}.workouts")
                   .withWatermark("time","30 seconds")
                   .filter("action = 'start'")
                   .selectExpr("user_id","workout_id","session_id","action","time as start_time")
                   )
        
        join_condition = [df_start.user_id==df_stop.user_id,df_start.workout_id==df_stop.workout_id,df_start.session_id==df_stop.session_id,df_stop.end_time < df_start.start_time + f.expr("INTERVAL 3 HOUR")]

        df_final = df_start.join(df_stop,join_condition,"left").select(df_start.user_id,df_start.workout_id,df_start.session_id,df_start.start_time,df_stop.end_time)

        write_stream = (df_final.writeStream
                        .foreachBatch(upserter.upsertdata)
                        .outputMode("append")
                        .option("queryName","upsert_completed_workouts_stream")
                        .option("checkpointLocation",f"{self.checkpoint_base}/completed_workouts")
                        )
        
        if once:
            write_stream.trigger(availableNow=True).start()
        else:
            write_stream.trigger(processingTime=process_time).start()

    
    def upsert_workout_bpm(self,once=False,process_time = "5 seconds",startingVersion=0):
        import pyspark.sql.functions as f

        merge_query = f"""
                    MERGE INTO {self.catalog}.{self.silver_db}.workout_bpm a USING workout_bpm_delta b
                    ON a.user_id=b.user_id AND a.workout_id=b.workout_id AND a.session_id=b.session_id AND a.time=b.time
                    WHEN NOT MATCHED THEN INSERT *
                    """

        upserter = Upserter("workout_bpm_delta",merge_query)

        users_df = spark.read.table(f"{self.catalog}.{self.silver_db}.users")

        df_complete_workouts = (spark.readStream
                   .option("staringVersion",startingVersion)
                   .option("ignoreDeletes",True)
                   .table(f"{self.catalog}.{self.silver_db}.completed_workouts")
                   .join(users_df,["user_id"])
                   .selectExpr("user_id","device_id","workout_id","session_id","start_time","end_time")
                   .withWatermark("start_time","30 seconds")
                   )
        df_heart_rate = (spark.readStream
                   .option("staringVersion",startingVersion)
                   .option("ignoreDeletes",True)
                   .table(f"{self.catalog}.{self.silver_db}.heart_rate")
                   .selectExpr("device_id","time","heartrate")
                   .withWatermark("time","30 seconds")
                   )
        
        join_condition = [df_complete_workouts.device_id==df_heart_rate.device_id, df_heart_rate.time>df_complete_workouts.start_time,df_heart_rate.time<=df_complete_workouts.end_time,df_complete_workouts.end_time < df_heart_rate.time + f.expr('interval 3 hour')]

        df_final = df_heart_rate.join(df_complete_workouts,join_condition).select("user_id","workout_id","session_id","start_time","end_time","time","heartrate")

        write_stream = (df_final.writeStream
                        .foreachBatch(upserter.upsertdata)
                        .outputMode("append")
                        .option("queryName","upsert_workout_bpm_stream")
                        .option("checkpointLocation",f"{self.checkpoint_base}/workout_bpm")
                        )
        
        if once:
            write_stream.trigger(availableNow=True).start()
        else:
            write_stream.trigger(processingTime=process_time).start()
    
    def _await_queries(self, once):
        if once:
            for stream in spark.streams.active:
                stream.awaitTermination()
                
    def upsert(self, once=True, processing_time="5 seconds"):
        import time
        start = int(time.time())
        print(f"\nExecuting silver layer upsert ...")
        self.upsert_user_details(once, processing_time)
        self.upsert_gym_logs(once, processing_time)
        self.upsert_user_profile(once, processing_time)
        self.upsert_workouts(once, processing_time)
        self.upsert_heart_rate(once, processing_time)        
        self._await_queries(once)
        print(f"Completed silver layer 1 upsert {int(time.time()) - start} seconds")
        self.upsert_user_bins(once, processing_time)
        self.upsert_completed_workouts(once, processing_time)        
        self._await_queries(once)
        print(f"Completed silver layer 2 upsert {int(time.time()) - start} seconds")
        self.upsert_workout_bpm(once, processing_time)
        self._await_queries(once)
        print(f"Completed silver layer 3 upsert {int(time.time()) - start} seconds")
        
        
    def assert_count(self, table_name, expected_count, filter="true"):
        print(f"Validating record counts in {table_name}...", end='')
        actual_count = spark.read.table(f"{self.catalog}.{self.silver_db}.{table_name}").where(filter).count()
        assert actual_count == expected_count, f"Expected {expected_count:,} records, found {actual_count:,} in {table_name} where {filter}" 
        print(f"Found {actual_count:,} / Expected {expected_count:,} records where {filter}: Success")        
        
    def validate(self, sets):
        import time
        start = int(time.time())
        print(f"\nValidating silver layer records...")
        self.assert_count("users", 5 if sets == 1 else 10)
        self.assert_count("gym_logs", 8 if sets == 1 else 16)
        self.assert_count("user_profile", 5 if sets == 1 else 10)
        self.assert_count("workouts", 16 if sets == 1 else 32)
        self.assert_count("heart_rate", sets * 253801)
        self.assert_count("user_bins", 5 if sets == 1 else 10)
        self.assert_count("completed_workouts", 8 if sets == 1 else 16)
        self.assert_count("workout_bpm", 3968 if sets == 1 else 8192)
        print(f"Silver layer validation completed in {int(time.time()) - start} seconds")

    

        
        
        
        