
### Get environment name from widget and assign to variable

In [0]:
dbutils.widgets.text(name="env", defaultValue='', label='Enter environment in lower case')
env = dbutils.widgets.get("env")


<br>

### Call common notebook to access shared variables and methods


In [0]:
%run "/Workspace/Users/yen#ext#@yenyahoo.onmicrosoft.com/3_Common"


## Creating a read_Traffic_Data() Function

In [0]:
def read_Traffic_Data():
    
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
    from pyspark.sql.functions import current_timestamp

    print("Reading the Raw Traffic Data :  ", end='')
    
    schema = StructType([
    StructField("Record_ID",IntegerType()),
    StructField("Count_point_id",IntegerType()),
    StructField("Direction_of_travel",StringType()),
    StructField("Year",IntegerType()),
    StructField("Count_date",StringType()),
    StructField("hour",IntegerType()),
    StructField("Region_id",IntegerType()),
    StructField("Region_name",StringType()),
    StructField("Local_authority_name",StringType()),
    StructField("Road_name",StringType()),
    StructField("Road_Category_ID",IntegerType()),
    StructField("Start_junction_road_name",StringType()),
    StructField("End_junction_road_name",StringType()),
    StructField("Latitude",DoubleType()),
    StructField("Longitude",DoubleType()),
    StructField("Link_length_km",DoubleType()),
    StructField("Pedal_cycles",IntegerType()),
    StructField("Two_wheeled_motor_vehicles",IntegerType()),
    StructField("Cars_and_taxis",IntegerType()),
    StructField("Buses_and_coaches",IntegerType()),
    StructField("LGV_Type",IntegerType()),
    StructField("HGV_Type",IntegerType()),
    StructField("EV_Car",IntegerType()),
    StructField("EV_Bike",IntegerType())
    ])

    rawTraffic_stream = (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format","csv")
        .option('cloudFiles.schemaLocation',f'{checkpoint}/rawTrafficLoad/schemaInfer')
        .option('header','true')
        .schema(schema)
        .load(landing + '/raw_traffic/')
        .withColumn("Extract_Time", current_timestamp()))
    
    print('Reading Succcess !!')
    print('*******************')

    return rawTraffic_stream


## Creating read_Road_Data() Function

In [0]:
def read_Road_Data():

    from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
    from pyspark.sql.functions import current_timestamp

    print("Reading the Raw Roads Data :  ", end='')
    
    schema = StructType([
        StructField('Road_ID',IntegerType()),
        StructField('Road_Category_Id',IntegerType()),
        StructField('Road_Category',StringType()),
        StructField('Region_ID',IntegerType()),
        StructField('Region_Name',StringType()),
        StructField('Total_Link_Length_Km',DoubleType()),
        StructField('Total_Link_Length_Miles',DoubleType()),
        StructField('All_Motor_Vehicles',DoubleType())
        
        ])

    rawRoads_stream = (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format","csv")
        .option('cloudFiles.schemaLocation',f'{checkpoint}/rawRoadsLoad/schemaInfer')
        .option('header','true')
        .schema(schema)
        .load(landing + '/raw_roads/')
        )
    
    print('Reading Succcess !!')
    print('*******************')

    return rawRoads_stream


## Creating write_Traffic_Data(StreamingDF,environment) Function

In [0]:
def write_Traffic_Data(StreamingDF, environment):
    print(f'Writing data to {environment}_catalog raw_traffic table :  ', end='' )
    write_Stream = (StreamingDF.writeStream
                    .format('delta')
                    .option("checkpointLocation", checkpoint + '/rawTrafficLoad/Checkpt')
                    .outputMode('append')
                    .queryName('rawTrafficWriteStream')
                    .trigger(availableNow=True)
                    .toTable(f"`{environment}_catalog`.`bronze`.`raw_traffic`"))
    
    write_Stream.awaitTermination()
    print('Write Success')
    print("****************************")    


## Creating write_Road_Data(StreamingDF,environment) Function

