In [1]:
from pathlib import Path
counter_dir = Path('./counters')
parquet_files = list(counter_dir.glob('*.parquet'))
parquet_files[:2]

[WindowsPath('counters/ngram_counter_0-199999.parquet'),
 WindowsPath('counters/ngram_counter_1000000-1199999.parquet')]

In [2]:
import duckdb
db_connection = duckdb.connect('example_db.duckdb')

In [3]:
# Possible functions to use:
# Connection object
# get_table_names
# commit
# close
# execute
# executemany
# Relation object
# aggregate
# apply
# execute (lazy evaluation)
# intersect
# union
# map
# order
# project
# to_*
# write_parquet

In [4]:
# create two example arrow tables, and then run the query on them to check the resuts
import pyarrow as pa
table2 = pa.Table.from_pydict({'ngram': ['1', '3', '4'], 'count': [1, 5, 2]})

In [5]:
table2

pyarrow.Table
ngram: string
count: int64
----
ngram: [["1","3","4"]]
count: [[1,5,2]]

In [6]:
db_connection.execute("SHOW TABLES").arrow()

pyarrow.Table
name: string
----
name: []

In [7]:
# Flow - we initialize a table with the schema, then, for each file, we load it as a arrow table, and then we 
#  update the original one and commit
db_connection.execute('CREATE OR REPLACE TABLE ngram_counter(ngram VARCHAR PRIMARY KEY, count INTEGER)')
insert_query = """
INSERT INTO ngram_counter 
    SELECT ngram, count FROM table_to_insert
    ON CONFLICT (ngram) DO UPDATE SET count = count + excluded.count;
"""
limit = 5
for i, parquet_file in enumerate(parquet_files[:limit], 1):
    print(f'processing file {i} out of {limit}.')
    table_to_insert = db_connection.read_parquet(str(parquet_file))
    db_connection.execute(insert_query)
    db_connection.commit()

processing file 1 out of 5.


FloatProgress(value=0.0, layout=Layout(width='100%'), style=ProgressStyle(bar_color='black'))

processing file 2 out of 5.


FloatProgress(value=0.0, layout=Layout(width='100%'), style=ProgressStyle(bar_color='black'))

processing file 3 out of 5.


FloatProgress(value=0.0, layout=Layout(width='100%'), style=ProgressStyle(bar_color='black'))

processing file 4 out of 5.


FloatProgress(value=0.0, layout=Layout(width='100%'), style=ProgressStyle(bar_color='black'))

processing file 5 out of 5.


FloatProgress(value=0.0, layout=Layout(width='100%'), style=ProgressStyle(bar_color='black'))

In [8]:
db_connection.execute("SHOW TABLES").arrow()

pyarrow.Table
name: string
----
name: [["ngram_counter"]]

In [9]:
# TODO: pay attention to using thread-safety stuff https://duckdb.org/docs/guides/python/multiple_threads
# All we need to do us to use
# local_connection = db_connection.cursor()

In [10]:
# DON't forget to close!
db_connection.close()