# Introduction to Multi GPU Polars, powered by cuDF

Polars is a popular DataFrame library for manipulating structured data. Previously, NVIDIA and Polars [collaborated](https://developer.nvidia.com/blog/polars-gpu-engine-powered-by-rapids-cudf-now-available-in-open-beta/) to release a GPU engine powered by RAPIDS cuDF, allowing users to leverage NVIDIA GPUs for even greater performance.

With the 25.06 release of RAPIDS, cuDF now provides support for streaming execution, enabling you to process even larger datasets than before. Further, this new streaming executor allows users to scale up to multiple GPUs on a single node for maximum performance throughput.

This notebook is a short introduction to the new streaming GPU engine -- powered by cuDF -- running on multiple NVIDIA GPUs.

**NOTE:** This demo was originally performed on a 8xA100 (80GB) node, with a 96 core AMD EPYC 7642 CPU. Running this demo on a different system may lead to different results due to CPU and GPU configuration. Multi GPU Polars is still in an experimental stage, so if you run into problems please [file an issue](https://github.com/rapidsai/cudf/issues/new/choose).

## Set-up

First, let's see how many NVIDIA GPUs are available to us. Note that even on a single GPU, users can take advantage of the streaming executor for increased performance and scalability. This demo will utilize multiple GPUs.

In [1]:
!nvidia-smi

Fri Jun  6 16:11:53 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 570.133.20             Driver Version: 570.133.20     CUDA Version: 12.8     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  NVIDIA A100 80GB PCIe          On  |   00000000:01:00.0 Off |                    0 |
| N/A   44C    P0             74W /  300W |   14357MiB /  81920MiB |      0%      Default |
|                                         |                        |             Disabled |
+-----------------------------------------+------------------------+----------------------+
|   1  NVIDIA A100 80GB PCIe          On  |   00

Let's import the Polars library and confirm which version we're working with. We'll also import a utility to compare the outputs of the different engines we're testing.

In [2]:
import polars as pl
from polars.testing import assert_frame_equal
print(pl.__version__)

1.26.0


## Loading a Dataset

To start, we'll load a fairly large dataset which we can use to show the power of running Polars on multiple GPUs. For this demo, we'll use a Kaggle dataset containing [simulated financial transactions](https://www.kaggle.com/datasets/conorsully1/simulated-transactions). We're downloading a copy of this dataset from a GCS bucket hosted by NVIDIA to provide faster download speeds. We'll start by downloading the data. This should take about 30 seconds.






In [3]:
!wget https://storage.googleapis.com/rapidsai/polars-demo/transactions.parquet -O transactions.parquet

--2025-06-06 16:11:56--  https://storage.googleapis.com/rapidsai/polars-demo/transactions.parquet
Resolving storage.googleapis.com (storage.googleapis.com)... 142.250.191.59, 142.250.191.91, 142.251.46.187, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|142.250.191.59|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4274457161 (4.0G) [application/octet-stream]
Saving to: ‘transactions.parquet’


2025-06-06 16:12:56 (67.8 MB/s) - ‘transactions.parquet’ saved [4274457161/4274457161]



## Data License and Terms
As this dataset originates from Kaggle, it's governed by a Kaggle dataset-specific license and terms of use.

> ### CC0 1.0 Universal
> **No Copyright**

>The person who associated a work with this deed has dedicated the work to the public domain by waiving all of his or her rights to the work worldwide under copyright law, including all related and neighboring rights, to the extent allowed by law.
You can copy, modify, distribute and perform the work, even for commercial purposes, all without asking permission. See Other Information below.

> **Other Information**

>In no way are the patent or trademark rights of any person affected by CC0, nor are the rights that other persons may have in the work or in how the work is used, such as publicity or privacy rights.
Unless expressly stated otherwise, the person who associated a work with this deed makes no warranties about the work, and disclaims liability for all uses of the work, to the fullest extent permitted by applicable law.
When using or citing the work, you should not imply endorsement by the author or the affirmer.


In [4]:
transactions_df = pl.scan_parquet("transactions.parquet")

To better understand the dataset we're working with, let's take a quick look at the schema and a few sample records from our large crypto trade history dataset. Note that if I run `collect()` without anything passed in, we will just be using the Polars CPU engine.

In [5]:
transactions_df.collect_schema()

Schema([('CUST_ID', String),
        ('START_DATE', Date),
        ('END_DATE', Date),
        ('TRANS_ID', String),
        ('DATE', Date),
        ('YEAR', Int64),
        ('MONTH', Int64),
        ('DAY', Int64),
        ('EXP_TYPE', String),
        ('AMOUNT', Float64)])

In [6]:
transactions_df.head(5).collect()

CUST_ID,START_DATE,END_DATE,TRANS_ID,DATE,YEAR,MONTH,DAY,EXP_TYPE,AMOUNT
str,date,date,str,date,i64,i64,i64,str,f64
"""CI6XLYUMQK""",2015-05-01,,"""T8I9ZB5A6X90UG8""",2015-09-11,2015,9,11,"""Motor/Travel""",20.27
"""CI6XLYUMQK""",2015-05-01,,"""TZ4JSLS7SC7FO9H""",2017-02-08,2017,2,8,"""Motor/Travel""",12.85
"""CI6XLYUMQK""",2015-05-01,,"""TTUKRDDJ6B6F42H""",2015-08-01,2015,8,1,"""Housing""",383.8
"""CI6XLYUMQK""",2015-05-01,,"""TDUHFRUKGPPI6HD""",2019-03-16,2019,3,16,"""Entertainment""",5.72
"""CI6XLYUMQK""",2015-05-01,,"""T0JBZHBMSVRFMMD""",2015-05-15,2015,5,15,"""Entertainment""",11.06


## Multi-GPU Engine Setup

For this demo, we are going to be comparing Polars' [CPU streaming engine](https://docs.pola.rs/user-guide/concepts/_streaming/) to the new multi GPU streaming engine. To use the CPU streaming engine, all I have to do is pass `engine="streaming"` into my `collect` calls.

To set up our multi GPU engine, let's configure our system to take advantage of the multiple GPUs we have available. The first thing we'll do is set up a `LocalCUDACluster` using `dask_cuda`. By default, we'll be using all of the GPUs that are present on our system, but this is configurable.

In [7]:
# This will use all GPUs on the local host by default
from dask_cuda import LocalCUDACluster
from dask.distributed import Client, wait

client = Client(LocalCUDACluster())
# client = Client(LocalCUDACluster(n_workers=2))  # to specify a specific number of workers


Now let's set up my `GPUEngine` using the new streaming executor from cuDF.

In [8]:
executor_options = {"scheduler": "distributed"}  # Use "synchronous" for single GPU streaming execution
executor = "streaming"

engine_multi = pl.GPUEngine(
    executor=executor,
    executor_options=executor_options,
)

With those few lines, we have our multi GPU cluster initialized and our `GPUEngine` configured to use streaming all ready to go. Now let's take a look at how this performs on some real queries, compared to the CPU streaming engine.

# Running Queries

Let's explore the dataset. We'll take a look at which customers are spending the most money across the entire dataset and grab a number of other supporting stats that may help us understand customer patterns.

In [9]:
%%time

q1_cpu = (
    transactions_df
    .group_by("CUST_ID")
    .agg(
        pl.sum("AMOUNT").alias("total_amount"),
        pl.mean("AMOUNT").round(2).alias("avg_amount"),
        pl.min("AMOUNT").alias("min_amount"),
        pl.max("AMOUNT").alias("max_amount"),
        pl.median("AMOUNT").alias("median_amount"),
        pl.std("AMOUNT").round(2).alias("stddev_amount"),
        pl.col("AMOUNT").quantile(0.25).alias("q25_amount"),
        pl.col("AMOUNT").quantile(0.75).alias("q75_amount"),
        (pl.col("AMOUNT").quantile(0.75) - pl.col("AMOUNT").quantile(0.25)).alias("iqr_amount"),

    )
    .sort("CUST_ID", descending=True)
    .collect(engine="streaming")
)
q1_cpu

CPU times: user 5min 32s, sys: 8min 48s, total: 14min 21s
Wall time: 21.9 s


CUST_ID,total_amount,avg_amount,min_amount,max_amount,median_amount,stddev_amount,q25_amount,q75_amount,iqr_amount
str,f64,f64,f64,f64,f64,f64,f64,f64,f64
"""CZZYMEA7MJ""",18617.42,8.27,0.29,156.63,2.89,16.53,1.46,6.77,5.31
"""CZZYEH2PF6""",143326.3,53.54,2.19,439.73,8.0,78.03,3.94,85.73,81.79
"""CZZXQ5ULG7""",166066.68,30.62,2.41,680.86,9.38,78.68,4.65,26.76,22.11
"""CZZW4TZZ3H""",86072.87,106.66,18.44,1406.41,47.66,171.82,32.49,106.62,74.13
"""CZZUM94ZSD""",104694.09,17.3,1.31,407.51,5.4,44.73,2.69,14.74,12.05
…,…,…,…,…,…,…,…,…,…
"""C002LR5U74""",8796.49,11.09,1.58,59.14,4.57,12.02,2.82,16.03,13.21
"""C0026REM1Q""",35598.73,8.55,1.21,79.03,6.795,7.32,4.76,9.97,5.21
"""C001F6USSU""",415635.71,103.11,12.64,1633.94,46.3,203.56,26.21,83.85,57.64
"""C000TDGP4R""",766480.7,249.75,20.91,3409.74,88.04,439.9,44.9,288.19,243.29


That was pretty quick! We can see the power of the CPU streaming engine allowing these tasks to be performed in parallel and taking advantage of all of the available CPU cores.

Now let's run the same query using the multi GPU engine we set up.

In [10]:
%%time
q1_gpu = (
    transactions_df
    .group_by("CUST_ID")
    .agg(
        pl.sum("AMOUNT").alias("total_amount"),
        pl.mean("AMOUNT").round(2).alias("avg_amount"),
        pl.min("AMOUNT").alias("min_amount"),
        pl.max("AMOUNT").alias("max_amount"),
        pl.median("AMOUNT").alias("median_amount"),
        pl.std("AMOUNT").round(2).alias("stddev_amount"),
        pl.col("AMOUNT").quantile(0.25).alias("q25_amount"),
        pl.col("AMOUNT").quantile(0.75).alias("q75_amount"),
        (pl.col("AMOUNT").quantile(0.75) - pl.col("AMOUNT").quantile(0.25)).alias("iqr_amount"),

    )
    .sort("CUST_ID", descending=True)
    .collect(engine=engine_multi)
)
q1_gpu


CPU times: user 818 ms, sys: 3.91 s, total: 4.73 s
Wall time: 6.63 s


CUST_ID,total_amount,avg_amount,min_amount,max_amount,median_amount,stddev_amount,q25_amount,q75_amount,iqr_amount
str,f64,f64,f64,f64,f64,f64,f64,f64,f64
"""CZZYMEA7MJ""",18617.42,8.27,0.29,156.63,2.89,16.53,1.46,6.77,5.31
"""CZZYEH2PF6""",143326.3,53.54,2.19,439.73,8.0,78.03,3.94,85.73,81.79
"""CZZXQ5ULG7""",166066.68,30.62,2.41,680.86,9.38,78.68,4.65,26.76,22.11
"""CZZW4TZZ3H""",86072.87,106.66,18.44,1406.41,47.66,171.82,32.49,106.6,74.11
"""CZZUM94ZSD""",104694.09,17.3,1.31,407.51,5.4,44.73,2.69,14.74,12.05
…,…,…,…,…,…,…,…,…,…
"""C002LR5U74""",8796.49,11.09,1.58,59.14,4.57,12.02,2.82,16.03,13.21
"""C0026REM1Q""",35598.73,8.55,1.21,79.03,6.795,7.32,4.76,9.97,5.21
"""C001F6USSU""",415635.71,103.11,12.64,1633.94,46.3,203.56,26.21,83.82,57.61
"""C000TDGP4R""",766480.7,249.75,20.91,3409.74,88.04,439.9,44.9,288.19,243.29


Even on a relatively simple query with just a single dataset, we see a nice speedup from using our multi-GPU engine. We're able to utilize the 8 GPUs on our cluster to run this query about 3.5x faster than we could on CPUs.

Let's look at a little more complex query next. We'll just look at transaction IDs that contain certain characters, and perform a few aggregations to understand the number of unique customers, the number of large and small transactions, and how many unique customers we have for various expense types.

In [11]:
%%time
q2_cpu = (
    transactions_df
    .filter(
        pl.col("TRANS_ID").str.contains("A") | pl.col("TRANS_ID").str.contains("B")
    )
    .group_by("EXP_TYPE")
    .agg([
        pl.n_unique("TRANS_ID").alias("distinct_ab_ids"),
        pl.n_unique("CUST_ID").alias("distinct_customers"),
        (pl.col("CUST_ID").n_unique() / pl.len()).alias("customer_diversity_ratio"),
        (pl.col("AMOUNT") > 1000).len().alias("num_large_txns"),
        (pl.col("AMOUNT") < 10).len().alias("num_micro_txns"),
    ])
    .collect(engine="streaming")
)
q2_cpu 

CPU times: user 4min 30s, sys: 2min 51s, total: 7min 22s
Wall time: 1min


EXP_TYPE,distinct_ab_ids,distinct_customers,customer_diversity_ratio,num_large_txns,num_micro_txns
str,u32,u32,f64,u32,u32
"""Housing""",950172,35501,0.037363,950172,950172
"""Entertainment""",81694858,71250,0.000872,81694858,81694858
"""Gambling""",2594431,14398,0.00555,2594431,2594431
"""Groceries""",24994970,75000,0.003001,24994970,24994970
"""Tax""",1622190,60748,0.037448,1622190,1622190
…,…,…,…,…,…
"""Education""",1877951,28334,0.015088,1877951,1877951
"""Health""",4276463,71237,0.016658,4276463,4276463
"""Fines""",27785,6244,0.224726,27785,27785
"""Clothing""",4275270,71219,0.016658,4275270,4275270


Again, we see that the CPU streaming engine allows us to run this pretty quickly, though with a bit more complexity added to the query we do have to wait a bit for results now. Let's see what kind of speed-ups we can get from running this on our multi GPU engine again.

In [12]:
%%time
q2_gpu = (
    transactions_df
    .filter(
        pl.col("TRANS_ID").str.contains("A") | pl.col("TRANS_ID").str.contains("B")
    )
    .group_by("EXP_TYPE")
    .agg([
        pl.n_unique("TRANS_ID").alias("distinct_ab_ids"),
        pl.n_unique("CUST_ID").alias("distinct_customers"),
        (pl.col("CUST_ID").n_unique() / pl.len()).alias("customer_diversity_ratio"),
        (pl.col("AMOUNT") > 1000).sum().alias("num_large_txns"),
        (pl.col("AMOUNT") < 10).sum().alias("num_micro_txns"),
    ])
    .collect(engine=engine_multi)
)
q2_gpu

CPU times: user 693 ms, sys: 658 ms, total: 1.35 s
Wall time: 7.87 s


EXP_TYPE,distinct_ab_ids,distinct_customers,customer_diversity_ratio,num_large_txns,num_micro_txns
str,i64,i64,f64,u32,u32
"""Bills and Utilities""",4748360,71252,0.015006,303677,816824
"""Gambling""",2594431,14398,0.00555,0,279641
"""Groceries""",24994970,75003,0.003001,12565,3909102
"""Fines""",27785,6244,0.224726,0,2921
"""Clothing""",4275270,71222,0.016659,59810,360004
…,…,…,…,…,…
"""Entertainment""",81694858,71253,0.000872,2,33232960
"""Tax""",1622190,60748,0.037448,182763,19227
"""Savings""",1196982,28079,0.023458,25526,104570
"""Education""",1877951,28336,0.015089,120081,104589


With a bit more complexity, we see that the relative gain from executing on our multi GPU engine grows, as we're now able to finish this query around 8x faster.

Let's take a look at one more example. For this one, we'll load another small dataset containg some simulated weather data that we can use to join on this dataset.

In [13]:
!wget https://storage.googleapis.com/rapidsai/polars-demo/rainfall_data_2010_2020.csv

--2025-06-06 16:15:13--  https://storage.googleapis.com/rapidsai/polars-demo/rainfall_data_2010_2020.csv
Resolving storage.googleapis.com (storage.googleapis.com)... 142.251.32.59, 142.250.189.219, 142.250.189.251, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|142.251.32.59|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 131421 (128K) [text/csv]
Saving to: ‘rainfall_data_2010_2020.csv.9’


2025-06-06 16:15:13 (7.00 MB/s) - ‘rainfall_data_2010_2020.csv.9’ saved [131421/131421]



In [14]:
names = ['Location', 'Rainfall (inches)', 'DATE', 'YEAR', 'MONTH', 'DAY']

weather = pl.scan_csv("rainfall_data_2010_2020.csv", new_columns=names)

weather = (
    weather
    .with_columns(pl.col("DATE").cast(pl.Utf8).str.strptime(pl.Date(), "%Y%m%d"))
    .collect()
)
weather.head()

Location,Rainfall (inches),DATE,YEAR,MONTH,DAY
str,f64,date,i64,i64,i64
"""Tatooine""",0.33,2010-01-01,2010,1,1
"""Tatooine""",0.0,2010-01-02,2010,1,2
"""Tatooine""",0.28,2010-01-03,2010,1,3
"""Tatooine""",0.26,2010-01-04,2010,1,4
"""Tatooine""",0.39,2010-01-05,2010,1,5


We might be interested in understanding how certain expense types are impacted by the weather. For example, what if we only want to know how much customers are spending on Entertainment on rainy days vs. sunny? We can use the rainfall dataset with our transaction dataset to explore patterns like this.

Let's take a look at the number of customers and amounts spent on different expense types on _rainy_ days.

In [15]:
%%time
q3_cpu = (
    transactions_df
    .join(
        weather.lazy().select(["DATE", "Rainfall (inches)"]),
        on="DATE",
        how="inner"
    )
    .filter(pl.col("Rainfall (inches)") > 0)
    .group_by("EXP_TYPE")
    .agg([
        pl.col("TRANS_ID").n_unique().alias("unique_txn_count"),
        pl.col("CUST_ID").n_unique().alias("unique_customer_count"),
        pl.col("AMOUNT").mean().alias("avg_amount"),
        pl.col("AMOUNT").sum().alias("total_amount"),
    ])
    .sort("total_amount", descending=True)
    .collect(engine="streaming")
)
q3_cpu

CPU times: user 4min 54s, sys: 4min 28s, total: 9min 22s
Wall time: 31.3 s


EXP_TYPE,unique_txn_count,unique_customer_count,avg_amount,total_amount
str,u32,u32,f64,f64
"""Motor/Travel""",16548136,60166,131.201647,2.1711e9
"""Groceries""",25825356,75000,79.565491,2.0548e9
"""Entertainment""",84423303,71250,23.866129,2.0149e9
"""Housing""",968072,35506,1552.703778,1.5031e9
"""Bills and Utilities""",4879406,71250,206.575511,1.0080e9
…,…,…,…,…
"""Tax""",1651777,60750,413.093358,6.8234e8
"""Education""",1930972,28334,278.486389,5.3775e8
"""Gambling""",2681960,14398,104.243855,2.7958e8
"""Savings""",1232942,28079,222.827497,2.7473e8


Finally, we run the same query on our multi GPU engine for comparison.

In [16]:
%%time
q3_gpu = (
    transactions_df
    .join(
        weather.lazy().select(["DATE", "Rainfall (inches)"]),
        on="DATE",
        how="inner"
    )
    .filter(pl.col("Rainfall (inches)") > 0)
    .group_by("EXP_TYPE")
    .agg([
        pl.col("TRANS_ID").n_unique().alias("unique_txn_count"),
        pl.col("CUST_ID").n_unique().alias("unique_customer_count"),
        pl.col("AMOUNT").mean().alias("avg_amount"),
        pl.col("AMOUNT").sum().alias("total_amount"),
    ])
    .sort("total_amount", descending=True)
    .collect(engine=engine_multi)
)
q3_gpu

CPU times: user 560 ms, sys: 423 ms, total: 983 ms
Wall time: 5.82 s


EXP_TYPE,unique_txn_count,unique_customer_count,avg_amount,total_amount
str,i64,i64,f64,f64
"""Motor/Travel""",16548136,60168,131.201647,2.1711e9
"""Groceries""",25825356,75003,79.565491,2.0548e9
"""Entertainment""",84423303,71253,23.866129,2.0149e9
"""Housing""",968072,35506,1552.703778,1.5031e9
"""Bills and Utilities""",4879406,71252,206.575511,1.0080e9
…,…,…,…,…
"""Tax""",1651777,60751,413.093358,6.8234e8
"""Education""",1930972,28336,278.486389,5.3775e8
"""Gambling""",2681960,14398,104.243855,2.7958e8
"""Savings""",1232942,28081,222.827497,2.7473e8


These simple queries show just a taste of the power of using the GPU streaming engine to run queries across multiple GPUs. As dataset sizes grow and queries become more complex, the multi GPU Polars engine shines in its ability to maximize performance. 

To learn more about the multi GPU polars engine, check out our [docs](https://docs.rapids.ai/api/cudf/stable/cudf_polars/engine_options/).