# Chapter 1: Introducing Polars

In [None]:
import polars as pl
pl.show_versions()  # The book is built with Polars version 1.13.1

## What Is This Thing Called Polars?

### Features

### Key Concepts

### Advantages

## Why You Should Use Polars

### Performance

### Usability

### Popularity

### Sustainability

## Polars Compared to Other Data Processing Packages

## Why We Focus on Python Polars

## How This Book is Organized

## An ETL Showcase

### Extract

#### Import Packages

In [None]:
! cd plugins/polars_geo && maturin develop --release

In [None]:
# And reset the kernel to make the new plug-in available
from IPython.display import display, Javascript


def restart_kernel():
    display(Javascript("Jupyter.notebook.kernel.restart()"))


restart_kernel()

In [None]:
import polars as pl  
import polars_geo
from plotnine import *  

#### Download and Extract Citi Bike Trips

In [None]:
# It might be needed to install `unzip` first if you're on Ubuntu or MacOS.
# Ubuntu:
# sudo apt update && sudo apt install unzip
# MacOS:
# brew install unzip
# Windows:
# Download the ZIP file manually and extract it (sorry)

In [None]:
! curl -sO https://s3.amazonaws.com/tripdata/202403-citibike-tripdata.csv.zip
! tar -xvf 202403-citibike-tripdata.csv.zip -C data/citibike/
! rm -f 202403-citibike-tripdata.csv.zip

#### Read Citi Bike Trips into a Polars DataFrame

In [None]:
! wc -l data/citibike/202403-citibike-tripdata.csv
! head -n 6 data/citibike/202403-citibike-tripdata.csv

In [None]:
trips = pl.read_csv(  
    "data/citibike/202403-citibike-tripdata.csv",
    try_parse_dates=True,
    schema_overrides={
        "start_station_id": pl.String,
        "end_station_id": pl.String,
    },
).sort(  
    "started_at"
)

trips.height

In [None]:
print(trips[:, :4])
print(trips[:, 4:8])
print(trips[:, 8:])

#### Read in Neighborhoods from GeoJSON

In [None]:
! python -m json.tool data/citibike/nyc-neighborhoods.geojson

In [None]:
neighborhoods = (
    pl.read_json("data/citibike/nyc-neighborhoods.geojson")
    .select("features")
    .explode("features")  
    .unnest("features")
    .unnest("properties")
    .select("neighborhood", "borough", "geometry")
    .unnest("geometry")
    .with_columns(polygon=pl.col("coordinates").list.first())
    .select("neighborhood", "borough", "polygon")
    .filter(pl.col("borough") != "Staten Island")  
    .sort("neighborhood")
)

neighborhoods

### Bonus: Visualizing Neighborhoods and Stations

In [None]:
neighborhoods_coords = (
    neighborhoods.with_row_index("id")
    .explode("polygon")
    .with_columns(
        lon=pl.col("polygon").list.first(),
        lat=pl.col("polygon").list.last(),
    )
    .drop("polygon")
)

neighborhoods_coords

In [None]:
stations = (
    trips.group_by(station=pl.col("start_station_name"))
    .agg(  
        lon=pl.col("start_lng").median(),
        lat=pl.col("start_lat").median(),
    )
    .sort("station")
    .drop_nulls()
)
stations

In [None]:
(
    ggplot(neighborhoods_coords, aes(x="lon", y="lat", group="id"))
    + geom_polygon(aes(alpha="neighborhood", fill="borough"), color="white")
    + geom_point(stations, size=0.1)
    + scale_x_continuous(expand=(0, 0))
    + scale_y_continuous(expand=(0, 0, 0, 0.01))
    + scale_alpha_ordinal(range=(0.3, 1))
    + scale_fill_brewer(type="qual", palette=2)
    + guides(alpha=False)
    + labs(
        title="New York City Neighborhoods and Citi Bike Stations",
        subtitle="2143 stations across 106 neighborhoods",
        caption="Source: https://citibikenyc.com/system-data",
        fill="Borough",
    )
    + theme_void(base_size=14)
    + theme(
        dpi=200,
        figure_size=(7, 9),
        plot_background=element_rect(fill="white", color="white"),
        plot_caption=element_text(style="italic"),
        plot_title=element_text(ha="left"),
    )
)

