## NYC Taxi Uber/Lyft Data Transformation

Let's this a bit cleaner shall we?

https://www.nyc.gov/site/tlc/businesses/high-volume-for-hire-services.page

## Get Dask cluster close to the data

In [None]:
import coiled
cluster = coiled.Cluster(
    n_workers=10,
    name="nyc-taxi-uber-lyft",
    package_sync=True,
    backend_options={"region": "us-east-1"},
    worker_memory="64 GiB",
)

In [None]:
from dask.distributed import Client
client = Client(cluster)

## Inspect data

In [None]:
import s3fs

s3 = s3fs.S3FileSystem()
s3.glob("nyc-tlc/trip data/fhvhv_tripdata_*.parquet")

In [None]:
import dask.dataframe as dd

df = dd.read_parquet(
    "s3://nyc-tlc/trip data/fhvhv_tripdata_*.parquet",
)
df

In [None]:
df.partitions[0].memory_usage(deep=True).compute()

In [None]:
df.dtypes

## Two problems!

1.  Massive partitions!
2.  Inefficient data types!
    -   Strings
    -   Categories

## Convert dtypes!

In [None]:
import pandas as pd
import numpy as np

yes_no = pd.CategoricalDtype(categories=["Y", "N"])

In [None]:
conversions = {}
for column, dtype in df.dtypes.items():
    if dtype == "object":
        conversions[column] = "string[pyarrow]"
    if dtype == "float64":
        conversions[column] = "float32"
    if dtype == "int64": 
        conversions[column] = "int32"
    if "flag" in column:
        conversions[column] = yes_no
        
conversions

In [None]:
df = df.astype(conversions)

## Repartition to smaller chunks!

In [None]:
df = df.persist()

df = df.repartition(partition_size="128MB").persist()

In [None]:
import dask

dask.util.format_bytes(
    df.memory_usage(deep=True).compute()
)

## Sort 

In [None]:
df = df.set_index("request_datetime").persist()

## Pretty close to one-day partitioning

In [None]:
df.divisions

In [None]:
df = df.repartition(freq="1d")

In [None]:
s3.mkdir("/oss-shared-scratch/mrocklin")

In [None]:
divisions = df.divisions

def name_function(index: int) -> str:
    return str(divisions[index].date()) + ".parquet"

name_function(divisions[0])

## Save!

In [None]:
df.to_parquet(
    "s3://oss-shared-scratch/mrocklin/nyc-taxi-fhv/", 
    name_function=name_function,
)

In [None]:
s3.ls("/oss-shared-scratch/mrocklin/nyc-taxi-fhv")[:20]

## How is it to read this data?

In [None]:
df = dd.read_parquet(
    "s3://oss-shared-scratch/mrocklin/nyc-taxi-fhv", 
    use_nullable_dtypes=True
).persist()

df.dtypes

In [None]:
import dask

dask.utils.format_bytes(
    df.memory_usage(deep=True).sum().compute()
)

In [None]:
df.memory_usage_per_partition(deep=True).compute().apply(dask.utils.format_bytes)