In [1]:
import pandas as pd
import torch
from torch.utils.data import DataLoader, Dataset
import pandas as pd
import pyarrow.parquet as pq

# torch.set_printoptions(profile='default')
# torch.set_printoptions(profile='full')
torch.set_printoptions(precision=6)

In [2]:
df_jan = pq.read_table('yellow_tripdata_2024-01.parquet', columns=["tpep_pickup_datetime"]).to_pandas()

# Cast columns
df_jan['tpep_pickup_datetime'] = pd.to_datetime(df_jan['tpep_pickup_datetime'], errors='coerce')

In [3]:
# Lets just get Jan 2024
df_agg = df_jan[["tpep_pickup_datetime"]].copy()
df_agg = df_agg.loc[df_agg["tpep_pickup_datetime"] > "2023-12-31 23:59:59"]
df_agg.reset_index(drop=True, inplace=True)

# Do the hourly sampling
df_hourly = df_agg.resample('h', on='tpep_pickup_datetime').size().reset_index(name="hourly_pickups")

# Create time-based features
df_hourly["hour"] = df_hourly["tpep_pickup_datetime"].dt.hour.astype(int)
df_hourly["dayofweek"] = df_hourly["tpep_pickup_datetime"].dt.dayofweek.astype(int)
df_hourly["dayofmonth"] = df_hourly["tpep_pickup_datetime"].dt.day.astype(int)
df_hourly["month"] = df_hourly["tpep_pickup_datetime"].dt.month.astype(int)

# see df_hourly
df_hourly.head(5)

Unnamed: 0,tpep_pickup_datetime,hourly_pickups,hour,dayofweek,dayofmonth,month
0,2024-01-01 00:00:00,6596,0,0,1,1
1,2024-01-01 01:00:00,7355,1,0,1,1
2,2024-01-01 02:00:00,6220,2,0,1,1
3,2024-01-01 03:00:00,4936,3,0,1,1
4,2024-01-01 04:00:00,3188,4,0,1,1


In [4]:
# Sequence to Sequence Dataset and Data Loader
# Read last five minutes and get the labels of the next five minutes
# x = [x1,x2,x3,x4,x5] where x1 in R4
# y = [y6,y7,y8,y9,y10]

class TaxiSeq2Seq(Dataset):
    def __init__(self, parquet_files, transform = None, input_window=1, output_window=1):
        # init class parameters
        self.input_window = input_window
        self.output_window = output_window
        self.transform = transform

        dataframes = []
        column_headers = ["tpep_pickup_datetime"]
        for filename in parquet_files:
            print(f"reading {filename}")
            df = pq.read_table(filename, columns=column_headers).to_pandas()

            # Append dataframes
            dataframes.append(df)
        
        # get data
        self.data = pd.concat(dataframes)

        # Cast columns
        self.data['tpep_pickup_datetime'] = pd.to_datetime(self.data['tpep_pickup_datetime'], errors='coerce')
        
        # PLEASE SEE THIS (ADDING IT TO FILTER DATA)
        self.data = self.data.loc[self.data["tpep_pickup_datetime"] > "2023-12-31 23:59:59"]

        # Sort dataframe by datetime
        self.data.sort_values(by="tpep_pickup_datetime", inplace=True)

        # Do the hourly sampling
        self.data = self.data.resample('h', on='tpep_pickup_datetime').size().reset_index(name="hourly_pickups")

        # Create time-based features
        self.data["hour"] = self.data["tpep_pickup_datetime"].dt.hour.astype(int)
        self.data["dayofweek"] = self.data["tpep_pickup_datetime"].dt.dayofweek.astype(int)
        self.data["dayofmonth"] = self.data["tpep_pickup_datetime"].dt.day.astype(int)
        self.data["month"] = self.data["tpep_pickup_datetime"].dt.month.astype(int)



    def __len__(self):
        '''
        Make sure that we do not access data that do not contain complete input and output
        Given a total dataset of N rows: 
            The first sample starts at index 0 and includes rows up to input_window + output_window - 1.
            The last sample starts at index N - input_window - output_window.
            Thus, the total number of valid samples is: len(self.data) - self.input_window - self.output_window + 1
            Assume:
            N = 10 rows.
            input_window = 3.
            output_window = 2.
            Valid Samples:
            Index	Input Window Rows	Output Window Rows
            0           Rows [0, 1, 2]      Rows [3, 4]
            1	        Rows [1, 2, 3]	    Rows [4, 5]
            2	        Rows [2, 3, 4]	    Rows [5, 6]
            3	        Rows [3, 4, 5]	    Rows [6, 7]
            4	        Rows [4, 5, 6]	    Rows [7, 8]
            5	        Rows [5, 6, 7]	    Rows [8, 9]
        len = N - input_window - output_window + 1 = 10 - 3 - 2 + 1 = 6 i.e. Dataloader will not go beyond 5
        '''
        return len(self.data) - self.input_window - self.output_window + 1
    
    def __getitem__(self, idx):
        '''
        It will be passed integer index internally whose range is determined by __len__
        '''
        input_start = idx
        input_end = idx + self.input_window
        output_start = input_end
        output_end = output_start + self.output_window

        # Get the input sequence 
        input_seq = self.data.iloc[input_start:input_end][["hourly_pickups","hour","dayofweek","dayofmonth", "month"]].to_numpy(dtype=float)
        output_seq = self.data.iloc[output_start:output_end][["hourly_pickups"]].to_numpy()


        return torch.tensor(input_seq, dtype=torch.float64), torch.tensor(output_seq, dtype=torch.long) 