### Transform

#### Clean Up Columns

In [None]:
trips = trips.select(
    bike_type=pl.col("rideable_type")
    .str.split("_")
    .list.get(0)
    .cast(pl.Categorical),  
    rider_type=pl.col("member_casual").cast(pl.Categorical),
    datetime_start=pl.col("started_at"),
    datetime_end=pl.col("ended_at"),
    station_start=pl.col("start_station_name"),
    station_end=pl.col("end_station_name"),
    lon_start=pl.col("start_lng"),
    lat_start=pl.col("start_lat"),
    lon_end=pl.col("end_lng"),
    lat_end=pl.col("end_lat"),
).with_columns(  
    duration=(pl.col("datetime_end") - pl.col("datetime_start"))
)

trips.columns

#### Clean Up Rows

In [None]:
from datetime import date

trips = (
    trips.drop_nulls()
    .filter(  
        (pl.col("datetime_start") >= date(2024, 3, 1))
        & (pl.col("datetime_end") < date(2024, 4, 1))
    )
    .filter(
        ~(
            (pl.col("station_start") == pl.col("station_end"))
            & (pl.col("duration").dt.total_seconds() < 5 * 60)
        )
    )
)

trips.height

#### Add Trip Distance

In [None]:
trips = trips.with_columns(
    distance=pl.concat_list("lon_start", "lat_start").geo.haversine_distance(
        pl.concat_list("lon_end", "lat_end")
    )
    / 1000  
)

trips.select(
    "lon_start",
    "lon_end",
    "lat_start",
    "lat_end",
    "distance",
    "duration",
)

#### Add Borough and Neighborhood

In [None]:
stations = (
    stations.with_columns(point=pl.concat_list("lon", "lat"))
    .join(neighborhoods, how="cross")
    .with_columns(
        in_neighborhood=pl.col("point").geo.point_in_polygon(pl.col("polygon"))
    )
    .filter(pl.col("in_neighborhood"))
    .unique("station")
    .select(
        "station",
        "borough",
        "neighborhood",
    )
)

stations

In [None]:
trips = (
    trips.join(
        stations.select(pl.all().name.suffix("_start")), on="station_start"
    )
    .join(stations.select(pl.all().name.suffix("_end")), on="station_end")
    .select(
        "bike_type",
        "rider_type",
        "datetime_start",
        "datetime_end",
        "duration",
        "station_start",
        "station_end",
        "neighborhood_start",
        "neighborhood_end",
        "borough_start",
        "borough_end",
        "lat_start",
        "lon_start",
        "lat_end",
        "lon_end",
        "distance",
    )
)

In [None]:
print(trips[:, :4])
print(trips[:, 4:7])
print(trips[:, 7:11])
print(trips[:, 11:])

### Bonus: Visualizing Daily Trips per Borough

In [None]:
trips_per_hour = trips.group_by_dynamic(
    "datetime_start", group_by="borough_start", every="1d"
).agg(num_trips=pl.len())

trips_per_hour

In [None]:
(
    ggplot(
        trips_per_hour,
        aes(x="datetime_start", y="num_trips", fill="borough_start"),
    )
    + geom_area()
    + scale_fill_brewer(type="qual", palette=2)
    + scale_x_datetime(date_labels="%-d", date_breaks="1 day", expand=(0, 0))
    + scale_y_continuous(expand=(0, 0))
    + labs(
        x="March 2024",
        fill="Borough",
        y="Trips per day",
        title="Citi Bike Trips Per Day In March 2024",
        subtitle="On March 23, nearly 10cm of rain fell in NYC",
    )
    + theme_tufte(base_size=14)
    + theme(
        axis_ticks_major=element_line(color="white"),
        figure_size=(8, 5),
        legend_position="top",
        plot_background=element_rect(fill="white", color="white"),
        plot_caption=element_text(style="italic"),
        plot_title=element_text(ha="left"),
    )
)

