### Athena + Polars

- query distribuída via Athena

- análise local via Polars

In [3]:
import pandas as pd
import os, sys

sys.path.append(os.path.abspath(".."))

from src.utils import get_athena_connection, read_sql_df
from src.config import DB_ATHENA


# Athena connection
conn = get_athena_connection()
print(f"Connected to Athena database:'{DB_ATHENA}'")

def run_sql(sql: str) -> pd.DataFrame:
    """
    Execute a SQL query on Athena (MovieLens 32M) and return a Pandas DataFrame.
    """
    return read_sql_df(sql, conn=conn)

Connected to Athena database:'movielens32m'


In [5]:
# Funções de apoio para medir tempo (sem magias Jupyter)

import time

def run_sql_timed(sql: str):
    start = time.perf_counter()
    df = read_sql_df(sql, conn=conn)
    elapsed = time.perf_counter() - start
    return df, elapsed



In [7]:
# Benchmark 1 – COUNT(*) em Athena

sql = f"SELECT COUNT(*) FROM {DB_ATHENA}.ratings_parquet;"
df, t = run_sql_timed(sql)
df, t


  df = pd.read_sql(sql, conn)


(      _col0
 0  32000204,
 1.8236644999997225)

In [8]:
# Benchmark 1 – COUNT(*) em Polars (mesmo dataset)

sql = f"SELECT COUNT(*) AS n FROM {DB_ATHENA}.ratings_parquet;"
df_count, t_count = run_sql_timed(sql)



  df = pd.read_sql(sql, conn)


In [None]:
# Função para medir o COUNT(*) em Polars

import polars as pl
import time

# Caminho S3 para os ficheiros Parquet do ratings
RATINGS_S3_PATH = "s3://bdf25-20-movielens/processed/ratings/"

# Lazy scan (não lê os dados até collect())
scan_ratings = pl.scan_parquet(RATINGS_S3_PATH)

def run_polars_count():
    start = time.perf_counter()
    df = scan_ratings.select(pl.count()).collect()
    elapsed = time.perf_counter() - start
    return df, elapsed

df_polars, t_polars = run_polars_count()

df_polars, t_polars


(Deprecated in version 0.20.5)
  df = scan_ratings.select(pl.count()).collect()


(shape: (1, 1)
 ┌──────────┐
 │ count    │
 │ ---      │
 │ u32      │
 ╞══════════╡
 │ 32000204 │
 └──────────┘,
 1.8999501000216696)

In [16]:
#comparação

import pandas as pd

resumo = pd.DataFrame([
    ["COUNT(*)", "Athena", float(t)],
    ["COUNT(*)", "Polars", float(t_polars)]
], columns=["query", "engine", "time_seconds"])

resumo


Unnamed: 0,query,engine,time_seconds
0,COUNT(*),Athena,1.823664
1,COUNT(*),Polars,1.89995
