# CAI BJRY Project - JSON

# Initialize Widget

In [0]:
# dbutils.widgets.removeAll()

In [0]:
dbutils.widgets.text("input_table", "default.cai_bjry__20210309221212", " 1. Input Table")
dbutils.widgets.text("file_name", "cai_bjry__20210309221212", " 2. File Name")
dbutils.widgets.text("output_schema_name", "greenbrier_projects", " 3. Output Schema Name")
dbutils.widgets.text("output_table_name", "movement", " 4. Output Table Name")

# Run Common Functions Notebook
This notebook contains functions and libraries we need

In [0]:
%run "./Common Functions"

# Define Notebook variables

In [0]:
INPUT_TABLE = dbutils.widgets.get("input_table")
FILE_NAME = dbutils.widgets.get("file_name")
OUTPUT_SCHEMA_NAME = dbutils.widgets.get("output_schema_name")
OUTPUT_TABLE_NAME = dbutils.widgets.get("output_table_name")

print(f"INPUT_TABLE             = {INPUT_TABLE}")
print(f"FILE_NAME               = {FILE_NAME}")
print(f"OUTPUT_SCHEMA_NAME      = {OUTPUT_SCHEMA_NAME}")
print(f"OUTPUT_TABLE_NAME       = {OUTPUT_TABLE_NAME}")


## Get current timestamp

In [0]:
## from pyspark.sql.functions import current_timestamp

df_current_timestamp = spark.createDataFrame([["1"]],["id"])
df_current_timestamp = (df_current_timestamp.withColumn("currentTimestamp",current_timestamp()))

PROCESSING_TIMESTAMP = df_current_timestamp.select("currentTimestamp").collect()[0][0]
print(f"PROCESSING_TIMESTAMP   = {PROCESSING_TIMESTAMP}")

# Read Raw Data

## Read raw json file
 

In [0]:
# data_json = spark.read.json(path="dbfs_file_path")

Since we don't have access to dbfs we will have to read the data as a table

## Read raw data as table
 

In [0]:
data_raw = spark.table(INPUT_TABLE)
## or 
#data_raw = spark.sql(f"select * from {INPUT_TABLE}")
display(data_raw)

# Transformations

## Explode `v1.0.1`
- Rename "v1.0.1" to "v1_0_1"

- Explode "v1_0_1"

In [0]:
## from pyspark.sql.functions import explode

explode_v1_0_1 = (data_raw.withColumnRenamed("v1.0.1", "v1_0_1")
                            .withColumn("v1_0_1", explode("v1_0_1"))
                            )
display(explode_v1_0_1)

## Extract all necesary nested columns in `v1_0_1`

In [0]:
expand_equip = explode_v1_0_1.select((input_file_name()).alias("input_file_name"), current_timestamp().alias("current_timestamp")
                                     , "carrier","carrier_id","carrier_timezone"
                                     ,"event.event_datetime","event.splc","event.report_type"
                                     ,"v1_0_1.*")
display(expand_equip)

## Select only needed columns and extract all columns in `interchange` and `waybill`

In [0]:
expand_inter_waybill = expand_equip.select('input_file_name','current_timestamp','carrier','carrier_id','carrier_timezone','event_datetime','splc',
                                           'report_type','equipment_initial','equipment_number','interchange.*'
                                           ,'load_status','status', 'waybill.*')
display(expand_inter_waybill)

## Explode `route` and `waybill_parties` with new column names `routes` and `waybills`

In [0]:
explode_routes_waybill = (expand_inter_waybill.withColumn("route", explode("route"))
           .withColumn("waybill_parties", explode("waybill_parties"))
           )
display(explode_routes_waybill)

# Select columns needed for Movement table

In [0]:
df_movement = explode_routes_waybill.select('input_file_name','current_timestamp','carrier','carrier_timezone','equipment_initial','equipment_number' 
                                            ,'report_type','load_status' ,'train_id','event_datetime','splc','from_carrier_code','interchange_timestamp'
                                            ,'to_carrier_code','shipment_qualifier','waybill_number').distinct()

## Remove leading and trailing spaces from all columns except `train_id`

Load function `remove_leading_trailing_spaces` from `Common Functions` notebook

In [0]:
df_trim = remove_leading_trailing_spaces(df_movement, 'train_id')
display(df_trim.tail(10))

## Add source file name, select and rename required columns
1. Extract only file name from column `input_file_name` and rename to `FileName` 
2. Adjust `event_datetime` for time zone based on the `carrier_timezone`
3. Get `WaybillId` column
4. Select required columns and rename

In [0]:
# from pyspark.sql.functions import lit, substring, from_utc_timestamp, concat
# from pyspark.sql.types import DateType, IntegerType, TimestampType

transform_columns = (df_trim.withColumn("input_file_name",(substring("input_file_name",24,27))) # or use lit(f"{FILE_NAME}")) 
                            .withColumn("event_datetime_adjusted", from_utc_timestamp(col("event_datetime"), col("carrier_timezone"))) 
                            .withColumn("WaybillId", concat(col("carrier"),col("waybill_number"),col("interchange_timestamp")))
                            .select(col("input_file_name").alias("FileName") 
                                   ,col("current_timestamp").alias("ProcessDatetime").cast(TimestampType())
                                   ,col("equipment_initial").alias("EquipmentInitial") 
                                   ,col("equipment_number").alias("EquipmentNumber").cast(IntegerType())
                                   ,col("report_type").alias("ReportType")
                                   ,col("carrier").alias("ReportingRoad")
                                   ,col("load_status").alias("LoadStatus")
                                   ,col("train_id").alias("TrainID")
                                   ,col("event_datetime_adjusted").alias("EventDate").cast(DateType())
                                   ,col("event_datetime_adjusted").alias("EventDatetime").cast(TimestampType())
                                   ,col("splc").alias("SPLC").cast(IntegerType())
                                   ,col("from_carrier_code").alias("FromRoad")
                                   ,col("to_carrier_code").alias("ToRoad")
                                   ,col("WaybillId")
                                   ,col("shipment_qualifier").alias("JointServiceCode")
                                                )
                     )
