# Bronze Layer to Silver Layer

Note: This notebook uses Azure Synapse Analytics with PySpark

Here are the general steps to go from Bronze Layer to Silver Layer

1. Load unioned bronze data
2. Add trip duration in minutes to the data
3. Filter the data
4. Join in Zone ID data
5. Transform the data
6. Save the data

In [1]:
import pyspark.sql.functions as F
from pyspark.ml import Transformer, Pipeline

StatementMeta(ExecSmall, 39, 2, Finished, Available, Finished)

The raw bronze data was contactenated into one dataframe called union.
Please review Concat Bronze notebook to see how this was done.

In [None]:
%%pyspark
df = spark.read.load("<YOUR BASE PATH GOES HERE>/bronze/union", format='parquet')

StatementMeta(ExecSmall, 39, 3, Finished, Available, Finished)

### Definitions for Filters and Transformations

#### Filters

In [11]:
class FilterVendorID(Transformer):
  """
  Filters Vendor ID.
  Values must be 1 or 2
  """
  def __init__(self, input_col="VendorID"):
    self.input_col = input_col

  def _transform(self, df):
    return df.filter((F.col(self.input_col).isin([1,2])) & (F.col(self.input_col).isNotNull()))


class FilterTimestamps(Transformer):
  """
  Filters PU and DO timestamps
  Timestamps must be bestween 2021-01-01 00:00:00 and 2025-01-01 00:00:00
  Removes null values
  """
  def __init__(self, input_cols=["tpep_pickup_datetime", "tpep_dropoff_datetime"]):
    self.input_cols = input_cols

  def _transform(self, df):
    for input_col in self.input_cols:
      df = df.filter( 
        (F.col(input_col) >= "2021-01-01 00:00:00")
        & (F.col(input_col) < "2025-01-01 00:00:00")
        & (F.col(input_col).isNotNull()))
    return df

class FilterPassengerCount(Transformer):
  """
  Filters passenger count
  Passenger Count must be between 1 and 6
  Value cannot be null
  """
  def __init__(self, input_col="passenger_count", filter_value=list(range(1,264))):
    self.input_col = input_col
    self.filter_value = filter_value

  def _transform(self, df):
    return df.filter(
        (F.col(self.input_col).isin(self.filter_value)) 
        & (F.col(self.input_col).isNotNull()))


class FilterDistance(Transformer):
  """
  Filters trip distance data.
  Distance must be > 0 and <= 30 miles
  Value cannot be null
  """
  def __init__(self, input_col="trip_distance", low_lim=0, up_lim=30):
    self.input_col = input_col
    self.low_lim = low_lim
    self.up_lim = up_lim

  def _transform(self, df):
    return df.filter(
        (F.col(self.input_col) > self.low_lim) 
        & (F.col(self.input_col) <= self.up_lim) 
        & (F.col(self.input_col).isNotNull()))
  

class FilterLocationID(Transformer):
  """
  Filters Location IDs for PU and DO locations
  Must be between [1,263]
  Value cannot be null
  """
  def __init__(self, input_cols=["PULocationID", "DOLocationID"], filter_value=list(range(1,264))):
    self.input_cols = input_cols
    self.filter_value = filter_value

  def _transform(self, df):
    for input_col in self.input_cols:
      df = df.filter((F.col(input_col).isin(self.filter_value)) & (F.col(input_col).isNotNull()))
    return df


class FilterRateCodeId(Transformer):
  """
  Filters Rate Code ID
  Must be between [1,6]
  Value cannot be null
  """
  def __init__(self, input_col="payment_type", filter_value=list(range(1,7))):
    self.input_col = input_col
    self.filter_value = filter_value

  def _transform(self, df):
    return df.filter((F.col(self.input_col).isin(self.filter_value)) & (F.col(self.input_col).isNotNull()))


class FilterPayementType(Transformer):
  """
  Filters Payment Type
  Must be between 1 or 2
  Value cannot be null
  """
  def __init__(self, input_col="RatecodeID", filter_value=list(range(1,3))):
    self.input_col = input_col
    self.filter_value = filter_value

  def _transform(self, df):
    return df.filter((F.col(self.input_col).isin(self.filter_value)) & (F.col(self.input_col).isNotNull()))


