# Comparativa de uso de Pandas, Duckdb, Polars, PySpark

Vamos a hacer uso de las diferentes herramientas para tener un punto común dónde poder revisar cómo hacer las operaciones más comunes y que sirva como guia.

Nos basaremos en lo que Polars llama [Contextos](https://pola-rs.github.io/polars/user-guide/concepts/contexts/):

1. Ingesta de datos
1. Manejo básico de atributos 
1. Selección
1. Filtrado
1. Nuevas columnas
1. Agrupación / Agregación
1. Gestión de valores nulos
1. Borrado de columnas


---
## APIs

- [pandas](https://pandas.pydata.org/docs/reference/index.html)
- [polars API lazy](https://pola-rs.github.io/polars/user-guide/concepts/lazy-vs-eager/#when-to-use-which)
- [polars API Streaming](https://pola-rs.github.io/polars/user-guide/concepts/lazy-vs-eager/#when-to-use-which)
- [duckdb API](https://duckdb.org/docs/api/python/reference/)
- [pyspark](https://spark.apache.org/docs/latest/api/python/reference/index.html)


## Tipos de datos

- [Tipo de datos de Pandas](https://pandas.pydata.org/docs/user_guide/basics.html#basics-dtypes)
- [Tipo de datos de Polars](https://pola-rs.github.io/polars/user-guide/concepts/data-types/)
- [Tipo de datos de DuckDB](https://duckdb.org/docs/sql/data_types/overview)
- [Tipo de datos de PySpark](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/data_types.html)

In [None]:
import pandas as pd
import polars as pl
import duckdb

### PySpark conlleva la instalación previa de los siguientes elementos:

```shell
# Instalar java
brew install java
brew info java
sudo ln -sfn /opt/homebrew/opt/openjdk/libexec/openjdk.jdk /Library/Java/JavaVirtualMachines/openjdk.jdk
echo 'export PATH="/opt/homebrew/opt/openjdk/bin:$PATH"' >> ~/.zshrc
java --version

# Instalar apache spark
brew install apache-spark

# Dentro de .zshrc poner:
# Variables para correr pyspark en jupyter notebook
# visto en https://www.bmc.com/blogs/jupyter-notebooks-apache-spark/
export PYSPARK_DRIVER_PYTHON='jupyter'
export PYSPARK_DRIVER_PYTHON_OPTS='lab'
export PYSPARK_PYTHON='python3'
```


In [None]:
# https://medium.com/@sujathamudadla1213/sparkcontext-vssparksession-c7c991af95
# https://www.guru99.com/pyspark-tutorial.html
# https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html
from pyspark.sql import SparkSession
#import pyspark.sql.functions as func

# Create a SparkSession
spark = SparkSession.builder \
    .appName("SparkSessionExample") \
    .master("local[*]") \
    .getOrCreate()

---
## Ingesta de datos

In [None]:
parquet_file = '../data/yellos_tripdata_2022.parquet'
csv_file = '../data/yellow_tripdata_2022.csv'

In [None]:
#dfpl.sample(n=1000, seed=42).write_parquet('../data/mini_yellows.parquet')
parquet_file = '../data/mini_yellows.parquet'

In [None]:
# pandas
dfpd= pd.read_parquet(parquet_file)
dfpd.shape

In [None]:
# polars
dfpl = pl.read_parquet(parquet_file)
dfpl.shape

**duckdb**

Existen varias formas para cargar cargar ficheros en memoria, asignándolos a una variable o creando una BD en memoria:

```python
db = duckdb.read_parquet(parquet_file)
db.shape
```

*En este notebook cargaremos la tabla en memoria. [DB API](https://duckdb.org/docs/api/python/dbapi)*

**Referencias interesantes:**

[Cuál es la diferencia entre duckdb.sql y duckdb.execute](https://stackoverflow.com/a/77067172)

[Diferentes opciones de ingesta](https://duckdb.org/docs/api/python/data_ingestion)

[Referencia SQL](https://duckdb.org/docs/sql/introduction)

[Eficiente SQL con Pandas sobre Duckdb](https://duckdb.org/2021/05/14/sql-on-pandas.html)

*ToDo*

Revisar: 
- https://www.analyticsvidhya.com/blog/2021/12/the-guide-to-data-analysis-with-duckdb/
- https://learnsql.com/blog/sql-basics-cheat-sheet/


In [None]:
# duckdb
con = duckdb.connect(database=':memory:')
# Creamos una variable con el nombre de la BD.
table_name = 'db'

sql = f'''
create table {table_name} as select * from '{parquet_file}';
'''
con.sql(sql)

shape = f'''
-- En DuckDB, combinar la consulta para obtener filas y columnas en una tabla
SELECT
    (SELECT COUNT(*) FROM {table_name}) AS num_filas,
    (SELECT COUNT(*) FROM information_schema.columns WHERE table_name = '{table_name}') AS num_columnas;
'''
print(con.sql(shape))
print(f"Tipo de dato usando `.sql` {type(con.sql(shape))}")
print(f"Tipo de dato usando `.execute` {type(con.execute(shape))}")
# También se puede hacer directamente
print(f"\nDe forma directa:", con.sql(f'from {table_name};').shape)

In [None]:
spk = spark.read.parquet(parquet_file)
print(f"{spk.count()}, {len(spk.columns)}")

Nota:
> `Duckdb` tiene funciones para poder convertir la tablas leidas en memoria a dataframes, tanto de `Polars` como de `Pandas`:
```python
# Dataframe de Polars
dfpl = duckdb.read_parquet('../data/yellos_tripdata_2022.parquet').pl() 
# Dataframe de Pandas
dfpd = duckdb.read_parquet('../data/yellos_tripdata_2022.parquet').df()
```

---
## Manejo básico de atributos

In [None]:
# pandas
dfpd.describe()

In [None]:
# polars
dfpl.describe()

In [None]:
# duckdb
# Estadísticas descriptivas para una tabla en DuckDB, en caso de tener los valores en una variable
# db.describe()
# En este caso la salida del describe de la BD es otro.
print(con.sql('describe db;'))

# Para conseguir la salida de describe de los dataframes con un reporte matemático de valores podemos utizar:
sql = f'''
FROM {table_name}
'''
con.sql(sql).describe()

In [None]:
# pyspark
spk.describe().show(truncate=False)
spk.summary().show(truncate=False)
# La salida está truncada y no hay forma de cambiarlo. Si queremos verlo mejor, simplemente lo convertimos a un DataFrame de Pandas. 
spk.summary().toPandas()

In [None]:
# pandas
print(f"{dfpd.info()}\n")
print(f"Columnas: \n{dfpd.columns}\n")
print(f"Tipos de datos: \n{dfpd.dtypes}\n")
print(f"Uso de memoria: \n{dfpd.memory_usage()}\n")

In [None]:
# polars
print(f"Columnas: \n{dfpl.columns}\n")
print(f"Tipos de datos: \n{dfpl.dtypes}\n")
print(f"Tamaño estimado en memoria: \n{dfpl.estimated_size('mb')}\n")

In [None]:
# duckdb

print(con.sql(f"select * from information_schema.tables WHERE table_name = '{table_name}'"))
print(f"Columnas: \n{con.sql('from db').columns}\n")
print(f"Tipos de datos: \n{con.sql('from db').dtypes}\n")

In [None]:
# pyspark
print(f"{spk.printSchema()}\n")
print(f"Columnas: \n{spk.columns}\n")
print(f"Tipos de datos: \n{spk.dtypes}\n")

---
## Selección

In [None]:
# pandas
# dfpd.loc[:,['VendorID', 'trip_distance']].head()
dfpd[['VendorID', 'trip_distance']].head()

In [None]:
# polars
dfpl.select(
    pl.col('VendorID'),
    pl.col('trip_distance'),
).head()

In [None]:
# duckdb
sql = '''
SELECT VendorID, trip_distance
FROM db
LIMIT 5;
'''
con.sql(sql).show()


In [None]:
# pyspark
spk.select('VendorID', 'trip_distance').limit(5).show()

---
## Filtrado

In [None]:
# pandas
dfpd[dfpd.trip_distance > 25]

In [None]:
# polars
dfpl.filter(pl.col('trip_distance') > 25)

In [None]:
# duckdb
sql = '''
from db
where trip_distance > 25;
'''
con.sql(sql).show()

In [None]:
# pyspark
spk.filter(spk.trip_distance > 25).show(truncate=False)

---
## Nuevas columnas

In [None]:
# pandas
dfpd[['VendorID', 'trip_distance']][dfpd.trip_distance > 25].assign(new_col=0)

In [None]:
# polars
dfpl.select(pl.col('VendorID'),pl.col('trip_distance'),).filter(pl.col('trip_distance') > 25).with_columns(new_col= pl.lit(0))

In [None]:
# duckdb
sql = '''
SELECT VendorID, trip_distance, 0 as new_col
FROM db
WHERE trip_distance > 25;
'''
con.sql(sql).show()

In [None]:
# Si se quiere añadir una columna de forma fija utilizar el ALTER TABLE
sql = f'''ALTER TABLE {table_name} ADD COLUMN new_col INTEGER DEFAULT 0;'''
con.sql(sql)
print(con.query(f'from {table_name} limit 5;'))
# Dejamos la BD como estaba inicialmente
con.query(f'ALTER TABLE {table_name} drop COLUMN new_col;')
print(con.query(f'select * from {table_name} limit 5;'))

In [None]:
# pyspark
from pyspark.sql.functions import lit
spk.select('VendorID', 'trip_distance').filter(spk.trip_distance > 25).withColumn('new_col', lit(0)).show()

---
## Agrupación / Agregación

In [None]:
# pandas
dfpd.groupby('VendorID').agg({'trip_distance': 'sum'}).sort_values('trip_distance', ascending=False)

In [None]:
# polars
dfpl.group_by("VendorID").agg(pl.col("trip_distance").sum()).sort("trip_distance", descending=True)

In [None]:
# duckdb
sql = f'''
select VendorID, sum(trip_distance) as Distance
from {table_name}
group by VendorID
order by Distance desc;
'''
con.sql(sql)

In [None]:
# pyspark
from pyspark.sql.functions import sum, desc
spk.groupBy('VendorID').agg(sum('trip_distance').alias('trip_distance')).sort(desc("trip_distance")).show()

---
## Borrado de columas

In [None]:
# Nos ayudamos para copiar los campos
# dfpd.columns.tolist()

In [None]:
# pandas
print(f"Columnas totales: {len(dfpd.columns.tolist())}")
print(f"Columnas totales despues del borrado: {len(dfpd.drop(['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count'], axis=1).columns.tolist())}")
# Este borrado no se hace efectivo ya que no machacamos el DataFrame original
print(f"Columnas totales: {len(dfpd.columns.tolist())}")

In [None]:
# polars
print(f"Columnas totales: {len(dfpl.columns)}")
print(f"Columnas totales despues del borrado: {len(dfpl.drop(['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count']).columns)}")

In [None]:
# duckdb
sql = f'''
select * EXCLUDE (tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count)
from {table_name};
'''
print(f"Columnas totales: {len(con.sql(f'from {table_name};').columns)}")
print(f"Columnas totales despues del borrado: {len(con.sql(sql).columns)}")

In [None]:
# pyspark
print(f"Columnas totales: {len(spk.columns)}")
print(f"Columnas totales despues del borrado: {len(spk.drop('tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count').columns)}")


---
## Gestión de valores nulos

In [None]:
# pandas
# https://pandas.pydata.org/docs/user_guide/missing_data.html

In [None]:
# polars
# https://pola-rs.github.io/polars/user-guide/expressions/null/

In [None]:
# duckdb
# https://duckdb.org/docs/sql/data_types/nulls.html

In [None]:
# pyspark
# https://pub.towardsai.net/handle-missing-data-in-pyspark-3b5693fb04a4

---

In [None]:
# pandas

In [None]:
# polars

In [None]:
# duckdb

In [None]:
# pyspark

---

In [None]:
# pandas

In [None]:
# polars

In [None]:
# duckdb

In [None]:
# pyspark

In [None]:
# pandas

In [None]:
# polars

In [None]:
# duckdb

In [None]:
# pyspark