In [1]:
from multiprocessing import Pool
import os
import json

import numpy as np
import pandas as pd

In [2]:
def _get_timestamp(json_contents):
    return pd.to_datetime(json_contents['features'][0]['properties']['timestamp'])

In [3]:
def _get_coordinates(json_contents):
    return np.array(json_contents['features'][0]['geometry']['coordinates'])

In [4]:
def _process_one_json(path):
#     print('DEBUG', path)
    def _extract_timestamp_and_coordinates():
        try:
            json_contents = json.load(open(path))
            return _get_timestamp(json_contents), _get_coordinates(json_contents)
        except KeyError:
            return None, []
        except Exception as e:
            print('Error for path:', path)
            raise e
        
    timestamp, coordinates = _extract_timestamp_and_coordinates()
    return (timestamp, pd.DataFrame(columns=['lon', 'lat', 'ts']).set_index('ts') \
                if len(coordinates) == 0 else \
                pd.DataFrame(data=coordinates, columns=['lon', 'lat'])\
                    .assign(ts=timestamp)\
                    .set_index('ts')
           )

In [5]:
def _persist_parquet_path(dirpath_persist_parquet, date):
    return os.path.join(dirpath_persist_parquet, date.strftime('sg_taxi_%Y%m%d.parquet'))

In [6]:
def _process_one_date(date, dirpath_json, dirpath_persist_parquet):
    pd.concat(
        filter(
            lambda v: len(v) > 0,
            dict(
                map(_process_one_json, 
                    (os.path.join(dirpath_json, 'sg_taxi_%s.json' % dt) 
                     for dt in pd.date_range(start=date, periods=(24 * 60), freq='1T').strftime('%Y%m%dT%H%M'))
                   )).values()
        )
    )\
    .sort_index()\
    .to_parquet(
        _persist_parquet_path(dirpath_persist_parquet, date),
        allow_truncated_timestamps=True,
        compression='gzip')

In [7]:
def process_multiple_dates(dates, dirpath_json, dirpath_persist_parquet):
    if not os.path.exists(dirpath_persist_parquet):
        os.makedirs(dirpath_persist_parquet)
    filtered_dates = list(filter(
        lambda dt: not os.path.exists(_persist_parquet_path(dirpath_persist_parquet, dt)), 
        dates))
    print('About to process %d dates: %s' % (len(filtered_dates), filtered_dates))
    with Pool(maxtasksperchild=1) as pool:
        pool.starmap(
            _process_one_date, 
            ((date, dirpath_json, dirpath_persist_parquet) 
             for date in filtered_dates))

In [8]:
# process_multiple_dates(
#     dirpath_persist_parquet='./parquet/2019/', 
#     dirpath_json='./2019/', 
#     dates=pd.date_range(start='20190101', end='20191231', freq='1D'))

In [9]:
# process_multiple_dates(
#     dirpath_persist_parquet='./parquet/2018/', 
#     dirpath_json='./2018/', 
#     dates=pd.date_range(start='20180101', end='20181231', freq='1D'))

In [10]:
# process_multiple_dates(
#     dirpath_persist_parquet='./parquet/2017/', 
#     dirpath_json='./2017/', 
#     dates=pd.date_range(start='20170101', end='20171231', freq='1D'))

In [11]:
# process_multiple_dates(
#     dirpath_persist_parquet='./parquet/2020/', 
#     dirpath_json='./2020/', 
#     dates=pd.date_range(start='20200101', end='20201023', freq='1D'))

In [12]:
# process_multiple_dates(
#     dirpath_persist_parquet='./parquet/2016/', 
#     dirpath_json='./2016/', 
#     dates=pd.date_range(start='20160113', end='20161231', freq='1D'))