class FilterFareAmount(Transformer):
  """
  Filters fare amount
  Price > $0 and <= $70
  Value cannot be null
  """
  def __init__(self, input_col="fare_amount", low_lim=0, up_lim=70):
    self.input_col = input_col
    self.low_lim = low_lim
    self.up_lim = up_lim

  def _transform(self, df):
    return df.filter(
        (F.col(self.input_col) > self.low_lim)
        & (F.col(self.input_col) <= self.up_lim) 
        & (F.col(self.input_col).isNotNull())
        )
        
    
class FilterExtra(Transformer):
  """
  Filters extra
  Price >= $0 and <= $15
  Null values will be imputed as 0
  """
  def __init__(self, input_col="extra", low_lim=0, up_lim=15):
    self.input_col = input_col
    self.low_lim = low_lim
    self.up_lim = up_lim

  def _transform(self, df):
    return df.filter(
        (F.col(self.input_col) >= self.low_lim)
        & (F.col(self.input_col) <= self.up_lim)
        ).fillna(0, subset=[self.input_col])


class FilterMTATax(Transformer):
  """
  Filters MTA Tax
  Value must be either 0 or 0.50
  Null values will be imputed as 0
  """
  def __init__(self, input_col="mta_tax", filter_values=[0,0.5]):
    self.input_col = input_col
    self.filter_values = filter_values

  def _transform(self, df):
    return df.filter(F.col(self.input_col).isin(self.filter_values)
        ).fillna(0, subset=[self.input_col])


class FilterImprovementSurcharge(Transformer):
  """
  Filters Improvement Surcharge
  Value must be either 0, 0.30, or 1.00
  Null values will be imputed as 0
  """
  def __init__(self, input_col="improvement_surcharge", filter_values=[0,0.30,1.00]):
    self.input_col = input_col
    self.filter_values = filter_values

  def _transform(self, df):
    return df.filter(F.col(self.input_col).isin(self.filter_values)
        ).fillna(0, subset=[self.input_col])


class FilterTipAmount(Transformer):
  """
  Filters tip amount
  Price >= $0
  Null values will be imputed as 0
  """
  def __init__(self, input_col="tip_amount", low_lim=0):
    self.input_col = input_col
    self.low_lim = low_lim

  def _transform(self, df):
    return df.filter(F.col(self.input_col) >= self.low_lim).fillna(0, subset=[self.input_col])


class FilterTollsAmount(Transformer):
  """
  Filters tolls amount
  Price >= $0 and <= $30
  Null values will be imputed as 0
  """
  def __init__(self, input_col="tolls_amount", low_lim=0, up_lim=30):
    self.input_col = input_col
    self.low_lim = low_lim
    self.up_lim = up_lim

  def _transform(self, df):
    return df.filter(
        (F.col(self.input_col) >= self.low_lim)
        & (F.col(self.input_col) <= self.up_lim)
        ).fillna(0, subset=[self.input_col])


class FilterCongestionSurcharge(Transformer):
  """
  Filters Congestion Surcharge
  Value must be either 0, 2.50
  Null values will be imputed as 0
  """
  def __init__(self, input_col="congestion_surcharge", filter_values=[0,2.50]):
    self.input_col = input_col
    self.filter_values = filter_values

  def _transform(self, df):
    return df.filter(F.col(self.input_col).isin(self.filter_values)).fillna(0, subset=[self.input_col])


class FilterAirportFee(Transformer):
  """
  Filters Airport Fee
  Value must be $0.00, $1.25, or $1.75
  Null values will be imputed as 0
  """
  def __init__(self, input_col="airport_fee", filter_values=[0,1.25,1.75]):
    self.input_col = input_col
    self.filter_values = filter_values

  def _transform(self, df):
    return df.filter(F.col(self.input_col).isin(self.filter_values)).fillna(0, subset=[self.input_col])


class FilterDuration(Transformer):
  """
  Filters trip duration data.
  Duration > 1 minute and capped at 90 minutes
  """
  def __init__(self, input_col="trip_duration_min", filter_value = 1):
    self.input_col = input_col
    self.filter_value = filter_value

  def _transform(self, df):
    return df.filter((F.col(self.input_col) > 1.0) & (F.col(self.input_col) <= 90.0) & (F.col(self.input_col).isNotNull()))

StatementMeta(ExecSmall, 39, 12, Finished, Available, Finished)

