In [2]:
import os
from multiprocessing import get_context
from functools import partial
from dask.distributed import LocalCluster


In [1]:
from read_parquet import duckdb_read_parquet

duckdb_read_parquet(1)

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

85.33304012499866

In [3]:
from read_parquet import (
    pl_read_parquet,
    pd_read_parquet,
    spark_read_parquet,
    dask_read_parquet,
    modin_read_parquet,
    duckdb_read_parquet,
)

class ProcessTimer:
    def time_in_process(func, repeat: int, *args):
        results = []
        for _ in range(repeat):
            ctx = get_context("spawn")
            with ctx.Pool(processes=1) as pool:
                results.append(pool.apply(func, args))

        return sum(results)

    def time_in_process_single_cpu(func, repeat: int):
        return time_in_process(func, repeat, cpu_count=1)

    def time_in_process_all_cpu(func, repeat: int):
        return time_in_process(func, repeat, cpu_count=os.cpu_count())

    def timer(func, repeat: int, **kwargs):
        results = []
        for _ in range(repeat):
            results.append(func(**kwargs))
        return sum(results)

    def print_stat(description: str, duration: float, repeats: int):
        print(
            f"{description}: {duration}s | Mean: {duration / repeats}s | Reading rate: {repeats / duration} op/s"
        )


repeat_time = 3
pl_all_cpu_duration = time_in_process_all_cpu(pl_read_parquet, repeat_time)
pl_one_cpu_duration = time_in_process_single_cpu(pl_read_parquet, repeat_time)

pd_all_cpu_duration = time_in_process_all_cpu(pd_read_parquet, repeat_time)
pd_one_thread_duration = time_in_process_single_cpu(pd_read_parquet, repeat_time)

pyspark_all_thread_duration = time_in_process_all_cpu(spark_read_parquet, repeat_time)
pyspark_one_thread_duration = time_in_process_single_cpu(spark_read_parquet, repeat_time)

with LocalCluster(memory_limit="16GiB",threads_per_worker=1, n_workers=1) as cluster:
    dask_one_thread_duration = time_in_process(dask_read_parquet, repeat_time)
with LocalCluster(memory_limit="16GiB",threads_per_worker=os.cpu_count(), n_workers=1) as cluster:
    dask_all_thread_duration = time_in_process(dask_read_parquet, repeat_time)

modin_max_threads_duration = time_in_process_all_cpu(modin_read_parquet, repeat_time)
modin_one_thread_duration = time_in_process_single_cpu(modin_read_parquet, repeat_time)

duck_db_max_threads_duration = time_in_process_all_cpu(duckdb_read_parquet, repeat_time)
duck_db_one_thread_duration = time_in_process_single_cpu(duckdb_read_parquet, repeat_time)


print_stat("Polars with all threads", pl_max_threads_duration)
print_stat("Polars with one thread", pl_one_thread_duration)
print_stat("Pandas with all threads", pd_duration)
print_stat("Pandas with one thread", pd_one_thread_duration)
print_stat("Pyspark with all threads", pyspark_all_thread_duration)
print_stat("Pyspark with one thread", pyspark_one_thread_duration)
print_stat("Dask with all threads", dask_all_thread_duration)
print_stat("Dask with one thread", dask_one_thread_duration)
print_stat("Modin with all threads", modin_max_threads_duration, repeat_time)
print_stat("Modin with one thread", modin_one_thread_duration, repeat_time)
print_stat("Modin with all threads", duck_db_max_threads_duration, repeat_time)
print_stat("Modin with one thread", duck_db_one_thread_duration, repeat_time)


