### Based on https://github.com/FalkonML/falkon/blob/master/notebooks/NycTaxiDataset.ipynb

In [1]:
import psycopg2
import sys, os, time
import subprocess

In [2]:
csv_size = 3_000_000
folder = os.path.abspath("./taxi_data_preprocessed")
try:
    os.makedirs(folder)
except FileExistsError:
    pass

### Preprocess data into compressed CSVs using SQL

In [3]:
def make_copy_sql(from_id, num_rows, out_file):
    query = """
    COPY (
        SELECT
            id,
            EXTRACT(EPOCH FROM CAST(pickup_datetime AS time)) as time,
            EXTRACT(ISODOW FROM pickup_datetime) as dow,
            EXTRACT(DAY FROM pickup_datetime) as dom,
            EXTRACT(MONTH FROM pickup_datetime) as month,
            round(pickup_latitude, 6) as pickup_lat,
            round(pickup_longitude, 6) as pickup_lon,
            round(dropoff_latitude, 6) as dropoff_lat,
            round(dropoff_longitude, 6) as dropoff_lon,
            round(trip_distance, 3) as distance,
            EXTRACT(EPOCH FROM dropoff_datetime - pickup_datetime) as duration
        FROM trips 
        WHERE 
            (
                (EXTRACT(YEAR FROM pickup_datetime) < 2011 AND (
                    pickup_nyct2010_gid IS NOT NULL AND 
                    dropoff_nyct2010_gid IS NOT NULL
                )) OR
                (EXTRACT(YEAR FROM pickup_datetime) >= 2011 AND (
                    pickup_location_id < 264 AND 
                    dropoff_location_id < 264
                ))
            ) AND
            (EXTRACT(EPOCH FROM dropoff_datetime - pickup_datetime) BETWEEN 0 AND 18000) AND
            (id > %d)
        ORDER BY id ASC
        LIMIT %d
    ) TO '%s'
    WITH (FORMAT csv, HEADER true);
    """ % (from_id, num_rows, out_file)
    return query


In [4]:
conn = psycopg2.connect(database="nyc-taxi-data")

In [5]:
index = 0
i = 0
with conn.cursor() as cursor:
    while True:
        t_s = time.time()
        fn = os.path.join(folder, "%d.csv" % (index))
        # Postgres server needs to have permission to create the file
        os.umask(0)
        with open(os.open(fn, os.O_CREAT | os.O_WRONLY, 0o777), 'w') as fh:
            cursor.copy_expert(make_copy_sql(index, csv_size, fn), fh)
        # Read last line of written file to check the new index
        last_line = os.popen('tail -n 1 %s' % (fn)).read()
        # if last_line.startswith("id"):
        #     break

        fh.close()

        try:
            index = int(last_line.split(",")[0])
        except ValueError:
            print("Error: %s" % (last_line))
            break
        # Compress the file
        # os.popen('gzip -f %s' % (fn))
        subprocess.run(['gzip', '-f', fn], check=True)

        i += 1
        print("%d - %.2fs - Retrieved new start ID %d" % (i, time.time() - t_s, index))

1 - 43.62s - Retrieved new start ID 3378753
2 - 33.46s - Retrieved new start ID 6378763
3 - 28.67s - Retrieved new start ID 9378765
4 - 23.96s - Retrieved new start ID 12378772
5 - 10.31s - Retrieved new start ID 13464997
Error: id,time,dow,dom,month,pickup_lat,pickup_lon,dropoff_lat,dropoff_lon,distance,duration



### Turn zipped csv files into a compressed h5py dataset

In [6]:
import pandas as pd
import h5py
import time
import os
import numpy as np

In [7]:
data_folder = "./taxi_data_preprocessed"
def list_files(folder):
    for r, d, f in os.walk(folder):
        for file in f:
            if file.endswith('.csv.gz'):
                yield os.path.join(r, file)

In [8]:
all_x = []
all_y = []
for f in list_files(data_folder):
    t_s = time.time()
    print(f)
    df = pd.read_csv(f, compression='gzip', header=0, index_col=False)
    Y = df['duration'].to_numpy(np.int32, copy=True)
    X = df[['time', 'dow', 'dom', 'month', 'pickup_lat',
            'pickup_lon', 'dropoff_lat', 'dropoff_lon',
            'distance']].to_numpy(np.float64, copy=True)
    all_x.append(X)
    all_y.append(Y)
    del df
    print("file %s read in %.2fs" % (f, time.time() - t_s))

./taxi_data_preprocessed/9378765.csv.gz
file ./taxi_data_preprocessed/9378765.csv.gz read in 5.42s
./taxi_data_preprocessed/3378753.csv.gz
file ./taxi_data_preprocessed/3378753.csv.gz read in 4.55s
./taxi_data_preprocessed/12378772.csv.gz
file ./taxi_data_preprocessed/12378772.csv.gz read in 1.58s
./taxi_data_preprocessed/6378763.csv.gz
file ./taxi_data_preprocessed/6378763.csv.gz read in 4.50s
./taxi_data_preprocessed/0.csv.gz
file ./taxi_data_preprocessed/0.csv.gz read in 4.75s


In [9]:
num_samples = sum([arr.shape[0] for arr in all_x])
dim = all_x[0].shape[1]
print(num_samples, ",", dim)

13086214 , 9


In [10]:
max_chunk_size = 2 * 2**20  # 2MB
chunk_x = int(max_chunk_size / dim / 8)
chunk_y = chunk_x
print("Chunk size:", chunk_x)

Chunk size: 29127


In [11]:
with h5py.File(os.path.join(data_folder, 'full.h5py'), 'w', libver='latest') as f:
    Xdset = f.create_dataset("X", (num_samples, dim), dtype='float64', 
                             compression="gzip", chunks=(chunk_x, dim))
    Ydset = f.create_dataset("Y", (num_samples, 1), dtype='int32')
    current_i = 0
    for X, Y in zip(all_x, all_y):
        t_s = time.time()
        X = np.ascontiguousarray(X)
        Y = Y.reshape((-1, 1))
        Xdset.write_direct(X, dest_sel=np.s_[current_i:current_i+X.shape[0], :])
        Ydset.write_direct(Y, dest_sel=np.s_[current_i:current_i+Y.shape[0], :])
        current_i += X.shape[0]
        print("i: %d/%d in %.2fs" % (current_i, num_samples, time.time() - t_s))

i: 3000000/13086214 in 5.78s
i: 6000000/13086214 in 5.80s
i: 7086214/13086214 in 4.64s
i: 10086214/13086214 in 9.46s
i: 13086214/13086214 in 6.63s
