# Merging Data with Parquet & Dask
We can use `dask` (rather than `pandas`) dataframes when we are merging datasets and they won't fit in memory. Here, the 12 monthly CSVs for 2019 are quite big. We could do a pd.merge, but this will result in memory issues and be **extremely** slow. Although pandas does use disk space when it runs out of memory, it is not particularly great at it. There will be a lot of [memory swapping](https://en.wikipedia.org/wiki/Memory_paging)

We'd like to use dask.dataframes to just concat the 12 files *on disk*. This turns out to be surprisingly quick, as we will see

In addition, we use `parquet` instead of CSVs. Parquet is a columnar data store which makes groupby/aggregations on a column much faster as you don't need to read all columns in a row when doing an aggregation like you would with a CSV

### CSV → Parquet
1. Let's convert our monthly CSV files to parquet
2. Then merge the 12 months into a year

In [None]:
import os, logging
import pandas as pd
import dask.dataframe as dd
import pyarrow as pa
import matplotlib.pyplot as plt
import seaborn as sns

PARQUET_EXTENSION = ".parquet"

In [None]:
def convert_to_parquet(directory_from: str, directory_to: str):
    """convert csv in `directory_from` to parquet format in `directory_to`"""
    csv_files = sorted([directory_from + f for f in os.listdir(directory_from)])
    i = 0
    for csv_file in csv_files:
        i += 1
        filename = csv_file.split("/")[-1][:-4]
        newfilepath = directory_to + filename + ".parquet"
        logging.debug(f"parquetifying file {i} of {len(csv_files)}...")

        csv_df = pd.read_csv(csv_file)
        csv_df.columns = [col.lower().replace(" ", "") for col in csv_df.columns]
        csv_df.dropna(inplace=True)  # remove NAs for trips to stations in the Bronx
        csv_df["birthyear"] = csv_df["birthyear"].replace(
            r"\\N", "0", regex=True
        )  # replace \N with string '0'
        csv_df.to_parquet(newfilepath)

In [None]:
%%time
convert_to_parquet("data/2019_csv/", "data/2019_parquet/")

In [None]:
def merge_monthly_trips(year, directory: str) -> None:
    """
    Creates a merged parquet file from parquet files in a directory
    :param year: the year (int) to merge monthly data for. if None, then merge all files in directory
    :param directory: a directory containing parquet files with identical schema (column names) across files
    :return: None
    """
    if year:
        range_start = str(year) + "-01"
        range_end = str(year) + "-13"
        month_files = sorted(
            [
                directory + f
                for f in os.listdir(directory)
                if range_start <= f <= range_end
            ]
        )
    else:
        month_files = sorted(
            [
                directory + f
                for f in os.listdir(directory)
                if f.endswith(PARQUET_EXTENSION)
            ]
        )

    parquet_ddfs: list[dd.DataFrame] = []
    for month_file in month_files:
        if os.path.exists(month_file):
            ddf = dd.read_parquet(month_file)
            # ddf.astype(TRIPDATA_COLUMN_DTYPES)
            ddf["birthyear"] = ddf["birthyear"].astype(
                "str"
            )  # some issue with birthyear in particular
            parquet_ddfs.append(ddf)

    all_trips = dd.concat(parquet_ddfs)
    filename = str(year) if year else "alltrips"
    all_trips.to_parquet(
        directory + filename + PARQUET_EXTENSION,
        schema={"birthyear": pa.string()},
        engine="pyarrow",
    )

In [None]:
%%time
merge_monthly_trips(2019, "data/2019_parquet/")

### Data Analysis
1. Read in the parquet file for 2019 we created
2. Analyze effect of weather on number of trips

In [None]:
%%time
# Now read the 2019 parquet file to do data analysis
TRIPS_COLUMNS = [
    "tripduration",
    "starttime",
    "stoptime",
    "startstationid",
    "endstationid",
    "bikeid",
    "usertype",
    "birthyear",
    "gender",
]

trips = pd.read_parquet(
    "data/2019_parquet/2019.parquet",
    columns=TRIPS_COLUMNS,
    engine="pyarrow",
).reset_index()
trips.drop(trips.columns[0], axis=1, inplace=True)  # drop the dask index
trips["starttime"] = trips["starttime"].astype("datetime64")
trips["stoptime"] = trips["stoptime"].astype("datetime64")

trips.head(10)

In [None]:
# Examine relationship of trips with weather
trips_per_day = (
    trips.groupby(trips["starttime"].dt.dayofyear)["tripduration"]
    .count()
    .reset_index()
    .rename(columns={"starttime": "dayofyear", "tripduration": "counttrips"})
)

# read weather data
weather = pd.read_csv("data/" + "GHCN-Daily-Cleaned.csv", index_col=0)
weather["DATE"] = pd.to_datetime(weather["DATE"])
weather.set_index(weather["DATE"])

# get 2019 weather
start_2019 = pd.to_datetime("2019-01-01")
end_2019 = pd.to_datetime("2019-12-31")
weather_2019 = weather.loc[
    (weather["DATE"] >= start_2019) & (weather["DATE"] <= end_2019)
]

# merged weather + trips dataset
w_trips = pd.merge(
    weather_2019,
    trips_per_day,
    left_on=weather_2019["DATE"].dt.dayofyear,
    right_on="dayofyear",
)
w_trips["Snowed on Day"] = w_trips["SNWD"].apply(lambda depth: depth > 0)

In [None]:
# Plot linear regression of weather conditions versus trips
g = sns.lmplot(
    data=w_trips,
    x="TAVG_F",
    y="counttrips",
    hue="Snowed on Day",
    markers=["o", "*"],
    height=8,
    aspect=1.5,
    facet_kws={"legend_out": False},
)
plt.title("Daily Citibike Trips Increase with Temperature (2019)")
plt.xlabel("Average Temperature (°F)")
plt.ylabel("Number of Trips")
plt.legend(
    labels=[
        "No Snow",
        "Best fit (snow)",
        "95% CI",
        "Snow",
        "Best fit (no snow)",
        "95%CI",
    ]
)
plt.show()