### Big data course project
<strong>T5: External data</strong>

Jovana Videnovic & Haris Kupinic

In [1]:
!hostnamectl

In [None]:
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
from pathlib import Path
import pandas as pd
import os
import numpy as np
from sklearn.neighbors import BallTree
import pyarrow.dataset as ds
import pyarrow as pa
import pyarrow.compute as pc
from datetime import timedelta


In [None]:
part_data_path = Path("/d/hpc/projects/FRI/bigdata/students/jv8043/partitioned_data")
service_type = "green"

In [None]:
add_data_path = Path("/d/hpc/home/jv8043/BD/project/T5/add_data")

In [None]:
augmented_data_path = Path("/d/hpc/projects/FRI/bigdata/students/jv8043/augmented_data_new") / service_type
augmented_data_path.mkdir(parents=True, exist_ok=True)

In [6]:
start_dates = {
    "yellow": pd.Timestamp("2012-01-01"),
    "green": pd.Timestamp("2014-01-01"),
    "fhv": pd.Timestamp("2015-01-01"),
    "fhvh": pd.Timestamp("2019-02-01"),
}
end_date = pd.Timestamp("2025-02-01")

In [7]:
dataset = ds.dataset(part_data_path / service_type, format="parquet", partitioning="hive")
table = dataset.to_table()

In [8]:
if service_type in ["yellow", "green"]:
    columns_to_drop = [
        "vendorid", "store_and_fwd_flag", "extra", "mta_tax", 
        "tolls_amount", "ehail_fee", "improvement_surcharge", "congestion_surcharge"
    ]

    existing_cols_to_drop = [col for col in columns_to_drop if col in table.schema.names]
    table = table.drop(existing_cols_to_drop)

    required_cols = [
    "pickup_lon", "pickup_lat", 
    "dropoff_lon", "dropoff_lat", 
    "pickup_datetime", "dropoff_datetime"
    ]

    # Apply non-null filters for each required column
    for col in required_cols:
        table = table.filter(pc.invert(pc.is_null(table[col])))

    start = pa.scalar(start_dates[service_type], type=pa.timestamp('us'))
    end = pa.scalar(end_date, type=pa.timestamp('us'))

    # pickup_datetime within [start, end)
    table = table.filter(pc.and_(
        pc.greater_equal(table["pickup_datetime"], start),
        pc.less(table["pickup_datetime"], end)
    ))

    # dropoff_datetime within [start, end)
    table = table.filter(pc.and_(
        pc.greater_equal(table["dropoff_datetime"], start),
        pc.less(table["dropoff_datetime"], end)
    ))

    # pickup_datetime <= dropoff_datetime
    table = table.filter(pc.less_equal(table["pickup_datetime"], table["dropoff_datetime"]))

In [9]:
schools_df = pd.read_csv(add_data_path / "schools.csv",)
colleges_df = pd.read_csv(add_data_path / "colleges.csv")
major_ba_df = pd.read_csv(add_data_path / "major_ba.csv")
weather_df = pd.read_csv(add_data_path / "weather.csv")

In [10]:
display(schools_df.head(1))
display(colleges_df.head(1))
display(major_ba_df.head(1))
display(weather_df.head(1))

#### Weather external data

In [11]:
# make all columns in weather_df small
weather_df.columns = weather_df.columns.str.lower()
# count nans in weather_df
nan_counts = weather_df.isna().sum()
print("NaN percentages in weather_df:")
print(nan_counts / weather_df.shape[0]  * 100)

In [12]:
# delete cols tavg, name, station
weather_df = weather_df.drop(columns=["tavg", "name", "station"])
weather_df["awnd"] = weather_df["awnd"].fillna(0)
weather_df["date"] = pd.to_datetime(weather_df["date"]) # ns format
# convert date to datetime
weather_df["date"] = weather_df["date"].astype('datetime64[us]')
float_cols = [
    "awnd",
    "prcp",
    "snow",
    "snwd",
    "tmax",
    "tmin",
]
for col in float_cols:
    weather_df[col] = weather_df[col].astype("float32")

In [13]:
weather_df.describe()

In [14]:
weather_df['date'] = weather_df['date'].dt.floor('D')
weather_df = weather_df.rename(columns={"date": "pickup_date"})
weather_table = pa.Table.from_pandas(weather_df, preserve_index=False)

In [15]:
pickup_date = pc.floor_temporal(table["pickup_datetime"], unit="day")
table = table.append_column("pickup_date", pickup_date)

In [16]:
merged_table = table.join(
    weather_table,
    "pickup_date",
)

In [17]:
merged_table.schema

In [18]:
if "pickup_date" in merged_table.schema.names:
    merged_table = merged_table.drop(["pickup_date"])
pickup_year = pc.year(merged_table["pickup_datetime"])
merged_table = merged_table.append_column("year", pickup_year)

