In [1]:
import os

import pandas as pd
import polars as pl

In [2]:
CWD = os.getcwd()
PATH_INPUT_FOLDER = os.path.join(CWD, "input")

In [3]:
fs = sorted([os.path.join(PATH_INPUT_FOLDER, f) for f in os.listdir(PATH_INPUT_FOLDER)])

In [4]:
pddf = pd.concat([pd.read_parquet(path = path) for path in fs[:5]])

In [5]:
pddf.shape

(9344926, 19)

In [6]:
pddf.info()

<class 'pandas.core.frame.DataFrame'>
Index: 9344926 entries, 0 to 2507108
Data columns (total 19 columns):
 #   Column                Dtype         
---  ------                -----         
 0   VendorID              int64         
 1   TpepPickupDatetime    datetime64[ns]
 2   TpepDropoffDatetime   datetime64[ns]
 3   PassengerCount        float64       
 4   TripDistance          float64       
 5   RatecodeID            float64       
 6   StoreAndFwdFlag       object        
 7   PULocationID          int64         
 8   DOLocationID          int64         
 9   PaymentType           int64         
 10  FareAmount            float64       
 11  Extra                 float64       
 12  MtaTax                float64       
 13  TipAmount             float64       
 14  TollsAmount           float64       
 15  ImprovementSurcharge  float64       
 16  TotalAmount           float64       
 17  CongestionSurcharge   float64       
 18  AirportFee            float64       
dtypes: da

In [7]:
pddf.head()

Unnamed: 0,VendorID,TpepPickupDatetime,TpepDropoffDatetime,PassengerCount,TripDistance,RatecodeID,StoreAndFwdFlag,PULocationID,DOLocationID,PaymentType,FareAmount,Extra,MtaTax,TipAmount,TollsAmount,ImprovementSurcharge,TotalAmount,CongestionSurcharge,AirportFee
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.1,1.0,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5,
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.2,1.0,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0,
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.7,1.0,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0,
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.6,1.0,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0,
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5,


In [8]:
pddf.head()["TpepPickupDatetime"].dt.day

0    1
1    1
2    1
3    1
4    1
Name: TpepPickupDatetime, dtype: int32

In [9]:
# %%timeit

# (
#     df
#     [["TpepPickupDatetime", "TripDistance", "TotalAmount", "PaymentType"]]
#     .assign(
#         DateOfWeek = df["TpepPickupDatetime"].dt.day
#     )
#     .set_index("TpepPickupDatetime")
#     .groupby(["PaymentType"])
#     .resample("D")
#     ["TripDistance"]
#     .mean()
# )

# ### 4.16 s ± 50.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [10]:
(
    pddf
    [["TpepPickupDatetime", "TripDistance", "TotalAmount", "PaymentType"]]
    .assign(
        DateOfWeek = pddf["TpepPickupDatetime"].dt.day
    )
    .set_index("TpepPickupDatetime")
    .groupby(["PaymentType"])
    .resample("D")
    ["TripDistance"]
    .mean()
)

PaymentType  TpepPickupDatetime
0            2021-01-01             7.251882
             2021-01-02             6.544945
             2021-01-03            91.774073
             2021-01-04            14.980734
             2021-01-05            19.919565
                                     ...    
4            2021-05-27             2.680473
             2021-05-28             2.632170
             2021-05-29             2.373442
             2021-05-30             2.950102
             2021-05-31             3.019135
Name: TripDistance, Length: 14024, dtype: float64

In [11]:
# lazy dataframe
plldf = pl.scan_parquet(source = os.path.join(PATH_INPUT_FOLDER, f"yellow*.parquet"))

In [12]:
# eager mode
pldf = pl.read_parquet(source = fs[:5]) 

In [13]:
# pandas: 9_344_926   --> 4.16 s --> Todo en Memoria
# polars: 114_842_782 --> 7.99 s --> Leyendo del disco -> procesando en dataset

# %%timeit

# (
#     plldf
#     .select(pl.col(["TpepPickupDatetime", "TripDistance", "TotalAmount", "PaymentType"]))
#     .with_columns(
#         DateOfWeek = pl.col("TpepPickupDatetime").dt.weekday()
#     )
#     .sort("TpepPickupDatetime")
#     .group_by_dynamic(
#         index_column = "TpepPickupDatetime",
#         every = "1h",
#         group_by = "PaymentType"
#     )
#     .agg(
#         MeanTripDistance = pl.col("TripDistance").mean()
#     )
#     .collect()
# )

# ### 7.81 s ± 1.44 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [14]:
plldf.head().collect()

VendorID,TpepPickupDatetime,TpepDropoffDatetime,PassengerCount,TripDistance,RatecodeID,StoreAndFwdFlag,PULocationID,DOLocationID,PaymentType,FareAmount,Extra,MtaTax,TipAmount,TollsAmount,ImprovementSurcharge,TotalAmount,CongestionSurcharge,AirportFee
i64,datetime[ns],datetime[ns],f64,f64,f64,str,i64,i64,i64,f64,f64,f64,f64,f64,f64,f64,f64,f64
1,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.1,1.0,"""N""",142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5,
1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.2,1.0,"""N""",238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0,
1,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.7,1.0,"""N""",132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0,
1,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.6,1.0,"""N""",138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0,
2,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,"""N""",68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5,


