<a href="https://colab.research.google.com/github/zia207/High_Performance_Computing_Python/blob/main/Notebook/04_01_04_hpc_bigdata_duckbd_python.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

![alt text](http://drive.google.com/uc?export=view&id=1IFEWet-Aw4DhkkVe1xv_2YYqlvRe9m5_)

# A Guide to Using DuckDB in Python

[DuckDB](https://duckdb.org/) is an open-source, high-performance, in-process analytical database management system designed for data analytics. It’s optimized for columnar data storage and query execution, making it ideal for analytical workloads like those in data science, machine learning, and business intelligence. Unlike traditional databases that run as separate server processes, DuckDB operates within the same process as the application, eliminating the need for a dedicated server and enabling fast, lightweight data processing. It supports SQL queries and integrates seamlessly with various programming languages, including Python, R, and Julia.

DuckDB integrates with Python through the `duckdb` package, which provides an interface to connect to a DuckDB database, execute SQL queries, and work with data in Python. This tutorial introduces DuckDB in Python for handling large datasets that exceed RAM, using the NYC taxi dataset (yellow_tripdata_2023.csv) as an example. DuckDB stores data efficiently and allows direct querying of files without loading them fully into memory.

## Key Features of DuckDB

- **In-process**: Runs embedded within the host application, reducing overhead.
- **Columnar storage**: Optimized for analytical queries with efficient columnar data processing.
- **SQL support**: Uses standard SQL for querying, with extensions for advanced analytics.
- **High performance**: Leverages modern hardware (e.g., multi-core CPUs, vectorized query execution).
- **Lightweight**: No external dependencies, easy to install and use.
- **Cross-platform**: Works on Windows, macOS, Linux, and more.
- **Integration**: Supports reading/writing from various formats like CSV, Parquet, and JSON.

DuckDB integrates with Python through the `duckdb` package, which provides an interface to connect to a DuckDB database, execute SQL queries, and work with data in Python. This tutorial introduces DuckDB in Python for handling large datasets that exceed RAM, using the NYC taxi dataset (yellow_tripdata_2023.csv) as an example. DuckDB stores data efficiently and allows direct querying of files without loading them fully into memory.



Key Features in Python:

* `Integration with Pandas`: DuckDB seamlessly converts between DataFrames and SQL tables.
* `Direct File Queries`: Query Parquet, CSV, and JSON without loading into memory.
* `Performance`: DuckDB’s vectorized query engine ensures fast execution, even for large datasets.
* `Memory Management`: For large datasets, query files directly or use persistent storage.

## Mount Google Drive

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Installation

In [None]:
!pip install duckdb



In [42]:
# Create a persistent database (stored in a file)
con = duckdb.connect('my_duckdb.db')

## Working with Data

DuckDB in Python allows you to:

* Execute SQL queries directly.
* Register Pandas DataFrames as virtual tables.
* DuckDB can ingest data from a wide variety of formats – both on-disk and in-memory. See the data ingestion page for more informatio

```
import duckdb

duckdb.read_csv("example.csv")                # read a CSV file into a Relation
duckdb.read_parquet("example.parquet")        # read a Parquet file into a Relation
duckdb.read_json("example.json")              # read a JSON file into a Relation

duckdb.sql("SELECT * FROM 'example.csv'")     # directly query a CSV file
duckdb.sql("SELECT * FROM 'example.parquet'") # directly query a Parquet file
duckdb.sql("SELECT * FROM 'example.json'")    # directly query a JSON file
```


## DataFrames

DuckDB can directly query Pandas DataFrames, Polars DataFrames and Arrow tables. Note that these are read-only, i.e., editing these tables via INSERT or UPDATE statements is not possible.

### Pandas

In [48]:
import duckdb
import polars as pl

df = pd.DataFrame({
    'id': [1, 2, 3],
    'name': ['Alice', 'John', 'Charlie']
})
duckdb.sql("SELECT * FROM df WHERE id > 1")

┌───────┬─────────┐
│  id   │  name   │
│ int64 │ varchar │
├───────┼─────────┤
│     2 │ John    │
│     3 │ Charlie │
└───────┴─────────┘

### Polars

To directly query a Polars DataFrame, run:

In [25]:
import duckdb
import polars as pl

polars_df = pl.DataFrame({"a": [42]})
duckdb.sql("SELECT * FROM polars_df")

┌───────┐
│   a   │
│ int64 │
├───────┤
│    42 │
└───────┘

### PyArrow
To directly query a PyArrow table, run:

In [26]:
import duckdb
import pyarrow as pa

arrow_table = pa.Table.from_pydict({"a": [42]})
duckdb.sql("SELECT * FROM arrow_table")

┌───────┐
│   a   │
│ int64 │
├───────┤
│    42 │
└───────┘

## Result Conversion
DuckDB supports converting query results efficiently to a variety of formats. See the result conversion page for more information.

In [27]:
import duckdb

duckdb.sql("SELECT 42").fetchall()   # Python objects
duckdb.sql("SELECT 42").df()         # Pandas DataFrame
duckdb.sql("SELECT 42").pl()         # Polars DataFrame
duckdb.sql("SELECT 42").arrow()      # Arrow Table
duckdb.sql("SELECT 42").fetchnumpy() # NumPy Arrays

{'42': array([42], dtype=int32)}

## Writing Data to Disk

DuckDB supports writing Relation objects directly to disk in a variety of formats. The COPY statement can be used to write data to disk using SQL as an alternative.

In [28]:
import duckdb

duckdb.sql("SELECT 42").write_parquet("out.parquet") # Write to a Parquet file
duckdb.sql("SELECT 42").write_csv("out.csv")         # Write to a CSV file
duckdb.sql("COPY (SELECT 42) TO 'out.parquet'")      # Copy to a Parquet file

## Reading/Writing External Files
DuckDB supports direct reading and writing of files like `CSV` and `Parquet`. You can query these files as if they were tables.

In [5]:
import duckdb

# Query a CSV file directly
queens = duckdb.sql("SELECT * FROM '/content/drive/MyDrive/Data/CSV_files/taxi_zone_lookup.csv' WHERE Borough = 'Queens'").fetchdf()
print(queens)

    LocationID Borough           Zone service_zone
0            2  Queens    Jamaica Bay    Boro Zone
1            7  Queens        Astoria    Boro Zone
2            8  Queens   Astoria Park    Boro Zone
3            9  Queens     Auburndale    Boro Zone
4           10  Queens   Baisley Park    Boro Zone
..         ...     ...            ...          ...
64         226  Queens      Sunnyside    Boro Zone
65         252  Queens     Whitestone    Boro Zone
66         253  Queens  Willets Point    Boro Zone
67         258  Queens      Woodhaven    Boro Zone
68         260  Queens       Woodside    Boro Zone

[69 rows x 4 columns]


## Connecting to DuckDB

DuckDB can operate in two modes in Python:

* `In-memory`: Data is stored in memory (default).
* `Persistent`: Data is stored on disk for persistence across sessions.

### Using an In-Memory Database

When using DuckDB through `duckdb.sql()`, it operates on an in-memory database, i.e., no tables are persisted on disk. Invoking the duckdb.connect() method without arguments returns a connection, which also uses an in-memory database:

In [29]:
import duckdb

# In-memory database
con = duckdb.connect()

# Create and query a table
con.execute("CREATE TABLE example (id INTEGER, name VARCHAR);")
con.execute("INSERT INTO example VALUES (1, 'Alice'), (2, 'Bob');")

result = con.sql("SELECT * FROM example").fetchall()
print(result)

con.close()

[(1, 'Alice'), (2, 'Bob')]


### Persistent Storage
The duckdb.connect(dbname) creates a connection to a persistent database. Any data written to that connection will be persisted, and can be reloaded by reconnecting to the same file, both from Python and from other DuckDB clients.

In [31]:
import duckdb

# create a connection to a file called 'file.db'
con = duckdb.connect("file.db")
# create a table and load data into it
con.sql("CREATE TABLE test (i INTEGER)")
con.sql("INSERT INTO test VALUES (42)")
# query the table
con.table("test").show()
# explicitly close the connection
con.close()

┌───────┐
│   i   │
│ int32 │
├───────┤
│    42 │
└───────┘



## Executing SQL Queries


SQL queries can be executed using the duckdb.sql function.

In [33]:
import duckdb

duckdb.sql("SELECT 42").show()

┌───────┐
│  42   │
│ int32 │
├───────┤
│    42 │
└───────┘



The result can be converted to various formats using the result conversion functions. For example, the `fetchall` method can be used to convert the result to Python objects.

In [34]:
results = duckdb.sql("SELECT 42").fetchall()
print(results)

[(42,)]


You can use df to convert the result to a Pandas DataFrame.

In [35]:
results = duckdb.sql("SELECT 42").df()
print(results)

   42
0  42


By default, a global in-memory connection will be used. Any data stored in files will be lost after shutting down the program. A connection to a persistent database can be created using the connect function.

After connecting, SQL queries can be executed using the sql command.

In [36]:
import duckdb

con = duckdb.connect()

# Example: Create a table and insert data
con.execute("CREATE TABLE example (id INTEGER, name VARCHAR);")
con.execute("INSERT INTO example VALUES (1, 'Alice'), (2, 'Bob');")

# Query the table
result = con.execute("SELECT * FROM example").fetchdf()
print(result)
#    id   name
# 0   1  Alice
# 1   2    Bob

   id   name
0   1  Alice
1   2    Bob


In [6]:
# Export query results to a Parquet file
duckdb.sql("COPY (SELECT * FROM queens) TO 'queens.parquet' (FORMAT PARQUET);")

### SQL on Pandas
Pandas DataFrames stored in local variables can be queried as if they are regular tables within DuckDB.

In [51]:
import duckdb
import pandas

# Create a Pandas dataframe
my_df = pandas.DataFrame.from_dict({'a': [42]})

# query the Pandas DataFrame "my_df"
# Note: duckdb.sql connects to the default in-memory database connection
results = duckdb.sql("SELECT * FROM my_df").df()
print(results)

    a
0  42


### SQL on Apache Arrow

In [53]:
# Apache Arrow Tables
import duckdb
import pyarrow as pa

# connect to an in-memory database
con = duckdb.connect()

my_arrow_table = pa.Table.from_pydict({'i': [1, 2, 3, 4],
                                       'j': ["one", "two", "three", "four"]})

# query the Apache Arrow Table "my_arrow_table" and return as an Arrow Table
results = con.execute("SELECT * FROM my_arrow_table WHERE i = 2").arrow()
print(results)

pyarrow.Table
i: int64
j: string
----
i: [[2]]
j: [["two"]]


## Integration with Ibis

[lbis](https://ibis-project.org/) is a Python dataframe library that supports 20+ backends, with DuckDB as the default. Ibis with DuckDB provides a Pythonic interface for SQL with great performance.





### Installation
You can pip install Ibis with the DuckDB backend:

In [None]:
!pip install 'ibis-framework[duckdb]'
!pip install pins

### Create a Database File

Ibis can work with several file types, but at its core, it connects to existing databases and interacts with the data there. You can get started with your own DuckDB databases or create a new one with example data.

In [4]:
import ibis

con = ibis.connect("duckdb://penguins.ddb")
con.create_table(
    "penguins", ibis.examples.penguins.fetch().to_pyarrow(), overwrite = True
)

In [5]:
# reconnect to the persisted database (dropping temp tables)
con = ibis.connect("duckdb://penguins.ddb")
con.list_tables()

['penguins']

There's one table, called penguins. We can ask Ibis to give us an object that we can interact with.

In [6]:
penguins = con.table("penguins")
penguins

We can call `head` and then `to_pandas` to get the first few rows of the table as a pandas DataFrame.

In [7]:
penguins.head().to_pandas()

Unnamed: 0,species,island,bill_length_mm,bill_depth_mm,flipper_length_mm,body_mass_g,sex,year
0,Adelie,Torgersen,39.1,18.7,181.0,3750.0,male,2007
1,Adelie,Torgersen,39.5,17.4,186.0,3800.0,female,2007
2,Adelie,Torgersen,40.3,18.0,195.0,3250.0,female,2007
3,Adelie,Torgersen,,,,,,2007
4,Adelie,Torgersen,36.7,19.3,193.0,3450.0,female,2007


## Integration with Polars

[Polars](https://github.com/pola-rs/polars) is a DataFrames library built in Rust with bindings for Python and Node.js. It uses Apache Arrow's columnar format as its memory model. DuckDB can read Polars DataFrames and convert query results to Polars DataFrames. It does this internally using the efficient Apache Arrow integration. Note that the pyarrow library must be installed for the integration to work.

### Installtion

In [None]:
!pip install -U duckdb 'polars[pyarrow]'

### Polars to DuckDB

DuckDB can natively query Polars DataFrames by referring to the name of Polars DataFrames as they exist in the current scope.

In [1]:
import duckdb
import polars as pl

df = pl.DataFrame(
    {
        "A": [1, 2, 3, 4, 5],
        "fruits": ["banana", "banana", "apple", "apple", "banana"],
        "B": [5, 4, 3, 2, 1],
        "cars": ["beetle", "audi", "beetle", "beetle", "beetle"],
    }
)
duckdb.sql("SELECT * FROM df").show()

┌───────┬─────────┬───────┬─────────┐
│   A   │ fruits  │   B   │  cars   │
│ int64 │ varchar │ int64 │ varchar │
├───────┼─────────┼───────┼─────────┤
│     1 │ banana  │     5 │ beetle  │
│     2 │ banana  │     4 │ audi    │
│     3 │ apple   │     3 │ beetle  │
│     4 │ apple   │     2 │ beetle  │
│     5 │ banana  │     1 │ beetle  │
└───────┴─────────┴───────┴─────────┘



### DuckDB to Polars
DuckDB can output results as Polars DataFrames using the .pl() result-conversion method.

In [2]:
df = duckdb.sql("""
    SELECT 1 AS id, 'banana' AS fruit
    UNION ALL
    SELECT 2, 'apple'
    UNION ALL
    SELECT 3, 'mango'"""
).pl()
print(df)

shape: (3, 2)
┌─────┬────────┐
│ id  ┆ fruit  │
│ --- ┆ ---    │
│ i32 ┆ str    │
╞═════╪════════╡
│ 1   ┆ banana │
│ 2   ┆ apple  │
│ 3   ┆ mango  │
└─────┴────────┘


## Large Data Processing with DuckBD

This section of the  tutorial uses the NYC Yellow Taxi Trip Data for January 2023 (~3 million rows, 47 MB Parquet file). DuckDB can query this file directly without loading it into memory.



### Data

he dataset is in Parquet format, which DuckDB can query directly without loading into memory. For reproducibility:
- Download the January 2023 data from:
[https://d37ci07v2hxiua.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet](https://d37ci07v2hxiua.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet) (about 47 MB, ~3 million rows).
- We saved  it to a local folder, e.g., `/home/zia207/Dropbox/WebSites/R_Website/Quarto_Projects/R_Beginner/Data/yellow_tripdata_2023-01.parquet`.
- We'll also use the Taxi Zone Lookup CSV for joins: Download from [https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv](https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv).

The dataset includes columns like `VendorID`, `tpep_pickup_datetime`, `tpep_dropoff_datetime`, `passenger_count`, `trip_distance`, `PULocationID`, `DOLocationID`, `fare_amount`, `total_amount`, etc.

In [8]:
import duckdb
import pandas as pd

# Set data folder
DATA_FOLDER = "/content/drive/MyDrive/Data/CSV_files/"
PARQUET_FILE = DATA_FOLDER + "yellow_tripdata_2023-01.parquet"
ZONE_FILE = DATA_FOLDER + "taxi_zone_lookup.csv"



### Connect and Query Parquet Directly

In [9]:
# Query Parquet file directly (no connection needed!)
preview = duckdb.sql(f"SELECT * FROM '{PARQUET_FILE}' LIMIT 10").fetchdf()
print(preview)

# Get schema
schema = duckdb.sql(f"DESCRIBE '{PARQUET_FILE}'").fetchdf()
print(schema)

# Basic filtering
filtered = duckdb.sql(f"""
    SELECT * FROM '{PARQUET_FILE}'
    WHERE passenger_count > 1
    LIMIT 10
""").fetchdf()
print(filtered)

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

   VendorID tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  \
0         2  2023-01-01 00:32:10   2023-01-01 00:40:36              1.0   
1         2  2023-01-01 00:55:08   2023-01-01 01:01:27              1.0   
2         2  2023-01-01 00:25:04   2023-01-01 00:37:49              1.0   
3         1  2023-01-01 00:03:48   2023-01-01 00:13:25              0.0   
4         2  2023-01-01 00:10:29   2023-01-01 00:21:19              1.0   
5         2  2023-01-01 00:50:34   2023-01-01 01:02:52              1.0   
6         2  2023-01-01 00:09:22   2023-01-01 00:19:49              1.0   
7         2  2023-01-01 00:27:12   2023-01-01 00:49:56              1.0   
8         2  2023-01-01 00:21:44   2023-01-01 00:36:40              1.0   
9         2  2023-01-01 00:39:42   2023-01-01 00:50:36              1.0   

   trip_distance  RatecodeID store_and_fwd_flag  PULocationID  DOLocationID  \
0           0.97         1.0                  N           161           141   
1           1.10

### Aggregation

In [10]:
# Average trip distance and total trips
agg = duckdb.sql(f"""
    SELECT
        AVG(trip_distance) AS avg_distance,
        COUNT(*) AS total_trips
    FROM '{PARQUET_FILE}'
""").fetchdf()
print(agg)

   avg_distance  total_trips
0      3.847342      3066766


### Group By

In [11]:
group_result = duckdb.sql(f"""
    SELECT
        payment_type,
        SUM(total_amount) AS total_revenue,
        AVG(tip_amount) AS avg_tip
    FROM '{PARQUET_FILE}'
    GROUP BY payment_type
    ORDER BY total_revenue DESC
""").fetchdf()
print(group_result)

   payment_type  total_revenue   avg_tip
0             1   6.824062e+07  4.170799
1             2   1.226006e+07  0.001675
2             0   2.090131e+06  3.733109
3             3   1.893647e+05  0.029469
4             4   8.502335e+04  0.051490


### Register Files as Tables (for reuse)

In [12]:
con = duckdb.connect()

# Register Parquet and CSV as views
con.execute(f"CREATE VIEW taxi_data AS SELECT * FROM '{PARQUET_FILE}'")
con.execute(f"CREATE VIEW zones AS SELECT * FROM '{ZONE_FILE}'")

<duckdb.duckdb.DuckDBPyConnection at 0x7af0c8315e30>

### Join

In [13]:
# Join with zone lookup
joined = con.execute("""
    SELECT
        t.VendorID,
        t.trip_distance,
        t.total_amount,
        z.Borough AS pickup_borough
    FROM taxi_data t
    LEFT JOIN zones z ON t.PULocationID = z.LocationID
    WHERE t.trip_distance > 10
    LIMIT 10
""").fetchdf()
print(joined)

   VendorID  trip_distance  total_amount pickup_borough
0         2          11.70         64.44      Manhattan
1         2          11.43         66.31         Queens
2         1          17.80         95.15         Queens
3         2          11.11         61.86         Queens
4         2          16.02         64.85         Queens
5         1          11.30         64.20      Manhattan
6         2          11.19         72.90      Manhattan
7         2          11.03         68.50         Queens
8         2          13.54         78.41         Queens
9         1          19.20         91.25         Queens


### Window Function

In [14]:
window = con.execute("""
    SELECT
        payment_type,
        trip_distance,
        RANK() OVER (PARTITION BY payment_type ORDER BY trip_distance DESC) AS distance_rank
    FROM taxi_data
    LIMIT 20
""").fetchdf()
print(window)

    payment_type  trip_distance  distance_rank
0              2       62359.52              1
1              2        9674.67              2
2              2        6508.23              3
3              2        5056.82              4
4              2        2034.31              5
5              2         605.28              6
6              2         204.10              7
7              2         177.88              8
8              2         159.85              9
9              2         147.83             10
10             2         127.98             11
11             2         127.98             11
12             2         111.60             13
13             2         103.80             14
14             2         100.46             15
15             2          93.05             16
16             2          90.63             17
17             2          88.83             18
18             2          87.53             19
19             2          85.94             20


### Query Multiple Remote Files

In [21]:
# Enable HTTPFS for remote files
con = duckdb.connect()
con.execute("INSTALL httpfs; LOAD httpfs;")

# Query two months of taxi data from S3
query = """
SELECT
    EXTRACT(MONTH FROM tpep_pickup_datetime) AS month,
    COUNT(*) AS trips
FROM read_parquet([
    'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet',
    'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-02.parquet'
])
GROUP BY month
ORDER BY month;
"""

result = con.execute(query).fetchdf()
print(result)

con.close()

   month    trips
0      1  3066727
1      2  2913910
2      3       42
3     10       11
4     12       31


### Export Results

In [16]:
con.execute(f"""
    COPY (
        SELECT * FROM taxi_data WHERE trip_distance > 5 LIMIT 1000
    ) TO '{DATA_FOLDER}output.csv' (FORMAT CSV, HEADER)
""")

<duckdb.duckdb.DuckDBPyConnection at 0x7af0c8315e30>

### Close connection

In [17]:
con.close()

## Relational API on Pandas
DuckDB offers a relational API that can be used to chain together query operations. These are lazily evaluated so that DuckDB can optimize their execution. These operators can act on Pandas DataFrames, DuckDB tables or views (which can point to any underlying storage format that DuckDB can read, such as CSV or Parquet files, etc.). Here we show a simple example of reading from a Pandas DataFrame and returning a DataFrame.

In [3]:
# Set data folder (adjust as needed for your environment)
DATA_FOLDER = "/content/drive/MyDrive/Data/CSV_files/"
PARQUET_FILE = DATA_FOLDER + "yellow_tripdata_2023-01.parquet"
ZONE_FILE = DATA_FOLDER + "taxi_zone_lookup.csv"

### Import and Connect

In [4]:
import duckdb
import pandas as pd

# Create a persistent or in-memory connection
con = duckdb.connect()  # in-memory

### Basic Filtering & Aggregation on Parquetmory connection


In [5]:
# Lazily read Parquet file and compute average trip distance
avg_trip = (
    con.read_parquet(PARQUET_FILE)
    .filter("trip_distance > 0")
    .aggregate("AVG(trip_distance) AS avg_distance, COUNT(*) AS total_trips")
    .fetchdf()
)

print(avg_trip)

   avg_distance  total_trips
0      3.905751      3020904


### Group By Payment Type

In [8]:
payment_summary = (
    con.read_parquet(PARQUET_FILE)
    .aggregate("""
        payment_type,
        SUM(total_amount) AS total_revenue,
        AVG(tip_amount) AS avg_tip
    """, "payment_type")
    .order("total_revenue DESC")
    .fetchdf()
)

print(payment_summary.head)

<bound method NDFrame.head of    payment_type  total_revenue   avg_tip
0             1   6.824062e+07  4.170799
1             2   1.226006e+07  0.001675
2             0   2.090131e+06  3.733109
3             3   1.893647e+05  0.029469
4             4   8.502335e+04  0.051490>


### Extract Hour and Count Trips

In [10]:
hourly_trips = (
    con.read_parquet(PARQUET_FILE)
    .project("EXTRACT(HOUR FROM tpep_pickup_datetime) AS pickup_hour")
    .aggregate("pickup_hour, COUNT(*) AS trip_count", "pickup_hour")
    .order("pickup_hour")
    .fetchdf()
)

print(hourly_trips.head())

   pickup_hour  trip_count
0            0       84969
1            1       59799
2            2       42040
3            3       27438
4            4       17835


### Join with Taxi Zone Lookup (CSV)

In [11]:
# Read both files as relations
taxi_rel = con.read_parquet(PARQUET_FILE)
zone_rel = con.read_csv(ZONE_FILE)

# Perform join to get borough names
joined = (
    taxi_rel
    .filter("trip_distance > 10")
    .join(
        zone_rel,
        condition="PULocationID = LocationID",
        how="left"
    )
    .project("VendorID, trip_distance, total_amount, Borough")
    .limit(10)
    .fetchdf()
)

print(joined)

   VendorID  trip_distance  total_amount    Borough
0         2          11.70         64.44  Manhattan
1         2          11.43         66.31     Queens
2         1          17.80         95.15     Queens
3         2          11.11         61.86     Queens
4         2          16.02         64.85     Queens
5         1          11.30         64.20  Manhattan
6         2          11.19         72.90  Manhattan
7         2          11.03         68.50     Queens
8         2          13.54         78.41     Queens
9         1          19.20         91.25     Queens


### Borough-Level Aggregation (After Join)

In [13]:
borough_revenue = (
    taxi_rel
    .join(zone_rel, condition="PULocationID = LocationID", how="left")
    .aggregate("""
        Borough AS pickup_borough,
        AVG(trip_distance) AS avg_distance,
        SUM(total_amount) AS total_revenue
    """, "Borough")
    .order("total_revenue DESC")
    .fetchdf()
)

print(borough_revenue)

  pickup_borough  avg_distance  total_revenue
0      Manhattan      2.883807   6.107196e+07
1         Queens     12.320490   1.928394e+07
2        Unknown      7.362869   1.704629e+06
3       Brooklyn      5.677405   5.968103e+05
4          Bronx      5.296593   1.437350e+05
5            EWR      1.594780   4.279404e+04
6  Staten Island     11.356774   2.132153e+04


### Window Function (Top 3 Longest Trips per Payment Type)

In [14]:
top_trips = (
    con.read_parquet(PARQUET_FILE)
    .project("payment_type, trip_distance")
    .filter("trip_distance IS NOT NULL AND trip_distance > 0")
    .project("""
        payment_type,
        trip_distance,
        ROW_NUMBER() OVER (
            PARTITION BY payment_type
            ORDER BY trip_distance DESC
        ) AS rank
    """)
    .filter("rank <= 3")
    .order("payment_type, rank")
    .fetchdf()
)

print(top_trips)

    payment_type  trip_distance  rank
0              0      258928.15     1
1              0      225987.37     2
2              0      187872.33     3
3              1       14098.55     1
4              1        9947.03     2
5              1        9684.00     3
6              2       62359.52     1
7              2        9674.67     2
8              2        6508.23     3
9              3          74.40     1
10             3          70.20     2
11             3          63.60     3
12             4          78.78     1
13             4          78.78     2
14             4          71.63     3


### Export Result to CSV (via Relation)

In [15]:
# Define a filtered relation
long_trips = (
    con.read_parquet(PARQUET_FILE)
    .filter("trip_distance > 20")
    .project("VendorID, tpep_pickup_datetime, trip_distance, total_amount")
    .limit(1000)
)

# Write to CSV
long_trips.write_csv(DATA_FOLDER + "long_trips_output.csv")
print("✅ Exported 1000 long trips to CSV")

✅ Exported 1000 long trips to CSV


### Mix Pandas DataFrame + DuckDB Relation

In [17]:
# Suppose you have a small Pandas filter list
high_tip_zones = pd.DataFrame({
    'LocationID': [1, 2, 132, 234]
})

# Register it in DuckDB
con.register('high_tip_zones', high_tip_zones)

# Join Parquet data with Pandas DataFrame
result = (
    con.read_parquet(PARQUET_FILE)
    .join(con.table('high_tip_zones'), 'PULocationID = LocationID')
    .aggregate("AVG(tip_amount) AS avg_tip_in_selected_zones")
    .fetchdf()
)

print(result)

   avg_tip_in_selected_zones
0                   6.467511


In [None]:
con.close()

## Machine Learning Preprocessing with DuckDB

DuckDB is not a machine learning library, but it plays a powerful supporting role in ML workflows—especially in the data preprocessing, feature engineering, and scalable inference stages. By performing these steps inside the database, you avoid costly data movement, reduce memory pressure, and often achieve significant performance gains over traditional Python-based pipelines (e.g., scikit-learn + Pandas).




 ### Why Use DuckDB for ML?

| Benefit | Explanation |
|--------|-------------|
| **Speed** | Vectorized, parallel SQL execution often outperforms Pandas/scikit-learn on large data |
| **Memory Efficiency** | Processes data in chunks; no need to load full dataset into RAM |
| **Reproducibility** | Preprocessing logic lives in SQL—easy to version, audit, and reuse |
| **Data Locality** | Transform data where it lives (Parquet, CSV, S3, etc.) |
| **Consistency** | Same SQL logic can be used at training **and** inference time |





Below is a practical guide to using **DuckDB for ML preprocessing and inference**, based on real-world patterns from the [DuckDB ML preprocessing blog](https://duckdb.org/2025/08/15/ml-data-preprocessing.html).

### Load Data
We’ll use the synthetic financial fraud dataset:

In [18]:
import duckdb

# Load data directly from URL into a table
duckdb.sql("""
    CREATE TABLE financial_trx AS
    FROM read_csv('https://blobs.duckdb.org/data/financial_fraud_detection_dataset.csv')
""")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

### Explore & Summarize

In [19]:
# Get column stats (null %, min, type, etc.)
summary = duckdb.sql("""
    FROM (SUMMARIZE financial_trx)
    SELECT column_name, column_type, null_percentage, min
""").fetchdf()
print(summary)

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

                    column_name column_type  null_percentage  \
0                transaction_id     VARCHAR             0.00   
1                     timestamp   TIMESTAMP             0.00   
2                sender_account     VARCHAR             0.00   
3              receiver_account     VARCHAR             0.00   
4                        amount      DOUBLE             0.00   
5              transaction_type     VARCHAR             0.00   
6             merchant_category     VARCHAR             0.00   
7                      location     VARCHAR             0.00   
8                   device_used     VARCHAR             0.00   
9                      is_fraud     BOOLEAN             0.00   
10                   fraud_type     VARCHAR            96.41   
11  time_since_last_transaction      DOUBLE            17.93   
12     spending_deviation_score      DOUBLE             0.00   
13               velocity_score      BIGINT             0.00   
14            geo_anomaly_score      DOU

### Feature Encoding

In [20]:
one_hot = duckdb.sql("""
    SELECT DISTINCT
        transaction_type,
        (transaction_type = 'deposit')::INT AS deposit_onehot,
        (transaction_type = 'payment')::INT AS payment_onehot,
        (transaction_type = 'transfer')::INT AS transfer_onehot,
        (transaction_type = 'withdrawal')::INT AS withdrawal_onehot
    FROM financial_trx
    ORDER BY transaction_type
""").fetchdf()

### Ordinal Encoding

In [21]:
ordinal = duckdb.sql("""
    WITH trx_encoded AS (
        SELECT
            transaction_type,
            ROW_NUMBER() OVER (ORDER BY transaction_type) - 1 AS trx_oe
        FROM (SELECT DISTINCT transaction_type FROM financial_trx)
    )
    SELECT f.transaction_type, e.trx_oe, COUNT(*) AS count
    FROM financial_trx f
    JOIN trx_encoded e ON f.transaction_type = e.transaction_type
    GROUP BY ALL
    ORDER BY trx_oe
""").fetchdf()

### Train/Test Split (Avoid Data Leakage)

In [22]:
# Reproducible 80/20 split
duckdb.sql("SET threads = 1")  # for reproducibility

duckdb.sql("""
    CREATE TABLE financial_trx_training AS
    FROM financial_trx
    USING SAMPLE 80 PERCENT (reservoir, 256)
""")

duckdb.sql("""
    CREATE TABLE financial_trx_testing AS
    FROM financial_trx
    ANTI JOIN financial_trx_training USING (transaction_id)
""")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

### Feature Scaling (Using Macros)

In [23]:
# Standard scaler: (x - mean) / std
duckdb.sql("""
    CREATE OR REPLACE MACRO standard_scaler(x, mean_x, std_x) AS
        (x - mean_x) / NULLIF(std_x, 0)
""")

# Min-Max scaler: (x - min) / (max - min)
duckdb.sql("""
    CREATE OR REPLACE MACRO min_max_scaler(x, min_x, max_x) AS
        (x - min_x) / NULLIF(max_x - min_x, 0)
""")

# Robust scaler: (x - median) / IQR
duckdb.sql("""
    CREATE OR REPLACE MACRO robust_scaler(x, q25, q50, q75) AS
        (x - q50) / NULLIF(q75 - q25, 0)
""")

### Compute Scaling Parameters from Training Set

In [24]:
params = duckdb.sql("""
    FROM financial_trx_training
    SELECT
        AVG(velocity_score) AS avg_v,
        STDDEV_POP(velocity_score) AS std_v,
        MIN(velocity_score) AS min_v,
        MAX(velocity_score) AS max_v,
        QUANTILE_CONT(velocity_score, 0.25) AS q25_v,
        MEDIAN(velocity_score) AS med_v,
        QUANTILE_CONT(velocity_score, 0.75) AS q75_v
""").fetchdf().iloc[0].to_dict()

### Apply Scaling on Test Set

In [25]:
scaled_test = duckdb.sql(f"""
    SELECT
        standard_scaler(velocity_score, {params['avg_v']}, {params['std_v']}) AS ss_v,
        min_max_scaler(velocity_score, {params['min_v']}, {params['max_v']}) AS mm_v,
        robust_scaler(velocity_score, {params['q25_v']}, {params['med_v']}, {params['q75_v']}) AS rs_v
    FROM financial_trx_testing
    LIMIT 5
""").fetchdf()
print(scaled_test)

       ss_v      mm_v  rs_v
0 -1.300957  0.105263  -0.8
1  0.433105  0.631579   0.2
2 -0.607332  0.315789  -0.4
3 -0.954145  0.210526  -0.6
4  0.086293  0.526316   0.0


In [22]:
# Load financial fraud dataset
con = duckdb.connect()
con.execute("""
CREATE TABLE financial_trx AS
FROM read_csv('https://blobs.duckdb.org/data/financial_fraud_detection_dataset.csv')
""")

# Summarize
summary = con.execute("""
FROM (SUMMARIZE financial_trx)
SELECT column_name, column_type, count, null_percentage, min
""").fetchdf()
print(summary.head())

# One-hot encoding example
one_hot = con.execute("""
SELECT DISTINCT
    transaction_type,
    (transaction_type = 'deposit')::INT AS deposit_onehot,
    (transaction_type = 'payment')::INT AS payment_onehot
FROM financial_trx
ORDER BY transaction_type
""").fetchdf()
print(one_hot)

con.close()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

        column_name column_type    count  null_percentage  \
0    transaction_id     VARCHAR  5000000              0.0   
1         timestamp   TIMESTAMP  5000000              0.0   
2    sender_account     VARCHAR  5000000              0.0   
3  receiver_account     VARCHAR  5000000              0.0   
4            amount      DOUBLE  5000000              0.0   

                          min  
0                     T100000  
1  2023-01-01 00:09:26.241974  
2                   ACC100000  
3                   ACC100000  
4                        0.01  
  transaction_type  deposit_onehot  payment_onehot
0          deposit               1               0
1          payment               0               1
2         transfer               0               0
3       withdrawal               0               0


### Handle Missing Values

In [26]:
# Get mean from training set
mean_time = duckdb.sql("""
    SELECT AVG(time_since_last_transaction)
    FROM financial_trx_training
""").fetchone()[0]

# Impute in test set
imputed = duckdb.sql(f"""
    SELECT
        COALESCE(time_since_last_transaction, 0) AS fill_0,
        COALESCE(time_since_last_transaction, {mean_time}) AS fill_mean
    FROM financial_trx_testing
    WHERE time_since_last_transaction IS NULL
    LIMIT 3
""").fetchdf()
print(imputed)

   fill_0  fill_mean
0     0.0    1.90919
1     0.0    1.90919
2     0.0    1.90919


### Export Features for Model Training

In [27]:
# Final preprocessed training set
duckdb.sql("""
    COPY (
        SELECT
            transaction_id,
            -- Encoded features
            (transaction_type = 'deposit')::INT AS is_deposit,
            -- Scaled features
            (velocity_score - (SELECT AVG(velocity_score) FROM financial_trx_training)) /
            (SELECT STDDEV_POP(velocity_score) FROM financial_trx_training) AS ss_velocity,
            -- Imputed features
            COALESCE(time_since_last_transaction,
                     (SELECT AVG(time_since_last_transaction) FROM financial_trx_training)) AS time_filled,
            -- Target
            is_fraud
        FROM financial_trx_training
    ) TO 'train_features.parquet' (FORMAT PARQUET)
""")

print( "Training features saved to train_features.parquet")

Training features saved to train_features.parquet


### Consistent Inference

At inference time, reuse the same SQL logic with precomputed parameters:

In [29]:
# Example: new transaction
# Create a temporary table with sample new transaction data for demonstration
con = duckdb.connect() # Use a new in-memory connection for this example
con.execute("""
    CREATE TEMP TABLE new_transactions (
        transaction_id VARCHAR,
        velocity_score DOUBLE,
        time_since_last_transaction DOUBLE
    );
    INSERT INTO new_transactions VALUES
    ('T_new_1', 15.0, 10.5),
    ('T_new_2', 3.0, NULL), -- Simulate missing value
    ('T_new_3', 18.0, 5.2);
""")


new_data = con.sql(f"""
    SELECT
        transaction_id,
        (velocity_score - {params['avg_v']}) / {params['std_v']} AS ss_velocity,  -- precomputed mean/std from training
        COALESCE(time_since_last_transaction, {mean_time}) AS time_filled  -- precomputed mean from training
    FROM new_transactions
""").fetchdf()

print(new_data)

con.close() # Close the temporary connection

  transaction_id  ss_velocity  time_filled
0        T_new_1     0.779918     10.50000
1        T_new_2    -1.300957      1.90919
2        T_new_3     1.300136      5.20000


## Benchmark Test: DuckDB vs scikit-learn

According to DuckDB’s official benchmark:

* `DuckDB is 2–10x faste`r than scikit-learn pipelines for preprocessing
* `Lower memory usage` (streaming vs full materialization)
* `Same numerical result`s (validated via reconciliation scripts)


### Load Data

In [2]:
import duckdb
import pandas as pd
import time

# Load data into Pandas (for scikit-learn)
print("Loading data into Pandas...")
start = time.time()
df = pd.read_csv('https://blobs.duckdb.org/data/financial_fraud_detection_dataset.csv')
load_time = time.time() - start
print(f"Loaded {len(df):,} rows in {load_time:.2f}s")

Loading data into Pandas...
Loaded 5,000,000 rows in 47.65s


### scikit-learn Pipeline




In [3]:
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler, MinMaxScaler, RobustScaler
from sklearn.impute import SimpleImputer
from sklearn.model_selection import train_test_split

def sklearn_preprocess(df):
    # Train/test split
    train, test = train_test_split(df, test_size=0.2, random_state=42)

    # Categorical & numerical columns
    cat_cols = ['transaction_type', 'payment_channel']
    num_cols = ['velocity_score', 'spending_deviation_score']
    miss_col = ['time_since_last_transaction']

    # Preprocessor
    preprocessor = ColumnTransformer([
        ('onehot', OneHotEncoder(drop='first', sparse_output=False), cat_cols),
        ('standard', StandardScaler(), ['velocity_score']),
        ('minmax', MinMaxScaler(), ['spending_deviation_score']),
        ('impute_mean', SimpleImputer(strategy='mean'), miss_col),
        ('robust', RobustScaler(), ['amount'])  # extra robust scaling
    ], remainder='drop')

    # Fit on train, transform test
    preprocessor.fit(train)
    X_test_transformed = preprocessor.transform(test)
    return X_test_transformed

### DuckDB Pipeline

In [15]:
def duckdb_preprocess():
    # Use DuckDB to do everything in SQL (no Pandas load needed!)
    con = duckdb.connect()

    # Create train/test split
    con.execute("SET threads = 1")  # reproducibility
    con.execute("""
        CREATE TABLE train AS
        FROM read_csv('https://blobs.duckdb.org/data/financial_fraud_detection_dataset.csv')
        USING SAMPLE 80 PERCENT (reservoir, 256)
    """)
    con.execute("""
        CREATE TABLE test AS
        FROM read_csv('https://blobs.duckdb.org/data/financial_fraud_detection_dataset.csv') t
        ANTI JOIN train USING (transaction_id)
    """)
    con.execute("SET threads = 8")

    # Compute scaling/imputation params from TRAIN
    params = con.execute("""
        SELECT
            AVG(velocity_score) AS avg_v,
            STDDEV_POP(velocity_score) AS std_v,
            MIN(spending_deviation_score) AS min_s,
            MAX(spending_deviation_score) AS max_s,
            AVG(time_since_last_transaction) AS mean_t,
            MEDIAN(amount) AS med_a,
            QUANTILE_CONT(amount, 0.25) AS q25_a,
            QUANTILE_CONT(amount, 0.75) AS q75_a
        FROM train
    """).fetchdf().iloc[0].to_dict()

    # Apply full preprocessing on TEST
    result = con.execute(f"""
        SELECT
            -- One-hot (drop first: deposit = baseline for transaction_type, ACH for payment_channel)
            (transaction_type = 'payment')::DOUBLE AS transaction_type_payment,
            (transaction_type = 'transfer')::DOUBLE AS transaction_type_transfer,
            (transaction_type = 'withdrawal')::DOUBLE AS transaction_type_withdrawal,
            (payment_channel = 'card')::DOUBLE AS payment_channel_card,
            (payment_channel = 'credit_card')::DOUBLE AS payment_channel_credit_card,
            (payment_channel = 'debit_card')::DOUBLE AS payment_channel_debit_card,
            (payment_channel = 'wire_transfer')::DOUBLE AS payment_channel_wire_transfer,
            -- Scaling
            (velocity_score - {params['avg_v']}) / {params['std_v']} AS velocity_score_standard,
            (spending_deviation_score - {params['min_s']}) / NULLIF({params['max_s']} - {params['min_s']}, 0) AS spending_deviation_score_minmax,
            COALESCE(time_since_last_transaction, {params['mean_t']}) AS time_since_last_transaction_impute_mean,
            -- Robust scaling for amount
            (amount - {params['med_a']}) / NULLIF({params['q75_a']} - {params['q25_a']}, 0) AS amount_robust
        FROM test
    """).fetchdf()

    con.close()
    return result

### Run Benchmark

In [16]:
# --- scikit-learn ---
print("\n Running scikit-learn pipeline...")
start = time.time()
X_sklearn = sklearn_preprocess(df)
sklearn_time = time.time() - start
print(f" scikit-learn finished in {sklearn_time:.2f}s")
print(f"   Output shape: {X_sklearn.shape}")

# --- DuckDB ---
print("\n Running DuckDB pipeline...")
start = time.time()
df_duckdb = duckdb_preprocess()
duckdb_time = time.time() - start
print(f" DuckDB finished in {duckdb_time:.2f}s")
print(f"   Output shape: {df_duckdb.shape}")


 Running scikit-learn pipeline...
 scikit-learn finished in 23.51s
   Output shape: (1000000, 10)

 Running DuckDB pipeline...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

 DuckDB finished in 45.67s
   Output shape: (1000000, 11)


## Summary and Conclusion

This tutorial demonstrated the power and flexibility of using DuckDB in Python for efficient data analysis and preprocessing, especially with large datasets. We covered:

- **Core Concepts**: DuckDB's in-process nature, columnar storage, and SQL capabilities.
- **Integration**: Seamless integration with popular Python libraries like Pandas, Polars, and Ibis.
- **Data Handling**: Directly querying various file formats (CSV, Parquet) without loading them into memory, handling large datasets efficiently.
- **SQL Execution**: Executing SQL queries directly or through connections, and converting results to different formats.
- **Relational API**: Using the relational API for lazy evaluation and optimized query execution.
- **Machine Learning Preprocessing**: Applying DuckDB for feature encoding, scaling, imputation, and train/test splitting directly within the database, highlighting its performance benefits over traditional methods like scikit-learn for large datasets.

In conclusion, DuckDB is a valuable tool for data professionals working with large datasets in Python, offering significant advantages in terms of speed, memory efficiency, and ease of use through its SQL interface and integrations. Its ability to process data directly from files and perform complex transformations within the database makes it particularly well-suited for modern data workflows, including those involving machine learning preprocessing.

## Resources

1.  [DuckDB Documentation](https://duckdb.org/docs/)
2.  [DuckDB Python API Reference](https://duckdb.org/docs/api/python/overview)
3.  [DuckDB Blog](https://duckdb.org/blog/)
4.  [DuckDB GitHub Repository](https://github.com/duckdb/duckdb)
5.  [Ibis Project Documentation (for Ibis + DuckDB)](https://ibis-project.org/backends/duckdb)