# Manipulação de dados tabulares - Pandas e alternativas
> Pandas, SQL e comparações (Dask, Polars, Datatable, PandaSQL e DuckDB)

- toc: true 
- badges: true
- comments: true
- categories: [demo, sql, pandas, duckdb, dask, datatable, polars, tabelas, benchmarks]
- image: https://images.unsplash.com/photo-1526716121440-dc3b4f254a0a?ixid=MnwxMjA3fDB8MHxwaG90by1wYWdlfHx8fGVufDB8fHx8&ixlib=rb-1.2.1&auto=format&fit=crop&w=700&q=80

# Introdução

Em machine learning, a gente sempre comenta como nós precisamos de dados. E precisamos de dados bem curados. Talvez você até já tenha ouvido a frase: "garbage in, garbage out", né?

Na vida real, isso é bem verdade. E gastar mais tempo criando novas features, ou limpando os dados acaba muitas vezes impactando os resultados dos modelos mais que qualquer outra coisa. Não é à toa que grande parte dos ganhadores das competições do [Kaggle](https://www.kaggle.com/) usam os mesmo modelos - [ensembles](https://towardsdatascience.com/xgboost-lightgbm-and-other-kaggle-competition-favorites-6212e8b0e835). E, na maior parte, quem ganha são os que tem os melhores dados.

Então, antes de mexermos em modelos, temos que manipular os dados. Mas então como manipular esses dados?

## Pandas

A resposta mais comum é [pandas](https://pandas.pydata.org/)! Se você já mexeu com dados, provavelmente já ouviu falar sobre essa biblioteca. Ela é, de longe, a mais popular pra manipulação de dados tabulares. Ela foi construída em cima de [NumPy](https://numpy.org/), que usa C++ por trás dos panos.

> Em Python temos o GIL que limita o uso de threads pra evitar que mais de um thread tente escrever/apagar as mesmas partes da memória ao mesmo tempo ([condição de corrida](https://pt.wikipedia.org/wiki/Condi%C3%A7%C3%A3o_de_corrida)). Quando usamos Numpy e chegamos no nível de C, o programa não está sujeito ao GIL e então podemos parallelizar as coisas.

Mas se você já ouviu falar de pandas, é capaz que não tenha ouvido coisas boas. Especialmente quem vem da linguagem de programação em R usando o [dplyr](https://dplyr.tidyverse.org/) tem o que falar 😅. O número de perguntas no [StackOverflow](https://stackoverflow.com/) relacionadas à pandas cresceu bastante durante os anos. Parte pelo aumento em popularidade da biblioteca, mas parte pela dificuldade da API do pandas.

![](https://raw.githubusercontent.com/murilo-cunha/inteligencia-superficial/master/images/copied_from_nb/2021-06-13-pandas_and_alternatives/stackoverflow_questions.png "Fonte: https://insights.stackoverflow.com/trends?tags=pandas%2Cnumpy%2Cdask%2Cspacy%2Ctensorflow%2Cpytorch%2Cscikit-learn%2Cpyspark%2Cpostgresql")

Além disso, existem vários pontos em que podemos otimizar a performance de pandas. Isso pelo fato da biblioteca ter sido criada em cima de uma outra. Vamos dar uma olhada na performance de pandas.

### Dados

Vamos usar o TPC-H dataset, focando no `lineitem` e `orders`, que são as maiores tabelas que somam 1GB. Para medir o tempo de execução, vamos unir as duas tabelas, filtram as linhas, agrupam os dados e computamos alguns dados agregados (máximo, mínimo, etc.). O que estamos fazendo em si não é muito importante, mas é um tipo de transformação que encontraríamos na vida real.

Vamos tentar otimizar tudo ao máximo, e vamos focar na transformação dos dados

- Vamos medir o tempo da transformação (excluindo o tempo de ler os dados)
- Vamos filtrar o quanto antes
- Vamos encadear operações

In [1]:
# hide
!pip install duckdb pandas 'dask[dataframe]' datatable polars pandasql > /dev/null
!wget -q https://github.com/cwida/duckdb-data/releases/download/v1.0/lineitemsf1.snappy.parquet
!wget -q https://github.com/cwida/duckdb-data/releases/download/v1.0/orders.parquet

In [2]:
# hide
import dask.dataframe as dd
import datatable as dt
import duckdb
import pandas as pd
import polars as pl
from datatable import by, f, g, join

In [3]:
# hide
lineitem = pd.read_parquet("lineitemsf1.snappy.parquet")
orders = pd.read_parquet("orders.parquet")

In [4]:
%%time
lineitem.merge(orders, left_on="l_orderkey", right_on="o_orderkey").pipe(
    lambda df: df.copy()[
        (df["l_shipdate"] < "1998-09-02")
        & (df["o_orderpriority"].isin(("1-URGENT", "2-HIGH")))
    ]
).groupby(["l_returnflag", "l_linestatus"]).agg(
    {
        "l_extendedprice": ["sum", "min", "max", "mean"],
        "l_quantity": ["sum", "min", "max", "mean"],
    }
)

CPU times: user 5.63 s, sys: 2.34 s, total: 7.97 s
Wall time: 8.09 s


Unnamed: 0_level_0,Unnamed: 1_level_0,l_extendedprice,l_extendedprice,l_extendedprice,l_extendedprice,l_quantity,l_quantity,l_quantity,l_quantity
Unnamed: 0_level_1,Unnamed: 1_level_1,sum,min,max,mean,sum,min,max,mean
l_returnflag,l_linestatus,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2,Unnamed: 9_level_2
A,F,22677200000.0,904.0,104949.5,38303.426664,15123892,1,50,25.545346
N,F,596530200.0,920.0,104049.0,38461.006847,397729,1,50,25.643391
N,O,44734060000.0,901.0,104749.5,38267.695102,29826993,1,50,25.515466
R,F,22683100000.0,906.0,104899.5,38240.140532,15126938,1,50,25.501645


Executando todas essas transformações demorou 10.3 segundos. Mas podemos ser mais eficientes. Por exemplo, poderíamos esquecer várias colunas já que no fim só estamos interessados em `l_returnflag`, `l_linestatus`, `l_extendedprice` e `l_quantity` (além das colunas que usamos para filtrar os dados). Também somos ineficientes quando temos de filtrar e agregar os valores (escaneamos a tabela duas vezes, quando poderíamos fazer isso de uma vez só). Além disso, poderíamos melhorar na implementação de paralelismo e do algoritmo.

Vamos então deixar pandas de lado e ver como podemos melhorar isso.

![](https://media.giphy.com/media/EPcvhM28ER9XW/giphy.gif)

## Dask

[Dask](https://docs.dask.org/en/latest/dataframe.html) provavelmente é a primeira solução que vem à cabeça. Dask basicamente cria várias tabelas em pandas e vai paralelizando as operações em cores diferentes. Também é mais eficiente por usar [avaliação preguiçosa](https://pt.wikipedia.org/wiki/Avalia%C3%A7%C3%A3o_pregui%C3%A7osa) (lazy evaluation) - primeiro traçamos o que queremos fazer e depois mandamos o computador executar as transformações. Isso nos deixa otimizar a computação das transformações (por exemplo, podemos filtrar linhas antes de fazer qualquer outra coisa, mesmo que o código não represente isso).

![](https://media.giphy.com/media/26ufnwz3wDUli7GU0/giphy.gif)

In [5]:
# collapse
# dask
dd_lineitem = dd.from_pandas(lineitem, npartitions=1)
dd_orders = dd.from_pandas(orders, npartitions=1)

In [6]:
%%timeit
dd_lineitem.merge(dd_orders, left_on="l_orderkey", right_on="o_orderkey").pipe(
    lambda df: df.copy()[
        (df["l_shipdate"] < "1998-09-02")
        & (df["o_orderpriority"].isin(("1-URGENT", "2-HIGH")))
    ]
).groupby(["l_returnflag", "l_linestatus"]).agg(
    {
        "l_extendedprice": ["sum", "min", "max", "mean"],
        "l_quantity": ["sum", "min", "max", "mean"],
    }
).compute()

6.14 s ± 20.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


Usando Dask e com essas melhorias de parallelismo a gente ganha alguns segundos. O que faz Dask muito popular é o fato de ele imitar a API do Pandas. Aí fica muito mais fácil de pegar e começar a codar. Mas ainda assim, estamos usando Pandas. Temos espaços para melhorias.

## Polars

[Polars](https://pola-rs.github.io/polars-book/) é uma biblioteca implementada em [Rust](https://www.rust-lang.org/), que é muito eficiente em termos de velocidade e uso de memória. O lugar de Polars (de acordo com a documentação) é para quando os dados são muito grandes para Pandas mas muito pequenos para realizar as transformações num cluster de máquinas (Spark). Ou seja, se os dados são muito grandes pra sua máquina, mesmo depois de filtrar algumas entradas, precisamos procurar outras soluções (o que é verdade pra Dask e todas as outras bibliotecas que vamos ver aqui). E assim como Dask, Polars vai usar todas as threads da sua máquina.

Uma outra idéia de Polars é permitir o use da [avaliação ansiosa](https://pt.wikipedia.org/wiki/Avalia%C3%A7%C3%A3o_ansiosa) (eager evaluation), como em Pandas e da [avaliação preguiçosa](https://pt.wikipedia.org/wiki/Avalia%C3%A7%C3%A3o_pregui%C3%A7osa) (lazy evaluation), como em Dask. A availação ansiosa é como em Pandas, e a API é bem parecida. A API da avaliação preguiçosa é um pouco diferente, e é otimizada pelos mesmo motivos que citamos quando falamos de Dask (paralelização e otimização das transformações).

![](https://media.giphy.com/media/2FmmbTZMl6lQQ/giphy.gif)

In [7]:
# collapse
# polars
pl_lineitem = pl.DataFrame(lineitem)
pl_orders = pl.DataFrame(orders)

In [8]:
%%timeit
pl_lineitem.join(pl_orders, left_on="l_orderkey", right_on="o_orderkey").filter(
    (pl.col("l_shipdate") < "1998-09-02")
    & (
        (pl.col("o_orderpriority") == "1-URGENT")
        | (pl.col("o_orderpriority") == "2-HIGH")
    )
).groupby(["l_returnflag", "l_linestatus"]).agg(
    {
        "l_extendedprice": ["sum", "min", "max", "mean"],
        "l_quantity": ["sum", "min", "max", "mean"],
    }
)

1.87 s ± 1.03 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [9]:
# hide
# using lazy eval
import time

pl_lineitem = pl_lineitem.lazy()
pl_orders = pl_orders.lazy()

In [10]:
# hide
start = time.monotonic()

# lazy dataframe
_df = (
    pl_lineitem.join(pl_orders, left_on="l_orderkey", right_on="o_orderkey").filter(
        (pl.col("l_shipdate") < "1998-09-02")
        & (
            (pl.col("o_orderpriority") == "1-URGENT")
            | (pl.col("o_orderpriority") == "2-HIGH")
        )
    )
    # could not delay collection - error (not finding `o_orderkey` column)
    .collect()
)
# eager dataframe
_df = _df.groupby(["l_returnflag", "l_linestatus"]).agg(
    [
        pl.sum("l_extendedprice"),
        pl.min("l_extendedprice"),
        pl.max("l_extendedprice"),
        pl.mean("l_extendedprice"),
        pl.sum("l_quantity"),
        pl.min("l_quantity"),
        pl.max("l_quantity"),
        pl.mean("l_quantity"),
    ],
)

print(f"Wall time: {time.monotonic() - start:.2f}s")
_df

Wall time: 1.49s


l_returnflag,l_linestatus,l_extendedprice_sum,l_extendedprice_min,l_extendedprice_max,l_extendedprice_mean,l_quantity_sum,l_quantity_min,l_quantity_max,l_quantity_mean
str,str,f64,f64,f64,f64,i64,i64,i64,f64
"""N""","""O""",44734055416.80963,901,104749.5,38267.695101622725,29826993,1,50,25.515466087014545
"""R""","""F""",22683095360.31932,906,104899.5,38240.14053242183,15126938,1,50,25.501644539975555
"""A""","""F""",22677199025.699192,904,104949.5,38303.42666419926,15123892,1,50,25.54534567707304
"""N""","""F""",596530216.2000039,920,104049.0,38461.0068471956,397729,1,50,25.64339136041264


Com isso conseguimos reduzir o tempo em **<30%** (comparando com Dask)! Então, o que ainda podemos melhorar? Quando usamos a avaliação ansiosa, ainda podemos melhorar a execução das queries. Além disso, a biblioteca não tem tantas funções quanto Pandas (e consequentemente Dask). E falando em Pandas, se a API é igual, e você não está satisfeito com ela, provavelmente não vai estar muito contente com a API do Polars também.

## Datatable

Se você gosta de R e está fazendo a transição para python, talvez essa seja uma boa opção. Datatable é inspirada pela biblioteca em R [data.table](https://rdatatable.gitlab.io/data.table/). A API é parecida, e tem várias otimizações para tratar com dados grandes. Que cabem **ou não** em memória (dados que não caberiam em memória usando Pandas cabem quando usamos Datatable). Essa biblioteca também usa algoritmos que supportam o paralelismo em diferentes [threads](https://pt.wikipedia.org/wiki/Thread_(computa%C3%A7%C3%A3o)), aumentando de novo a velocidade de execução.

![](https://media.giphy.com/media/rGlAZysKBcjRCkAX7S/giphy.gif)

In [11]:
# collapse
# datatable
dt_lineitem = dt.Frame(lineitem)
dt_orders = dt.Frame(orders)

# preparação
dt_lineitem.names = {"l_orderkey": "orderkey"}
dt_orders.names = {"o_orderkey": "orderkey"}

In [12]:
%%timeit
# dt_lineitem.key = "orderkey"  # não podemos usar como `key` existem valores repetidos
dt_orders.key = "orderkey"

# executando a query
dt_lineitem[
    (g.o_orderpriority == "1-URGENT") | (g.o_orderpriority == "2-HIGH"),
    :,
    join(dt_orders),
][
    :,
    {
        "sum(l_extendedprice)": dt.sum(f.l_extendedprice),
        "min(l_extendedprice)": dt.min(f.l_extendedprice),
        "max(l_extendedprice)": dt.max(f.l_extendedprice),
        "mean(l_extendedprice)": dt.mean(f.l_extendedprice),
        "sum(l_quantity)": dt.sum(f.l_quantity),
        "min(l_quantity)": dt.min(f.l_quantity),
        "max(l_quantity)": dt.max(f.l_quantity),
        "mean(l_quantity)": dt.mean(f.l_quantity),
    },
    by(f.l_returnflag, f.l_linestatus),
]

1.32 s ± 106 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


**~10%** do tempo (comparando com Pandas)! Você pode reparar também como a API muda bastante. Pessoalmente, foi um pouco mais trabalhoso pra escrever o código equivalente, mas na documentação eles também oferecem uma comparação da API do Datatable com a API do Pandas, o que ajuda bastante.

Além disso, eles também comparam a API do Datatable com SQL. Afinal de contas, SQL é muito usado até hoje, e mais familiar pra muitas pessoas. Além disso, SQL tem sido usado bastante através dos anos e oferece uma maneira eficiente de manipular dados tabulares. Quando os dados aumentam ainda mais e nos tornamos para um método distribuído como [Spark](https://spark.apache.org/) ainda podemos usar [SparkSQL](https://spark.apache.org/docs/latest/sql-programming-guide.html). Spark e SQL precisam de um [cluster](https://pt.wikipedia.org/wiki/Cluster) e de um [servidor](https://pt.wikipedia.org/wiki/Servidor_(computa%C3%A7%C3%A3o)) próprio pra executar as transformações. Mas existem algumas bibliotecas que nos permitem usar SQL para manipular dados em dataframes.

## PandaSQL

PandasSQL é uma solução mais antiga. A biblioteca permite usar SQL (SQLite) em tabelas Pandas. Você só tem que passar a query em SQL e as variáveis locais. Por trás dos panos, eles criam um [SQL engine](https://pt.wikipedia.org/wiki/Mecanismo_de_armazenamento) usando SQLAlchemy e os métodos de `to_sql` e `read_sql` de Pandas. O que faz possível executar queries em Pandas, mas é um truque, que acaba sendo bem ineficiente em relação a tempo de execução e uso de memória #gambiarra

![](https://media.giphy.com/media/L2HCO7avkR5H9MvS9Y/giphy.gif)

In [13]:
# hide_output
from pandasql import sqldf

start = time.monotonic()

_df = sqldf(
    """
SELECT l_returnflag,
       l_linestatus,
       SUM(l_extendedprice),
       MIN(l_extendedprice),
       MAX(l_extendedprice),
       AVG(l_extendedprice),
       SUM(l_quantity),
       MIN(l_quantity),
       MAX(l_quantity),
       AVG(l_quantity)
FROM lineitem AS lineitem
JOIN orders AS orders ON (l_orderkey=o_orderkey)
WHERE l_shipdate <= DATE '1998-09-02'
  AND o_orderpriority IN ('1-URGENT', '2-HIGH')
GROUP BY l_returnflag,
         l_linestatus
""",
    globals(),
)

# o código retorna um erro interno (`OperationalError`) - visite o notebook para mais detalhes
print(f"Wall time: {time.monotonic() - start:.2f}s")
_df

PandaSQLException: (sqlite3.OperationalError) near "'1998-09-02'": syntax error
[SQL: 
SELECT l_returnflag,
       l_linestatus,
       SUM(l_extendedprice),
       MIN(l_extendedprice),
       MAX(l_extendedprice),
       AVG(l_extendedprice),
       SUM(l_quantity),
       MIN(l_quantity),
       MAX(l_quantity),
       AVG(l_quantity)
FROM lineitem AS lineitem
JOIN orders AS orders ON (l_orderkey=o_orderkey)
WHERE l_shipdate <= DATE '1998-09-02'
  AND o_orderpriority IN ('1-URGENT', '2-HIGH')
GROUP BY l_returnflag,
         l_linestatus
]
(Background on this error at: http://sqlalche.me/e/14/e3q8)

In [14]:
# hide
!pip uninstall pandasql -y

Found existing installation: pandasql 0.7.3
Uninstalling pandasql-0.7.3:
  Successfully uninstalled pandasql-0.7.3


Não muito bom... Mas **deve** existir outro jeito de usar SQL em Pandas, e ainda aproveitando a eficiência de SQL, não? Na verdade sim. Acontece que tem gente que basicamente reconstruiu um servidor de SQL pra ser executado em memória, com uma API em python!

## DuckDB

DuckDB permite aos desenvolvedores à executar queries nas suas tabelas em Pandas ou direto de arquivos CSV, parquet, etc. A biblioteca foi desenvolvida com C++ por trás dos panos, e também permite paralelização e otimização da query. Ou seja, eles evitam problemas que encontramos em outras bibliotecas - antes nós tinhamos de escanear a tabela duas vezes quando executamos nossa query: uma para filtrar as linhas e outra vez pra agroupar os dados. Isso não acontece com DuckDB. Eles também são inteligente no uso de memória.

Também pensando no ambiente python, onde Pandas ainda é prevalente, eles oferecem métodos para transformar os dados numa tabela Pandas.

![](https://media.giphy.com/media/vIJaz7nMJhTUc/giphy.gif)

In [15]:
# ler arquivos em parquet e colocar em uma dataframe
lineitem = duckdb.query("SELECT * FROM 'lineitemsf1.snappy.parquet'").to_df()
orders = duckdb.query("SELECT * FROM 'orders.parquet'").to_df()

lineitem.head()

Unnamed: 0,l_orderkey,l_partkey,l_suppkey,l_linenumber,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment
0,1,155190,7706,1,17,21168.23,0.04,0.02,N,O,1996-03-13,1996-02-12,1996-03-22,DELIVER IN PERSON,TRUCK,egular courts above the
1,1,67310,7311,2,36,45983.16,0.09,0.06,N,O,1996-04-12,1996-02-28,1996-04-20,TAKE BACK RETURN,MAIL,ly final dependencies: slyly bold
2,1,63700,3701,3,8,13309.6,0.1,0.02,N,O,1996-01-29,1996-03-05,1996-01-31,TAKE BACK RETURN,REG AIR,"riously. regular, express dep"
3,1,2132,4633,4,28,28955.64,0.09,0.06,N,O,1996-04-21,1996-03-30,1996-05-16,NONE,AIR,lites. fluffily even de
4,1,24027,1534,5,24,22824.48,0.1,0.04,N,O,1996-03-30,1996-03-14,1996-04-01,NONE,FOB,pending foxes. slyly re


In [16]:
%%timeit
# executar uma query na nossa tabela Pandas
duckdb.query(
    """
SELECT l_returnflag,
       l_linestatus,
       SUM(l_extendedprice),
       MIN(l_extendedprice),
       MAX(l_extendedprice),
       AVG(l_extendedprice),
       SUM(l_quantity),
       MIN(l_quantity),
       MAX(l_quantity),
       AVG(l_quantity)
FROM lineitem AS lineitem
JOIN orders AS orders ON (l_orderkey=o_orderkey)
WHERE l_shipdate <= DATE '1998-09-02'
  AND o_orderpriority IN ('1-URGENT', '2-HIGH')
GROUP BY l_returnflag,
         l_linestatus
"""
).to_df()

648 ms ± 5.47 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


Além de ser a mais rápida, repare que DuckDB já pega as variáveis locais quando tentamos executar as queries. A biblioteca também suporta (código testado) vários tipos de SQL, incluindo PostgreSQL, MySQL e SQLite.

# Mais informações

O DuckDB foi a solução mais rápida. Mas como você deve ter imaginado, esse não é sempre o caso. Tudo depende do volume de dados que temos e das transformações que vamos fazer. Os [criadores do Datatable](https://www.h2o.ai/blog/speed-up-your-data-analysis-with-pythons-datatable-package/#:~:text=H2O%20AI%20Hybrid%20Cloud&text=Deploy%20models%20in%20any%20environment,%2C%20and%20real%2Dtime%20monitoring.&text=H2O%20Wave%20enables%20fast%20development,light%2Dweight%20Python%20development%20framework.) também comparam o tempo de execução das queries para diferentes cenários. As comparações são atualizadas com o passar do tempo e você pode encontrar os últimos resultados [aqui](https://h2oai.github.io/db-benchmark/).

Para os interessados, também tem um artigo muito interessante no blog do DuckDB, onde eles explicam um pouco os motivos quais DuckDB tem uma performance elevada à Pandas, e como que podemos otimizar nossas queries (mesmo que só em Pandas). Esse post também foi inspirado no blog post deles (as queries e os dados). Vale à pena dar uma olhada.

Quando falamos de Big Data também não podemos deixar de falar de outras tecnologias, como Spark, Apache Beam, etc. Quando os dados são muito grandes pra uma máquina só, o que nos resta é utilizar um cluster de computadores e paralelizar as computações e dividir o uso de memória - [escalabilidade horizontal](https://pt.wikipedia.org/wiki/Escalabilidade). Mas com esses clusters precisamos de uma parte administrativa maior, e não seria algo que qualquer um poderia testar rapidamente. Nesse post incluímos apenas os casos de quando os dados caberiam em uma máquina, e estamos avaliando a eficiência de bibliotecas em python (existem outras bibliotecas em outras linguagens que também são bem eficientes).

Em outras palavras, nós comparamos algumas ferramentas pra um específico caso, mas pra determinar a **melhor** ferramenta, precisamos ir de caso-a-caso. Qual o volume de dados? Precisamos um cluster? Qual o tempo de execução aceitável? Qual o nível técnico da equipe?

In [None]:
# hide
# deletar os arquivos baixados
!rm lineitemsf1.snappy.parquet
!rm orders.parquet