In [2]:
# import duckdb

In [3]:
_TAXI_SCHEMA_RAW = pa.schema(
    [
        pa.field("VendorID", pa.int64()),
        pa.field("tpep_pickup_datetime", pa.timestamp("us")),
        pa.field("tpep_dropoff_datetime", pa.timestamp("us")),
        pa.field("passenger_count", pa.float64()),
        pa.field("trip_distance", pa.float64()),
        pa.field("RatecodeID", pa.float64()),
        pa.field("store_and_fwd_flag", pa.string()),
        pa.field("PULocationID", pa.int64()),
        pa.field("DOLocationID", pa.int64()),
        pa.field("payment_type", pa.int64()),
        pa.field("fare_amount", pa.float64()),
        pa.field("extra", pa.float64()),
        pa.field("mta_tax", pa.float64()),
        pa.field("tip_amount", pa.float64()),
        pa.field("tolls_amount", pa.float64()),
        pa.field("improvement_surcharge", pa.float64()),
        pa.field("total_amount", pa.float64()),
        pa.field("congestion_surcharge", pa.float64()),
        pa.field("airport_fee", pa.float64()),
    ]
)

base_link = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{}.parquet"
data_root = Path.cwd().parent / ".data"

dates = [
    "2020-01",
    "2020-02",
    "2020-03",
    "2020-04",
    "2020-05",
    "2020-06",
    "2020-07",
    "2020-08",
    "2020-09",
    "2020-10",
    "2020-11",
    "2020-12",
]
dates = [
    "2021-01",
    "2021-02",
    "2021-03",
    "2021-04",
    "2021-05",
    "2021-06",
    "2021-07",
    "2021-08",
    "2021-09",
    "2021-10",
    "2021-11",
    "2021-12",
]
dates = ["2021-01"]


for date in dates:
    table_url = base_link.format(date)
    response = requests.get(table_url)
    table = pq.read_table(pa.py_buffer(response.content), schema=_TAXI_SCHEMA_RAW)
    write_deltalake(str(data_root / "raw"), table, mode="append")

In [4]:
table.schema.names

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'airport_fee']

In [48]:
dt = DeltaTable(str(data_root / "raw"))
dt.version()

23

In [49]:
dt = DeltaTable(str(data_root / "raw"))
taxi_raw = dt.to_pyarrow_table()
taxi_raw.shape

(55553400, 19)

In [51]:
pc.unique(taxi_raw["pulocationid"])

<pyarrow.lib.Int64Array object at 0x12dfed720>
[
  null
]

In [50]:
conn = duckdb.connect(":memory:")
taxi_raw = dt.to_pyarrow_table()

results = conn.execute("select * from taxi_raw where pulocationid = 245").arrow()
results.shape

(0, 19)

In [47]:
conn = duckdb.connect(":memory:")

dt = DeltaTable(str(data_root / "raw"))
taxi_raw_dt = dt.to_pyarrow_dataset()

results = conn.execute(
    "select * from taxi_raw_dt where pulocationid = 245 and passenger_count > 1"
).arrow()
results.shape

(0, 19)

In [40]:
dt = DeltaTable(str(data_root / "repo" / "raw"))
dt.version()

9

In [43]:
dt = DeltaTable(str(data_root / "repo" / "raw"))
dt.optimize.z_order(["pulocationid", "passenger_count"])

{'numFilesAdded': 1,
 'numFilesRemoved': 4,
 'filesAdded': {'min': 95610069,
  'max': 95610069,
  'avg': 95610069.0,
  'totalFiles': 1,
  'totalSize': 95610069},
 'filesRemoved': {'min': 26622159,
  'max': 40807203,
  'avg': 32687111.5,
  'totalFiles': 4,
  'totalSize': 130748446},
 'partitionsOptimized': 0,
 'numBatches': 835,
 'totalConsideredFiles': 4,
 'totalFilesSkipped': 0,
 'preserveInsertionOrder': True}

In [6]:
from object_store import ObjectStore

store = ObjectStore("file:///Users/packre/code/delta-lakehouse/data")
store.head("meta/zone_to_borough.json")

ObjectMeta { location: Path { raw: "meta/zone_to_borough.json" }, last_modified: 2023-01-19T20:36:34.647420313Z, size: 8687 }

In [10]:
import collections
import json

import numpy as np
from object_store import ObjectStore

store = ObjectStore("file:///Users/packre/code/delta-lakehouse/data")
geo_json = json.loads(store.get("meta/taxi_zones-tiny.json"))

features = geo_json["features"]
borough_polygons = collections.defaultdict(list)
zone_polygons = collections.defaultdict(list)
zbmapper = {}
list_of_polygons = []
for i, feature in enumerate(features[:]):
    properties = feature["properties"]
    geo = feature["geometry"]

    polygons = []
    for polygon in geo["coordinates"]:
        polygon = np.array(polygon)
        if polygon.ndim == 3:
            polygon = polygon[0]
        polygon = polygon.T
        assert polygon.shape[0] == 2
        assert polygon.ndim == 2
        polygons.append(polygon)

    borough_polygons[properties["borough"]].extend(polygons)
    zone_polygons[properties["zone"]].extend(polygons)
    zbmapper[properties["zone"]] = properties["borough"]

keys = list(borough_polygons.keys())
bmapper = {i: keys[i] for i in range(len(keys))}
keys = list(zone_polygons.keys())
zmapper = {i: keys[i] for i in range(len(keys))}

with open("./aux_data/zone.json", "w") as f:
    json.dump(zmapper, f)
with open("./aux_data/borough.json", "w") as f:
    json.dump(bmapper, f)
with open("./aux_data/zone_to_borough.json", "w") as f:
    json.dump(zbmapper, f)