2024-05-29 11:08:01,673	INFO worker.py:1749 -- Started a local Ray instance.
[36m(raylet)[0m Spilled 3053 MiB, 56 objects, write throughput 1345 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.
[36m(raylet)[0m Spilled 5169 MiB, 74 objects, write throughput 1192 MiB/s.
[36m(raylet)[0m Spilled 8522 MiB, 89 objects, write throughput 1134 MiB/s.
2024-05-29 11:19:30,222	INFO worker.py:1749 -- Started a local Ray instance.
[36m(raylet)[0m Spilled 2246 MiB, 68 objects, write throughput 2182 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.
[36m(raylet)[0m Spilled 4933 MiB, 77 objects, write throughput 1595 MiB/s.
[36m(raylet)[0m Spilled 8226 MiB, 102 objects, write throughput 1419 MiB/s.
2024-05-29 11:20:17,033	INFO worker.py:1749 -- Started a local Ray instance.
[36m(raylet)[0m Spilled 2587 MiB, 71 objects, write throughput 1552 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.
[36m(raylet)[0m Spilled 5029 MiB, 85 objects, write throughput 16

Modin with all threads: 62.46101733400428s | Mean: 20.82033911133476s | Reading rate: 0.0480299573725767 op/s
Modin with one thread: 37.067314541011s | Mean: 12.355771513670334s | Reading rate: 0.08093383718641992 op/s


In [4]:
from random import randint
import polars as pl
from faker import Faker

sample_data = pl.read_parquet("../datasets/train.parquet")

faker = Faker()
# users = pl.DataFrame(
#     [
#         {
#             "id": i,
#             "name": faker.name(),
#             "address": faker.address(),
#             "balance": float(randint(0, 1000000)),
#             "is_deleted": randint(0, 10) == 0,
#             "cards": [{"number": faker.credit_card_number(), "provider": faker.credit_card_provider(), "expire": faker.credit_card_expire()} for _ in range(randint(0,5))]
#         }
#         for i in range(100000)
#     ]
# )
# users.write_parquet("../datasets/users.parquet")

devices = ["mobile", "pc", "domofon"]
users_session = pl.DataFrame(
    [
        {
            "user_id": randint(0, 105000),
            "session_id": i,
            "location": faker.address(),
            "device": devices[randint(0, 2)],
            
        }
        for i in range(sample_data.unique("session").shape[0] + 10000)
    ]
)
users_session.write_parquet("../datasets/users_session.parquet")

In [2]:
import polars as pl
pl.read_parquet("../datasets/train.parquet").with_columns(session=pl.col("session").cast(pl.Int64()), aid=pl.col("aid").cast(pl.Int64()), ts=pl.col("ts").cast(pl.Int64())).limit(5000000).write_parquet("../datasets/train.parquet")

In [2]:
from join_data import duckdb_join

duckdb_join(10)

(49.954491624986986, 11483332608)

In [18]:
import polars as pl
pl.read_parquet("../datasets/train.parquet").describe()
# pl.scan_parquet("../datasets/train.parquet").group_by("session").agg(
#             [
#                 pl.col("ts").first().alias("first_ts"),
#                 pl.col("aid").mean().alias("mean_aid"),
#                 pl.col("aid").sum().alias("sum_aid"),
#                 pl.col("aid").count().alias("count_aid"),
#                 pl.col("aid").median().alias("median_aid"),
#                 pl.col("aid").min().alias("min_aid"),
#                 pl.col("aid").max().alias("max_aid"),
#             ]
#         ).collect()

statistic,session,aid,ts,type
str,f64,f64,f64,f64
"""count""",5000000.0,5000000.0,5000000.0,5000000.0
"""null_count""",0.0,0.0,0.0,0.0
"""mean""",47519.890716,929009.162789,1660300000.0,0.1045832
"""std""",28509.12232,536183.400514,757636.8727,0.358765
"""min""",0.0,1.0,1659300000.0,0.0
"""25%""",22437.0,469409.0,1659600000.0,0.0
"""50%""",47662.0,927765.0,1660200000.0,0.0
"""75%""",72632.0,1393975.0,1660900000.0,0.0
"""max""",95893.0,1855601.0,1661700000.0,2.0


In [20]:
import polars as pl
pl.read_parquet("../datasets/users.parquet")

id,name,address,balance,is_deleted,cards
i64,str,str,f64,bool,list[struct[3]]
0,"""Tiffany Lane""","""638 Dylan Vista West Jonathanh…",983528.0,false,"[{""4107518457527"",""Maestro"",""01/29""}]"
1,"""Kelsey Haynes""","""852 Samuel Landing Port Wayne,…",1510.0,true,"[{""213196942001800"",""JCB 15 digit"",""08/27""}]"
2,"""Arthur Franklin""","""9270 Dawn Shores Suite 162 Ran…",263717.0,false,"[{""6581630471664274"",""VISA 16 digit"",""05/33""}, {""3592784318012041"",""American Express"",""09/29""}, {""4927738600686684"",""Diners Club / Carte Blanche"",""10/32""}]"
3,"""Lisa Garcia""","""7056 David Loaf Apt. 550 New H…",385942.0,false,"[{""2621076554998992"",""American Express"",""03/28""}, {""3536573740732725"",""VISA 19 digit"",""10/25""}, … {""4340086749898912"",""VISA 13 digit"",""01/26""}]"
4,"""Betty Scott""","""83548 Angela Centers Apt. 780 …",62807.0,false,"[{""4001765127838885"",""VISA 13 digit"",""10/27""}, {""180065515930894"",""Mastercard"",""11/32""}]"
…,…,…,…,…,…
99995,"""Diane Green""","""Unit 9989 Box 5423 DPO AA 2290…",514508.0,false,"[{""4804043829704899"",""American Express"",""09/30""}]"
99996,"""Hector Gonzalez""","""1369 Abigail Bypass Suite 876 …",876677.0,false,"[{""4830661757645808701"",""JCB 16 digit"",""06/30""}, {""4904882154483933799"",""Mastercard"",""02/33""}, … {""213121765239501"",""VISA 19 digit"",""11/31""}]"
99997,"""Jasmine Williams""","""6201 John Views Apt. 852 New D…",502120.0,false,"[{""3515291452549832"",""JCB 15 digit"",""01/31""}, {""30468516391102"",""VISA 16 digit"",""06/29""}]"
99998,"""Brittany Wright""","""673 Ashley Inlet Lake Travisst…",14968.0,false,"[{""3532065714991233"",""Maestro"",""04/30""}, {""6553402899707042"",""VISA 16 digit"",""06/30""}, … {""3516546005688760"",""JCB 16 digit"",""07/30""}]"