In [19]:
ds.write_dataset(
    merged_table,
    base_dir=augmented_data_path / "weather_merged",
    format="parquet",
    partitioning=["year"],
    existing_data_behavior="overwrite_or_ignore",
    max_rows_per_file=6_000_000
)

#### Schools, colleges, and major bussinesses and atractions external data

In [None]:
dataset = ds.dataset("/d/hpc/projects/FRI/bigdata/students/jv8043/augmented_data/green/weather_merged", format="parquet", partitioning="hive")
table = dataset.to_table()

In [12]:
tlc_scbah_df = pd.read_csv(add_data_path / "tlc_zones_with_schools_colleges_ba_hotels_strict.csv")

In [13]:
display(tlc_scbah_df.head(5))

In [14]:
tlc_scbah_df = tlc_scbah_df.rename(columns={"lat": "pickup_lat", "lon": "pickup_lon"})
tlc_scbah_table = pa.Table.from_pandas(tlc_scbah_df, preserve_index=False)

In [15]:
merged_table1 = table.join(
    tlc_scbah_table,
    ["pickup_lat", "pickup_lon"],
)

In [16]:
# rename this three to closest_school, closest_college, closest_ba with _pickup suffix
merged_table1 = merged_table1.rename_columns({
    "closest_school_college": "closest_school_college_pickup",
    "closest_ba": "closest_ba_pickup",
    "closest_hotel": "closest_hotel_pickup"
})

In [17]:
tlc_scbah_df = pd.read_csv(add_data_path / "tlc_zones_with_schools_colleges_ba_hotels.csv")
tlc_scbah_df = tlc_scbah_df.rename(columns={"lat": "dropoff_lat", "lon": "dropoff_lon"})
tlc_scbah_table = pa.Table.from_pandas(tlc_scbah_df, preserve_index=False)
merged_table = merged_table1.join(
    tlc_scbah_table,
    ["dropoff_lat", "dropoff_lon"],
)
merged_table = merged_table.rename_columns({
    "closest_school_college": "closest_school_college_dropoff",
    "closest_ba": "closest_ba_dropoff",
    "closest_hotel": "closest_hotel_dropoff"
})

In [18]:
merged_table.schema

In [19]:
if "pickup_date" in merged_table.schema.names:
    merged_table = merged_table.drop(["pickup_date"])
pickup_year = pc.year(merged_table["pickup_datetime"])
merged_table = merged_table.append_column("year", pickup_year)

In [20]:
ds.write_dataset(
    merged_table,
    base_dir=augmented_data_path / "weather_scbah",
    format="parquet",
    partitioning=["year"],
    existing_data_behavior="overwrite_or_ignore",
    max_rows_per_file=6_000_000
)

### Events

In [33]:
events_df = pd.read_parquet(add_data_path / "nyc_events_augmented.parquet")
events_df["start_date_time"] = pd.to_datetime(events_df["start_date_time"])
events_df["end_date_time"] = pd.to_datetime(events_df["end_date_time"])
events_df["start_date_time"] = events_df["start_date_time"].astype('datetime64[us]')
events_df["end_date_time"] = events_df["end_date_time"].astype('datetime64[us]')
display(events_df.head(1))

In [34]:
events_table = pa.Table.from_pandas(events_df, preserve_index=False)

In [None]:
dataset = ds.dataset("/d/hpc/projects/FRI/bigdata/students/jv8043/augmented_data_new/green/weather_scbah", format="parquet", partitioning="hive")
table = dataset.to_table()

In [None]:
table.schema

In [None]:
# Add row indices to both tables for later reconstruction
taxi_with_idx = table.add_column(0, "taxi_idx", pa.array(range(len(table))))
events_with_idx = events_table.add_column(0, "event_idx", pa.array(range(len(events_table))))

# Create time windows for events
# Convert to microseconds for timestamp arithmetic
hour_in_microseconds = 3600 * 1000000

# Create expanded time windows
events_expanded = events_with_idx.add_column(
    len(events_with_idx.column_names),
    "pickup_window_start", 
    pc.subtract(events_with_idx["start_date_time"], pa.scalar(hour_in_microseconds, pa.int64()))
)

events_expanded = events_expanded.add_column(
    len(events_expanded.column_names),
    "pickup_window_end",
    events_expanded["end_date_time"]
)

events_expanded = events_expanded.add_column(
    len(events_expanded.column_names),
    "dropoff_window_start",
    events_expanded["start_date_time"]
)

events_expanded = events_expanded.add_column(
    len(events_expanded.column_names),
    "dropoff_window_end",
    pc.add(events_expanded["end_date_time"], pa.scalar(hour_in_microseconds, pa.int64()))
)