#### Transformers

In [4]:
class ExtractDoW(Transformer):
  """
  Extracts the Day-of-Week from the timestamp
  returns STRING
  """
  def __init__(self, input_col="tpep_pickup_datetime", output_col="pu_dow"):
    self.input_col = input_col
    self.output_col = output_col

  def _transform(self, df):
    return df.withColumn(self.output_col, F.date_format(self.input_col, "E"))


class ExtractDoY(Transformer):
  """
  Extracts the Day-of-Year from the timestamp
  returns INT
  """
  def __init__(self, input_col="tpep_pickup_datetime", output_col="pu_doy"):
    self.input_col = input_col
    self.output_col = output_col

  def _transform(self, df):
    return df.withColumn(self.output_col, F.date_format(self.input_col, "D").cast("int"))


class ExtractDoM(Transformer):
  """
  Extracts the Day-of-Month from the timestamp
  returns INT
  """
  def __init__(self, input_col="tpep_pickup_datetime", output_col="pu_dom"):
    self.input_col = input_col
    self.output_col = output_col

  def _transform(self, df):
    return df.withColumn(self.output_col, F.date_format(self.input_col, "d").cast("int"))


class ExtractYear(Transformer):
  """
  Extracts the Year from the timestamp
  returns INT
  """
  def __init__(self, input_col="tpep_pickup_datetime", output_col="pu_year"):
    self.input_col = input_col
    self.output_col = output_col

  def _transform(self, df):
    return df.withColumn(self.output_col, F.date_format(self.input_col, "y").cast("int"))


class ExtractMonth(Transformer):
  """
  Extracts the Month from the timestamp
  returns INT
  """
  def __init__(self, input_col="tpep_pickup_datetime", output_col="pu_month"):
    self.input_col = input_col
    self.output_col = output_col

  def _transform(self, df):
    return df.withColumn(self.output_col, F.date_format(self.input_col, "M").cast("int"))


class ExtractYearMonth(Transformer):
  """
  Extracts the Month from the timestamp
  returns String
  """
  def __init__(self, input_col="tpep_pickup_datetime", output_col="pu_year_month"):
    self.input_col = input_col
    self.output_col = output_col

  def _transform(self, df):
    return df.withColumn(self.output_col, F.concat(F.date_format(self.input_col, "y").cast("string"),F.lit("-"),F.date_format(self.input_col, "M").cast("string")))


class ExtractWeek(Transformer):
  """
  Extracts the Week number from the timestamp
  returns INT
  """
  def __init__(self, input_col="tpep_pickup_datetime", output_col="pu_week_number"):
    self.input_col = input_col
    self.output_col = output_col

  def _transform(self, df):
    return df.withColumn(self.output_col, F.weekofyear(self.input_col).cast("int"))


class ExtractDate(Transformer):
  """
  Extracts the date from the timestamp
  returns string
  """
  def __init__(self, input_col="tpep_pickup_datetime", output_col="pu_date"):
    self.input_col = input_col
    self.output_col = output_col

  def _transform(self, df):
    return df.withColumn(self.output_col, F.to_date(self.input_col, "yyyy-MM-dd"))


class ExtractHour(Transformer):
  """
  Extracts the Hour (0-23) from the timestamp
  returns INT
  """
  def __init__(self, input_col="tpep_pickup_datetime", output_col="pu_hour"):
    self.input_col = input_col
    self.output_col = output_col

  def _transform(self, df):
    return df.withColumn(self.output_col, F.date_format(self.input_col, "H").cast("int"))


class ExtractDurationSec(Transformer):
  """
  Extracts trip duration in seconds
  returns Numeric
  """
  def __init__(self, pu_col="tpep_pickup_datetime", do_col="tpep_dropoff_datetime", output_col="trip_duration_sec"):
    self.pu_col = pu_col
    self.do_col = do_col
    self.output_col = output_col

  def _transform(self, df):
    return df.withColumn(self.output_col, F.to_timestamp(F.col(self.do_col)).cast("long") - F.to_timestamp(F.col(self.pu_col)).cast("long"))


