In [0]:
%run "/Workspace/Users/schaudhuri1495@gmail.com/04. Common"

In [0]:
dbutils.widgets.text(name = "env", defaultValue='', label = "ENTER THE ENVIRONMENT IN LOWER CASE")
env = dbutils.widgets.get("env")


### CREATING A read_traffic_data FUNCTION

In [0]:
def read_traffic_data():
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
    from pyspark.sql.functions import current_timestamp
    print("Reading the Raw Traffic Data :  ", end='')
    schema = StructType([
    StructField("Record_ID",IntegerType()),
    StructField("Count_point_id",IntegerType()),
    StructField("Direction_of_travel",StringType()),
    StructField("Year",IntegerType()),
    StructField("Count_date",StringType()),
    StructField("hour",IntegerType()),
    StructField("Region_id",IntegerType()),
    StructField("Region_name",StringType()),
    StructField("Local_authority_name",StringType()),
    StructField("Road_name",StringType()),
    StructField("Road_Category_ID",IntegerType()),
    StructField("Start_junction_road_name",StringType()),
    StructField("End_junction_road_name",StringType()),
    StructField("Latitude",DoubleType()),
    StructField("Longitude",DoubleType()),
    StructField("Link_length_km",DoubleType()),
    StructField("Pedal_cycles",IntegerType()),
    StructField("Two_wheeled_motor_vehicles",IntegerType()),
    StructField("Cars_and_taxis",IntegerType()),
    StructField("Buses_and_coaches",IntegerType()),
    StructField("LGV_Type",IntegerType()),
    StructField("HGV_Type",IntegerType()),
    StructField("EV_Car",IntegerType()),
    StructField("EV_Bike",IntegerType())
    ])

    rawTraffic_stream = (spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format","csv")
            .option('cloudFiles.schemaLocation', f'{checkpoints}/rawTrafficLoad/schemaInfer')
            .option("header",'true')
            .schema(schema)
            .load(landing+'/raw_traffic/')
            .withColumn("Extract_Time", current_timestamp()))
    print('READING SUCCESS!!!!!!!!!!!!!!!!')

    return rawTraffic_stream


### CREATING A write_traffic_data() FUNCTION

In [0]:
def write_traffic_data(StreamingDF, env):
    write_stream = (StreamingDF.writeStream
                    .format('delta')
                    .option("checkpointLocation",checkpoints+'/rawTrafficLoad/Checkpt')
                    .outputMode('append')
                    .queryName('rawTrafficWriteStream')
                    .trigger(availableNow=True)
                    .toTable(f"`databricks_{env}_ws`.`bronze`.`raw_traffic`"))

    write_stream.awaitTermination()
    print('WRITE SUCCESS!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')
        


####CALLING READ AND WRITE FUNCTION FOR TRAFFIC

In [0]:
read_df = read_traffic_data()
write_traffic_data(read_df, env)

Reading the Raw Traffic Data :  READING SUCCESS!!!!!!!!!!!!!!!!
WRITE SUCCESS!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!



###CREATING A road_ride_data() FUNCTION

In [0]:
def read_Road_Data():
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
    from pyspark.sql.functions import current_timestamp
    print("Reading the Raw Roads Data :  ", end='')
    schema = StructType([
        StructField('Road_ID',IntegerType()),
        StructField('Road_Category_Id',IntegerType()),
        StructField('Road_Category',StringType()),
        StructField('Region_ID',IntegerType()),
        StructField('Region_Name',StringType()),
        StructField('Total_Link_Length_Km',DoubleType()),
        StructField('Total_Link_Length_Miles',DoubleType()),
        StructField('All_Motor_Vehicles',DoubleType())
        
        ])

    rawRoad_stream = (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format","csv")
        .option('cloudFiles.schemaLocation', f'{checkpoints}/rawRoadsLoad/schemaInfer')
        .option("header",'true')
        .schema(schema)
        .load(landing+'/raw_roads/')
    )
    print('READING SUCCESS!!!!!!!!!!!!!!!!')

    return rawRoad_stream


###CREATING  A write_road_data() FUNCTION

In [0]:
def write_road_data(StreamingDF, env):
    write_road = (StreamingDF.writeStream
                    .format('delta')
                    .option("checkpointLocation",checkpoints+'/rawRoadsLoad/Checkpt')
                    .outputMode('append')
                    .queryName('rawTrafficWriteStream')
                    .trigger(availableNow=True)
                    .toTable(f"`databricks_{env}_ws`.`bronze`.`raw_roads`"))

    write_road.awaitTermination()
    print('WRITE SUCCESS!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!')


####CALLING READ AND WRITE FUNCTION FOR ROADS