# Function to perform conditional join with time windows
def conditional_join_with_time_window(taxi_table, events_table, 
                                    taxi_time_col, taxi_lat_col, taxi_lon_col,
                                    window_start_col, window_end_col):
    """
    Perform a conditional join based on time windows and exact location match
    """
    
    # Add dummy key for cross join
    taxi_cross = taxi_table.add_column(len(taxi_table.column_names), "key", pa.array([1] * len(taxi_table)))
    events_cross = events_table.add_column(len(events_table.column_names), "key", pa.array([1] * len(events_table)))
    
    # Perform cross join
    joined = taxi_cross.join(events_cross, keys=["key"], join_type="inner")
    
    # Create filter conditions
    # Time condition: taxi_time >= window_start AND taxi_time <= window_end
    time_condition = pc.and_(
        pc.greater_equal(joined[taxi_time_col], joined[window_start_col]),
        pc.less_equal(joined[taxi_time_col], joined[window_end_col])
    )
    
    # Location condition: exact match on lat/lon
    location_condition = pc.and_(
        pc.equal(joined[taxi_lat_col], joined["latitude"]),
        pc.equal(joined[taxi_lon_col], joined["longitude"])
    )
    
    # Combined condition
    combined_condition = pc.and_(time_condition, location_condition)
    
    # Filter the joined table
    filtered = joined.filter(combined_condition)
    
    return filtered

# Process pickup events
print("Processing pickup events...")
pickup_matches = conditional_join_with_time_window(
    taxi_with_idx, events_expanded,
    "pickup_datetime", "pickup_lat", "pickup_lon",
    "pickup_window_start", "pickup_window_end"
)

# Process dropoff events  
print("Processing dropoff events...")
dropoff_matches = conditional_join_with_time_window(
    taxi_with_idx, events_expanded,
    "dropoff_datetime", "dropoff_lat", "dropoff_lon", 
    "dropoff_window_start", "dropoff_window_end"
)

# Handle multiple matches - take first match for each taxi ride
def get_first_match_per_taxi(matches_table):
    """
    Group by taxi_idx and take the first event_type for each taxi ride
    """
    if len(matches_table) == 0:
        return pa.table({
            "taxi_idx": pa.array([], type=pa.int64()),
            "event_type": pa.array([], type=pa.string())
        })
    
    # Sort by taxi_idx, then by event_idx to ensure consistent ordering
    sorted_matches = matches_table.sort_by([
        ("taxi_idx", "ascending"),
        ("event_idx", "ascending")
    ])
    
    # Group by taxi_idx and take first of each group
    # Since PyArrow doesn't have a direct group_by.first(), we'll use a different approach
    taxi_indices = sorted_matches["taxi_idx"].to_pylist()
    event_types = sorted_matches["event_type"].to_pylist()
    
    # Find first occurrence of each taxi_idx
    seen_taxis = set()
    first_matches_taxi = []
    first_matches_event = []
    
    for taxi_idx, event_type in zip(taxi_indices, event_types):
        if taxi_idx not in seen_taxis:
            seen_taxis.add(taxi_idx)
            first_matches_taxi.append(taxi_idx)
            first_matches_event.append(event_type)
    
    return pa.table({
        "taxi_idx": pa.array(first_matches_taxi),
        "event_type": pa.array(first_matches_event)
    })

# Get first matches for pickup and dropoff
pickup_first = get_first_match_per_taxi(pickup_matches)
dropoff_first = get_first_match_per_taxi(dropoff_matches)

# Create lookup arrays for the original table
pickup_lookup = pa.nulls(len(table), type=pa.string())
dropoff_lookup = pa.nulls(len(table), type=pa.string())

# Fill in the matched events
if len(pickup_first) > 0:
    pickup_indices = pickup_first["taxi_idx"].to_pylist()
    pickup_events = pickup_first["event_type"].to_pylist()
    
    # Create a new array with the matched events
    pickup_array = pa.nulls(len(table), type=pa.string()).to_pylist()
    for idx, event in zip(pickup_indices, pickup_events):
        pickup_array[idx] = event
    pickup_lookup = pa.array(pickup_array)

if len(dropoff_first) > 0:
    dropoff_indices = dropoff_first["taxi_idx"].to_pylist()
    dropoff_events = dropoff_first["event_type"].to_pylist()
    
    # Create a new array with the matched events  
    dropoff_array = pa.nulls(len(table), type=pa.string()).to_pylist()
    for idx, event in zip(dropoff_indices, dropoff_events):
        dropoff_array[idx] = event
    dropoff_lookup = pa.array(dropoff_array)

# Add the new columns to the original table
final_table = table.add_column(len(table.column_names), "pickup_event_type", pickup_lookup)
final_table = final_table.add_column(len(final_table.column_names), "dropoff_event_type", dropoff_lookup)

print(f"Total taxi rides: {len(final_table)}")
print(f"Rides with pickup events: {pc.sum(pc.is_valid(final_table['pickup_event_type'])).as_py()}")
print(f"Rides with dropoff events: {pc.sum(pc.is_valid(final_table['dropoff_event_type'])).as_py()}")