class ExtractDurationMin(Transformer):
  """
  Extracts trip duration in minutes
  returns Numeric
  """
  def __init__(self, pu_col="tpep_pickup_datetime", do_col="tpep_dropoff_datetime", output_col="trip_duration_min"):
    self.pu_col = pu_col
    self.do_col = do_col
    self.output_col = output_col

  def _transform(self, df):
    return df.withColumn(self.output_col, (F.to_timestamp(F.col(self.do_col)).cast("long") - F.to_timestamp(F.col(self.pu_col)).cast("long"))/60.0)


class ExtractTimeOfDay(Transformer):
  """
  Extracts the Time of Day from the PU timestamp
  returns STRING
  """
  def __init__(self, input_col="pu_hour", output_col="pu_tod"):
    self.input_col = input_col
    self.output_col = output_col


  def time_of_day(self, col_val):
    if col_val >= 0 and col_val < 3:
      tod = "0-3"
    elif col_val >= 3 and col_val < 6:
      tod = "3-6"
    elif col_val >= 6 and col_val < 9:
      tod = "6-9"
    elif col_val >= 9 and col_val < 12:
      tod = "9-12"
    elif col_val >= 12 and col_val < 15:
      tod = "12-15"
    elif col_val >= 15 and col_val < 18:
      tod = "15-18"
    elif col_val >= 18 and col_val < 21:
      tod = "18-21"
    elif col_val >= 21 and col_val < 24:
      tod = "21-24"
    return tod

  def _transform(self, df):
    tod_udf = F.udf(self.time_of_day, StringType())
    return df.withColumn(self.output_col, tod_udf(F.col(self.input_col)))


class ExtractWithinBorough(Transformer):
  """
  Extracts if day is Weekday or Weekend from the timestamp
  returns INT (1 = weekday, 0 = weekend)
  """
  def __init__(self, pu_borough="pu_borough", do_borough="do_borough", output_col="trip_within_borough"):
    self.pu_borough = pu_borough
    self.do_borough = do_borough
    self.output_col = output_col

  def _transform(self, df):
    return df.withColumn(self.output_col,
                         F.when((F.col(self.pu_borough) == F.col(self.do_borough)), 1)
                         .when((F.col(self.pu_borough) != F.col(self.do_borough)), 0)
                         .otherwise(F.lit(None)))


class ExtractPuDoBoroughs(Transformer):
  """
  Extracts PU Borough and DO Borough into 1 column
  returns STR
  """
  def __init__(self, pu_borough="pu_borough", do_borough="do_borough", output_col="pu_do_borough"):
    self.pu_borough = pu_borough
    self.do_borough = do_borough
    self.output_col = output_col

  def _transform(self, df):
    return df.withColumn(self.output_col, F.concat(F.col(self.pu_borough),F.lit('-'),F.col(self.do_borough)))


class ExtractWithinZone(Transformer):
  """
  Extracts if trip is within same zone
  returns INT (1 = yes, 0 = no)
  """
  def __init__(self, pu_zone="pu_zone", do_zone="do_zone", output_col="trip_within_zone"):
    self.pu_zone = pu_zone
    self.do_zone = do_zone
    self.output_col = output_col

  def _transform(self, df):
    return df.withColumn(self.output_col,
                         F.when((F.col(self.pu_zone) == F.col(self.do_zone)), 1)
                         .when((F.col(self.pu_zone) != F.col(self.do_zone)), 0)
                         .otherwise(F.lit(None)))
  
class ExtractRouteId(Transformer):
  """
  Concats the PU Location ID and the DO Location ID
  returns STRING
  """
  def __init__(self, pu_col="PULocationID", do_col="DOLocationID", output_col="route_id"):
    self.pu_col = pu_col
    self.do_col = do_col
    self.output_col = output_col

  def _transform(self, df):
    return df.withColumn(self.output_col, F.concat(self.pu_col, F.lit('-'), self.do_col))
  
class ExtractWeekday(Transformer):
  """
  Extracts if day is Weekday or Weekend from the timestamp
  returns INT (1 = weekday, 0 = weekend)
  """
  def __init__(self, input_col="tpep_pickup_datetime", output_col="pu_weekday"):
    self.input_col = input_col
    self.output_col = output_col

  def _transform(self, df):
    return df.withColumn(self.output_col,
                         F.when((F.date_format(self.input_col, "E").isin(["Mon", "Tue", "Wed", "Thu", "Fri"])), 1)
                         .when((F.date_format(self.input_col, "E").isin(["Sat", "Sun"])), 0)
                         .otherwise(F.lit(None)))

