### Imports


In [1]:
%load_ext autoreload
%autoreload 2
%matplotlib inline

In [2]:
import os

import folium
import geopandas as gpd
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import pandas as pd
from tqdm.notebook import tqdm

import helper

In [3]:
n_files = len(os.listdir("data/raw")) - 1

In [None]:
!wc -l 'data/raw/extracted/week_{n_files}.csv.zip'

In [None]:
FILE_PATH = f"data/raw/week_{n_files}.csv"
city = "rotterdam"
ts_col = "ts"
chunk_size = 10**7
n_chunks = int(173_453_671 / chunk_size) + 1
dtypes = {
    "form_factor": "object",
    "system_id": "object",
    "longitude": "float64",
    "latitude": "float64",
    "ts": "int64",
}

### Part 1 - Data Ingestion


In [None]:
# Load the Netherlands polygon from the Natural Earth dataset

world_filepath = gpd.datasets.get_path("naturalearth_lowres")
world = gpd.read_file(world_filepath)
netherlands = world.loc[world["name"] == "Netherlands"]

city_boundaries = gpd.read_file(f"data/boundaries/{city}_.geojson")

In [None]:
# TODO: Read directly from zip w/out extracting to csv manually
# READ_ZIP = True

# if READ_ZIP:
#     zip_path = "data/raw/data.zip"
#     csv_name = "data/datanew.csv"

#     df = helper.read_zip(zip_path, csv_name)
# else:
#     df = pd.read_csv("data/raw/aprox1week.csv").drop_duplicates()
# df.shape

In [None]:
tqdm.pandas()
i = 0

processed_chunks = []

with pd.read_csv(FILE_PATH, chunksize=chunk_size, dtype=dtypes) as chunks:
    for chunk in tqdm(chunks, "Processing Chunks", total=n_chunks):
        chunk = chunk.drop_duplicates().reset_index(drop=True)
        chunk[(chunk["longitude"] != 0) & (chunk["latitude"] != 0)]
        geometry = gpd.points_from_xy(chunk.longitude, chunk.latitude, crs=4326)
        chunk = gpd.GeoDataFrame(chunk, geometry=geometry)

        df_left = pd.DataFrame(
            data=chunk.sindex.query(city_boundaries.geometry, predicate="intersects").T,
            columns=["district_id", "point_id"],
        ).reset_index(drop=True)

        df_right = (
            chunk.iloc[df_left["point_id"]][ts_col]
            .reset_index()
            .rename(columns={"index": "point_id", ts_col: "timestamp"})
        )

        points = pd.merge(df_left, df_right, on="point_id")

        points = pd.merge(
            points,
            chunk[["latitude", "longitude"]],
            left_on="point_id",
            right_index=True,
        )

        # Map district_id to district names
        district_codes = dict(city_boundaries.iloc[points.district_id.unique()]["name"])
        points["district_id"] = points["district_id"].map(district_codes)
        processed_chunks.append(points)

        i += 1

In [None]:
gdf = (
    pd.concat(processed_chunks, ignore_index=True)
    .sort_values(by="point_id")
    .reset_index(drop=True)
)
gdf.shape

In [None]:
print(f"Data contains {gdf.timestamp.nunique()} timestamps")

In [None]:
map_center = (gdf.latitude.mean(), gdf.longitude.mean())
map = folium.Map(location=map_center, zoom_start=10)
folium.TileLayer("openstreetmap").add_to(map)

# Add points and polygons as GeoJSON overlays
for idx, row in gdf.sample(10000).iterrows():
    folium.CircleMarker(
        location=[row.latitude, row.longitude],
        radius=2,  # Radius of the circle marker
        color="red",  # Color of the marker border
        fill=True,
        fill_color="red",  # Color of the marker fill
        fill_opacity=0.6,  # Opacity of the marker fill
        popup=row["name"] if "name" in row else None,  # Optional popup text
    ).add_to(map)

folium.GeoJson(city_boundaries.geometry).add_to(map)

map

### Part 2 - Data Extraction


In [None]:
points_per_district = (
    gdf.groupby(by=["district_id", "timestamp"])
    .agg({"point_id": "count"})
    .rename({"point_id": "crowd"}, axis=1)
    .sort_values(by="crowd", ascending=False)
    .reset_index()
)

points_per_district

In [None]:
points_per_district.to_parquet(
    f"data/processed/points_per_district_week_{n_files}.parquet", index=False
)

#### Crowd Analysis

Let's delve deeper and focus on the most crowded district to see how the crowdedness evolves over time.


