# MAST30034 Applied Data Science Project 1

## Part 1: Preprocessing (I)

### Import Libraries and Create Spark Session

In [None]:
import pandas as pd
from pyspark.sql import SparkSession, functions as F
import os

In [None]:
spark = (
    SparkSession.builder.appName("MAST30034 Project 1-1")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.executor.memory", "10g")
    .config("spark.driver.memory", "10g")
    .config("spark.sql.session.timeZone",  "Etc/UTC")
    .getOrCreate()
)

### Read In Data

In [None]:
# Rent and taxi zone data
rent_df = pd.read_csv("../data/raw/rental_data/medianAskingRent_All.csv")
zones_df = pd.read_csv("../data/taxi_zones/taxi+_zone_lookup.csv")

# Trip data
sdf = spark.read.parquet('../data/raw/tlc_data/')
sdf_test = spark.read.parquet('../data/raw/tlc_test_data/')

In [None]:
CURATED_PATH = "../data/curated/"
if not os.path.exists(CURATED_PATH):
    os.makedirs(CURATED_PATH)

### Data Linkage: Link Rental Zones with Taxi Zones

In [None]:
# Filter for Manhattan and remove outlier zones
rent_df = rent_df.loc[(rent_df["areaType"] == "neighborhood")
                      & (rent_df["Borough"] == "Manhattan")]
zones_df = zones_df.loc[(zones_df["Borough"] == "Manhattan")
                        & (zones_df["Zone"] != "Randalls Island")
                        & (zones_df["Zone"] !=
                        "Governor's Island/Ellis Island/Liberty Island")
                        & (zones_df["Zone"] != "Central Park")
                        & (zones_df["Zone"] != "Marble Hill")]

# Manually link 50 neighbourhoods (dict in form - taxi zone: neighbourhood)
zone_dict = {
    "Alphabet City":"East Village",
    "Battery Park": "Battery Park City",
    "Bloomingdale": "Upper West Side",
    "Central Harlem North": "Central Harlem",
    "Clinton East": "Midtown West",
    "Clinton West": "Midtown West",
    "East Chelsea": "Chelsea",
    "East Harlem North": "East Harlem",
    "East Harlem South": "East Harlem",
    "Financial District North": "Financial District",
    "Financial District South": "Financial District",
    "Garment District": "Midtown",
    "Gramercy": "Gramercy Park",
    "Greenwich Village North": "Greenwich Village",
    "Greenwich Village South": "Greenwich Village",
    "Highbridge Park": "Washington Heights",
    "Hudson Sq": "Soho",
    "Inwood Hill Park": "Inwood",
    "Kips Bay": "Midtown East",
    "Lenox Hill East": "Upper East Side",
    "Lenox Hill West": "Upper East Side",
    "Lincoln Square East": "Upper West Side",
    "Lincoln Square West": "Upper West Side",
    "Little Italy/NoLiTa": "Little Italy",
    "Manhattan Valley": "Upper West Side",
    "Manhattanville": "West Harlem",
    "Meatpacking/West Village West": "West Village",
    "Midtown Center": "Midtown",
    "Midtown North": "Central Park South",
    "Murray Hill": "Midtown East",
    "Penn Station/Madison Sq West": "Chelsea",
    "Seaport": "Financial District",
    "SoHo": "Soho",
    "Stuy Town/Peter Cooper Village": "Stuyvesant Town/PCV",
    "Sutton Place/Turtle Bay North": "Midtown East",
    "Times Sq/Theatre District": "Midtown",
    "TriBeCa/Civic Center": "Tribeca",
    "Two Bridges/Seward Park": "Lower East Side",
    "UN/Turtle Bay South": "Midtown East",
    "Union Sq": "Flatiron",
    "Upper East Side North": "Upper East Side",
    "Upper East Side South": "Upper East Side",
    "Upper West Side North": "Upper West Side",
    "Upper West Side South": "Upper West Side",
    "Washington Heights North": "Washington Heights",
    "Washington Heights South": "Washington Heights",
    "West Chelsea/Hudson Yards": "Chelsea",
    "World Trade Center": "Financial District",
    "Yorkville East": "Upper East Side",
    "Yorkville West": "Upper East Side"
}
zones_df["rental_zone"] = zones_df["Zone"].replace(zone_dict)

In [None]:
# Join rent and zone data (April 2022 for test data)
RENT_COLS = ["areaName"] + [f"2021-{i:02}" for i in range(1, 13)] + ["2022-04"]
merged_df = pd \
            .merge(zones_df, rent_df[RENT_COLS], how="inner",
                   left_on="rental_zone", right_on="areaName") \
            .drop(["areaName"], axis=1)

# Output as a lookup file
merged_df.to_csv("../data/curated/rent_taxi_zone_lookup.csv")

### Filtering and Outlier Removal

In [None]:
# Filter records from Manhattan
manhattan_zone = list(set(zones_df["LocationID"]))

sdf_manhattan = sdf.where(
    (F.col('PULocationID').isin(manhattan_zone))
    & (F.col('DOLocationID').isin(manhattan_zone))
)

sdf_manhattan_test = sdf_test.where(
    (F.col('PULocationID').isin(manhattan_zone))
    & (F.col('DOLocationID').isin(manhattan_zone))
)

# Check the number of records
sdf.count(), sdf_manhattan.count(), sdf_manhattan_test.count()

In [None]:
# Remove outliers (in the order mentioned in report)
cleaned_sdf = sdf_manhattan.where(
    (F.col('pickup_datetime') < F.col('dropoff_datetime'))
    & (F.year('pickup_datetime') == 2021)
    & (F.year('dropoff_datetime') == 2021)
    & (F.col('airport_fee') == 0)
    & (F.col("base_passenger_fare") > 0)
    & (F.col("driver_pay") > 0)
    & ((F.col("trip_miles") / (F.col("trip_time") / 3600)) < 120)  # >120mph
)

# Amount of records after outlier removal
cleaned_sdf.count()

### Output Data

In [None]:
# Full data for analysis and modelling
cleaned_sdf \
    .write \
    .mode('overwrite') \
    .parquet('../data/curated/full_data/manhattan_data_cleaned')

sdf_manhattan_test \
    .write \
    .mode('overwrite') \
    .parquet('../data/curated/full_data/manhattan_data_test')

Random sample of 5% of records has been taken to save computational time in geospatial plotting, but full distribution of data is still used for analysis

In [None]:
# Sampled data for visualisation
cleaned_sdf \
    .select(['hvfhs_license_num', 'PULocationID', 'DOLocationID']) \
    .sample(0.05, seed=0) \
    .write \
    .mode('overwrite') \
    .parquet('../data/curated/sampled_data')