Possible demand features:
- Hour of the day
- day of the week
- month
- holiday indicator

- temperature
- precipitation

- pick-up and drop-off locations


In [135]:
import pandas as pd
from h3 import h3
from dask import dataframe
import geopandas as gpd

In [136]:
# load data
ddf = dataframe.read_parquet("data/taxi_data_preprocessed.gzip")
ddf_weather = dataframe.read_parquet("data/prepared/weather_data_hourly_prepared.gzip")

#df = pd.read_parquet("data/taxi_data_preprocessed.gzip")
#df_weather = pd.read_parquet("data/prepared/weather_data_hourly_prepared.gzip")

In [137]:
#ddf = dataframe.from_pandas(df.head(10000), npartitions=20)
#ddf_weather = dataframe.from_pandas(df_weather, npartitions=20)

In [138]:
ddf = ddf.repartition(npartitions=20)
ddf_weather = ddf_weather.repartition(npartitions=20)

## Calculate different Hexagons

In [78]:
H3_HEXAGON_RESOLUTIONS = [3,6,9,12]

In [33]:
# get all unique pickup locations
df_locations = ddf.pickup_centroid_location.unique().compute().to_frame()

In [17]:
# convert to geodataframe
df_locations["pickup_centroid_location"] = gpd.GeoSeries.from_wkt(df_locations["pickup_centroid_location"])
df_geo = gpd.GeoDataFrame(df_locations, geometry='pickup_centroid_location', crs=4326)

In [18]:
# get hexagons with different resolutions
for resolution in H3_HEXAGON_RESOLUTION:
    df_geo[f"h3_{resolution}"] = df_geo.apply(lambda row: h3.geo_to_h3(row.pickup_centroid_location.y, row.pickup_centroid_location.x, resolution), axis=1)

In [24]:
# safe the hexagons as an csv-file
df_geo.to_csv("hexagons.csv")

## Add and Adjust Data Columns

In [139]:
# change data type
ddf["trip_start_timestamp"] = dataframe.to_datetime(ddf.trip_start_timestamp)
ddf["trip_end_timestamp"] = dataframe.to_datetime(ddf.trip_end_timestamp)

In [140]:
# add columns to taxi df
ddf["hour"] = ddf.trip_start_timestamp.dt.hour
ddf["date"] = ddf.trip_start_timestamp.dt.date

In [141]:
# add columns to weather df
ddf_weather["hour"] = ddf_weather.time.dt.hour
ddf_weather["date"] = ddf_weather.time.dt.date

In [114]:
# create timebins
def create_timebins(ddf, steps):
    for step in steps:
        bins = list(range(0,25,step))
        labels = range(len(bins)-1)
        ddf[f"time_bin_{step}"] = ddf["hour"].map_partitions(pd.cut, bins=bins,labels=labels, right=False, include_lowest=True)
    return ddf

In [115]:
# create timebin columns
ddf = create_timebins(ddf, [1,2,6,24])

In [116]:
# add weekday and month columns
ddf["weekday"] = ddf.trip_start_timestamp.dt.weekday
ddf["month"] = ddf.trip_start_timestamp.dt.month

In [56]:
# select columns
ddf_features = ddf[["hour", "weekday", "time_bin_1", "time_bin_2", "time_bin_6", "time_bin_24", "month", "date", "pickup_census_tract", "pickup_centroid_location"]]

In [117]:
# join dfs
ddf_features = ddf_features.merge(ddf_weather, on=["date", "hour"], how="inner")

### Data grouped by census tract

#### Group using different timebins

In [74]:
def group_by_different_timebins_to_csv(ddf, time_steps, location_dimension):
    for step in time_steps:  
        df_grouped = (ddf.groupby(by=["date", f"time_bin_{step}", location_dimension])
         .agg({
            "time": "size",
            'relativehumidity_2m (%)': "mean",
            "temperature_2m (°C)": "mean",
            "apparent_temperature (°C)": "mean",
            "precipitation (mm)": "mean",
            "cloudcover (%)": "mean",
            "windspeed_10m (km/h)": "mean",
            "weekday": "mean",
            "month": "mean"
            }
            )
         .rename(columns={"time": "ntrips"})
         .reset_index().compute()
        )
        df_grouped = df_grouped.dropna()
        df_grouped.to_csv(f"{location_dimension}_time_bin_{step}.csv")


In [15]:
# group data by different timebins and census tracts
group_by_different_timebins_to_csv(ddf_features, [1,2,6,24], "pickup_census_tract")

### Data grouped by H3

In [66]:
# load hexagons
ddf_h3 = dataframe.read_csv("hexagons.csv").drop(columns="Unnamed: 0")

In [67]:
ddf_h3

Unnamed: 0_level_0,pickup_centroid_location,h3_3,h3_6,h3_9,h3_12
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
,object,object,object,object,object
,...,...,...,...,...


In [72]:
# join hexagons
ddf_features = ddf_features.merge(ddf_h3, on="pickup_centroid_location")

In [79]:
# group by different timebins and hexagon resolutions and save as csv
for resolution in H3_HEXAGON_RESOLUTIONS:    
    
    # group data by time and location dimension
    group_by_different_timebins_to_csv(ddf_features, [1,2,6,24], f"h3_{resolution}")

start


## TEST

In [142]:
df = ddf.compute()
df_weather = ddf_weather.compute()

In [144]:
df2 = df[["hour", "date", "pickup_census_tract"]]

In [149]:
df_grouped = df2.groupby(by=["date", "hour", "pickup_census_tract"]).size().reset_index().rename(columns={0:"trip_count"})

In [157]:
df_grouped["date"] = pd.to_datetime(df_grouped["date"])

In [160]:
df_grouped["month"] = df_grouped.date.dt.month

In [161]:
df_grouped["weekday"] = df_grouped.date.dt.weekday

In [162]:
df_grouped

Unnamed: 0,date,hour,pickup_census_tract,trip_count,month,weekday
0,2016-01-01,0,17031010202,1,1,4
1,2016-01-01,0,17031010502,3,1,4
2,2016-01-01,0,17031010702,2,1,4
3,2016-01-01,0,17031020301,1,1,4
4,2016-01-01,0,17031030300,17,1,4
...,...,...,...,...,...,...
541393,2016-12-31,23,17031841900,3,12,5
541394,2016-12-31,23,17031842200,18,12,5
541395,2016-12-31,23,17031842300,9,12,5
541396,2016-12-31,23,17031980000,18,12,5


In [168]:
df_weather.date = pd.to_datetime(df_weather.date)

In [169]:
df_joined = df_grouped.merge(df_weather, on=["date", "hour"], how="left")

In [177]:
df_joined.isna().any()

date                         False
hour                         False
pickup_census_tract          False
trip_count                   False
month                        False
weekday                      False
time                         False
temperature_2m (°C)          False
relativehumidity_2m (%)      False
apparent_temperature (°C)    False
precipitation (mm)           False
cloudcover (%)               False
windspeed_10m (km/h)         False
dtype: bool

In [174]:
df_joined.to_csv("df_grouped_test.csv")

In [86]:
result = ddf[["trip_start_timestamp", "time_bin_1", "hour"]].compute()

In [118]:
# Group the data by hour and census tract, and count the number of trips in each group
trip_counts = ddf.groupby([ddf["trip_start_timestamp"].dt.date ,ddf['trip_start_timestamp'].dt.hour, 'pickup_census_tract']).size().reset_index('trip_count')

# Print the resulting trip counts
print(trip_counts)

Dask Series Structure:
npartitions=1
    int64
      ...
dtype: int64
Dask Name: reset_index, 709 tasks