In [None]:
most_crowded_district_idx = (
    points_per_district.groupby(["district_id"])
    .sum()
    .sort_values(by="crowd", ascending=False)
    .reset_index()
    .iloc[0]["district_id"]
)

In [None]:
most_crowded_district = (
    points_per_district.loc[
        points_per_district["district_id"] == most_crowded_district_idx
    ]
    .drop(columns="district_id")
    .sort_values(by="timestamp")
    .reset_index(drop=True)
)

In [None]:
_, ax = plt.subplots(figsize=(13, 6))


ax.plot(
    pd.to_datetime(most_crowded_district["timestamp"], unit="s"),
    most_crowded_district["crowd"],
    linestyle="-",
    linewidth=1.5,
)

ax.set_title(
    f"Crowdedness Over Time in District {most_crowded_district_idx}", fontsize=18
)
ax.set_xlabel("Timestamp", fontsize=12)
ax.set_ylabel("No. of Points", fontsize=12)
plt.gca().spines[["top", "right"]].set_visible(False)
ax.grid(True, axis="x", linestyle="--", alpha=0.6)
ax.xaxis.set_major_formatter(mdates.DateFormatter("%m-%d--%H:%M"))

plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(f"figures/most_crowded_week_{n_files}.png", dpi=300)
plt.show()

In [None]:
most_crowded_district.to_parquet(
    f"data/processed/most_crowded_week_{n_files}.parquet", index=False
)

### Part 3 - Data Storage


In [None]:
weekly_data = [
    pd.read_parquet(f"data/processed/{week}")
    for week in os.listdir("data/processed")
    if week.startswith("points_")
]


pd.concat(weekly_data, ignore_index=True).to_parquet(
    "data/final/points_per_district_full.parquet.gzip", compression="gzip", index=False
)

#### Summary


In [10]:
min_t, max_t = pd.to_datetime(df["timestamp"].apply(["min", "max"]), unit="s").apply(
    lambda x: x.strftime("%d-%m-%Y--%H:%M")
)

In [11]:
print(f"Data is gathered between {min_t} and {max_t}")
print(f"Time between entries: {helper.REFRESH_INTERVAL} seconds")

Data is gathered between 18-06-2024--06:21 and 04-07-2024--08:30


NameError: name 'helper' is not defined

In [1]:
import os
import pandas as pd
import zipfile
from tqdm import tqdm
import subprocess
import tempfile
import shutil
import geopandas as gpd

In [2]:
city = "rotterdam"
ts_col = "ts"
chunk_size = 10**7
dtypes = {
    "form_factor": "object",
    "system_id": "object",
    "longitude": "float64",
    "latitude": "float64",
    "ts": "int64",
}

In [3]:
# Load the Netherlands polygon from the Natural Earth dataset

world_filepath = gpd.datasets.get_path("naturalearth_lowres")
world = gpd.read_file(world_filepath)
netherlands = world.loc[world["name"] == "Netherlands"]

city_boundaries = gpd.read_file(f"data/boundaries/{city}_.geojson")

  world_filepath = gpd.datasets.get_path("naturalearth_lowres")


In [4]:
def unzip_files(zip_path, extract_to):
    with zipfile.ZipFile(zip_path, "r") as zip_ref:
        zip_ref.extractall(extract_to)


def count_rows_in_csv(file_path):
    result = subprocess.run(["wc", "-l", file_path], capture_output=True, text=True)
    return int(result.stdout.split()[0]) - 1  # Subtract 1 for header row


def _process_csv(
    file, chunk_size=chunk_size, dtypes=None, filter_func=None, n_chunks=None
):
    tqdm.pandas()
    all_chunks = []
    with pd.read_csv(file, chunksize=chunk_size, dtype=dtypes) as chunks:
        for chunk in tqdm(chunks, desc="    Processing Chunks...", total=n_chunks):
            if filter_func:
                chunk = filter_func(chunk)
            all_chunks.append(chunk)
    return all_chunks


def my_filter_func(chunk):
    processed_chunks = []

    chunk = chunk.drop_duplicates().reset_index(drop=True)
    chunk[(chunk["longitude"] != 0) & (chunk["latitude"] != 0)]
    geometry = gpd.points_from_xy(chunk.longitude, chunk.latitude, crs=4326)
    chunk = gpd.GeoDataFrame(chunk, geometry=geometry)

    df_left = pd.DataFrame(
        data=chunk.sindex.query(city_boundaries.geometry, predicate="intersects").T,
        columns=["district_id", "point_id"],
    ).reset_index(drop=True)

    df_right = (
        chunk.iloc[df_left["point_id"]][ts_col]
        .reset_index()
        .rename(columns={"index": "point_id", ts_col: "timestamp"})
    )
    merged = pd.merge(df_left, df_right, on="point_id")
    merged = pd.merge(
        merged,
        chunk[["latitude", "longitude"]],
        left_on="point_id",
        right_index=True,
    )

    # Map district_id to district names
    district_codes = dict(city_boundaries.iloc[merged.district_id.unique()]["name"])
    merged["district_id"] = merged["district_id"].map(district_codes)
    processed_chunks.append(merged)
    return merged


