In [1]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import numpy as np
import json

In [4]:
data = pd.read_csv('flights_2006_2010.csv', sep='\t', encoding='utf-8', dtype={'FlightDate': 'str', 'ArrTime': 'str', 'DepTime': 'str'})

In [10]:
with_json = False
export_parquet = True

In [5]:
renamed = data.rename(index=str, columns={"FlightDate": "FL_DATE", "DepTime": "DEP_TIME", "ArrTime": "ARR_TIME", "Distance": "DISTANCE", "AirTime": "AIR_TIME", "DepDelay": "DEP_DELAY", "ArrDelay": "ARR_DELAY"})

renamed['FL_DATE'] = pd.to_datetime(renamed.FL_DATE, format='%Y-%m-%d').dt.date

renamed['DEP_TIME'] = renamed.DEP_TIME.replace('2400', '0000')
renamed['ARR_TIME'] = renamed.ARR_TIME.replace('2400', '0000')

def toTime(col):
    col = pd.to_numeric(col)
    col = (col/100).apply(np.floor) + (col.mod(100)) / 60.
    return col

renamed['DEP_TIME'] = toTime(renamed['DEP_TIME'])
renamed['ARR_TIME'] = toTime(renamed['ARR_TIME'])

types = {
    'DEP_DELAY': 'int16',
    'ARR_DELAY': 'int16',
    'AIR_TIME': 'int16',
    'DISTANCE': 'int16',
    'DEP_TIME': 'float32' if export_parquet else 'float16',  # parquet does not support halfloats right now
    'ARR_TIME': 'float32' if export_parquet else 'float16',
}
columns = ['FL_DATE'] + list(types.keys())
renamed = renamed[columns]
renamed = renamed.dropna()

right_types = renamed.astype(types)

In [15]:
right_types.head()

Unnamed: 0,FL_DATE,DEP_DELAY,ARR_DELAY,AIR_TIME,DISTANCE,DEP_TIME,ARR_TIME
0,2006-01-01,5,19,350,2475,9.083333,12.483334
1,2006-01-02,167,216,343,2475,11.783334,15.766666
2,2006-01-03,-7,-2,344,2475,8.883333,12.133333
3,2006-01-04,-5,-13,331,2475,8.916667,11.95
4,2006-01-05,-3,-17,321,2475,8.95,11.883333


In [17]:
print(len(data))
print(len(renamed))
print(len(right_types))

34507508
33803130
33803130


In [12]:
def json_serial(obj):
    return int(obj.strftime("%s"))

for size, name in [(10000, 'flights-10k'), (200000, 'flights-200k'), (500000, 'flights-500k'), (1000000, 'flights-1m'), (3000000, 'flights-3m'), (10000000, 'flights-10m'), (100000000, 'flights-100m')]:
    smaller = right_types[:size]

    print(name, len(smaller))

    table = pa.Table.from_pandas(smaller, preserve_index=False)

    if with_json:
        d = {}
        for column in smaller.columns:
            d[column]=list(smaller[column])

        with open(f'{name}.json', 'w') as f:
            json.dump(d, f, default=json_serial, separators=(',', ':'))

    # table = table.column('ARRIVAL').cast(pa.TimestampValue, True)

    if export_parquet:
        pq.write_table(table, f'{name}.parquet')
    else:
        writer = pa.RecordBatchFileWriter(f'{name}.arrow', table.schema)
        writer.write(table)
        writer.close()

flights-10k 10000
flights-200k 200000
flights-500k 500000
flights-1m 1000000
flights-3m 3000000
flights-10m 10000000
flights-100m 33803130


In [8]:
!ls -lah

total 11194720
drwxr-xr-x  28 dominik  staff   896B Jan 21 09:39 [34m.[m[m
drwxr-xr-x  31 dominik  staff   992B Jan 21 09:15 [34m..[m[m
drwxr-xr-x   2 dominik  staff    64B Jan 21 09:36 [34m.ipynb_checkpoints[m[m
-rw-r--r--   1 dominik  staff    28M Aug 14 09:40 564230852_T_ONTIME.csv
-rw-r--r--   1 dominik  staff   4.0K Jan 21 09:36 convert_flights.ipynb
-rw-r--r--   1 dominik  staff    17K Aug 14 09:40 convert_movies.ipynb
-rw-r--r--   1 dominik  staff    15K Aug 14 09:40 convert_weather.ipynb
-rw-r--r--   1 dominik  staff    20M Aug 14 09:40 error analysis-full.ipynb
-rw-r--r--   1 dominik  staff    11M Aug 14 09:40 error analysis.ipynb
-rw-r--r--   1 dominik  staff   160K Aug 14 09:40 flights-10k.arrow
-rw-r--r--   1 dominik  staff   400K Aug 14 09:40 flights-10k.csv
-rw-r--r--   1 dominik  staff   409K Aug 14 09:40 flights-10k.json
-rw-r--r--   1 dominik  staff    80K Jan 21 09:39 flights-10k.parquet
-rw-r--r--   1 dominik  staff    72M Jan 21 09:39 flights-10m.parquet
-r