In [None]:
# Instalar bibliotecas e baixar os dados que serão usados neste demo

!pip install pyspark

!pip uninstall duckdb -y

!pip install duckdb

!wget "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2016-01.parquet" -O "tripdata.parquet"
!wget "https://raw.githubusercontent.com/duckdb/duckdb/main/data/csv/lineitem-carriage.csv" -O "lineitem.csv"
!wget "https://github.com/duckdb/duckdb-data/releases/download/v1.0/example_rn.ndjson"

csv_path = 'lineitem.csv'
parquet_path = 'tripdata.parquet'
json_path = 'example_rn.ndjson'

# Executar SQL Simples

In [None]:
import duckdb
duckdb.sql("SELECT 42").show()

# De/Para arquivos com dados

In [None]:
# Ler CSV
duckdb.from_csv_auto("https://raw.githubusercontent.com/duckdb/duckdb/main/data/csv/lineitem-carriage.csv").show()

In [None]:
# Ler Parquet
duckdb.from_parquet(parquet_path).show()

In [None]:
# Ler Json
duckdb.read_json(json_path).show()

In [None]:
# Criar um banco de dados do DuckDB
con = duckdb.connect("our_db.db")

# Criar tabelas
con.execute(f"CREATE TABLE csv as SELECT * FROM {csv_path}")
con.execute(f"CREATE TABLE json as SELECT * FROM {json_path}")
con.execute(f"CREATE TABLE parquet as SELECT * FROM {parquet_path}")
con.close()

In [None]:
con = duckdb.connect("our_db.db")

csv_rel = con.table("csv")
csv_rel.to_csv("from_duck.csv")

parquet_rel = con.table("parquet")
parquet_rel.to_parquet("from_duck.parquet")

# Comparar Performance (Pandas, DuckDB, PySpark)

In [None]:
import duckdb
import pandas
import pyspark as spark
import time

In [None]:
con = duckdb.connect()
duck_rel = con.from_parquet("tripdata.parquet")
print(duck_rel)

print(duck_rel.count("*"))

In [None]:
def time_function(function):
  res = []
  for i in range (0,5):
    start_time = time.monotonic()
    function()
    end_time = time.monotonic()
    res.append(end_time-start_time)
  res.sort()
  print ("Time: " + str(res[2]))

In [None]:
import duckdb

def duck_avg():
  con = duckdb.connect()
  sql = f""" select passenger_count, avg(tip_amount) as tip_amount from {parquet_path} where trip_distance < 5 group by passenger_count order by passenger_count"""
  return con.execute(sql).df()

time_function(duck_avg)

result_df = duck_avg()
result_df.plot.bar(x="passenger_count", y="tip_amount")

In [None]:
import pandas

def pandas_avg():
  df = pandas.read_parquet("tripdata.parquet")
  return df[df['trip_distance'] < 5].groupby(['passenger_count']).agg(
      avg_tip=('tip_amount', 'mean')).unstack(level=0)

time_function(pandas_avg)

result_df = pandas_avg()
result_df.plot.bar(x="passenger_count", y="tip_amount")

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg

def spark_avg_from_parquet_sql():
  spark=SparkSession.builder.appName("PySpark Read Parquet").getOrCreate()
  s_df = spark.read.parquet(parquet_path)
  s_df.createOrReplaceTempView("yellow_tripdata_2016")
  sql = """ select passenger_count, avg(tip_amount) as tip_amount from yellow_tripdata_2016
  where trip_distance < 5 group by passenger_count order by passenger_count"""
  s_df = spark.sql(sql)
  return s_df.toPandas()

def spark_avg_from_parquet():
  spark=SparkSession.builder.appName("PySpark Read Parquet").getOrCreate()
  s_df = spark.read.parquet(parquet_path)
  s_df = s_df.filter(s_df.trip_distance < 5) \
    .groupBy("passenger_count") \
    .agg(avg("tip_amount").alias("tip_amount")) \
    .orderBy("passenger_count")
  return s_df.toPandas()

  s_df = spark.read.parquet(parquet_path)
  s_df = s_df.filter(s_df.trip_distance < 5) \
    .groupBy("passenger_count") \
    .agg(avg("tip_amount").alias("tip_amount")) \
    .orderBy("passenger_count")
  return s_df.toPandas()

time_function(spark_avg_from_parquet)

result_df = spark_avg_from_parquet()
result_df.plot.bar(x="passenger_count", y="tip_amount")

# DuckDB - PySpark

In [None]:
from duckdb.experimental.spark.sql import SparkSession as session
from duckdb.experimental.spark.sql.functions import lit,col, avg

In [None]:
def duck_spark():
  spark = session.builder.getOrCreate()
  s_df = spark.read.parquet(parquet_path)
  s_df = s_df.filter(s_df.trip_distance < 5) \
    .groupBy("passenger_count") \
    .agg(avg("tip_amount").alias("tip_amount")) \
    .orderBy("passenger_count")
  return s_df.toPandas()


time_function(duck_spark)

result_df = duck_spark()
result_df.plot.bar(x="passenger_count", y="tip_amount")