# CSV vs Parquet vs Delta Lake

In [1]:
from pathlib import Path

import pandas as pd
import pyarrow as pa
import pyarrow.csv
import pyarrow.parquet as pq
from deltalake import DeltaTable

## CSV

In [2]:
# this computation doesn't run on my machine
# it errors out because there isn't enough memory
%%time
(
    pd.read_csv(f"{Path.home()}/data/G1_1e9_1e2_0_0.csv")
    .query("id1 == 'id016'")
    .groupby("id2")
    .agg({"v1": "sum"})
)

UsageError: Line magic function `%%time` not found.


In [None]:
(df.query("id1 == 'id016'").groupby("id2").agg({"v1": "sum"}))

## CSV with usecols

In [12]:
%%time
(
    pd.read_csv(f"{Path.home()}/data/G1_1e9_1e2_0_0.csv", usecols=["id1", "id2", "v1"])
    .query("id1 == 'id016'")
    .groupby("id2")
    .agg({"v1": "sum"})
)

CPU times: user 3min 9s, sys: 35.7 s, total: 3min 45s
Wall time: 3min 54s


Unnamed: 0_level_0,v1
id2,Unnamed: 1_level_1
id001,301302
id002,299602
id003,300751
id004,300182
id005,298422
...,...
id096,299284
id097,300429
id098,301122
id099,298842


## CSV => Parquet

### groupby

In [6]:
in_path = f"{Path.home()}/data/G1_1e8_1e2_0_0.csv"
out_path = f"{Path.home()}/data/G1_1e8_1e2_0_0.parquet"

In [7]:
writer = None
with pyarrow.csv.open_csv(in_path) as reader:
    for next_chunk in reader:
        if next_chunk is None:
            break
        if writer is None:
            writer = pq.ParquetWriter(out_path, next_chunk.schema)
        next_table = pa.Table.from_batches([next_chunk])
        writer.write_table(next_table)
writer.close()

In [4]:
in_path = f"{Path.home()}/data/G1_1e9_1e2_0_0.csv"
out_path = f"{Path.home()}/data/G1_1e9_1e2_0_0.parquet"

In [7]:
writer = None
with pyarrow.csv.open_csv(in_path) as reader:
    for next_chunk in reader:
        if next_chunk is None:
            break
        if writer is None:
            writer = pq.ParquetWriter(out_path, next_chunk.schema)
        next_table = pa.Table.from_batches([next_chunk])
        writer.write_table(next_table)
writer.close()

### join

In [2]:
def csv_to_parquet(csv_path, parquet_path):
    writer = None
    with pyarrow.csv.open_csv(in_path) as reader:
        for next_chunk in reader:
            if next_chunk is None:
                break
            if writer is None:
                writer = pq.ParquetWriter(out_path, next_chunk.schema)
            next_table = pa.Table.from_batches([next_chunk])
            writer.write_table(next_table)
    writer.close()

In [3]:
in_path = f"{Path.home()}/data/J1_1e9_1e3_0_0.csv"
out_path = f"{Path.home()}/data/J1_1e9_1e3_0_0.parquet"
csv_to_parquet(in_path, out_path)

In [4]:
in_path = f"{Path.home()}/data/J1_1e9_1e6_0_0.csv"
out_path = f"{Path.home()}/data/J1_1e9_1e6_0_0.parquet"
csv_to_parquet(in_path, out_path)

In [5]:
in_path = f"{Path.home()}/data/J1_1e9_1e9_0_0.csv"
out_path = f"{Path.home()}/data/J1_1e9_1e9_0_0.parquet"
csv_to_parquet(in_path, out_path)

In [6]:
in_path = f"{Path.home()}/data/J1_1e9_NA_0_0.csv"
out_path = f"{Path.home()}/data/J1_1e9_NA_0_0.parquet"
csv_to_parquet(in_path, out_path)

KeyboardInterrupt: 

## Parquet

In [12]:
%%time
(
    pd.read_parquet(
        f"{Path.home()}/data/G1_1e9_1e2_0_0.parquet", columns=["id1", "id2", "v1"]
    )
    .astype({"v1": "int64"})
    .query("id1 == 'id016'")
    .groupby("id2")
    .agg({"v1": "sum"})
)

CPU times: user 1min 23s, sys: 46.2 s, total: 2min 9s
Wall time: 1min 58s


Unnamed: 0_level_0,v1
id2,Unnamed: 1_level_1
id001,301302
id002,299602
id003,300751
id004,300182
id005,298422
...,...
id096,299284
id097,300429
id098,301122
id099,298842