In [5]:
# Init dataset
list_of_files = ['yellow_tripdata_2024-01.parquet']
input_window = 1 # Read last hour
output_window=1 # Output next next hour

# Read the dataset
dataset = TaxiSeq2Seq(parquet_files=list_of_files, input_window=input_window, output_window=output_window)

reading yellow_tripdata_2024-01.parquet


In [6]:
print(type(dataset))
print("-----")
print(dataset.__dict__.keys())
print("-----")
print(dataset.data.head(5))
print("-----")
print(type(dataset.data))
print("-----")
print(dataset.data.dtypes)

<class '__main__.TaxiSeq2Seq'>
-----
dict_keys(['input_window', 'output_window', 'transform', 'data'])
-----
  tpep_pickup_datetime  hourly_pickups  hour  dayofweek  dayofmonth  month
0  2024-01-01 00:00:00            6596     0          0           1      1
1  2024-01-01 01:00:00            7355     1          0           1      1
2  2024-01-01 02:00:00            6220     2          0           1      1
3  2024-01-01 03:00:00            4936     3          0           1      1
4  2024-01-01 04:00:00            3188     4          0           1      1
-----
<class 'pandas.core.frame.DataFrame'>
-----
tpep_pickup_datetime    datetime64[us]
hourly_pickups                   int64
hour                             int64
dayofweek                        int64
dayofmonth                       int64
month                            int64
dtype: object


In [9]:
dataset.data.head(5)

Unnamed: 0,tpep_pickup_datetime,hourly_pickups,hour,dayofweek,dayofmonth,month
0,2024-01-01 00:00:00,6596,0,0,1,1
1,2024-01-01 01:00:00,7355,1,0,1,1
2,2024-01-01 02:00:00,6220,2,0,1,1
3,2024-01-01 03:00:00,4936,3,0,1,1
4,2024-01-01 04:00:00,3188,4,0,1,1


In [10]:
# Lets check the dataloader
dataloader = DataLoader(dataset, batch_size=1)

In [11]:
# iterate through dataloader
ex_num = 0
for features, labels in dataloader:
    print(f"------Sequence#: {ex_num}  ------")
    print(f"features: {features}")
    print(f"labels: {labels}")
    print('\n')
    if ex_num == 1:
        break
    ex_num += 1

------Sequence#: 0  ------
features: tensor([[[6.596000e+03, 0.000000e+00, 0.000000e+00, 1.000000e+00, 1.000000e+00]]],
       dtype=torch.float64)
labels: tensor([[[7355]]])


------Sequence#: 1  ------
features: tensor([[[7.355000e+03, 1.000000e+00, 0.000000e+00, 1.000000e+00, 1.000000e+00]]],
       dtype=torch.float64)
labels: tensor([[[6220]]])


