# Test parquet partition and writing behaviour

In [1]:
import pyarrow as pa
import pyarrow.parquet
import dask.dataframe as dd
import numpy as np
import rstr
from faker import Faker
f = Faker()

In [2]:
from dask.distributed import Client
client = Client()
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 8,Total memory: 62.52 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:40705,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 62.52 GiB

0,1
Comm: tcp://127.0.0.1:34377,Total threads: 2
Dashboard: http://127.0.0.1:44101/status,Memory: 15.63 GiB
Nanny: tcp://127.0.0.1:40195,
Local directory: /tmp/dask-scratch-space/worker-yxrslf8u,Local directory: /tmp/dask-scratch-space/worker-yxrslf8u

0,1
Comm: tcp://127.0.0.1:45475,Total threads: 2
Dashboard: http://127.0.0.1:41631/status,Memory: 15.63 GiB
Nanny: tcp://127.0.0.1:45233,
Local directory: /tmp/dask-scratch-space/worker-rrim4s6t,Local directory: /tmp/dask-scratch-space/worker-rrim4s6t

0,1
Comm: tcp://127.0.0.1:36865,Total threads: 2
Dashboard: http://127.0.0.1:38595/status,Memory: 15.63 GiB
Nanny: tcp://127.0.0.1:41917,
Local directory: /tmp/dask-scratch-space/worker-11ha3qf9,Local directory: /tmp/dask-scratch-space/worker-11ha3qf9

0,1
Comm: tcp://127.0.0.1:33201,Total threads: 2
Dashboard: http://127.0.0.1:33345/status,Memory: 15.63 GiB
Nanny: tcp://127.0.0.1:44965,
Local directory: /tmp/dask-scratch-space/worker-68ztf6l_,Local directory: /tmp/dask-scratch-space/worker-68ztf6l_


In [3]:
import dask.bag as db

n=int(1e6)
b=db.from_sequence(list(np.arange(n)), npartitions=1024)
col_a = db.map(lambda x: x, b).compute()
col_b = db.map(lambda x: rstr.xeger(r'([A-Z]\d{20})'), b).compute()
col_c = db.map(lambda x: rstr.xeger(r'([A-Z]\d{10})'), b).compute()
col_date = db.map(lambda x: f.date_between(start_date='-1y',end_date='-0d'), b).compute()

This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.


In [4]:
d={
    'a': col_a,
    'b': col_b,
    'c': col_c,
    'date': col_date
}
table=pa.table(d)
table.schema

a: int64
b: string
c: string
date: date32[day]

In [5]:
table.num_rows

1000000

In [6]:
import pyarrow.parquet as pq
pq.write_table(table, '1m.parquet')

In [7]:
Table= pq.ParquetFile('1m.parquet')

# create a new repo

In [8]:
import os
import shutil
def reset_dir(path='datasets'):
    if os.path.exists(path):
        shutil.rmtree(path)
    os.makedirs(path)

reset_dir()

In [9]:
!tree datasets -la

