## Pandas

In [1]:
from datetime import datetime
from glob import glob

from pandas import concat
from pandas import read_parquet

start = datetime.now()

df = concat(map(read_parquet, glob("./data/yellow_tripdata_202*-*.parquet")))

print(f"DF has {len(df)} rows.")

res = (
    df.groupby(["PULocationID", "DOLocationID"])
    .agg(
        {
            "total_amount": "sum",
            "fare_amount": "sum",
            "tolls_amount": "sum",
            "tip_amount": "sum",
            "congestion_surcharge": "sum",
            "trip_distance": "mean",
        }
    )
    .sort_values(by="fare_amount", ascending=False)
    .head(10)
)
print(res)
print(f"Runtime: {datetime.now() - start}")

DF has 70560406 rows.
                           total_amount  fare_amount  tolls_amount  \
PULocationID DOLocationID                                            
132          265             8790934.51   7422390.32     374067.16   
             230             7894806.99   5770019.47     677238.74   
264          264             6979146.58   5237721.48     132240.98   
132          48              5809088.05   4275710.82     496605.18   
             132             4293873.70   3524985.16      69755.77   
             164             4510414.60   3247564.82     397416.61   
138          230             4889752.34   3229509.35     565195.92   
237          236             5733423.01   3215118.36        191.12   
236          237             5143731.00   2978173.55        255.04   
132          170             4112882.90   2963877.08     361501.95   

                           tip_amount  congestion_surcharge  trip_distance  
PULocationID DOLocationID                                   

## Polars

### Note: Restarted the kernel here.

In [1]:
import polars as pl
from datetime import datetime

start = datetime.now()

df = pl.read_parquet("./data/yellow_tripdata_202*-*.parquet")

print(f"DF has {len(df)} rows.")

res = (
    df.group_by(["PULocationID", "DOLocationID"])
    .agg(
        [
            pl.col("total_amount").sum(),
            pl.col("fare_amount").sum(),
            pl.col("tolls_amount").sum(),
            pl.col("tip_amount").sum(),
            pl.col("congestion_surcharge").sum(),
            pl.col("trip_distance").mean()
        ]
    )
    .sort(by="fare_amount")
    .reverse()
)

print(res)

print(f"Runtime: {datetime.now() - start}")

DF has 70560406 rows.
shape: (52_878, 8)
┌────────────┬────────────┬────────────┬───────────┬───────────┬───────────┬───────────┬───────────┐
│ PULocation ┆ DOLocation ┆ total_amou ┆ fare_amou ┆ tolls_amo ┆ tip_amoun ┆ congestio ┆ trip_dist │
│ ID         ┆ ID         ┆ nt         ┆ nt        ┆ unt       ┆ t         ┆ n_surchar ┆ ance      │
│ ---        ┆ ---        ┆ ---        ┆ ---       ┆ ---       ┆ ---       ┆ ge        ┆ ---       │
│ i64        ┆ i64        ┆ f64        ┆ f64       ┆ f64       ┆ f64       ┆ ---       ┆ f64       │
│            ┆            ┆            ┆           ┆           ┆           ┆ f64       ┆           │
╞════════════╪════════════╪════════════╪═══════════╪═══════════╪═══════════╪═══════════╪═══════════╡
│ 132        ┆ 265        ┆ 8.7909e6   ┆ 7.4224e6  ┆ 374067.16 ┆ 795700.48 ┆ 8743.25   ┆ 21.956994 │
│ 132        ┆ 230        ┆ 7.8948e6   ┆ 5.7700e6  ┆ 677238.74 ┆ 856887.87 ┆ 271995.25 ┆ 18.305493 │
│ 264        ┆ 264        ┆ 6.9791e6   ┆ 5.2377e6 

## PySpark Pandas

### Note: Restarted Kernel here.

In [1]:
from datetime import datetime

from pyspark.pandas import read_parquet

start = datetime.now()

df = read_parquet("./data/yellow_tripdata_202*-*.parquet")

print(f"DF has {len(df)} rows.")

res = (
    df.groupby(["PULocationID", "DOLocationID"])
    .agg(
        {
            "total_amount": "sum",
            "fare_amount": "sum",
            "tolls_amount": "sum",
            "tip_amount": "sum",
            "congestion_surcharge": "sum",
            "trip_distance": "mean",
        }
    )
    .sort_values(by="fare_amount", ascending=False)
    .head(10)
)
print(res)
print(f"Runtime: {datetime.now() - start}")



23/12/17 20:53:34 WARN Utils: Your hostname, romulus-arm.local resolves to a loopback address: 127.0.0.1; using 192.168.4.105 instead (on interface en0)
23/12/17 20:53:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/12/17 20:53:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
DF has 70560406 rows.




                           total_amount  fare_amount  tolls_amount  tip_amount  congestion_surcharge  trip_distance
PULocationID DOLocationID                                                                                          
132          265           8.790935e+06   7422390.32     374067.16   795700.48               8743.25      21.956994
             230           7.894807e+06   5770019.47     677238.74   856887.87             271995.25      18.305493
264          264           6.979147e+06   5237721.48     132240.98   888590.87             373987.50       3.256247
132          48            5.809088e+06   4275710.82     496605.18   605372.31             201617.50      18.533374
             132           4.293874e+06   3524985.16      69755.77   463964.71               9502.50       2.306788
             164           4.510415e+06   3247564.82     397416.61   535699.93             153245.50      17.439196
138          230           4.889752e+06   3229509.35     565195.92   658

                                                                                

## PySpark Dataframes

### Note: Restarted Kernel here.

In [1]:
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("ReadTaxi").getOrCreate()

start = datetime.now()

df = spark.read.parquet("./data/yellow_tripdata_202*-*.parquet")

print(f"DF has {df.count()} rows.")

res = (df.groupby(["PULocationID", "DOLocationID"]).agg(
    sum("total_amount"),
    sum("fare_amount"),
    sum("tolls_amount"),
    sum("tip_amount"),
    sum("congestion_surcharge"),
    mean("trip_distance")
).sort(desc(sum("fare_amount"))).show(10))

print(f"Runtime: {datetime.now() - start}")

23/12/17 20:54:57 WARN Utils: Your hostname, romulus-arm.local resolves to a loopback address: 127.0.0.1; using 192.168.4.105 instead (on interface en0)
23/12/17 20:54:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/12/17 20:54:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
DF has 70560406 rows.




+------------+------------+------------------+------------------+------------------+------------------+-------------------------+------------------+
|PULocationID|DOLocationID| sum(total_amount)|  sum(fare_amount)| sum(tolls_amount)|   sum(tip_amount)|sum(congestion_surcharge)|avg(trip_distance)|
+------------+------------+------------------+------------------+------------------+------------------+-------------------------+------------------+
|         132|         265| 8790934.510000236| 7422390.320000001| 374067.1600000008| 795700.4800000027|                  8743.25|21.956994294661445|
|         132|         230| 7894806.989998972|        5770019.47| 677238.7400001296| 856887.8699999934|                271995.25| 18.30549332223389|
|         264|         264|6979146.5800001435| 5237721.479999993|132240.97999999748| 888590.8700000198|                 373987.5|3.2562468018918778|
|         132|          48| 5809088.049999422|        4275710.82| 496605.1800000536| 605372.3100000018|   

                                                                                