#### Loading historical data

In [0]:
#%run /Workspace/Users/mbothe7@hotmail.com/databricks-end-to-end-project/01-initial-config


In [0]:
class BronzeLoader():
    def __init__(self, env):
        self.catalog = env
        Conf = InitialConfig()
        self.landing_path=Conf.landing_path
        self.checkpoint_path= Conf.checkpoint_path+f'/{self.catalog}'
        self.schema='bronze'
        print(f'Checkpoint path is {self.checkpoint_path}')
        print(f'Landing path is {self.landing_path}')
        print(f'current catalog is {self.catalog}')

    def read_raw_traffic(self):
        
        from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DoubleType, TimestampType
        from pyspark.sql.functions import current_timestamp
        print("Loading traffic data...")
        schema = StructType([
            StructField("Record_ID",IntegerType()),
            StructField("Count_point_id",IntegerType()),
            StructField("Direction_of_travel",StringType()),
            StructField("Year",IntegerType()),
            StructField("Count_date",StringType()),
            StructField("hour",IntegerType()),
            StructField("Region_id",IntegerType()),
            StructField("Region_name",StringType()),
            StructField("Local_authority_name",StringType()),
            StructField("Road_name",StringType()),
            StructField("Road_Category_ID",IntegerType()),
            StructField("Start_junction_road_name",StringType()),
            StructField("End_junction_road_name",StringType()),
            StructField("Latitude",DoubleType()),
            StructField("Longitude",DoubleType()),
            StructField("Link_length_km",DoubleType()),
            StructField("Pedal_cycles",IntegerType()),
            StructField("Two_wheeled_motor_vehicles",IntegerType()),
            StructField("Cars_and_taxis",IntegerType()),
            StructField("Buses_and_coaches",IntegerType()),
            StructField("LGV_Type",IntegerType()),
            StructField("HGV_Type",IntegerType()),
            StructField("EV_Car",IntegerType()),
            StructField("EV_Bike",IntegerType())
            ])
        
        
        df=(spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "csv")
            .option("cloudFiles.schemaLocation", f'{self.checkpoint_path}/raw_traffic/schemaInfer')
            .schema(schema)
            .load(f'{self.landing_path}/raw_traffic')
            .withColumn('create_at', current_timestamp())
         )
        return df
    
    def read_raw_roads(self):
        from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
        from pyspark.sql.functions import current_timestamp
        schema = StructType([
        StructField('Road_ID',IntegerType()),
        StructField('Road_Category_Id',IntegerType()),
        StructField('Road_Category',StringType()),
        StructField('Region_ID',IntegerType()),
        StructField('Region_Name',StringType()),
        StructField('Total_Link_Length_Km',DoubleType()),
        StructField('Total_Link_Length_Miles',DoubleType()),
        StructField('All_Motor_Vehicles',DoubleType())
        
        ])

        df=(spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.schemaLocation", f'{self.checkpoint_path}/raw_roads/schemaInfer')
        .schema(schema)
        .load(f'{self.landing_path}/raw_roads')
       
        )
        return df
    
    def write_raw_traffic(self,StreamingDF):
        print(f'Writing data to {self.catalog} catalog raw_traffic table', end='' )
        write_Stream = (StreamingDF.writeStream
                    .format('delta')
                    .option("checkpointLocation",self.checkpoint_path + '/raw_traffic/Checkpt')
                    .outputMode('append')
                    .queryName('rawTrafficWriteStream')
                    .trigger(availableNow=True)
                    .toTable(f"`{self.catalog}`.`bronze`.`raw_traffic`"))
    
        write_Stream.awaitTermination()
        print(f'Writing raw_traffic in {self.catalog} completed')
      

    def write_raw_roads(self,StreamingDF):
        print(f'Writing data to {self.catalog} catalog raw_roads table', end='' )
        write_Data = (StreamingDF.writeStream
                        .format('delta')
                        .option("checkpointLocation",self.checkpoint_path + '/raw_roads/Checkpt')
                        .outputMode('append')
                        .queryName('rawRoadsWriteStream')
                        .trigger(availableNow=True)
                        .toTable(f"`{self.catalog}`.`bronze`.`raw_roads`"))
        
        write_Data.awaitTermination()
        print(f'Writing raw_roads  in {self.catalog} completed')


    def load_bronze_tables(self):
        print('Loading bronze tables...', end='')
        df_traffic = self.read_raw_traffic()
        df_roads = self.read_raw_roads()
        self.write_raw_traffic(df_traffic)
        self.write_raw_roads(df_roads)
        print('Loading bronze tables completed')

    def assert_count(self,table_name,expected_count):
        print(f'Asserting {table_name} count', end='')
        actual_count = spark.read.table(f"`{self.catalog}`.`bronze`.`{table_name}`").count()
        assert actual_count == expected_count, f'Expected {expected_count} records in {table_name}, found {actual_count} in {table_name}'
        print(f'Asserting {table_name} count completed')

    def validate(self):
        import time
        self.assert_count('raw_traffic',100000)
        self.assert_count('raw_roads',100000)




In [0]:
GL= BronzeLoader(env)
GL.load_bronze_tables()