[01;34mdatasets[00m

0 directories, 0 files


# test splitting batch and writting to a parquet

In [10]:
%%time
# reader = table.to_batches(max_chunksize=2)
reader = table.to_batches()
with open("datasets/test.parquet","wb") as sink:
    with pa.parquet.ParquetWriter(sink, table.schema) as writer:
        for chunk in reader:
            writer.write_table(pa.Table.from_batches([chunk]))

CPU times: user 328 ms, sys: 50.7 ms, total: 379 ms
Wall time: 351 ms


In [11]:
!tree datasets -la

[01;34mdatasets[00m
└── test.parquet

0 directories, 1 file


# Write df a dataset contains many parquets

In [12]:
import pyarrow.dataset as ds
import datetime

In [13]:
%%time
date_schema=schema=table.schema.remove(0).remove(0).remove(0)
part = ds.partitioning(date_schema, flavor="hive",)

repo='datasets'
table_name='df'

ds.write_dataset(
    data=table,
    base_dir=os.path.join(repo,table_name),
    format="parquet",
    partitioning=part,
#     max_rows_per_file=3,
#     max_rows_per_group=2,
    basename_template = "part-{:%y%m%d-%H%M%S}-{{i}}.parquet".format(datetime.datetime.now()),
#     existing_data_behavior='delete_matching',
    existing_data_behavior='overwrite_or_ignore',
)

CPU times: user 3.3 s, sys: 722 ms, total: 4.02 s
Wall time: 740 ms


In [14]:
!tree datasets

[01;34mdatasets[00m
├── [01;34mdf[00m
│   ├── [01;34mdate=2022-09-09[00m
│   │   └── part-230910-225409-0.parquet
│   ├── [01;34mdate=2022-09-10[00m
│   │   └── part-230910-225409-0.parquet
│   ├── [01;34mdate=2022-09-11[00m
│   │   └── part-230910-225409-0.parquet
│   ├── [01;34mdate=2022-09-12[00m
│   │   └── part-230910-225409-0.parquet
│   ├── [01;34mdate=2022-09-13[00m
│   │   └── part-230910-225409-0.parquet
│   ├── [01;34mdate=2022-09-14[00m
│   │   └── part-230910-225409-0.parquet
│   ├── [01;34mdate=2022-09-15[00m
│   │   └── part-230910-225409-0.parquet
│   ├── [01;34mdate=2022-09-16[00m
│   │   └── part-230910-225409-0.parquet
│   ├── [01;34mdate=2022-09-17[00m
│   │   └── part-230910-225409-0.parquet
│   ├── [01;34mdate=2022-09-18[00m
│   │   └── part-230910-225409-0.parquet
│   ├── [01;34mdate=2022-09-19[00m
│   │   └── part-230910-225409-0.parquet
│   ├── [01;34mdate=2022-09-20[00m
│   │   └── part-230910-225409-0.parqu

In [15]:
%%time
import duckdb
duckdb.sql("SELECT * FROM 'datasets/df/*/*.parquet';").show()

┌────────┬───────────────────────┬─────────────┬────────────┐
│   a    │           b           │      c      │    date    │
│ int64  │        varchar        │   varchar   │  varchar   │
├────────┼───────────────────────┼─────────────┼────────────┤
│    135 │ Y65766041509910946019 │ G8153270150 │ 2023-02-14 │
│    272 │ U13433912731669179585 │ V1742129600 │ 2023-02-14 │
│   1112 │ B65972293997454843932 │ C2592386539 │ 2023-02-14 │
│   1249 │ B04719805656697042693 │ G9415649918 │ 2023-02-14 │
│   2089 │ Q48787267289345183370 │ T1416110099 │ 2023-02-14 │
│   2226 │ K97670015791463239255 │ K5320205135 │ 2023-02-14 │
│   3066 │ N10325666658239232250 │ T8336212442 │ 2023-02-14 │
│   3203 │ J71043130138822875248 │ T5239617985 │ 2023-02-14 │
│   4043 │ J32927436611184358007 │ H7458793552 │ 2023-02-14 │
│   4180 │ W89340413374358765249 │ X0233288531 │ 2023-02-14 │
│     ·  │           ·           │      ·      │     ·      │
│     ·  │           ·           │      ·      │     ·      │
│     · 

In [16]:
n

1000000

# Second write append rows

In [17]:
%%time
import pyarrow.dataset as ds
import datetime
import time

time.sleep(1)
repo='datasets'
table_name='df'


# df['last_updated']=str(datetime.datetime.now() )
# table = pa.Table.from_pandas(df)
t2 = table.append_column('last_updated',[[str(datetime.datetime.now())]*n])
date_schema=schema=t2.schema.remove(0).remove(0).remove(0).remove(1)
part = ds.partitioning(date_schema, flavor="hive",)

ds.write_dataset(
    data=t2,
    base_dir=os.path.join(repo,table_name),
    format="parquet",
    partitioning=part,
#     max_rows_per_file=3,
#     max_rows_per_group=2,
    basename_template = "part-{:%y%m%d-%H%M%S}-{{i}}.parquet".format(datetime.datetime.now()),
#     existing_data_behavior='delete_matching',
    existing_data_behavior='overwrite_or_ignore',
)

CPU times: user 4.28 s, sys: 935 ms, total: 5.22 s
Wall time: 1.87 s


In [18]:
import duckdb

duckdb.sql("""
SELECT count(a) FROM read_parquet('datasets/df/*/*.parquet',union_by_name=True);
""").show()

duckdb.sql("""
SELECT * FROM read_parquet('datasets/df/*/*.parquet',union_by_name=True);
""").show()

┌──────────┐
│ count(a) │
│  int64   │
├──────────┤
│  2000000 │
└──────────┘

┌────────┬───────────────────────┬─────────────┬────────────────────────────┬────────────┐
│   a    │           b           │      c      │        last_updated        │    date    │
│ int64  │        varchar        │   varchar   │          varchar           │  varchar   │
├────────┼───────────────────────┼─────────────┼────────────────────────────┼────────────┤
│    135 │ Y65766041509910946019 │ G8153270150 │ 2023-09-10 22:54:12.404843 │ 2023-02-14 │
│    272 │ U13433912731669179585 │ V1742129600 │ 2023-09-10 22:54:12.404843 │ 2023-02-14 │
│   1112 │ B65972293997454843932 │ C2592386539 │ 2023-09-10 22:54:12.404843 │ 2023-02-14 │
│   1249 │ B04719805656697042693 │ G9415649918 │ 2023-09-10 22:54:12.404843 │ 2023-02-14 │
│   2089 │ Q48787267289345183370 │ T1416110099 │ 2023-09-10 22:54:12.404843 │ 2023-02-14 │
│   2226 │ K97670015791463239255 │ K5320205135 │ 2023-09-10 22:54:12.404843 │ 2023-02-14 │
│   3066 │ 

# Third write append rows

In [19]:
%%timeit -n1 -r10
import pyarrow.dataset as ds
import datetime
import time

time.sleep(1)
repo='datasets'
table_name='df'


t2 = table.append_column('last_updated',[[str(datetime.datetime.now())]*n])
date_schema=schema=t2.schema.remove(0).remove(0).remove(0).remove(1)
part = ds.partitioning(date_schema, flavor="hive",)

ds.write_dataset(
    data=t2,
    base_dir=os.path.join(repo,table_name),
    format="parquet",
    partitioning=part,
#     max_rows_per_file=3,
#     max_rows_per_group=2,
    basename_template = "part-{:%y%m%d-%H%M%S}-{{i}}.parquet".format(datetime.datetime.now()),
#     existing_data_behavior='delete_matching',
    existing_data_behavior='overwrite_or_ignore',
)

1.87 s ± 34.4 ms per loop (mean ± std. dev. of 10 runs, 1 loop each)


In [20]:
%%time
duckdb.sql("""
SELECT count(a) FROM read_parquet('datasets/df/*/*.parquet',union_by_name=True);
""").show()

duckdb.sql("""
SELECT * FROM read_parquet('datasets/df/*/*.parquet',union_by_name=True);
""").show()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

┌──────────┐
│ count(a) │
│  int64   │
├──────────┤
│ 12000000 │
└──────────┘

┌────────┬───────────────────────┬─────────────┬────────────────────────────┬────────────┐
│   a    │           b           │      c      │        last_updated        │    date    │
│ int64  │        varchar        │   varchar   │          varchar           │  varchar   │
├────────┼───────────────────────┼─────────────┼────────────────────────────┼────────────┤
│    135 │ Y65766041509910946019 │ G8153270150 │ 2023-09-10 22:54:12.404843 │ 2023-02-14 │
│    272 │ U13433912731669179585 │ V1742129600 │ 2023-09-10 22:54:12.404843 │ 2023-02-14 │
│   1112 │ B65972293997454843932 │ C2592386539 │ 2023-09-10 22:54:12.404843 │ 2023-02-14 │
│   1249 │ B04719805656697042693 │ G9415649918 │ 2023-09-10 22:54:12.404843 │ 2023-02-14 │
│   2089 │ Q48787267289345183370 │ T1416110099 │ 2023-09-10 22:54:12.404843 │ 2023-02-14 │
│   2226 │ K97670015791463239255 │ K5320205135 │ 2023-09-10 22:54:12.404843 │ 2023-02-14 │
│   3066 │ 

## to do >> stream to flight server

```python
import pyarrow as pa
import pyarrow.flight

# client = pa.flight.connect("grpc://0.0.0.0:8815")
client = pa.flight.connect(location='grpc://192.168.24.90:8816')

# Upload a new dataset
NUM_BATCHES = 1024
ROWS_PER_BATCH = 4096
upload_descriptor = pa.flight.FlightDescriptor.for_path("streamed.parquet")
writer, _ = client.do_put(upload_descriptor, table.schema)
with writer:
    for batch in table.to_batches(max_chunksize=2):
        writer.write_batch(batch)
```