def save_to_parquet(df, output_path, compression="gzip"):
    print(f"    Saving to {output_path}...")
    df.to_parquet(output_path, compression=compression)


def process_csv_zip(
    zip_path,
    filter_func=None,
    output_path="output.parquet",
    chunk_size=100000,
    dtypes=None,
):
    temp_dir = tempfile.mkdtemp()
    try:
        print(f"Unzipping {zip_path} to {temp_dir}")
        unzip_files(zip_path, temp_dir)

        for csv_file in os.listdir(temp_dir):
            if csv_file.endswith(".csv"):
                file_path = os.path.join(temp_dir, csv_file)
                print(f"Processing {csv_file}")
                n_rows = count_rows_in_csv(file_path)
                n_chunks = (n_rows // chunk_size) + 1
                chunks = _process_csv(
                    file_path, chunk_size, dtypes, filter_func, n_chunks
                )
        processed_df = pd.concat(chunks)

        save_to_parquet(processed_df, output_path)
    finally:
        shutil.rmtree(temp_dir)
        print(f"    Moving {zip_path} to data/raw/extracted")
        shutil.move(zip_path, "data/raw/extracted")


def etl_pipeline():
    n_week = 1

    # Load all csv files
    zip_paths = sorted(
        [
            os.path.join("data/raw", file)
            for file in os.listdir("data/raw")
            if file.endswith(".csv.zip")
        ]
    )

    # Process each csv file sequentially
    for zip_path in zip_paths:
        output = f"data/processed/points_per_district_week_{n_week}.parquet"

        process_csv_zip(
            zip_path,
            filter_func=my_filter_func,
            output_path=output,
            chunk_size=chunk_size,
            dtypes=dtypes,
        )

        n_week += 1

    # Merge all processed parquet files
    weekly_data = [
        pd.read_parquet(f"data/processed/{week}")
        for week in os.listdir("data/processed")
        if week.startswith("points_")
    ]

    # Concat each dataframe and save to `final` folder
    print("Concatenating dataframes...")
    final_df = (
        pd.concat(weekly_data, ignore_index=True)
        .groupby(by=["district_id", "timestamp"])
        .agg({"point_id": "count"})
        .rename({"point_id": "crowd"}, axis=1)
        .sort_values(by="crowd", ascending=False)
        .reset_index()
    )

    final_df.to_parquet("data/final/points_per_district_full.parquet.gzip")

In [5]:
etl_pipeline()

Unzipping data/raw/week_1.csv.zip to /var/folders/5_/7z8lzq913cl3q8m0z5pnmpjr0000gn/T/tmp_eftr_ji
Processing week_1.csv


    Processing Chunks...: 100%|██████████| 21/21 [04:00<00:00, 11.45s/it]


Saving week_1.csv to data/processed/points_per_district_week_1.parquet
Saving to data/processed/points_per_district_week_1.parquet...
Moving data/raw/week_1.csv.zip to data/raw/extracted
Unzipping data/raw/week_2.csv.zip to /var/folders/5_/7z8lzq913cl3q8m0z5pnmpjr0000gn/T/tmpgem4j7fv
Processing week_2.csv


    Processing Chunks...: 100%|██████████| 18/18 [03:24<00:00, 11.37s/it]


Saving week_2.csv to data/processed/points_per_district_week_2.parquet
Saving to data/processed/points_per_district_week_2.parquet...
Moving data/raw/week_2.csv.zip to data/raw/extracted
Unzipping data/raw/week_3or4days.csv.zip to /var/folders/5_/7z8lzq913cl3q8m0z5pnmpjr0000gn/T/tmp7koon78_
Processing 3or4days.csv


    Processing Chunks...: 100%|██████████| 9/9 [01:37<00:00, 10.89s/it]


Saving __MACOSX to data/processed/points_per_district_week_3.parquet
Saving to data/processed/points_per_district_week_3.parquet...
Moving data/raw/week_3or4days.csv.zip to data/raw/extracted
Concatenating dataframes...