display(transform_columns)

## FromRoad and ToRoad
Populate FromRoad and ToRoad if report_type (ReportType) is either 'ICHD' or 'ICHR'

In [0]:
## from pyspark.sql.functions import when
transform_columns = (transform_columns.withColumn("FromRoad", when(transform_columns.ReportType == 'ICHD',transform_columns.FromRoad)
                                                              .when(transform_columns.ReportType == 'ICHR',transform_columns.FromRoad)
                                                              .otherwise(None))
                                      .withColumn("ToRoad", when(transform_columns.ReportType == 'ICHD',transform_columns.ToRoad)
                                                              .when(transform_columns.ReportType == 'ICHR',transform_columns.ToRoad)
                                                              .otherwise(None))
)

In [0]:
transform_columns.display()

##### Note: The mapping for `City`, `State`, `WaybillVersion` and `ClientEventCode` are missing from the json


To get `City` and `State` I would join the splc table to the movement table on SPLC merging only City and State.
 

## DQ Check - Schema and data types
Run some checks on our columns and dtypes to match sure they meet requirements

In [0]:
assert transform_columns.columns == ['FileName','ProcessDatetime','EquipmentInitial','EquipmentNumber','ReportType','ReportingRoad'
                                     ,'LoadStatus','TrainID','EventDate','EventDatetime','SPLC','FromRoad','ToRoad'
                                     ,'WaybillId','JointServiceCode']
assert transform_columns.dtypes == [('FileName', 'string'),('ProcessDatetime', 'timestamp'),('EquipmentInitial', 'string')
                                     ,('EquipmentNumber', 'int'),('ReportType', 'string'),('ReportingRoad', 'string')
                                     ,('LoadStatus', 'string'),('TrainID', 'string'),('EventDate', 'date'), ('EventDatetime', 'timestamp')
                                     ,('SPLC', 'int'),('FromRoad', 'string'),('ToRoad', 'string'),('WaybillId', 'string')
                                     ,('JointServiceCode', 'string')]

# Write to Table

## Create Schema `greenbrier_projects`

In [0]:
%sql
create schema if not exists ${output_schema_name};

In [0]:
%sql
--drop schema ${output_schema_name};
--drop table ${output_schema_name}.${output_table_name};

## Create Temp view `movement_vw` from the dataframe

In [0]:
temp_view = transform_columns.createOrReplaceTempView("movement_vw")

## Use *INSERT INTO* to add records to `movenment` delta table

### Create table and insert data using SQL

In [0]:
%sql
create or replace table ${output_schema_name}.${output_table_name} (FileName		                    string,
                                                                    ProcessDatetime	                timestamp,
                                                                    EquipmentInitial	              string, 
                                                                    EquipmentNumber		              int not null,
                                                                    ReportType                      string,
                                                                    ReportingRoad		                string,
                                                                    LoadStatus		                  string,
                                                                    TrainID			                    string,
                                                                    EventDate		                    date,
                                                                    EventDatetime		                timestamp,
                                                                    SPLC                            int,
                                                                    FromRoad                        string,
                                                                    ToRoad                          string,
                                                                    WaybillId                       string,
                                                                    JointServiceCode                string);
                                                                    
insert into ${output_schema_name}.${output_table_name}
select * from movement_vw;   -- select all from the temporary view movement_vw

select * from ${output_schema_name}.${output_table_name};

### Create table and insert data using pyspark

In [0]:
spark.sql(f"""create or replace table {OUTPUT_SCHEMA_NAME}.{OUTPUT_TABLE_NAME} (FileName		          string,
                                                                                  ProcessDatetime	      timestamp,
                                                                                  EquipmentInitial	      string,
                                                                                  EquipmentNumber		  int not null,
                                                                                  ReportType              string,
                                                                                  ReportingRoad		      string,
                                                                                  LoadStatus		      string,
                                                                                  TrainID			      string,
                                                                                  EventDate		          date,
                                                                                  EventDatetime		      timestamp,
                                                                                  SPLC                    int,
                                                                                  FromRoad                string,
                                                                                  ToRoad                  string,
                                                                                  WaybillId               string,
                                                                                  JointServiceCode        string);
          """)                                     

In [0]:
spark.sql(f"""
          insert into {OUTPUT_SCHEMA_NAME}.{OUTPUT_TABLE_NAME}
          select * from movement_vw   -- select all from the temporary view movement_vw
          """)

## Use CTAS to create table to `movement` delta table

In [0]:
# %sql
# use greenbrier_projects; -- use schema greenbrier_projects
# create or replace table movement
# as
# select * from movement_vw

In [0]:
spark.sql(f""" 
          create or replace table {OUTPUT_SCHEMA_NAME}.{OUTPUT_TABLE_NAME}
          as
          select * from movement_vw -- select all from the temporary view movement_vw
          """)

In [0]:
df = spark.sql(f"select * from {OUTPUT_SCHEMA_NAME}.{OUTPUT_TABLE_NAME}")
df.display()