In [None]:
# Dependencies
!pip install pyarrow, pandas, dask, polars, graphviz

# Gestione Dati su Larga Scala

Lavorando coi dati in Python si usano tipicamente librerie come NumPy e Pandas

NumPy estende Python con gli array e la vettorizzazione delle operazioni.

La funzionalità principale fornita da Pandas è la struttura dati DataFrame, simile a una tabella bidimensionale per rappresentare i dati

## PANDAS È LENTO!

## Alternative a Pandas

**Polars**: libreria per DataFrame scritta in Rust e basata sul formato di array colonna di Apache Arrow

**Dask**: libreria che estende NumPy e Pandas per fare computazioni ottimizzate e suddividere il lavoro fra diversi nodi in un cluster

# Voglio usare Pandas, come posso ottimizzarlo?

Per questa dimostrazione useremo [*Steam Games Dataset*](https://www.kaggle.com/datasets/fronkongames/steam-games-dataset?resource=download), un dataset distribuito su Kaggle di informazioni su vari giochi pubblicati sulla piattaforma Steam

Il dataset ha 85mila righe e 39 colonne

In [None]:
import pandas as pd
pd.set_option('display.max_rows', 6)
pd.set_option("max_colwidth", 15)

In [None]:
%%time

df = pd.read_csv('games.csv')

La lettura è stata abbastanza rapida, ma il dataset è abbastanza piccolo, sebbene abbia molte colonne

Proviamo a ingrandire il dataset

In [None]:
%%time

df_grande = pd.concat([df] * 10, ignore_index=True)

In [None]:
%%time

df_grande.to_csv('big_games.csv', index=False)

In [None]:
!dir

Adesso proviamo a ricaricare il dataset in memoria e comparare i tempi

In [None]:
%%time

df = pd.read_csv('games.csv')

In [None]:
%%time

df_grande = pd.read_csv('big_games.csv')

## Caricare solo i dati utili

Ci servono davvero tutte queste colonne e relative informazioni?

In [None]:
df

Per esempio, facciamo un'analisi sui generi dei giochi e i prezzi medi di ciascuno.
Abbiamo quindi solo bisogni di caricare queste informazioni:
- AppID: id del gioco
- Name: nome del gioco
- Price: prezzo
- Genres: generi del gioco

Versione lenta

Versione rapida

In [None]:
%%time

colonne = ['AppID', 'Name', 'Price', 'Genres']
df = pd.read_csv('big_games.csv')
df[colonne]

In [None]:
%%time

colonne = ['AppID', 'Name', 'Price', 'Genres']
df = pd.read_csv('big_games.csv', usecols=colonne)
df

## Integrare PyArrow

In [None]:
%%time

df = pd.read_csv('big_games.csv', engine='pyarrow')

PyArrow può essere anche utilizzato come backend per la gestione e tipizzazione dei dati

In [None]:
%%time

df = pd.read_csv('big_games.csv', engine='pyarrow', dtype_backend="pyarrow")
df.dtypes

## Usare il chunking (spezzettamento)

Possiamo iterare sul dataset a pezzi (chunk) e performare delle operazioni senza dover caricare l'intero dataset in memoria

In [None]:
prezzi_per_genere = None
with pd.read_csv('big_games.csv', chunksize=85_000) as reader:
    for i, chunk in enumerate(reader):
        chunk['Genres'] = chunk['Genres'].str.split(',')  # trasformiamo i generi in una lista
        chunk_genere_esploso = chunk.explode('Genres')  # ogni riga viene duplicata per ciascun genere nella lista
        
        chunk_prezzo_per_genere = chunk_genere_esploso.groupby('Genres')['Price'].sum()
        
        if prezzi_per_genere is None:
            prezzi_per_genere = chunk_prezzo_per_genere
        else:
            prezzi_per_genere.add(chunk_prezzo_per_genere, fill_value=0)
        
        print(f"Operazioni sul chunk #{i + 1} (lunghezza: {len(chunk)} righe) terminate")

In [None]:
prezzi_per_genere

## Estensioni di Pandas (cuDF e Dask)

In [None]:
%load_ext cudf.pandas
import pandas as pd

# ! La libreria cuDF è installabile solo su UNIX con GPU

In [None]:
%%time

import dask.dataframe as dd

dask_df = dd.read_csv(
    'big_games.csv',
    dtype={
        'Metacritic url': 'object',
        'Reviews': 'object'
    }
)

In [None]:
dask_df

In [None]:
result = dask_df['Price'] ** 2
result = result.mean()

result.dask

In [None]:
%%time

result.compute()

# Salto di qualità: Polars

Polars è un'alternativa a Pandas implementata con Rust, un linguaggio di programmazione all'avanguardia

Polars supporta lazy evaluation, ottimizzazione delle query e tutte le operazioni vengono eseguite tramite Rust

È quindi consigliato utilizzare il più possibile le funzionalità di Polars per non incorrere nel problema del Global Interpreter Lock (GIL) in Python e permettere a Polars di fare computazioni in multiprocessing con Rust

In [None]:
import polars as pl

pl.Config().set_tbl_rows(6)

## I/O in Polars

In [None]:
%%time

# Polars
polars_df = pl.read_csv('big_games.csv', separator=',')

In [None]:
%%time 

# Pandas
pandas_df = pd.read_csv('big_games.csv', sep=',')

In [None]:
%%time

#Polars
polars_df.write_csv('big_games.csv')

In [None]:
%%time

#Pandas
pandas_df.to_csv('big_games.csv', index=False)

### Qual è la principale differenza fra questi dataframe?

In [None]:
polars_df

In [None]:
pandas_df

Polars non usa indice sul dataframe, portando a un risparmio di memoria e complessità causate dall'uso dell'indice su Pandas

Polars propone strumenti diversi da Pandas per interagire con i dati nel DataFrame che portano alla sua estrema performance

Gli strumenti principali sono i contesti (*context*) e le esperessioni (*expressions*)

## Polars Contexts

Un *context* in Polars e il contesto in cui un'espressione viene valutata

Vuol dire che una certa espressione avrà un effetto diverso a seconda del contesto in cui si trova

Polars offre 3 tipi di contesto:
- Selection: `df.select(...)` e `df.with_columns(...)`
- Filtering: `df.filter(...)`
- Aggregation: `df.group_by(...).agg(...)`

### Selection (selezione)

In [None]:
polars_df.select(
    pl.max("Required age"),
    pl.col("About the game").str.len_chars().alias('Text Length "About the game"'),
    (pl.mean("Price") / 100).alias('Price_100')
)

In [None]:
polars_df.select(
    pl.max("Required age")
)

In [None]:
polars_df.select(
    (pl.mean("Price") / 100).alias('Price_100')
)

In [None]:
polars_df.select(
    pl.col("About the game").str.len_chars().alias('Text Length "About the game"')
)

### with_columns (selezione con aggiunta o sovrascrittura)

In Pandas aggiungere delle colonne e sovrascriverle è più macchinoso e necessità più boilerplate code

In [None]:
polars_df.with_columns(
    (pl.col("DLC count") > 0).alias("Has DLC"),
    pl.col("Price") * 100,
    Many_achievements=pl.col("Achievements") > 50
)

### Filtering

Il filtering su Polars può essere visto come l'indicizzazione booleana su Pandas, es. `pandas_df[pandas_df['Price'] > 50]`

In [None]:
polars_df.filter(
    pl.col("Achievements") > 50,
    (pl.col("Recommendations") > 200) & (pl.col("Positive") > pl.col("Negative"))
)

## Polars Expressions

Le espressioni in Polars sono strumenti potenti per effettuare operazioni in maniera parallela in un determinato context

Molti dei comandi che abbiamo visto in precedenza hanno fatto uso di expressions. Per esempio:

In [None]:
pl.col("Price").sort().pow(3).log(10)

Ogni espressione ne genera un'altra, quindi possono essere concatenate per fare operazioni più complesse

### Espressioni numeriche e logiche

In [None]:
polars_df.select(
    pl.col("Price"),
    (pl.col("Price") * 2).alias("Double Price"),
    (pl.col("Price") * pl.col("DLC count") / pl.col("Positive")).alias("Price-DLC-Positive"),
    ((pl.col("Price") > 20) & (pl.col("DLC count") > 1)).alias("Pricey Games with DLC")
)

### Selezione avanzata

Selezione multipla

In [None]:
polars_df.select(
    pl.col(["Price", "DLC count", "Achievements"])
).head(3)

---

Selezione con esclusione

In [None]:
polars_df.select(
    pl.all().exclude("Name", "Release Date")
).head(3)

Una volta selezionate delle colonne possiamo applicare la stessa operazione ad esse

In [None]:
polars_df.select(
    pl.col(["Price", "DLC count", "Achievements"]).exp().cast(pl.String)
).head(3)

Oppure possiamo sfruttare i `selectors` per selezionare con modalità più avanzate

In [None]:
import polars.selectors as cs

polars_df.select(
    cs.numeric().exp().cast(pl.String)
).head(3)

### Funzioni condizionali

Polars supporta le condizioni if-else in maniera simile a Rust con il costrutto when-then-otherwise

In [None]:
polars_df.select(
    pl.when(pl.col("Price") > 15)
    .then(pl.lit("Caro"))
    .otherwise(pl.lit("Economico"))
    .alias("Merita l'acquisto?")
)

### Funzioni per le stringhe

Polars supporta funzioni speciali per le stringhe, a cui è possibile accedere tramite l'accessor `.str` come Pandas

In [None]:
polars_df.select(
    pl.col("Name"),
    pl.col("Name").str.contains("Project").alias("Project Game"),
    pl.col("Name").str.contains(".* (B|W).*").alias("Parole dopo la prima iniziano per B o W"),
    pl.col("Name").str.replace("Train", "Trenitalia").alias("Giochi su Trenitalia"),
    pl.col("Name").str.split(' ').alias("Lista Name")
)

### Dati mancani (null)

Pandas ha vari modi per definire i tipi nulli (`pd.NA`, `None`, `nan`). Polars usa solo `null`, mentre `NaN` (equivalente a `nan`) è un valore float specifico, non un valore nullo.

In [None]:
null_df = polars_df.select(
    pl.col("Price"),
    pl.col("Tags"),
    pl.col("Tags").is_null().alias("null")
)
null_df

In [None]:
null_df.select(
    pl.col("Price"),
    (0 / pl.col("Price")).alias("10_diviso_Price"),
    (0 / pl.col("Price")).is_null().alias("null")
)

### Aggregazioni colonnari (window functions)

Prima di vedere le aggregazioni vere e proprie, vediamo le *window functions*, un tipo di aggregazioni che Polars fornisce per aggregare valori sulla base dei valori di altre colonne.

In [None]:
polars_df.sort("Required age").select(
    pl.col("Required age"),
    pl.col("Price").mean().over("Required age")
)

Le operazioni window non sono semplici da utilizzare, ma sono molto potenti quando necessarie

### Operazioni orizzontali e Folds

Polars permette anche di aggregare le colonne in maniera orizzontale.

Questo può essere fatto tramite delle funzioni fornite da Polars oppure tramite i *folds* per eseguire operazioni di riduzione scritte dall'utente.

In [None]:
polars_df.sort("DLC count", descending=False).select(
    pl.col(["Price", "DLC count"]),
    pl.sum_horizontal(
        "Price", "DLC count"
    ).alias("price_dlc_sum")
).tail()

In [None]:
polars_df.sort("DLC count", descending=False).select(
    pl.col(["Price", "DLC count"]),
    pl.fold(
        acc=pl.lit(1),
        function=lambda acc, x: acc * x,
        exprs=pl.col(["Price", "DLC count"])
    ).alias("price_dlc_prod")
).tail()

### Liste e Array

A differenza di Pandas, Polars supporta in maniera nativa liste e array, senza doverle rappresentare come semplici `object`

In [None]:
list_df = polars_df.select(
    pl.col("Name"),
    pl.col("Tags").str.split(',')
)
list_df.head()

In [None]:
list_df.with_columns(
    pl.col("Tags").list.contains("Indie").alias("Indie")
).slice(5, 5)

Si possono fare diverse operazioni sulle liste. Per esempio rimuovere da tutti i termini *Action* e *Adventure*.

In [None]:
list_df.with_columns(
    pl.col("Tags").list.set_difference(["Action", "Adventure"]).alias("no Action or Adventure")
).slice(5, 5)

A volte può essere utile ripetere lo stesso dato e associare a ciascuno valori diversi. Questo si fa tramite l'*esplosione* della lista, un concetto comune anche a Pandas

In [None]:
list_df.explode('Tags')

## Aggregazioni (group_by)

In [None]:
polars_df.group_by("Required age").agg(
    pl.col("Price").mean().round(2).alias("Avg Price"),
    pl.col("Tags").str.split(',').len().alias("# Tags")
).sort("Required age").head()

In [None]:
polars_df.group_by("Required age").agg(
    (pl.col("Price") > 30).sum().alias("# Cari"),
    (pl.col("Price") < 10).sum().alias("# Economici")
).sort("Required age").head()

Queste operazioni sono ottimizzate perché vengono parallelizzate in Rust. Se applicate una funzione Python a ogni gruppo ciclato da `group_by` allora il codice non verrà parallelizzato (a meno che la funzione non utilizzi solo Polars expressions).

## Join

Estraiamo dal dataset tutti i giochi per cui il gruppo di età richiesta ha un prezzo medio maggiore di 10. Questo può essere fatto tramite una group_by e poi una join col nuovo dataframe.

In [None]:
avg_price_per_age = (
    polars_df
    .group_by("Required age")
    .agg(
        pl.col("AppID"),
        pl.col("Price").mean().round(2).alias("Avg Price")
    )
    .filter(pl.col("Avg Price") > 10)
    .explode("AppID")
)
avg_price_per_age

In [None]:
polars_df.join(
    avg_price_per_age,
    on="AppID",
    how="semi"
)

## Lazy API

L'API Lazy (pigra) di Polars esegue le operazioni solo quando richiesto, permettendo a Polars di ottimizzare il processo di computazione e fornendo pure strumenti per lavorare con dataset di grandi dimensioni. Per sfruttare la Lazy API su un file bisogna *scansionare* (`scan_csv`) il file.

In [None]:
polars_df.lazy()

In [None]:
pl.scan_csv('big_games.csv')

L'operazione genera un grafo computazionale (simile a quello usato dalle librerie di ML come TensorFlow e PyTorch)

In [None]:
result = (
    polars_df.lazy()
    .group_by("Required age")
    .agg(
        pl.col("AppID"),
        pl.col("Price").mean().round(2).alias("Avg Price")
    )
    .filter(pl.col("Avg Price") > 10)
    .explode("AppID")
)
result.show_graph()

Una volta generato il grafo, si può chiamare il metodo `collect()` per ottenere il risultato

In [None]:
result.collect()

A quanto ammontano l'ottimizzazione fornita dalla Lazy API di Polars?

In [None]:
%%time

lazy_df = pl.scan_csv('big_games.csv')

result = (
    lazy_df
    .group_by("Required age")
    .agg(
        pl.col("AppID"),
        pl.col("Price").mean().round(2).alias("Avg Price")
    )
    .filter(pl.col("Avg Price") > 10)
    .explode("AppID")
)
result.collect()

In [None]:
%%time

eager_df = pl.read_csv('big_games.csv')
(
    eager_df.group_by("Required age")
    .agg(
        pl.col("AppID"),
        pl.col("Price").mean().round(2).alias("Avg Price")
    )
    .filter(pl.col("Avg Price") > 10)
    .explode("AppID")
)

# Comparazione operazioni e velocità con Pandas

Iniziamo con l'esempio precedente sfruttando la Lazy API di Polars

In [None]:
%%time

lazy_df = pl.scan_csv('big_games.csv')

result = (
    lazy_df
    .group_by("Required age")
    .agg(
        pl.col("AppID"),
        pl.col("Price").mean().round(2).alias("Avg Price")
    )
    .filter(pl.col("Avg Price") > 10)
    .explode("AppID")
)
result.collect()

In [None]:
%%time

pandas_df = pd.read_csv('big_games.csv')

agg_params = {
    "AppID": pd.NamedAgg(
        column="AppID", aggfunc=lambda x: x.tolist()
    ),
    "Avg Price": pd.NamedAgg(
        column="Price", aggfunc=lambda x: x.mean().round(2)
    )
}
pandas_df.groupby("Required age").agg(
    **agg_params
).query(
    "`Avg Price` > 10"
).explode("AppID") 

Per operazioni semplici, la Lazy API può però portare all'esecuzione di codice superfluo e oneroso

In [None]:
%%time

lazy_df.with_columns(
    (pl.col("Achievements") * 1000).pow(2) / 7
).collect()

In [None]:
%%time

pandas_df["Achievements"] = (
    (pandas_df["Achievements"] * 1000).pow(2) / 7
)
pandas_df

Lo stesso esempio senza utilizzare la Lazy API è molto più performante e mediamente migliore di Pandas

In [None]:
%%time

polars_df.with_columns(
    (pl.col("Achievements") * 1000).pow(2) / 7
)

In [None]:
%%time

pandas_df["Achievements"] = (
    (pandas_df["Achievements"] * 1000).pow(2) / 7
)
pandas_df

## Aggregazioni

In [None]:
%%time

(
    polars_df
    .group_by("Required age")
    .agg(
        pl.col("AppID"),
        pl.col("Price").mean().round(2).alias("Avg Price")
    )
    .filter(pl.col("Avg Price") > 10)
    .explode("AppID")
)

In [None]:
%%time

agg_params = {
    "AppID": pd.NamedAgg(
        column="AppID",
        aggfunc=lambda x: x.tolist()
    ),
    "Avg Price": pd.NamedAgg(
        column="Price",
        aggfunc=lambda x: x.mean().round(2)
    )
}
pandas_df.groupby("Required age").agg(
    **agg_params
).query(
    "`Avg Price` > 10"
).explode("AppID")

## Join

Sfruttiamo l'esempio del join con polars.

Creiamo quindi sia per Polars che per Pandas il secondo dataframe da joinare

!! Non usate il metodo `join` di Pandas perché dà risultati errati. Usate il metodo `merge`

In [None]:
polars_avg_price_per_age = (
    polars_df
    .group_by("Required age")
    .agg(
        pl.col("AppID"),
        pl.col("Price").mean().round(2).alias("Avg Price")
    )
    .filter(pl.col("Avg Price") > 10)
    .explode("AppID")
).unique()

In [None]:
agg_params = {
    "AppID": pd.NamedAgg(
        column="AppID",
        aggfunc=lambda x: x.tolist()
    ),
    "Avg Price": pd.NamedAgg(
        column="Price",
        aggfunc=lambda x: x.mean().round(2)
    )
}

pandas_avg_price_per_age = (
    pandas_df.groupby("Required age").agg(
        **agg_params
    )
    .query(
        "`Avg Price` > 10"
    )
    .explode("AppID")
).drop_duplicates()

Ora possiamo calcolare i tempi della sola operazione join (per Pandas usiamo un escamotage dato che non supporta il *semi* join)

In [None]:
%%time

polars_df.join(
    polars_avg_price_per_age,
    on="AppID",
    how="semi"
)

In [None]:
%%time

(
    pandas_df.merge(
        pandas_avg_price_per_age, 
        on="AppID", 
        how="outer", 
        indicator=True
    )
    .query('_merge == "both"')
    .drop(columns="_merge")
)

E se usassimo una modalità di join supportata da entrambe le librerie?

In [None]:
%%time

polars_df.join(
    polars_avg_price_per_age,
    on="AppID",
    how="inner"
)

In [None]:
%%time

pandas_df.merge(
    pandas_avg_price_per_age, 
    on="AppID",
    how="inner"
)