In [0]:
def write_Road_Data(StreamingDF,environment):
    print(f'Writing data to {environment}_catalog raw_roads table :  ', end='' )
    write_Data = (StreamingDF.writeStream
                    .format('delta')
                    .option("checkpointLocation", checkpoint + '/rawRoadsLoad/Checkpt')
                    .outputMode('append')
                    .queryName('rawRoadsWriteStream')
                    .trigger(availableNow=True)
                    .toTable(f"`{environment}_catalog`.`bronze`.`raw_roads`"))
    
    write_Data.awaitTermination()
    print('Write Success')
    print("****************************")    


## Calling read and Write Functions

In [0]:
## Reading the raw_traffic's data from landing to Bronze
read_Df = read_Traffic_Data()

## Reading the raw_roads's data from landing to Bronze
read_roads = read_Road_Data()

## Writing the raw_traffic's data from landing to Bronze
write_Traffic_Data(read_Df, env)

## Writing the raw_roads's data from landing to Bronze
write_Road_Data(read_roads, env)

Reading the Raw Traffic Data :  Reading Succcess !!
*******************
Reading the Raw Roads Data :  Reading Succcess !!
*******************
Writing data to dev_catalog raw_traffic table :  Write Success
****************************
Writing data to dev_catalog raw_roads table :  Write Success
****************************


In [0]:
display(spark.sql(f"SELECT * FROM `{env}_catalog`.`bronze`.`raw_traffic` Limit 5"))

Record_ID,Count_point_id,Direction_of_travel,Year,Count_date,hour,Region_id,Region_name,Local_authority_name,Road_name,Road_Category_ID,Start_junction_road_name,End_junction_road_name,Latitude,Longitude,Link_length_km,Pedal_cycles,Two_wheeled_motor_vehicles,Cars_and_taxis,Buses_and_coaches,LGV_Type,HGV_Type,EV_Car,EV_Bike,Extract_Time
1,749,E,2014,6/25/2014 0:00,7,3,Scotland,East Ayrshire,A77,3,LA boundary,A76,55.59163646,-4.478605862,4.6,0,2,845,5,31,15,4,4,2024-09-27T11:13:05.81Z
2,749,E,2014,6/25/2014 0:00,8,3,Scotland,East Ayrshire,A77,3,LA boundary,A76,55.59163646,-4.478605862,4.6,0,5,908,7,103,29,3,2,2024-09-27T11:13:05.81Z
3,749,E,2014,6/25/2014 0:00,9,3,Scotland,East Ayrshire,A77,3,LA boundary,A76,55.59163646,-4.478605862,4.6,0,2,595,14,88,21,3,5,2024-09-27T11:13:05.81Z
4,749,E,2014,6/25/2014 0:00,10,3,Scotland,East Ayrshire,A77,3,LA boundary,A76,55.59163646,-4.478605862,4.6,0,1,590,8,90,31,6,10,2024-09-27T11:13:05.81Z
5,749,E,2014,6/25/2014 0:00,11,3,Scotland,East Ayrshire,A77,3,LA boundary,A76,55.59163646,-4.478605862,4.6,0,2,695,18,75,38,2,2,2024-09-27T11:13:05.81Z


In [0]:
display(spark.sql(f"SELECT * FROM `{env}_catalog`.`bronze`.`raw_roads` Limit 5"))

Road_ID,Road_Category_Id,Road_Category,Region_ID,Region_Name,Total_Link_Length_Km,Total_Link_Length_Miles,All_Motor_Vehicles
1,1,TM,1,South West,301.339,187.24,3465840186.0
2,3,TA,1,South West,993.586,617.39,3484710210.0
3,4,PA,1,South West,3874.924,2407.77,7794003682.0
4,5,M,1,South West,43581.7,27080.41,9112005432.0
5,1,TM,2,East Midlands,178.609,110.98,2736661520.0


In [0]:
display(spark.sql(f"Select max(Record_ID) from `{env}_catalog`.`bronze`.`raw_traffic`"))

max(Record_ID)
55638


<br>

To demostrate incremental auto loading, I will upload 'raw_traffic2' to the 'landing/raw_traffic' folder and observe the new RecordID count as well as the Extract_Time after I rerun the cell for reading & writing and the cell below to display the max record in the raw_traffic table. 

**Note**: _Autoloader automatically handles the incremental without any special code to do this_.

In [0]:
display(spark.sql(f"Select max(Record_ID) from `{env}_catalog`.`bronze`.`raw_traffic`"))

max(Record_ID)
55638
