# Working with Parquet files
Since we will be working with close to 10000 parquet files we need to make sure that we can read it fast. 

With the default settings, it is not possible to partially read a Parquet file. This is bad, because most of the time we only need a small subsection of the data.

Parquet works with row groups. By default the entire file is one group (or 50 million by default) and hence a partial read is not possible. If there are multiple row groups, it may be faster to do a partial read. We can specify the <code>row_group_size</code> in the <code>to_parquet</code> method to be a smaller amount than the default 50 million. I will test whether it is actually faster to do a partial read. I will set the row group size to 25000, which for most stocks on the m1 timeframe is 1 quarter of data.

With fastparquet we can also append, which is not possible with pyarrow. So I will use fastparquet.

In [2]:
from utils import get_tickers
from datetime import date
import pandas as pd
import pyarrow.parquet as pq
DATA_PATH = "../data/polygon/"

In [3]:
tickers = get_tickers(3)

# Read speeds
We will compare read speeds for fastparquet (gzip) for large and small row groups. We will also compare partial reads versus full reads. At the end we test the read speeds of the csv format.

Large groups: read all (0.7s)

In [4]:
for index, row in tickers.iterrows():
    id = row['ID']
    dataset = pq.ParquetDataset(
    DATA_PATH + f"raw/m1 (fastparquet snappy no limit)/{id}.parquet")
    df = dataset.read().to_pandas()
    if index > 24:
        break

Large groups: read part (0.5s)

In [5]:
for index, row in tickers.iterrows():
    id = row['ID']
    dataset = pq.ParquetDataset(
    DATA_PATH + f"raw/m1 (fastparquet snappy no limit)/{id}.parquet",
    filters=[("datetime", ">=", date(2023, 8, 1)), ("datetime", "<=", date(2023, 9, 1))])
    df = dataset.read().to_pandas()
    if index > 24:
        break

Small groups: read all (0.7s)

In [7]:
for index, row in tickers.iterrows():
    id = row['ID']
    dataset = pq.ParquetDataset(
    DATA_PATH + f"raw/m1 (fastparquet snappy 25000)/{id}.parquet")
    df = dataset.read().to_pandas()
    if index > 24:
        break

Small groups: read part (0.1s)

In [8]:
for index, row in tickers.iterrows():
    id = row['ID']
    dataset = pq.ParquetDataset(
    DATA_PATH + f"raw/m1 (fastparquet snappy 25000)/{id}.parquet",
    filters=[("datetime", ">=", date(2023, 8, 1)), ("datetime", "<=", date(2023, 9, 1))])
    df = dataset.read().to_pandas()
    if index > 24:
        break

Csv: read all (7.1s)

In [11]:
for index, row in tickers.iterrows():
    id = row['ID']
    dataset = pd.read_csv(DATA_PATH + f"raw/m1 (csv)/{id}.csv")
    if index > 24:
        break

Indeed partitioning it in small row groups increases the speed drastically for partial reads. Under the hood it works like this: every row group has its own statistics such as the minimum/maximum. When we want to select a specific subset, Parquet compares the value to the statistics of all row groups. If there is only one row group, it has to 'open' the entire file. If there are multiple row groups, Parquet can quickly select which row group is the correct one. Then only the small part (25000) of rows is read.

Of course, specifying <code>row_group_size</code> to be small increases the amount of statistics that need to be stored and decreases the efficiency of compression. However partial reading is now 5x faster, while the file size is roughly the same.

I also tested the writing speed for small and large group sizes. Small row group sizes take more time to compress. And if you go too small partial reads actually slow down. I eventually settled on a row group size of 25000, which is approximately 1 quarter of 1-minute data.

# Write speeds
We will compare snappy (default), gzip, brotli and csv write speeds and file size.



Snappy (default row groups of 50 million): 2.5s

In [11]:
for index, row in tickers.iterrows():
    id = row['ID']
    dataset = pq.ParquetDataset(
    DATA_PATH + f"raw/m1 (fastparquet gzip)/{id}.parquet",
    filters=[("datetime", ">=", date(2000, 1, 1)), ("datetime", "<=", date(2100, 1, 1))])
    df = dataset.read().to_pandas()
    
    df.to_parquet(DATA_PATH + f"raw/m1 (fastparquet snappy no limit)/{id}.parquet", engine="fastparquet", compression='snappy') #  row_group_size=25000

    if index > 24:
        break