In [15]:
plldf.select(pl.col("PassengerCount").is_null()).collect().sum()

PassengerCount
u32
4482126


# ¿Dónde están los nulos?

In [16]:
# eager mode df
(
    pldf
    .select(pl.all().is_null())
    .sum()
    .melt()
    .rename({"variable":"column", "value":"nulls"})
    .filter(pl.col("nulls") > 0)
)

column,nulls
str,u32
"""PassengerCount""",579834
"""RatecodeID""",579834
"""StoreAndFwdFlag""",579834
"""CongestionSurcharge""",579834
"""AirportFee""",4742536


In [17]:
# lazy df
(
    plldf
    .select(pl.all().is_null())
    .sum()
    .melt()
    .rename({"variable":"column", "value":"nulls"})
    .filter(pl.col("nulls") > 0)
    .collect()
)

column,nulls
str,u32
"""PassengerCount""",4482126
"""RatecodeID""",4482126
"""StoreAndFwdFlag""",4482126
"""CongestionSurcharge""",4482126
"""AirportFee""",8644849


# ¿Castear las columnas a un tipo más eficiente?

In [27]:
# https://www.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf

MILE_TO_KM_CONVERTER = 1.60934

# RatecodeID
# 1= Standard rate
# 2=JFK
# 3=Newark
# 4=Nassau or Westchester
# 5=Negotiated fare
# 6=Group ride


# StoreAndFwdFlag
# Yes -> 1, No -> 0

# PaymentType
# 1= Credit card
# 2= Cash
# 3= No charge
# 4= Dispute
# 5= Unknown
# 6= Voided trip

english_to_spanish_dictionary = {
    'VendorID':'La empresa que nos ha suministrado información del viaje',
    'TpepPickupDatetime':'Fecha y hora cuando se ha activado el contador',
    'TpepDropoffDatetime':'Fecha y hora cuando se ha desactivado el contador',
    'PassengerCount':'Nr de viajers (introducido por el conductor)',
    'TripDistance':'La distancia del viaje en Milas según el taximétro',
    'RatecodeID':'El código del tipo de trayecto que se ha realizado',
    'StoreAndFwdFlag':'El viaje no se ha enviado enseguida al server porque no había conexión',
    'PULocationID':'La zona donde se había activado el contador',
    'DOLocationID':'La zona donde se había desactivado el contador',
    'PaymentType':'Tipo de pago',
    'FareAmount':'Importe del viaje según el recorrido',
    'Extra':'Puede ser 0.50$ o bien 1$ por hora punta o bien guardia de noche',
    'MtaTax':'0.50$ de impuesto',
    'TipAmount':'Propina pagada con tarjeta',
    'TollsAmount':'Peajes',
    'ImprovementSurcharge':'Bajada de bandera',
    'TotalAmount':'Importe total cargado',
    'CongestionSurcharge':'Cobro por congestión',
    'AirportFee':'1.25$ cobrados en LaGuardia o aeropuerto de JFK'
}

In [19]:
pldf.columns

['VendorID',
 'TpepPickupDatetime',
 'TpepDropoffDatetime',
 'PassengerCount',
 'TripDistance',
 'RatecodeID',
 'StoreAndFwdFlag',
 'PULocationID',
 'DOLocationID',
 'PaymentType',
 'FareAmount',
 'Extra',
 'MtaTax',
 'TipAmount',
 'TollsAmount',
 'ImprovementSurcharge',
 'TotalAmount',
 'CongestionSurcharge',
 'AirportFee']

In [20]:
pldf.head()

VendorID,TpepPickupDatetime,TpepDropoffDatetime,PassengerCount,TripDistance,RatecodeID,StoreAndFwdFlag,PULocationID,DOLocationID,PaymentType,FareAmount,Extra,MtaTax,TipAmount,TollsAmount,ImprovementSurcharge,TotalAmount,CongestionSurcharge,AirportFee
i64,datetime[ns],datetime[ns],f64,f64,f64,str,i64,i64,i64,f64,f64,f64,f64,f64,f64,f64,f64,f64
1,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.1,1.0,"""N""",142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5,
1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.2,1.0,"""N""",238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0,
1,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.7,1.0,"""N""",132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0,
1,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.6,1.0,"""N""",138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0,
2,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,"""N""",68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5,


In [21]:
(
    pldf
    .select(pl.selectors.numeric())
    .describe()
)