In [3]:
parquet_file = pq.ParquetFile(f"{Path.home()}/data/G1_1e9_1e2_0_0.parquet")

In [4]:
parquet_file.metadata

<pyarrow._parquet.FileMetaData object at 0x10771b540>
  created_by: parquet-cpp-arrow version 11.0.0
  num_columns: 9
  num_rows: 1000000000
  num_row_groups: 50467
  format_version: 2.6
  serialized_size: 47438984

## Use PySpark to compact and Z Order the Delta table

In [9]:
from pathlib import Path

import delta
import pyspark
from delta import configure_spark_with_delta_pip

In [5]:
builder = (
    pyspark.sql.SparkSession.builder.appName("MyApp")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.executor.memory", "10G")
    .config("spark.driver.memory", "25G")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

:: loading settings :: url = jar:file:/Users/matthew.powers/opt/miniconda3/envs/pyspark-340-delta-240/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/matthew.powers/.ivy2/cache
The jars for the packages stored in: /Users/matthew.powers/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-eb5cf25a-4ac4-4b1d-a3a2-6a9c52822e2a;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.4.0 in central
	found io.delta#delta-storage;2.4.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 105ms :: artifacts dl 4ms
	:: modules in use:
	io.delta#delta-core_2.12;2.4.0 from central in [default]
	io.delta#delta-storage;2.4.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default    

In [10]:
delta_table = delta.DeltaTable.forPath(
    spark, f"{Path.home()}/data/deltalake_baseline_G1_1e9_1e2_0_0"
)

In [11]:
delta_table.optimize().executeCompaction()

23/06/14 12:25:38 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

CPU times: user 2.76 ms, sys: 2.53 ms, total: 5.29 ms
Wall time: 2.85 s


DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,totalClusterParallelism:bigint,totalScheduledTasks:bigint,autoCompactParallelismStats:struct<maxClusterActiveParallelism:bigint,minClusterActiveParallelism:bigint,maxSessionActiveParallelism:bigint,minSessionActiveParallelism:bigint>,de

In [12]:
delta_table.optimize().executeZOrderBy("id1")

                                                                                

CPU times: user 455 ms, sys: 155 ms, total: 611 ms
Wall time: 10min 48s


DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,totalClusterParallelism:bigint,totalScheduledTasks:bigint,autoCompactParallelismStats:struct<maxClusterActiveParallelism:bigint,minClusterActiveParallelism:bigint,maxSessionActiveParallelism:bigint,minSessionActiveParallelism:bigint>,de

In [14]:
delta_table.history().show()

+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|      2|2023-06-14 12:36:...|  null|    null| OPTIMIZE|{predicate -> [],...|null|    null|     null|          1|SnapshotIsolation|        false|{numRemovedFiles ...|        null|Apache-Spark/3.4....|
|      1|2023-05-29 09:10:...|  null|    null| OPTIMIZE|{predicate -> [],...|null|    null|     null|          0|SnapshotIsolation|        false|{numRemovedFiles ...|        null|Apache-Spark/3.4.

In [13]:
from pathlib import Path

import pandas as pd
from deltalake import DeltaTable

## Delta Lake

In [14]:
%%time
(
    DeltaTable(f"{Path.home()}/data/deltalake_baseline_G1_1e9_1e2_0_0", version=0)
    .to_pandas(filters=[("id1", "==", "id016")], columns=["id1", "id2", "v1"])
    .astype({"v1": "int64"})
    .query("id1 == 'id016'")
    .groupby("id2")
    .agg({"v1": "sum"})
)

CPU times: user 48.5 s, sys: 4.6 s, total: 53.1 s
Wall time: 7.68 s


Unnamed: 0_level_0,v1
id2,Unnamed: 1_level_1
id001,301302
id002,299602
id003,300751
id004,300182
id005,298422
...,...
id096,299284
id097,300429
id098,301122
id099,298842


In [17]:
DeltaTable(
    f"{Path.home()}/data/deltalake_baseline_G1_1e9_1e2_0_0", version=0
).get_add_actions(flatten=True).to_pandas()

Unnamed: 0,path,size_bytes,modification_time,data_change,num_records,null_count.id1,min.id1,max.id1,null_count.id2,min.id2,...,max.id6,null_count.v1,min.v1,max.v1,null_count.v2,min.v2,max.v2,null_count.v3,min.v3,max.v3
0,part-00000-90754a7a-62a7-499b-8160-56d8a701c8d...,70209358,2023-05-29 12:50:57.148,True,2536351,0,id001,id100,0,id001,...,9999992,0,1,5,0,1,9,0,0.00021,99.999998
1,part-00001-016e0c6c-8a1a-41ad-9d2c-a36f582d738...,70209635,2023-05-29 12:50:57.204,True,2536356,0,id001,id100,0,id001,...,9999979,0,1,5,0,1,9,0,0.000124,99.999984
2,part-00002-2402f3d1-6e39-4671-8ad0-f9c3d5d4da6...,70201699,2023-05-29 12:50:57.251,True,2536330,0,id001,id100,0,id001,...,9999998,0,1,5,0,1,9,0,0.000122,99.999969
3,part-00003-b611c37c-f2fe-4d82-9a13-22d2b210f85...,70205391,2023-05-29 12:50:57.155,True,2536343,0,id001,id100,0,id001,...,999999,0,1,5,0,1,9,0,0.000142,99.999925
4,part-00004-5a2c5bce-87fb-4446-be71-3107d537936...,70203769,2023-05-29 12:50:57.453,True,2536353,0,id001,id100,0,id001,...,9999993,0,1,5,0,1,9,0,0.000126,99.999976
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
390,part-00390-cbfc10e0-d714-4857-ac4c-1b2452b1dae...,70206609,2023-05-29 12:56:47.502,True,2536384,0,id001,id100,0,id001,...,9999994,0,1,5,0,1,9,0,0.000106,99.999975
391,part-00391-5a4ad5c6-629d-4cbd-934a-e4197eae1d2...,70204788,2023-05-29 12:56:48.175,True,2536313,0,id001,id100,0,id001,...,9999993,0,1,5,0,1,9,0,0.00012,99.99991
392,part-00392-e86451bb-9974-430e-b2fd-5300cb819ba...,70209102,2023-05-29 12:56:49.254,True,2536371,0,id001,id100,0,id001,...,9999982,0,1,5,0,1,9,0,0.000128,99.999929
393,part-00393-3cc47bfb-8b49-4ba6-b332-7fab3fea4c5...,70206559,2023-05-29 12:56:49.799,True,2536335,0,id001,id100,0,id001,...,9999988,0,1,5,0,1,9,0,0.000151,99.999994


In [15]:
%%time
(
    DeltaTable(f"{Path.home()}/data/deltalake_baseline_G1_1e9_1e2_0_0", version=1)
    .to_pandas(filters=[("id1", "==", "id016")], columns=["id1", "id2", "v1"])
    .astype({"v1": "int64"})
    .query("id1 == 'id016'")
    .groupby("id2")
    .agg({"v1": "sum"})
)

CPU times: user 3.74 s, sys: 283 ms, total: 4.02 s
Wall time: 2.42 s


Unnamed: 0_level_0,v1
id2,Unnamed: 1_level_1
id001,301302
id002,299602
id003,300751
id004,300182
id005,298422
...,...
id096,299284
id097,300429
id098,301122
id099,298842


In [18]:
DeltaTable(
    f"{Path.home()}/data/deltalake_baseline_G1_1e9_1e2_0_0", version=1
).get_add_actions(flatten=True).to_pandas()

Unnamed: 0,path,size_bytes,modification_time,data_change,num_records,null_count.id1,min.id1,max.id1,null_count.id2,min.id2,...,max.id6,null_count.v1,min.v1,max.v1,null_count.v2,min.v2,max.v2,null_count.v3,min.v3,max.v3
0,part-00000-fd58de65-3be1-4081-88b2-aef05bb75c8...,1163029969,2023-05-29 13:08:01.697,False,42790653,0,id001,id005,0,id001,...,9999999,0,1,5,0,1,9,0,0.000103,99.999998
1,part-00001-c9e78196-483a-4c23-bfec-d4b310c5799...,1128786193,2023-05-29 13:07:58.379,False,41530608,0,id005,id009,0,id001,...,9999999,0,1,5,0,1,9,0,0.000105,9e-06
2,part-00002-72d6c763-b61b-4e49-9d00-4b5ab8a50f1...,1056320838,2023-05-29 13:07:47.876,False,38864966,0,id009,id013,0,id001,...,9999999,0,1,5,0,1,9,0,0.000101,99.999996
3,part-00003-ca8a4c82-6d44-4b70-bb71-d331b9074d6...,1095571507,2023-05-29 13:07:59.275,False,40308912,0,id013,id017,0,id001,...,9999999,0,1,5,0,1,9,0,0.000101,9e-05
4,part-00004-8899b9b3-127f-4be4-b9db-b5cef37d177...,1087116885,2023-05-29 13:07:53.880,False,39998190,0,id017,id021,0,id001,...,9999999,0,1,5,0,1,9,0,0.0,99.999999
5,part-00005-1394e6b5-0224-49bf-bbf6-31cff05a21d...,1091545704,2023-05-29 13:07:50.409,False,40161100,0,id021,id025,0,id001,...,9999999,0,1,5,0,1,9,0,0.000101,9e-06
6,part-00006-14b9b5dd-b7c6-4f41-b441-13e3b784f6a...,1044446596,2023-05-29 13:07:47.480,False,38428290,0,id025,id029,0,id001,...,9999999,0,1,5,0,1,9,0,0.000103,9e-06
7,part-00007-e5e3c262-80b3-43cb-82c0-f5d58c0c0db...,1003563065,2023-05-29 13:07:43.623,False,37094398,0,id029,id032,0,id001,...,9999999,0,1,5,0,1,9,0,0.000102,9e-06
8,part-00008-c613bb2c-44e1-4a9d-a859-53a812a60b9...,1200098986,2023-05-29 13:08:00.827,False,44155534,0,id032,id037,0,id001,...,9999999,0,1,5,0,1,9,0,0.000103,9e-05
9,part-00009-4dbd41f1-73f4-48f3-b99c-28594f74ebe...,1127516753,2023-05-29 13:07:56.888,False,41484321,0,id037,id041,0,id001,...,9999999,0,1,5,0,1,9,0,0.000102,99.999999


## Z Ordered Delta table

In [16]:
%%time
(
    DeltaTable(f"{Path.home()}/data/deltalake_baseline_G1_1e9_1e2_0_0", version=2)
    .to_pandas(filters=[("id1", "==", "id016")], columns=["id1", "id2", "v1"])
    .astype({"v1": "int64"})
    .query("id1 == 'id016'")
    .groupby("id2")
    .agg({"v1": "sum"})
)

CPU times: user 3.74 s, sys: 264 ms, total: 4 s
Wall time: 2.39 s


Unnamed: 0_level_0,v1
id2,Unnamed: 1_level_1
id001,301302
id002,299602
id003,300751
id004,300182
id005,298422
...,...
id096,299284
id097,300429
id098,301122
id099,298842


In [19]:
DeltaTable(
    f"{Path.home()}/data/deltalake_baseline_G1_1e9_1e2_0_0", version=2
).get_add_actions(flatten=True).to_pandas()

Unnamed: 0,path,size_bytes,modification_time,data_change,num_records,null_count.id1,min.id1,max.id1,null_count.id2,min.id2,...,max.id6,null_count.v1,min.v1,max.v1,null_count.v2,min.v2,max.v2,null_count.v3,min.v3,max.v3
0,part-00000-3d3ee4e2-9787-4c21-8547-0fe07745cd7...,1059055853,2023-06-14 16:33:57.835,False,39145223,0,id001,id004,0,id001,...,9999999,0,1,5,0,1,9,0,0.000103,99.999998
1,part-00001-ee8c1014-0978-4023-bea6-e7cc462cf9e...,1075576939,2023-06-14 16:34:12.676,False,39761003,0,id004,id008,0,id001,...,9999999,0,1,5,0,1,9,0,0.000105,9e-06
2,part-00002-b32b4555-132f-466e-b545-4e80c255b9c...,1105831308,2023-06-14 16:34:12.757,False,40867265,0,id008,id012,0,id001,...,9999999,0,1,5,0,1,9,0,0.000101,99.999996
3,part-00003-678beece-9292-4a0c-b7a0-b1f1a2e1704...,1097895679,2023-06-14 16:34:12.367,False,40429216,0,id012,id017,0,id001,...,9999999,0,1,5,0,1,9,0,0.000101,9e-05
4,part-00004-f1743822-22a1-42ab-a1b8-136fad3dd24...,1087919315,2023-06-14 16:33:58.828,False,40077497,0,id017,id021,0,id001,...,9999999,0,1,5,0,1,9,0,0.0,99.999999
5,part-00005-6333583e-9d30-48d4-bb80-dde5a1eba84...,1098411156,2023-06-14 16:34:07.786,False,40466981,0,id021,id025,0,id001,...,9999999,0,1,5,0,1,9,0,0.000101,9e-06
6,part-00006-79c89e59-c6ce-455c-a3b2-b69e395094b...,1077354947,2023-06-14 16:33:57.520,False,39690534,0,id025,id029,0,id001,...,9999999,0,1,5,0,1,9,0,0.000103,9e-06
7,part-00007-ec315568-d2dd-42a8-933c-fa5e5bd07b6...,1077499446,2023-06-14 16:33:58.500,False,39836509,0,id029,id033,0,id001,...,9999999,0,1,5,0,1,9,0,0.000102,9e-06
8,part-00008-f40ef00f-387a-4064-83c7-07492eea0f5...,1068411157,2023-06-14 16:34:11.870,False,39490857,0,id033,id036,0,id001,...,9999999,0,1,5,0,1,9,0,0.000103,9e-05
9,part-00009-c1b3c211-c792-4f12-91ee-56ab4160f94...,1103138622,2023-06-14 16:34:08.026,False,40621729,0,id036,id041,0,id001,...,9999999,0,1,5,0,1,9,0,0.000102,99.999999


## Compare with old way to use file skipping

In [10]:
from pathlib import Path

import pyarrow.dataset as ds
from deltalake import DeltaTable

In [11]:
%%time

dt = DeltaTable(f"{Path.home()}/data/deltalake_baseline_G1_1e9_1e2_0_0", version=2)
dataset = dt.to_pyarrow_dataset()
condition = ds.field("id1") == "id016"
(
    dataset.to_table(filter=condition, columns=["id1", "id2", "v1"])
    .to_pandas()
    .astype({"v1": "int64"})
    .query("id1 == 'id016'")
    .groupby("id2")
    .agg({"v1": "sum"})
)

CPU times: user 3.77 s, sys: 268 ms, total: 4.04 s
Wall time: 2.36 s


Unnamed: 0_level_0,v1
id2,Unnamed: 1_level_1
id001,301302
id002,299602
id003,300751
id004,300182
id005,298422
...,...
id096,299284
id097,300429
id098,301122
id099,298842


## Importance of dtypes

In [21]:
pd.read_parquet(
    f"{Path.home()}/data/G1_1e9_1e2_0_0.parquet", columns=["id1", "id2", "v1"]
).dtypes

id1    object
id2    object
v1      int64
dtype: object

In [20]:
%%time
(
    pd.read_parquet(
        f"{Path.home()}/data/G1_1e9_1e2_0_0.parquet", columns=["id1", "id2", "v1"]
    )
    .astype({"v1": "int64"})
    .query("id1 == 'id016'")
    .groupby("id2")
    .agg({"v1": "sum"})
)

CPU times: user 1min 25s, sys: 1min 5s, total: 2min 30s
Wall time: 2min 18s


Unnamed: 0_level_0,v1
id2,Unnamed: 1_level_1
id001,301302
id002,299602
id003,300751
id004,300182
id005,298422
...,...
id096,299284
id097,300429
id098,301122
id099,298842


In [22]:
%%time
(
    pd.read_parquet(
        f"{Path.home()}/data/G1_1e9_1e2_0_0.parquet", columns=["id1", "id2", "v1"]
    )
    .astype({"v1": "object"})
    .query("id1 == 'id016'")
    .groupby("id2")
    .agg({"v1": "sum"})
)

CPU times: user 1min 35s, sys: 1min 8s, total: 2min 44s
Wall time: 2min 37s


Unnamed: 0_level_0,v1
id2,Unnamed: 1_level_1
id001,301302
id002,299602
id003,300751
id004,300182
id005,298422
...,...
id096,299284
id097,300429
id098,301122
id099,298842


In [23]:
pd.read_parquet(
    f"{Path.home()}/data/G1_1e9_1e2_0_0.parquet", columns=["id1", "id2", "v1"]
).dtypes

id1    object
id2    object
v1      int64
dtype: object

In [24]:
DeltaTable(
    f"{Path.home()}/data/deltalake_baseline_G1_1e9_1e2_0_0", version=0
).to_pandas(filters=[("id1", "==", "id016")], columns=["id1", "id2", "v1"]).dtypes

id1    object
id2    object
v1     object
dtype: object

## Parquet with row-group filtering

In [25]:
%%time
(
    pd.read_parquet(
        f"{Path.home()}/data/G1_1e9_1e2_0_0.parquet",
        columns=["id1", "id2", "v1"],
        filters=[("id1", "==", "id016")],
    )
    .query("id1 == 'id016'")
    .groupby("id2")
    .agg({"v1": "sum"})
)

CPU times: user 48.8 s, sys: 4.5 s, total: 53.3 s
Wall time: 18.7 s


Unnamed: 0_level_0,v1
id2,Unnamed: 1_level_1
id001,301302
id002,299602
id003,300751
id004,300182
id005,298422
...,...
id096,299284
id097,300429
id098,301122
id099,298842