### Load

#### Write Partitions

In [None]:
trips_parts = (
    trips.sort("datetime_start")
    .with_columns(date=pl.col("datetime_start").dt.date().cast(pl.String))
    .partition_by(["date"], as_dict=True, include_key=False)
)

for key, df in trips_parts.items():
    df.write_parquet(f"data/citibike/trips-{key[0]}.parquet")

#### Verify

In [None]:
! ls -1 data/citibike/*.parquet

In [None]:
pl.read_parquet("data/citibike/*.parquet").height

### Bonus: Becoming Faster by Being Lazy

In [None]:
trips = (
    pl.scan_csv(
        "data/citibike/202403-citibike-tripdata.csv",  
        try_parse_dates=True,
        schema_overrides={
            "start_station_id": pl.String,
            "end_station_id": pl.String,
        },
    )
    .select(
        bike_type=pl.col("rideable_type").str.split("_").list.get(0),
        rider_type=pl.col("member_casual"),
        datetime_start=pl.col("started_at"),
        datetime_end=pl.col("ended_at"),
        station_start=pl.col("start_station_name"),
        station_end=pl.col("end_station_name"),
        lon_start=pl.col("start_lng"),
        lat_start=pl.col("start_lat"),
        lon_end=pl.col("end_lng"),
        lat_end=pl.col("end_lat"),
    )
    .with_columns(duration=(pl.col("datetime_end") - pl.col("datetime_start")))
    .drop_nulls()
    .filter(
        ~(
            (pl.col("station_start") == pl.col("station_end"))
            & (pl.col("duration").dt.total_seconds() < 5 * 60)
        )
    )
    .with_columns(
        distance=pl.concat_list(
            "lon_start", "lat_start"
        ).geo.haversine_distance(pl.concat_list("lon_end", "lat_end"))
        / 1000
    )
).collect()  

neighborhoods = (
    pl.read_json("data/citibike/nyc-neighborhoods.geojson")
    .lazy()  
    .select("features")
    .explode("features")
    .unnest("features")
    .unnest("properties")
    .select("neighborhood", "borough", "geometry")
    .unnest("geometry")
    .with_columns(polygon=pl.col("coordinates").list.first())
    .select("neighborhood", "borough", "polygon")
    .sort("neighborhood")
    .filter(pl.col("borough") != "Staten Island")
)

stations = (
    trips.lazy()
    .group_by(station=pl.col("station_start"))
    .agg(
        lat=pl.col("lat_start").median(),
        lon=pl.col("lon_start").median(),
    )
    .with_columns(point=pl.concat_list("lon", "lat"))
    .drop_nulls()
    .join(neighborhoods, how="cross")
    .with_columns(
        in_neighborhood=pl.col("point").geo.point_in_polygon(pl.col("polygon"))
    )
    .filter(pl.col("in_neighborhood"))
    .unique("station")
    .select(
        pl.col("station"),
        pl.col("borough"),
        pl.col("neighborhood"),
    )
).collect()

trips = (
    trips.join(
        stations.select(pl.all().name.suffix("_start")), on="station_start"
    )
    .join(stations.select(pl.all().name.suffix("_end")), on="station_end")
    .select(
        "bike_type",
        "rider_type",
        "datetime_start",
        "datetime_end",
        "duration",
        "station_start",
        "station_end",
        "neighborhood_start",
        "neighborhood_end",
        "borough_start",
        "borough_end",
        "lat_start",
        "lon_start",
        "lat_end",
        "lon_end",
        "distance",
    )
)

trips.height

## Takeaways