statistic,VendorID,PassengerCount,TripDistance,RatecodeID,PULocationID,DOLocationID,PaymentType,FareAmount,Extra,MtaTax,TipAmount,TollsAmount,ImprovementSurcharge,TotalAmount,CongestionSurcharge,AirportFee
str,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64
"""count""",9344926.0,8765092.0,9344926.0,8765092.0,9344926.0,9344926.0,9344926.0,9344926.0,9344926.0,9344926.0,9344926.0,9344926.0,9344926.0,9344926.0,8765092.0,4602390.0
"""null_count""",0.0,579834.0,0.0,579834.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,579834.0,4742536.0
"""mean""",1.718218,1.41081,6.186446,1.036471,166.003286,162.576939,1.182149,12.553366,1.049816,0.493277,2.031992,0.275359,0.297105,18.217332,2.267508,0.044023
"""std""",0.576921,1.034232,642.342603,0.617817,67.48774,71.522646,0.555135,184.14719,1.255063,0.074794,2.480929,1.52281,0.0411,184.272592,0.760103,0.231375
"""min""",1.0,0.0,0.0,1.0,1.0,1.0,0.0,-643.5,-5.5,-0.5,-333.32,-38.02,-0.3,-647.8,-2.5,-1.25
"""25%""",1.0,1.0,1.03,1.0,125.0,107.0,1.0,6.5,0.0,0.5,0.0,0.0,0.3,11.16,2.5,0.0
"""50%""",2.0,1.0,1.73,1.0,162.0,162.0,1.0,9.0,0.5,0.5,1.95,0.0,0.3,14.3,2.5,0.0
"""75%""",2.0,1.0,3.1,1.0,236.0,236.0,1.0,14.0,2.5,0.5,2.8,0.0,0.3,19.8,2.5,0.0
"""max""",6.0,9.0,332541.19,99.0,265.0,265.0,4.0,398466.38,90.06,3.85,1140.44,823.4,0.3,398469.2,3.0,1.25


In [22]:
pldf.select(pl.col("VendorID").unique())

VendorID
i64
1
2
5
6


In [23]:
# eager
(
    pldf
    .select(
        pl.col(["PULocationID"]).unique().len().alias("NrUniqueValuesPULocationID"),
        NrUniqueValuesDOLocationID = pl.col(["DOLocationID"]).unique().len() # named expressions
    )
)

NrUniqueValuesPULocationID,NrUniqueValuesDOLocationID
u32,u32
263,260


In [24]:
# lazy df
(
    plldf
    .select(
        pl.col(["PULocationID"]).unique().len().alias("NrUniqueValuesPULocationID"),
        NrUniqueValuesDOLocationID = pl.col(["DOLocationID"]).unique().len() # named expressions
    )
    .collect()
)

NrUniqueValuesPULocationID,NrUniqueValuesDOLocationID
u32,u32
263,262


In [25]:
def series_to_list(series):
    return ",".join(list(map(str, list(series))))

In [26]:
(
    pldf
    .filter(
        (~pl.col("PULocationID").is_in(pl.col("DOLocationID").unique())) |
        (~pl.col("DOLocationID").is_in(pl.col("PULocationID").unique()))
    )
    .select(
        UniquePULocationIDNotInDOLocationID = pl.col("PULocationID").unique().map_batches(series_to_list),
        UniqueDOLocationIDNotInPULocationID = pl.col("DOLocationID").unique().map_batches(series_to_list)
    )
)

UniquePULocationIDNotInDOLocationID,UniqueDOLocationIDNotInPULocationID
str,str
"""105,110,199""","""107,178,228,264"""


In [29]:
# %%timeit

# (
#     plldf
#     .filter(
#         (~pl.col("PULocationID").is_in(pl.col("DOLocationID").unique())) |
#         (~pl.col("DOLocationID").is_in(pl.col("PULocationID").unique()))
#     )
#     .select(
#         UniquePULocationIDNotInDOLocationID = pl.col("PULocationID").unique().map_batches(series_to_list),
#         UniqueDOLocationIDNotInPULocationID = pl.col("DOLocationID").unique().map_batches(series_to_list)
#     )
#     .collect()
# )

# 1.49 s ± 60.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


1.49 s ± 60.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [30]:
(
    plldf
    .filter(
        (~pl.col("PULocationID").is_in(pl.col("DOLocationID").unique())) |
        (~pl.col("DOLocationID").is_in(pl.col("PULocationID").unique()))
    )
    .select(
        UniquePULocationIDNotInDOLocationID = pl.col("PULocationID").unique().map_batches(series_to_list),
        UniqueDOLocationIDNotInPULocationID = pl.col("DOLocationID").unique().map_batches(series_to_list)
    )
    .collect()
)

UniquePULocationIDNotInDOLocationID,UniqueDOLocationIDNotInPULocationID
str,str
"""199""","""7,17,41,42,43,48,50,70,79,107,…"


In [None]:
plan = (
    plldf
    .with_columns(
        DayOfWeek = pl.col("TpepPickupDatetime").dt.weekday()
    )
    .filter(
        ~pl.col("PassengerCount").is_null()        
    )
    .select(pl.col(["TpepPickupDatetime", "TripDistance", "TotalAmount", "PaymentType"]))
    .sort("TpepPickupDatetime")
    .group_by_dynamic(
        index_column = "TpepPickupDatetime",
        every = "1h",
        group_by = "PaymentType"
    )
    .agg(
        MeanTripDistance = pl.col("TripDistance").mean()
    )
)

In [None]:
# Polars: Query Engine con API DataFrame

In [None]:
(
    plan
    .collect()
    .head()
)

In [None]:
plan

In [None]:
plan.show_graph(optimized = True)