In [0]:

file_location = "/FileStore/tables/trips.csv"
file_type = "csv"

infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

df.write.format("delta") \
.save("/delta/bronze_trips")

In [0]:

df = spark.read.format("delta") \
    .load("/delta/bronze_trips")

df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("silver_trips")

In [0]:
%sql
--enabling property to update column name
ALTER TABLE silver_trips SET TBLPROPERTIES (
    'delta.minReaderVersion' = '2',
    'delta.minWriterVersion' = '5',
    'delta.columnMapping.mode' = 'name'
  )

In [0]:
%sql
--renaming default column names to reflect more descriptive, column names
ALTER TABLE silver_trips 
RENAME COLUMN _c0 TO trip_id;

ALTER TABLE silver_trips 
RENAME COLUMN _c1 TO rideable_type;

ALTER TABLE silver_trips 
RENAME COLUMN _c2 TO start_at;

ALTER TABLE silver_trips 
RENAME COLUMN _c3 TO ended_at;

ALTER TABLE silver_trips 
RENAME COLUMN _c4 TO start_station_id;

ALTER TABLE silver_trips 
RENAME COLUMN _c5 TO end_station_id;

ALTER TABLE silver_trips 
RENAME COLUMN _c6 TO rider_id;


In [None]:

payTbl = spark.table("silver_payments")
tripsTbl = spark.table("silver_trips")
stnTbl = spark.table("silver_stations")
stnTbl2 = spark.table("silver_stations")
ridersTbl = spark.table("silver_riders")

from pyspark.sql.functions import *
fact_trips_df = payTbl.join(ridersTbl, payTbl.rider_id == ridersTbl.rider_id,"inner") \
    .join(tripsTbl, ridersTbl.rider_id == tripsTbl.rider_id,"inner") \
        .join(stnTbl, tripsTbl.start_station_id == stnTbl.station_id,"inner") \
            .join(stnTbl2, tripsTbl.end_station_id == stnTbl2.station_id,"inner") \
    .withColumn("age_at_trip", months_between(col("start_at"),col("birth_date"))/lit(12)) \
        .withColumn("minutes_between", (to_timestamp(col("ended_at")) - to_timestamp(col("start_at"))).cast("bigInt")/60) \
    .select(tripsTbl["trip_id"].alias("trip_key"), payTbl["payment_id"].alias("payment_key"), payTbl["rider_id"].alias("rider_key"), stnTbl["station_id"].alias("station_key"), payTbl["date"].alias("date_key"), floor("age_at_trip").alias("age_at_trip"), format_number("minutes_between",2).alias("trip_duration_min_sec")).dropDuplicates().sort("trip_id")                
                
fact_trips_df.show()

fact_trips_df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("fact_trips")