In [0]:
read_roads = read_Road_Data()
write_road_data(read_roads, env)

Reading the Raw Roads Data :  READING SUCCESS!!!!!!!!!!!!!!!!
WRITE SUCCESS!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!


In [0]:
display(spark.sql(f"SELECT * FROM `databricks_{env}_ws`.`bronze`.`raw_roads`"))

Road_ID,Road_Category_Id,Road_Category,Region_ID,Region_Name,Total_Link_Length_Km,Total_Link_Length_Miles,All_Motor_Vehicles
1,1,TM,1,South West,301.339,187.24,3465840186.0
2,3,TA,1,South West,993.586,617.39,3484710210.0
3,4,PA,1,South West,3874.924,2407.77,7794003682.0
4,5,M,1,South West,43581.7,27080.41,9112005432.0
5,1,TM,2,East Midlands,178.609,110.98,2736661520.0
6,3,TA,2,East Midlands,1219.231,757.6,4936702686.0
7,4,PA,2,East Midlands,2571.21,1597.68,5535722338.0
8,5,M,2,East Midlands,26712.7,16598.5,7083362916.0
9,1,TM,3,Scotland,335.263,208.32,2485468450.0
10,3,TA,3,Scotland,2820.302,1752.45,5052925570.0


In [0]:
%sql
select count(*) from `databricks_dev_ws`.`bronze`.`raw_traffic`

count(1)
74184



###THE BELOW MENTIONED CODE IS ADDED TO CHECK THE INCREMENTAL LOADING. TILL Record_ID '18546' THE "Extract_time" is different and post that it is different.

In [0]:
%sql
select * from `databricks_dev_ws`.`bronze`.`raw_traffic` where Record_ID  between '37090' and '37100' order by Record_ID;

Record_ID,Count_point_id,Direction_of_travel,Year,Count_date,hour,Region_id,Region_name,Local_authority_name,Road_name,Road_Category_ID,Start_junction_road_name,End_junction_road_name,Latitude,Longitude,Link_length_km,Pedal_cycles,Two_wheeled_motor_vehicles,Cars_and_taxis,Buses_and_coaches,LGV_Type,HGV_Type,EV_Car,EV_Bike,Extract_Time
37090,58073,E,2014,5/8/2014 0:00,13,7,East of England,Cambridgeshire,A14,3,A1303,A1303,52.21335948,0.267274407,6.8,0,3,541,1,163,50,2,15,2024-07-15T18:09:56.809Z
37091,58073,E,2014,5/8/2014 0:00,14,7,East of England,Cambridgeshire,A14,3,A1303,A1303,52.21335948,0.267274407,6.8,0,0,673,0,204,73,9,6,2024-07-15T18:09:56.809Z
37092,58073,E,2014,5/8/2014 0:00,15,7,East of England,Cambridgeshire,A14,3,A1303,A1303,52.21335948,0.267274407,6.8,0,49,1020,3,276,61,4,13,2024-07-15T18:09:56.809Z
37093,58073,E,2014,5/8/2014 0:00,16,7,East of England,Cambridgeshire,A14,3,A1303,A1303,52.21335948,0.267274407,6.8,0,10,1425,1,390,42,7,6,2024-07-15T20:51:30.376Z
37094,58073,E,2014,5/8/2014 0:00,17,7,East of England,Cambridgeshire,A14,3,A1303,A1303,52.21335948,0.267274407,6.8,0,4,1079,3,164,25,2,4,2024-07-15T20:51:30.376Z
37095,58073,E,2014,5/8/2014 0:00,18,7,East of England,Cambridgeshire,A14,3,A1303,A1303,52.21335948,0.267274407,6.8,0,9,870,2,82,11,0,1,2024-07-15T20:51:30.376Z
37096,57787,E,2014,9/23/2014 0:00,7,9,South East,Surrey,A31,4,A331,A3,51.22700479,-0.640882551,8.8,1,35,1955,10,413,32,4,7,2024-07-15T20:51:30.376Z
37097,57787,E,2014,9/23/2014 0:00,8,9,South East,Surrey,A31,4,A331,A3,51.22700479,-0.640882551,8.8,0,41,1951,2,299,33,4,10,2024-07-15T20:51:30.376Z
37098,57787,E,2014,9/23/2014 0:00,9,9,South East,Surrey,A31,4,A331,A3,51.22700479,-0.640882551,8.8,0,14,1462,4,265,34,6,10,2024-07-15T20:51:30.376Z
37099,57787,E,2014,9/23/2014 0:00,10,9,South East,Surrey,A31,4,A331,A3,51.22700479,-0.640882551,8.8,0,10,1038,2,174,43,3,9,2024-07-15T20:51:30.376Z
