In [1]:
%pip install --upgrade pip
%pip install  duckdb
%pip install   numpy
%pip install  pandas
%pip install fastavro
%pip install pyspark

[0mNote: you may need to restart the kernel to use updated packages.
[0mNote: you may need to restart the kernel to use updated packages.
Collecting numpy
  Downloading numpy-1.24.4-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.6 kB)
Downloading numpy-1.24.4-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (17.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m17.3/17.3 MB[0m [31m38.8 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: numpy
Successfully installed numpy-1.24.4
[0mNote: you may need to restart the kernel to use updated packages.
Collecting pandas
  Downloading pandas-2.0.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (18 kB)
Collecting tzdata>=2022.1 (from pandas)
  Downloading tzdata-2025.3-py2.py3-none-any.whl.metadata (1.4 kB)
Downloading pandas-2.0.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (12.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [1]:
import duckdb
con = duckdb.connect("benchmark.db")


In [2]:
import pandas as pd
import numpy as np
import random
from datetime import date

def gen_random_int_dataframe(scale_factor):
    target_bytes = int(scale_factor*1024**3)  # 1GB
    bytes_per_row = 10 * 8  # 10 int64 columns
    n_rows = target_bytes // bytes_per_row

    print(f"  - Estimated raw size: {n_rows * bytes_per_row / 1024**3:.2f} GB")

    schema = {
        'doc': '',
        'name': 'table',
        'namespace': '',
        'type': 'record',
        'fields': [{'name': f'column_{i}', 'type': 'int'} for i in range(10)]
    }
    
    # Create DataFrame with random integers
    df = pd.DataFrame({
        f'column_{i}': np.random.randint(
            low=np.iinfo(np.int32).min,
            high=np.iinfo(np.int32).max,
            size=n_rows,
            dtype=np.int64
        ) for i in range(10)
    })
    return df, schema


def gen_random_string(length):
    return ''.join([chr(random.randint(30, 256)) for i in range(length)])


def gen_random_string_dataframe(scale_factor, string_size=16):
    target_bytes = int(scale_factor*1024**3)  # 1GB
    bytes_per_row = 10 * string_size  # 10 string columns of string_size bytes
    n_rows = target_bytes // bytes_per_row

    print(f"  - Estimated raw size: {n_rows * bytes_per_row / 1024**3:.2f} GB")
    
    schema = {
        'doc': '',
        'name': 'table',
        'namespace': '',
        'type': 'record',
        'fields': [{'name': f'column_{i}', 'type': 'string'} for i in range(10)]
    }

    # Create DataFrame with random integers
    df = pd.DataFrame({
        f'column_{i}': [gen_random_string(string_size) for _ in range(n_rows)] for i in range(10)
    })
    return df, schema


def gen_dicted_string_dataframe(scale_factor, dict_size=16,string_size=16):
    target_bytes = int(scale_factor*1024**3)  # 1GB
    bytes_per_row = 10 * string_size  # 10 string columns of string_size bytes
    n_rows = target_bytes // bytes_per_row
    
    dicted_strings = [gen_random_string(string_size) for _ in range(dict_size)]

    print(f"  - Estimated raw size: {n_rows * bytes_per_row / 1024**3:.2f} GB")
    
    schema = {
        'doc': '',
        'name': 'table',
        'namespace': '',
        'type': 'record',
        'fields': [{'name': f'column_{i}', 'type': 'string'} for i in range(10)]
    }

    # Create DataFrame with random integers
    df = pd.DataFrame({
        f'column_{i}': [dicted_strings[random.randint(0, dict_size - 1) % dict_size] for _ in range(n_rows)] for i in range(10)
    })
    return df, schema


def gen_random_date(start_date=date(2010, 1, 1), end_date=date(2025, 1, 1)):
    start_ordinal = start_date.toordinal()
    end_ordinal = end_date.toordinal()
    
    random_ordinal = random.randint(start_ordinal, end_ordinal)
    
    return date.fromordinal(random_ordinal)


def gen_random_dates_dataframe(scale_factor):
    target_bytes = int(scale_factor*1024**2)  # 1GB
    bytes_per_row = 10 * 30  # 10 string columns of string_size bytes
    n_rows = target_bytes // bytes_per_row
    
    schema = {
        'doc': '',
        'name': 'table',
        'namespace': '',
        'type': 'record',
        'fields': [{'name': f'column_{i}', 'type': 'date'} for i in range(10)]
    }

    df = pd.DataFrame({
        f'column_{i}': [gen_random_date() for _ in range(n_rows)] for i in range(10)
    })
    return df, schema


In [76]:
tables_scaled = {}
for scale in [0.01, 0.1, 1, 5]: # 2, 3
    tables_scaled[scale] = {"ints": gen_random_int_dataframe(scale), "random_strings": gen_random_string_dataframe(scale), "dicted_strings": gen_dicted_string_dataframe(scale), "dates": gen_random_dates_dataframe(scale)}


  - Estimated raw size: 0.01 GB
  - Estimated raw size: 0.01 GB
  - Estimated raw size: 0.01 GB
  - Estimated raw size: 0.02 GB
  - Estimated raw size: 0.02 GB
  - Estimated raw size: 0.02 GB


In [None]:
def get_table_name(format_name, table_type, compression="default"):
    return f"{table_type}_compression_{compression}_{format_name}"
def get_file_name(format_name, table_type, compression="default"):
    return  f"{table_type}_compression_{compression}.{format_name}"
def get_s3_path(file_name):
    return "s3a://benchmark/"+file_name

AVRO = "avro"
ORC = "orc"
PARQUET = "parquet"
FILE_SCHEMAS = [AVRO, ORC,PARQUET]
compressions = {ORC: ["zstd", "snappy", "lz4", "none", "zlib"], AVRO: ["snappy", "deflate", "bzip2", "zstandard", "xz"], PARQUET: ["snappy", "gzip", "zstd", "lz4", "none"]}

In [78]:
import time

TABLE_WRITE_TIME="TABLE_WRITE_TIME_SEC"
TABLE_FULL_SCAN_TIME="TABLE_FULL_SCAN_TIME_SEC"
prev_time_start = {}

def measureTimeStart(metric):
    prev_time_start[metric] = time.perf_counter()

def measureTimeEnd(metric, data):
    data[metric] = time.perf_counter() - prev_time_start[metric]

time_metrics = {}

In [79]:
import os
FILE_SIZE_MB="FILE_SIZE_MB"
DF_DICT_TYPE="records"
size_metrics = {}

def measureFileSize(file, data):
    data[FILE_SIZE_MB] = os.path.getsize(file)/1e6 


In [80]:
for cur_schema in FILE_SCHEMAS:
    time_metrics[cur_schema] = {}
    size_metrics[cur_schema] = {}
    for scale, tables in tables_scaled.items():
        for table_name, table_schema in tables.items():
            for compression in compressions[cur_schema] + ["default"]:
                time_metrics[cur_schema][get_table_name(cur_schema, table_name,compression)] = {}
                size_metrics[cur_schema][get_table_name(cur_schema, table_name,compression)] = {}

In [81]:
# spark.stop()

In [82]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import col, count, avg, sum, min, max, desc
import sys
import time
import os
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MinIO Write") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262,org.apache.spark:spark-avro_2.12:3.5.3") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin123") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio-server:9000") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.connection.timeout", "60000") \
    .config("spark.hadoop.fs.s3a.socket.timeout", "60000") \
    .config("spark.hadoop.fs.s3.connection.timeout", "60000") \
    .config("spark.hadoop.fs.s3.socket.timeout", "60000") \
    .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "12g") \
    .config("spark.executor.memoryOverhead", "2g") \
    .config("spark.driver.memoryOverhead", "1g") \
    .getOrCreate()


In [83]:
# START TABLE CREATION

In [84]:
# %%timeit -r 1 -n 2
from fastavro import writer, parse_schema

cur_schema=AVRO
for scale, tables in tables_scaled.items():
    for base_table_name, table_schema in tables.items():
        for compression in compressions[cur_schema]:
            if base_table_name == "dates" and cur_schema == AVRO:
                continue
            table, schema = table_schema
    
            records = table.to_dict(DF_DICT_TYPE)
            measureTimeStart(TABLE_WRITE_TIME)
            parsed_schema = parse_schema(schema)
    
            file_name=get_file_name(cur_schema, base_table_name,compression)
            table_name = get_table_name(cur_schema, base_table_name,compression)
            #with open(file_name, 'wb') as file:
            #    writer(file, parsed_schema, records)
            spark_df = spark.createDataFrame(table)
            spark_df.write.format("avro").mode("overwrite").option("compression", compression).save(file_name) # get_s3_path(file_name))
            
            measureTimeEnd(TABLE_WRITE_TIME, time_metrics[cur_schema][table_name])


26/02/13 13:50:15 WARN TaskSetManager: Stage 235 contains a task of very large size (1713 KiB). The maximum recommended task size is 1000 KiB.
26/02/13 13:50:17 WARN TaskSetManager: Stage 236 contains a task of very large size (1713 KiB). The maximum recommended task size is 1000 KiB.
26/02/13 13:50:19 WARN TaskSetManager: Stage 237 contains a task of very large size (1713 KiB). The maximum recommended task size is 1000 KiB.
26/02/13 13:50:21 WARN TaskSetManager: Stage 238 contains a task of very large size (1713 KiB). The maximum recommended task size is 1000 KiB.
26/02/13 13:50:23 WARN TaskSetManager: Stage 239 contains a task of very large size (1713 KiB). The maximum recommended task size is 1000 KiB.
26/02/13 13:50:44 WARN TaskSetManager: Stage 245 contains a task of very large size (1229 KiB). The maximum recommended task size is 1000 KiB.
26/02/13 13:50:51 WARN TaskSetManager: Stage 246 contains a task of very large size (1229 KiB). The maximum recommended task size is 1000 KiB.

In [85]:
# %%timeit -r 1 -n 2
from fastavro import writer, parse_schema

cur_schema=PARQUET
for scale, tables in tables_scaled.items():
    for base_table_name, table_schema in tables.items():
        for compression in compressions[cur_schema]:
            if base_table_name == "dates" and (cur_schema == AVRO or cur_schema == PARQUET):
                continue
            table, schema = table_schema
    
            records = table.to_dict(DF_DICT_TYPE)
            measureTimeStart(TABLE_WRITE_TIME)
            parsed_schema = parse_schema(schema)
    
            file_name=get_file_name(cur_schema, base_table_name,compression)
            table_name = get_table_name(cur_schema, base_table_name,compression)
            #with open(file_name, 'wb') as file:
            #    writer(file, parsed_schema, records)
            spark_df = spark.createDataFrame(table)
            spark_df.write.format("parquet").mode("overwrite").option("compression", compression).save(file_name) # get_s3_path(file_name))
            
            measureTimeEnd(TABLE_WRITE_TIME, time_metrics[cur_schema][table_name])


26/02/13 13:52:24 WARN TaskSetManager: Stage 265 contains a task of very large size (1713 KiB). The maximum recommended task size is 1000 KiB.
26/02/13 13:52:26 WARN TaskSetManager: Stage 266 contains a task of very large size (1713 KiB). The maximum recommended task size is 1000 KiB.
26/02/13 13:52:28 WARN TaskSetManager: Stage 267 contains a task of very large size (1713 KiB). The maximum recommended task size is 1000 KiB.
26/02/13 13:52:30 WARN TaskSetManager: Stage 268 contains a task of very large size (1713 KiB). The maximum recommended task size is 1000 KiB.
26/02/13 13:52:33 WARN TaskSetManager: Stage 269 contains a task of very large size (1713 KiB). The maximum recommended task size is 1000 KiB.
26/02/13 13:52:49 WARN TaskSetManager: Stage 275 contains a task of very large size (1229 KiB). The maximum recommended task size is 1000 KiB.
26/02/13 13:52:57 WARN TaskSetManager: Stage 276 contains a task of very large size (1229 KiB). The maximum recommended task size is 1000 KiB.

In [86]:
#from pyspark.sql import SparkSession
#spark.stop()

In [87]:
cur_schema=ORC
for scale, tables in tables_scaled.items():
    for base_table_name, table_schema in tables.items():
        for compression in compressions[cur_schema]:
            if base_table_name == "dates" and (cur_schema == AVRO or cur_schema == PARQUET):
                continue
            table, schema = table_schema
            file_name=get_file_name(cur_schema, base_table_name,compression)
            table_name = get_table_name(cur_schema, base_table_name,compression)
    
            measureTimeStart(TABLE_WRITE_TIME)
            df_spark = spark.createDataFrame(table)
            df_spark.write \
                .mode("overwrite") \
                .option("header", "true") \
                .option("compression", compression) \
                .orc(file_name)
                #.orc(f"s3a://benchmark/{file_name}")

            measureTimeEnd(TABLE_WRITE_TIME, time_metrics[cur_schema][table_name])


26/02/13 13:54:19 WARN TaskSetManager: Stage 295 contains a task of very large size (1713 KiB). The maximum recommended task size is 1000 KiB.
26/02/13 13:54:21 WARN TaskSetManager: Stage 296 contains a task of very large size (1713 KiB). The maximum recommended task size is 1000 KiB.
26/02/13 13:54:23 WARN TaskSetManager: Stage 297 contains a task of very large size (1713 KiB). The maximum recommended task size is 1000 KiB.
26/02/13 13:54:25 WARN TaskSetManager: Stage 298 contains a task of very large size (1713 KiB). The maximum recommended task size is 1000 KiB.
26/02/13 13:54:26 WARN TaskSetManager: Stage 299 contains a task of very large size (1713 KiB). The maximum recommended task size is 1000 KiB.
26/02/13 13:54:44 WARN TaskSetManager: Stage 310 contains a task of very large size (1229 KiB). The maximum recommended task size is 1000 KiB.
26/02/13 13:54:50 WARN TaskSetManager: Stage 311 contains a task of very large size (1229 KiB). The maximum recommended task size is 1000 KiB.

In [88]:
# END TABLE CREATION

In [89]:
time_metrics

{'avro': {'ints_compression_snappy_avro': {'TABLE_WRITE_TIME_SEC': 6.997193843000787},
  'ints_compression_deflate_avro': {'TABLE_WRITE_TIME_SEC': 6.919648304000475},
  'ints_compression_bzip2_avro': {'TABLE_WRITE_TIME_SEC': 7.061260253},
  'ints_compression_zstandard_avro': {'TABLE_WRITE_TIME_SEC': 6.961667114999727},
  'ints_compression_xz_avro': {'TABLE_WRITE_TIME_SEC': 8.709090909000224},
  'ints_compression_default_avro': {},
  'random_strings_compression_snappy_avro': {'TABLE_WRITE_TIME_SEC': 3.6337204010005735},
  'random_strings_compression_deflate_avro': {'TABLE_WRITE_TIME_SEC': 3.7653017679995173},
  'random_strings_compression_bzip2_avro': {'TABLE_WRITE_TIME_SEC': 4.207488721999653},
  'random_strings_compression_zstandard_avro': {'TABLE_WRITE_TIME_SEC': 3.660791278999568},
  'random_strings_compression_xz_avro': {'TABLE_WRITE_TIME_SEC': 7.488386822000393},
  'random_strings_compression_default_avro': {},
  'dicted_strings_compression_snappy_avro': {'TABLE_WRITE_TIME_SEC': 3

In [90]:
for cur_schema in FILE_SCHEMAS:
    for scale, tables in tables_scaled.items():
        for base_table_name, table_schema in tables.items():
            for compression in compressions[cur_schema]:
                if base_table_name == "dates" and (cur_schema == AVRO or cur_schema == PARQUET):
                    continue
                file_name=get_file_name(cur_schema, base_table_name,compression)
                table_name = get_table_name(cur_schema, base_table_name,compression)
                table, schema = table_schema
    
                measureFileSize(file_name, size_metrics[cur_schema][table_name])


In [91]:
size_metrics

{'avro': {'ints_compression_snappy_avro': {'FILE_SIZE_MB': 0.004096},
  'ints_compression_deflate_avro': {'FILE_SIZE_MB': 0.004096},
  'ints_compression_bzip2_avro': {'FILE_SIZE_MB': 0.004096},
  'ints_compression_zstandard_avro': {'FILE_SIZE_MB': 0.004096},
  'ints_compression_xz_avro': {'FILE_SIZE_MB': 0.004096},
  'ints_compression_default_avro': {},
  'random_strings_compression_snappy_avro': {'FILE_SIZE_MB': 0.004096},
  'random_strings_compression_deflate_avro': {'FILE_SIZE_MB': 0.004096},
  'random_strings_compression_bzip2_avro': {'FILE_SIZE_MB': 0.004096},
  'random_strings_compression_zstandard_avro': {'FILE_SIZE_MB': 0.004096},
  'random_strings_compression_xz_avro': {'FILE_SIZE_MB': 0.004096},
  'random_strings_compression_default_avro': {},
  'dicted_strings_compression_snappy_avro': {'FILE_SIZE_MB': 0.004096},
  'dicted_strings_compression_deflate_avro': {'FILE_SIZE_MB': 0.004096},
  'dicted_strings_compression_bzip2_avro': {'FILE_SIZE_MB': 0.004096},
  'dicted_strings_co

In [92]:
for cur_schema in FILE_SCHEMAS:
    for scale, tables in tables_scaled.items():
        for base_table_name, table_schema in tables.items():
            for compression in compressions[cur_schema]:
                if base_table_name == "dates" and (cur_schema == AVRO or cur_schema == PARQUET):
                    continue
                file_name=get_file_name(cur_schema, base_table_name,compression)
                table_name = get_table_name(cur_schema, base_table_name,compression)
                table, schema = table_schema
    
                measureTimeStart(TABLE_FULL_SCAN_TIME)
                file = file_name + "/*." + cur_schema
                #df = duckdb.execute(f'SELECT * FROM read_avro("{file}")').df()
                df = spark.read.format(cur_schema).load(file_name)
                df.createOrReplaceTempView("table")
                spark.sql("SELECT * FROM table").toPandas()
                measureTimeEnd(TABLE_FULL_SCAN_TIME, time_metrics[cur_schema][table_name])


Exception ignored in: <bound method IPythonKernel._clean_thread_parent_frames of <ipykernel.ipkernel.IPythonKernel object at 0x7f8d2be4c190>>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/ipykernel/ipkernel.py", line 775, in _clean_thread_parent_frames
    def _clean_thread_parent_frames(
KeyboardInterrupt: 


KeyboardInterrupt: 

In [None]:
time_metrics

In [None]:
 con.register("table", df)
        con.table("table").write_csv("table.csv", header=True)

In [None]:
import duckdb

# Connect to an in-memory database
conn = duckdb.connect(':memory:')

# Use the read_avro() function in a SQL query
# You can use local file paths, or remote URLs
avro_file_path = 'userdata1.avro' 
# Example remote file: 'https://blobs.duckdb.org/data/userdata1.avro'
conn.from_pandas(gen_random_int_dataframe(1)).write_parquet('output.parquet')

conn.write_parquet("out.parquet")


result = conn.execute(f"SELECT * FROM read_avro('{avro_file_path}')")


# Fetch results into a Pandas DataFrame, Apache Arrow table, or other formats
df = result.df() 
# or 
arrow_table = result.arrow() 

# Display the data
print(df.head())


from fastavro import writer, parse_schema
parsed_schema = parse_schema(schema)
records = df.to_dict('table')
avro_file_path = 'table.avro'
with open(avro_file_path, 'wb') as out_file:
    writer(out_file, parsed_schema, records)

con.register("table", df)
con.table("table").write_csv("table.csv", header=True)