StatementMeta(ExecSmall, 39, 5, Finished, Available, Finished)

### 

### Begin the Process

Add trip duration in minutes

In [5]:
df = df.withColumn("trip_duration_min", (F.to_timestamp(F.col("tpep_dropoff_datetime")).cast("long") - F.to_timestamp(F.col("tpep_pickup_datetime")).cast("long"))/60.0)

StatementMeta(ExecSmall, 39, 6, Finished, Available, Finished)

#### Filter the Data

In [12]:
# Define the filters
filter_stages = [
    FilterVendorID(),
    FilterTimestamps(),
    FilterPassengerCount(),
    FilterDistance(),
    FilterLocationID(),
    FilterRateCodeId(),
    FilterPayementType(),
    FilterFareAmount(),
    FilterExtra(),
    FilterMTATax(),
    FilterImprovementSurcharge(),
    FilterTipAmount(),
    FilterTollsAmount(),
    FilterCongestionSurcharge(),
    FilterAirportFee(),
    FilterDuration()
  ]

StatementMeta(ExecSmall, 39, 13, Finished, Available, Finished)

In [13]:
# Filter the data
filter_pipe = Pipeline(stages=filter_stages)
filter_pipe_model = filter_pipe.fit(df)
df = filter_pipe_model.transform(df)

StatementMeta(ExecSmall, 39, 14, Finished, Available, Finished)

In [14]:
# Sanity check to make sure the filters didn't eliminate all the data
df.count()

StatementMeta(ExecSmall, 39, 15, Finished, Available, Finished)

128026248

## Join Zone data

In [None]:
# Load the zone map.
# This is done twice to keep things simple for doing two different joins.
# Note that the data is only 265 rows, so it's relatively easy to get away this with practice, but not optimal in general
zone_map = spark.read.load('<YOUR BASE PATH GOES HERE>/taxi_zone_lookup.csv', format='csv', header=True)
zone_map2 = spark.read.load('<YOUR BASE PATH GOES HERE>/taxi_zone_lookup.csv', format='csv', header=True)

StatementMeta(ExecSmall, 39, 16, Finished, Available, Finished)

In [16]:
# Join Zone Data onto Data
df = df.join(F.broadcast(zone_map[["Zone", "Borough", "LocationID"]]), df.PULocationID == zone_map.LocationID, "inner").select(df["*"], zone_map["Zone"].alias("pu_zone"), zone_map["Borough"].alias("pu_borough"))
df = df.join(F.broadcast(zone_map2[["Zone", "Borough", "LocationID"]]), df.DOLocationID == zone_map2.LocationID, "inner").select(df["*"], zone_map2["Zone"].alias("do_zone"), zone_map2["Borough"].alias("do_borough"))

StatementMeta(ExecSmall, 39, 17, Finished, Available, Finished)

#### Transform the Data

In [17]:
# Define the transformations
xform_stages = [
  ExtractDoW(),
  ExtractDoY(),
  ExtractDoM(),
  ExtractYear(),
  ExtractMonth(),
  ExtractYearMonth(),
  ExtractWeek(),
  ExtractDate(),
  ExtractHour(),
  ExtractDurationSec(),
  ExtractWithinBorough(),
  ExtractPuDoBoroughs(),
  ExtractWithinZone(),
  ExtractRouteId(),
  ExtractWeekday(),
]

StatementMeta(ExecSmall, 39, 18, Finished, Available, Finished)

In [18]:
# Transform the data
xform_pipe = Pipeline(stages=xform_stages)
xform_pipe_model = xform_pipe.fit(df)
df = xform_pipe_model.transform(df)

StatementMeta(ExecSmall, 39, 19, Finished, Available, Finished)

In [20]:
# Sanity check to make sure the transformations didn't eliminate all the data
df.count()

StatementMeta(ExecSmall, 39, 21, Finished, Available, Finished)

128026248

### Save the Data

In [None]:
# Save the unioned data to avoid re-processing raw files again
df.write.partitionBy("pu_year_month").parquet("<YOUR BASE PATH GOES HERE>/silver/union", mode='overwrite')

StatementMeta(ExecSmall, 39, 20, Finished, Available, Finished)