### Config catalog widget

In [0]:
dbutils.widgets.text(name="env",defaultValue='',label='Enter the environment or catalog in lower case')
env = dbutils.widgets.get("env")
print(env)

### Set path to checkpoints and landing area

In [0]:
checkpoints = spark.sql("""DESCRIBE EXTERNAL LOCATION `checkpoints`;""").select('url').collect()[0][0]
landing = spark.sql(" DESCRIBE EXTERNAL LOCATION `landing`;").select("url").collect()[0][0]


### Import all spark dependencies

In [0]:
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
    from pyspark.sql.functions import current_timestamp

### Creating a read road_traffic data function

In [0]:
def read_traffic_data():
    print("Reading the Raw Traffic Data :  ", end='')
    schema = StructType([
    StructField("Record_ID",IntegerType()),
    StructField("Count_point_id",IntegerType()),
    StructField("Direction_of_travel",StringType()),
    StructField("Year",IntegerType()),
    StructField("Count_date",StringType()),
    StructField("hour",IntegerType()),
    StructField("Region_id",IntegerType()),
    StructField("Region_name",StringType()),
    StructField("Local_authority_name",StringType()),
    StructField("Road_name",StringType()),
    StructField("Road_Category_ID",IntegerType()),
    StructField("Start_junction_road_name",StringType()),
    StructField("End_junction_road_name",StringType()),
    StructField("Latitude",DoubleType()),
    StructField("Longitude",DoubleType()),
    StructField("Link_length_km",DoubleType()),
    StructField("Pedal_cycles",IntegerType()),
    StructField("Two_wheeled_motor_vehicles",IntegerType()),
    StructField("Cars_and_taxis",IntegerType()),
    StructField("Buses_and_coaches",IntegerType()),
    StructField("LGV_Type",IntegerType()),
    StructField("HGV_Type",IntegerType()),
    StructField("EV_Car",IntegerType()),
    StructField("EV_Bike",IntegerType())
    ])

    raw_traffic_stream = spark.readStream.format('cloudFiles')\
    .option('cloudFiles.format', 'csv')\
    .option('cloudFiles.schemaLocation',f"{checkpoints}/rawTrafficLoad/schemaInfer")\
    .option('header','true')\
    .schema(schema)\
    .load(landing+'/raw_traffic/')\
    .withColumn('Extract_time',current_timestamp())

    print("Road traffic stream read successful !!")
    return raw_traffic_stream


### Creating raw roads read funtion

In [0]:
def read_road_data():
    print("Reading the Raw Roads Data :  ", end='')
    schema = StructType([
        StructField('Road_ID',IntegerType()),
        StructField('Road_Category_Id',IntegerType()),
        StructField('Road_Category',StringType()),
        StructField('Region_ID',IntegerType()),
        StructField('Region_Name',StringType()),
        StructField('Total_Link_Length_Km',DoubleType()),
        StructField('Total_Link_Length_Miles',DoubleType()),
        StructField('All_Motor_Vehicles',DoubleType())
        ])
    raw_road_stream = spark.readStream.format('cloudFiles')\
        .option('cloudFiles.format','csv')\
        .option('cloudFiles.schemaLocation',f"{checkpoints}/rawRoadsLoad/schemaInfer")\
        .option('header','true')\
        .schema(schema)\
        .load(landing+'/raw_roads/')
    print("Raw Road stream read successful !!")
    return raw_road_stream
    


### Creating write_traffic_data to bronze

In [0]:
def write_Traffic_data(streaming_df,catalog):
    print(f'Writing data to {catalog}_catalog raw_traffic table', end='' )
    write_Stream = streaming_df.writeStream.format('delta')\
        .option('checkpointLocation',f"{checkpoints}/rawTrafficLoad/checkpt")\
        .outputMode('append')\
        .queryName('rawTrafficWriteStream')\
        .trigger(availableNow=True)\
        .toTable(f"`{catalog}`.`bronze`.`raw_traffic`")

    write_Stream.awaitTermination()
    print('Write Success')
    print("****************************") 


### Creating function for raw roads write to bronze

In [0]:
def write_Road_Data(streaming_df,catalog):
    print(f'Writing data to {catalog}_catalog raw_roads table', end='' )
    write_Data = (streaming_df.writeStream
                    .format('delta')
                    .option("checkpointLocation",f"{checkpoints}/rawRoadsLoad/checkpt")
                    .outputMode('append')
                    .queryName('rawRoadsWriteStream')
                    .trigger(availableNow=True)
                    .toTable(f"`{catalog}`.`bronze`.`raw_roads`"))
    
    write_Data.awaitTermination()
    print('Write Success')
    print("****************************")    

### Calling read and write function to load data from landing to bronze tables

In [0]:
# Read traffic data and load to bronze table
read_df = read_traffic_data()
write_Traffic_data(streaming_df = read_df,catalog = env)

read_road = read_road_data()
write_Road_Data(streaming_df = read_road,catalog = env)