Snappy: 3.1s (124 MB)

In [12]:
for index, row in tickers.iterrows():
    id = row['ID']
    dataset = pq.ParquetDataset(
    DATA_PATH + f"raw/m1 (fastparquet gzip)/{id}.parquet",
    filters=[("datetime", ">=", date(2000, 1, 1)), ("datetime", "<=", date(2100, 1, 1))])
    df = dataset.read().to_pandas()
    
    df.to_parquet(DATA_PATH + f"raw/m1 (fastparquet snappy 25000)/{id}.parquet", engine="fastparquet", compression='snappy', row_group_offsets=25000) #  row_group_size=25000

    if index > 24:
        break

Gzip: 29s (80 MB)

In [13]:
for index, row in tickers.iterrows():
    id = row['ID']
    dataset = pq.ParquetDataset(
    DATA_PATH + f"raw/m1 (fastparquet gzip)/{id}.parquet",
    filters=[("datetime", ">=", date(2000, 1, 1)), ("datetime", "<=", date(2100, 1, 1))])
    df = dataset.read().to_pandas()
    
    df.to_parquet(DATA_PATH + f"raw/m1 (fastparquet gzip)/{id}.parquet", engine="fastparquet", compression='gzip', row_group_offsets=25000) #  row_group_size=25000

    if index > 24:
        break

Brotli: almost 10 minutes (63 MB)

In [49]:
for index, row in tickers.iterrows():
    id = row['ID']
    dataset = pq.ParquetDataset(
    DATA_PATH + f"raw/m1 (fastparquet gzip)/{id}.parquet",
    filters=[("datetime", ">=", date(2000, 1, 1)), ("datetime", "<=", date(2100, 1, 1))])
    df = dataset.read().to_pandas()
    
    df.to_parquet(DATA_PATH + f"raw/m1 (fastparquet brotli)/{id}.parquet", engine="fastparquet", compression='brotli', row_group_offsets=25000) #  row_group_size=25000

    if index > 24:
        break

Csv: 38s (278 MB)

In [14]:
for index, row in tickers.iterrows():
    id = row['ID']
    dataset = pq.ParquetDataset(
    DATA_PATH + f"raw/m1 (fastparquet gzip)/{id}.parquet",
    filters=[("datetime", ">=", date(2000, 1, 1)), ("datetime", "<=", date(2100, 1, 1))])
    df = dataset.read().to_pandas()
    
    df.to_csv(DATA_PATH + f"raw/m1 (csv)/{id}.csv")

    if index > 24:
        break


I will use the default snappy compression with row size 25000. Even though it takes more disk space, it is much faster to write. Also we already save 55% in disk space compared to the standard csv format. Also I just bought a 4 TB SSD so I am not going to waste that xp.

# Appending data to Parquet files

We compare appending data versus reading + concatenating + writing. We will add A-2019-01-01 to AAPL-2019-01-01 together.

In [15]:
update = pq.ParquetDataset(
DATA_PATH + f"raw/m1 (fastparquet snappy 25000)/A-2019-01-01.parquet")
update = update.read().to_pandas()

Appending: 0.1s

In [16]:
from fastparquet import write
write(DATA_PATH + f"raw/m1 (fastparquet snappy 25000)/AAPL-copy.parquet", update, append=True)

Reading + concatenating + writing: 3.6s

In [17]:
# Read AAPL
original = pq.ParquetDataset(
DATA_PATH + f"raw/m1 (fastparquet snappy 25000)/AAPL-2019-01-01.parquet")
original = original.read().to_pandas()

pd.concat([original, update]).to_parquet(DATA_PATH + f"raw/m1 (fastparquet snappy 25000)/A-AAPL.parquet", engine="fastparquet", compression='snappy', row_group_offsets=2500) #  row_group_size=25000


Appending is more than 30x faster than reading everything, concatenating and writing. And appending is something we will do for all tickers when we update the data, so this speedup is very meaningful.