### Calling common notebook to re-use variables

In [0]:
%run "/Workspace/Users/navinmalik7@hotmail.com/DatabricksLTI/04_CommonFunctions"

In [0]:
dbutils.widgets.text(name="env",defaultValue='',label='Enter the environment in lower case')
env = dbutils.widgets.get("env")

### Read Silver_Traffic table

In [0]:
def read_SilverTrafficTable(environment):
    print('Reading the Silver Traffic Table Data : ',end='')
    df_SilverTraffic = (spark.readStream
                    .table(f"`{environment}-catalog`.`silver`.silver_traffic")
                    )
    print(f'Reading {environment}-catalog.silver.silver_traffic Success!')
    print("**********************************")
    return df_SilverTraffic

### Read silver_roads Table

In [0]:
def read_SilverRoadsTable(environment):
    print('Reading the Silver Table Silver_roads Data : ',end='')
    df_SilverRoads = (spark.readStream
                    .table(f"`{environment}-catalog`.`silver`.silver_roads")
                    )
    print(f'Reading {environment}-catalog.silver.silver_roads Success!')
    print("**********************************")
    return df_SilverRoads

## Creating vehicle Intensity Column

In [0]:
def create_VehicleIntensity(df):
 from pyspark.sql.functions import col
 print('Creating Vehicle Intensity column : ',end='')
 df_veh = df.withColumn('Vehicle_Intensity',
               col('Motor_Vehicles_Count') / col('Link_length_km')
               )
 print("Success!!!")
 print('***************')
 return df_veh

#### Creating LoadTime column

In [0]:
def create_LoadTime(df):
    from pyspark.sql.functions import current_timestamp
    print('Creating Load Time column : ',end='')
    df_timestamp = df.withColumn('Load_Time',
                      current_timestamp()
                      )
    print('Success!!')
    print('**************')
    return df_timestamp

### Writing Data to Gold Traffic

In [0]:
def write_Traffic_GoldTable(StreamingDF,environment):
    print('Writing the gold_traffic Data : ',end='') 

    write_gold_traffic = (StreamingDF.writeStream
                .format('delta')
                .option('checkpointLocation',checkpoint+ "GoldTrafficLoad/Checkpt/")
                .outputMode('append')
                .queryName("GoldTrafficWriteStream")
                .trigger(availableNow=True)
                .toTable(f"`{environment}-catalog`.`gold`.`gold_traffic`"))
    
    write_gold_traffic.awaitTermination()
    print(f'Writing `{environment}-catalog`.`gold`.`gold_traffic` Success!')

### Writing Data to Gold Roads

In [0]:
def write_Roads_GoldTable(StreamingDF,environment):
    print('Writing the gold_roads Data : ',end='') 

    write_gold_roads = (StreamingDF.writeStream
                .format('delta')
                .option('checkpointLocation',checkpoint+ "GoldRoadsLoad/Checkpt/")
                .outputMode('append')
                .queryName("GoldRoadsWriteStream")
                .trigger(availableNow=True)
                .toTable(f"`{environment}-catalog`.`gold`.`gold_roads`"))
    
    write_gold_roads.awaitTermination()
    print(f'Writing `{environment}-catalog`.`gold`.`gold_roads` Success!')

In [0]:
## Reading from Silver tables
df_SilverTraffic = read_SilverTrafficTable(env)
df_SilverRoads = read_SilverRoadsTable(env)
    
## Tranformations     
df_vehicle = create_VehicleIntensity(df_SilverTraffic)
df_FinalTraffic = create_LoadTime(df_vehicle)
df_FinalRoads = create_LoadTime(df_SilverRoads)


## Writing to gold tables    
write_Traffic_GoldTable(df_FinalTraffic,env)
write_Roads_GoldTable(df_FinalRoads,env)

In [0]:
%sql
SELECT * FROM `dev-catalog`.gold.gold_roads LIMIT 10