## Parquet and DuckDB - Analyzing Large Datasets

In this jupyter notebook we explore the capabilities of DuckDB and Parquet file. We used the bitcoin dataset from Kaggle that has about 220 million rows. We first compute some technical indicators using Parquet files and DuckDB. Then we compare this with technical indicators computation using native DuckDB table format. You can find the accompanying blog here - https://learn2infiniti.com/parquet-and-duckdb-analyzing-large-datasets/

### Load libraries

In [2]:
import duckdb
import time

### Loading parquet files of the bit coin datasets
The dataset is available as two large csv files containing a total of 220 million rows. The csv files are converted to parquet files outside this jupyter notebook. In this notebook, we first read both the paruqet files using DuckDB python APIs. DuckDB provides a way to read multiple parquet files at the same time by providing a list of files.

1. Note that parquet files path is provided as list.
2. We see the load times are in milli seconds because DuckDB doesn't yet read the parquet files into memory.

In [3]:
%%time
# Load both the parquet files
bc_full_dataset = duckdb.sql('''SELECT * 
                                FROM read_parquet(['data/bc_dataset_snappy_1.parquet','data/bc_dataset_snappy_2.parquet'])''')

CPU times: total: 31.2 ms
Wall time: 66.4 ms


### Writing the two separate datasets as a single parquet file.
In this step we write the combined datasets variable into a new parquet file. We do this to avoid any inefficincies that can occur because of multiple files.

In [43]:
%%time
# We are writing the above combined variable to a parquet file. By default DuckDB uses Snappy compression. 
duckdb.execute('''COPY
    (SELECT *, YEAR("Open Time") as year, MONTH("Open Time") as month FROM bc_full_dataset)
    TO 'data/bc_full_dataset_snappy.parquet'
    (FORMAT 'parquet', CODEC 'snappy');''')

CPU times: total: 3min 21s
Wall time: 4min 21s


<duckdb.duckdb.DuckDBPyConnection at 0x2d83e42f830>

In [85]:
%%time
bc_full_dataset = duckdb.sql('''SELECT * 
                                FROM read_parquet('data/bc_full_dataset_snappy.parquet')''')

CPU times: total: 46.9 ms
Wall time: 99.5 ms


### Calculating technical indicators
We start with calculating simple technical indicators. We calculate the moving average 50 seconds and moving average 200 seconds. This is to compare with the moving average computed on the single dataset in the previous notebook. Notice that it took 3min 47sec to process the moving averages on a 220 million rows dataset. In the previous notebook, it took under a minute to calculate the same on a 110 million row dataset. This shows that performance seems to degrade slightly with increased data.

In [4]:
%%time
# Calculate the 50 seconds moving average and 200 seconds moving average and show the output.
start_time = time.time()
duckdb.sql('''
                SELECT "Open Time",
                        Close,
                        AVG("Close") OVER ( ORDER BY "Open Time"  
                                            RANGE BETWEEN INTERVAL 50 SECONDS PRECEDING
                                              AND INTERVAL 0 SECONDS FOLLOWING) as ma50,
                        AVG("Close") OVER ( ORDER BY "Open Time" 
                                            RANGE BETWEEN INTERVAL 200 SECONDS PRECEDING
                                              AND INTERVAL 0 SECONDS FOLLOWING) as ma200
                FROM bc_full_dataset
''').show()
end_time = time.time()
print(f'Total time taken: {end_time - start_time}')

┌─────────────────────┬─────────┬───────────────────┬───────────────────┐
│      Open Time      │  Close  │       ma50        │       ma200       │
│      timestamp      │ double  │      double       │      double       │
├─────────────────────┼─────────┼───────────────────┼───────────────────┤
│ 2017-08-17 04:00:28 │ 4261.48 │           4261.48 │           4261.48 │
│ 2017-08-17 04:00:29 │ 4261.48 │           4261.48 │           4261.48 │
│ 2017-08-17 04:00:30 │ 4261.48 │           4261.48 │           4261.48 │
│ 2017-08-17 04:00:31 │ 4261.48 │           4261.48 │           4261.48 │
│ 2017-08-17 04:00:32 │ 4261.48 │           4261.48 │           4261.48 │
│ 2017-08-17 04:00:33 │ 4261.48 │           4261.48 │           4261.48 │
│ 2017-08-17 04:00:34 │ 4261.48 │           4261.48 │           4261.48 │
│ 2017-08-17 04:00:35 │ 4261.48 │           4261.48 │           4261.48 │
│ 2017-08-17 04:00:36 │ 4261.48 │           4261.48 │           4261.48 │
│ 2017-08-17 04:00:37 │ 4261.48 │ 4261

### Adding more technical indicators
In the next step, we add more technical indicators to our arsenal. In this step we calculate the following technical indicators.
1. Moving average 14 seconds
2. Moving average 28 seconds
3. Standard deviation 14 seconds. This is used in calculating the bollinger bands
4. Average gain and loss in last 14 seconds. This is used in calculating RSI.

```Note - Note that I am no expert at technical analysis and I have taken the definitions from internet and tried to implement here. This may not be the most efficient and accurate implementation of the indicators.```

In [86]:
%%time
# Calculate the window functions required for the technical indicators
start_time = time.time()
tech_indicators = duckdb.sql('''
                SELECT "Open Time",
                        Close,
                        Volume,
                        AVG("Close") OVER ( ORDER BY "Open Time"  
                                            RANGE BETWEEN INTERVAL 14 SECONDS PRECEDING
                                              AND INTERVAL 0 SECONDS FOLLOWING) as ma14,
                        
                        AVG("Close") OVER ( ORDER BY "Open Time" 
                                            RANGE BETWEEN INTERVAL 28 SECONDS PRECEDING
                                              AND INTERVAL 0 SECONDS FOLLOWING) as ma28,
                                              
                        STDDEV_POP("Close") OVER ( ORDER BY "Open Time"  
                                            RANGE BETWEEN INTERVAL 14 SECONDS PRECEDING
                                              AND INTERVAL 0 SECONDS FOLLOWING) as stddev14,
                        
                        AVG(CASE WHEN Close - Open > 0 THEN Close - Open ELSE 0 END) OVER ( ORDER BY "Open Time"  
                                            RANGE BETWEEN INTERVAL 14 SECONDS PRECEDING
                                              AND INTERVAL 0 SECONDS FOLLOWING) AS avg_gain,
                                              
                        AVG(CASE WHEN Open - Close > 0 THEN Open - Close ELSE 0 END) OVER ( ORDER BY "Open Time"  
                                            RANGE BETWEEN INTERVAL 14 SECONDS PRECEDING
                                              AND INTERVAL 0 SECONDS FOLLOWING) AS avg_loss
                FROM bc_full_dataset
''')
end_time = time.time()
print(f'Total time taken: {end_time - start_time}')

Total time taken: 0.0752406120300293
CPU times: total: 31.2 ms
Wall time: 75.2 ms


In the previous step, DuckDB has not yet processed the result. We will be using the columns from the previous step to calculate the bollinger bands and RSI in the next step and show the result. We see that DuckDB is able to compute the moving averages, bollinger bands and RSI on a 220 million row dataset in about 11min 28sec. Again note that all the data is processed on local machine with 16GB RAM and 4 cores.

In [87]:
%%time
# Calculate the technical indicators and show the output
start_time = time.time()
duckdb.sql('''
                SELECT "Open Time",
                        Close,
                        Volume,
                        ma14,
                        ma28,
                        stddev14,
                        ma14 + 2*stddev14 AS upper_bollinger,
                        ma14 - 2*stddev14 AS lower_bollinger,
                        ma14              AS middle_bollinger,
                        avg_gain,
                        avg_loss,
                        100 - (100/(1+(avg_gain/avg_loss))) AS rsi
                FROM tech_indicators
''').show()
end_time = time.time()
print(f'Total time taken: {end_time - start_time}')

┌─────────────────────┬─────────┬──────────┬───────────────────┬───┬───────────────────┬──────────┬──────────┬────────┐
│      Open Time      │  Close  │  Volume  │       ma14        │ … │ middle_bollinger  │ avg_gain │ avg_loss │  rsi   │
│      timestamp      │ double  │  double  │      double       │   │      double       │  double  │  double  │ double │
├─────────────────────┼─────────┼──────────┼───────────────────┼───┼───────────────────┼──────────┼──────────┼────────┤
│ 2017-08-17 04:00:28 │ 4261.48 │      0.1 │           4261.48 │ … │           4261.48 │      0.0 │      0.0 │   NULL │
│ 2017-08-17 04:00:29 │ 4261.48 │      0.0 │           4261.48 │ … │           4261.48 │      0.0 │      0.0 │   NULL │
│ 2017-08-17 04:00:30 │ 4261.48 │      0.0 │           4261.48 │ … │           4261.48 │      0.0 │      0.0 │   NULL │
│ 2017-08-17 04:00:31 │ 4261.48 │      0.0 │           4261.48 │ … │           4261.48 │      0.0 │      0.0 │   NULL │
│ 2017-08-17 04:00:32 │ 4261.48 │ 1.6751

### Saving the technical indicators and writing to a parquet file

In [20]:
%%time
# Calculate the 50 seconds moving average and 200 seconds moving average and show the output
start_time = time.time()
tech_indicators_out = duckdb.sql('''
                SELECT "Open Time",
                        Close,
                        YEAR("Open Time") as year,
                        MONTH("Open Time") as month,
                        DAY("Open Time") as day,
                        Volume,
                        ma14,
                        ma28,
                        stddev14,
                        ma14 + 2*stddev14 AS upper_bollinger,
                        ma14 - 2*stddev14 AS lower_bollinger,
                        ma14              AS middle_bollinger,
                        avg_gain,
                        avg_loss,
                        100 - (100/(1+(avg_gain/avg_loss))) AS rsi
                FROM tech_indicators
''')
end_time = time.time()
print(f'Total time taken: {end_time - start_time}')

Total time taken: 0.04151630401611328
CPU times: total: 0 ns
Wall time: 41.5 ms


In the step below, we write the technical indicators variable to a parquet file. We also use partitioning on year and month columns and partition it using hive partitioning format. Notice that we used zstd as the compression format. The whole process took about 20min and the output folder size is 10 GB.

In [22]:
%%time
# We are writing the technical indicators variable to a parquet file. We use zstd compression. 
# The file is also partitioned by year and month
duckdb.execute('''COPY
    (SELECT * FROM tech_indicators_out)
    TO 'data/bc_full_dataset_tech_indicators.parquet'
    (FORMAT 'parquet', CODEC 'zstd', PARTITION_BY (year, month));''')

CPU times: total: 1h 15min 47s
Wall time: 20min 46s


<duckdb.duckdb.DuckDBPyConnection at 0x2d83e42f830>

### Comparing Parquet files with DuckDB native formats.

So far, we have tested DuckDB using the parquet file format. Next let us experiment with the DuckDB native format. For this we first create a DuckDB persistent stotrage connection below.

In [65]:
%%time
con = duckdb.connect('data/my_duck_db.duckdb')

CPU times: total: 0 ns
Wall time: 138 ms


We set the maximum amount of memory that DuckDB can use. This was set to 12.5GB by default. Because we are dealing with large datasets, we increase this limit to 20GB.

In [70]:
%%time
start_time = time.time()
con.sql('''
               SET memory_limit = '20GB';
''')
end_time = time.time()
print(f'Total time taken: {end_time - start_time}')

Total time taken: 0.0118560791015625
CPU times: total: 0 ns
Wall time: 11.9 ms


We create a DuckDB table in the next step from the partitioned parquet file.

In [66]:
%%time
con.sql('''CREATE TABLE bc_dataset_duckdb
            AS
            SELECT * 
            FROM read_parquet('data/bc_full_dataset_partitions/*/*/*.parquet',hive_partitioning=true)''')

CPU times: total: 1min 58s
Wall time: 8min 9s


In the next step, we run the same moving average 50 seconds and moving average 200 seconds calculation on the whole dataset but this time using the native DuckDB table. Notice that it took longer to compute the result compared to running the same analysis on Parquet files directly. This is an interesting observation. Another difference is that CPU usage time is less compared to parquet but overall time is more.

In [67]:
%%time
# Calculate the 50 seconds moving average and 200 seconds moving average and show the output
start_time = time.time()
con.sql('''
                SELECT "Open Time",
                        Close,
                        AVG("Close") OVER ( ORDER BY "Open Time"  
                                            RANGE BETWEEN INTERVAL 50 SECONDS PRECEDING
                                              AND INTERVAL 0 SECONDS FOLLOWING) as ma50,
                        AVG("Close") OVER ( ORDER BY "Open Time" 
                                            RANGE BETWEEN INTERVAL 200 SECONDS PRECEDING
                                              AND INTERVAL 0 SECONDS FOLLOWING) as ma200
                FROM bc_dataset_duckdb
''').show()
end_time = time.time()
print(f'Total time taken: {end_time - start_time}')

┌─────────────────────┬─────────┬───────────────────┬───────────────────┐
│      Open Time      │  Close  │       ma50        │       ma200       │
│      timestamp      │ double  │      double       │      double       │
├─────────────────────┼─────────┼───────────────────┼───────────────────┤
│ 2017-08-17 04:00:28 │ 4261.48 │           4261.48 │           4261.48 │
│ 2017-08-17 04:00:29 │ 4261.48 │           4261.48 │           4261.48 │
│ 2017-08-17 04:00:30 │ 4261.48 │           4261.48 │           4261.48 │
│ 2017-08-17 04:00:31 │ 4261.48 │           4261.48 │           4261.48 │
│ 2017-08-17 04:00:32 │ 4261.48 │           4261.48 │           4261.48 │
│ 2017-08-17 04:00:33 │ 4261.48 │           4261.48 │           4261.48 │
│ 2017-08-17 04:00:34 │ 4261.48 │           4261.48 │           4261.48 │
│ 2017-08-17 04:00:35 │ 4261.48 │           4261.48 │           4261.48 │
│ 2017-08-17 04:00:36 │ 4261.48 │           4261.48 │           4261.48 │
│ 2017-08-17 04:00:37 │ 4261.48 │ 4261

I tried adding indexes on the DuckDB table on "Open Time" and "Close" columns. I wanted to see if adding indexes made any difference. It took a few minutes to add both the indexes as shown below.

In [72]:
%%time
# Add indexes on the `Open Time` column
start_time = time.time()
con.sql('''
               CREATE INDEX open_time ON bc_dataset_duckdb("Open Time");
''')
end_time = time.time()
print(f'Total time taken: {end_time - start_time}')

Total time taken: 363.7751486301422
CPU times: total: 4min 6s
Wall time: 6min 3s


In [73]:
%%time
# Add indexes on the `Close` column
start_time = time.time()
con.sql('''
               CREATE INDEX close_value ON bc_dataset_duckdb("Close");
''')
end_time = time.time()
print(f'Total time taken: {end_time - start_time}')

Total time taken: 141.13373160362244
CPU times: total: 2min 53s
Wall time: 2min 21s


After adding indexes, the overall time reduced from 4min 49 sec to 3min 6s. This is also faster compared to the time taken using the parquet files. But the CPU time remained the same.

In [74]:
%%time
# Calculate the 50 seconds moving average and 200 seconds moving average and show the output
start_time = time.time()
con.sql('''
                SELECT "Open Time",
                        Close,
                        AVG("Close") OVER ( ORDER BY "Open Time"  
                                            RANGE BETWEEN INTERVAL 50 SECONDS PRECEDING
                                              AND INTERVAL 0 SECONDS FOLLOWING) as ma50,
                        AVG("Close") OVER ( ORDER BY "Open Time" 
                                            RANGE BETWEEN INTERVAL 200 SECONDS PRECEDING
                                              AND INTERVAL 0 SECONDS FOLLOWING) as ma200
                FROM bc_dataset_duckdb
''').show()
end_time = time.time()
print(f'Total time taken: {end_time - start_time}')

┌─────────────────────┬─────────┬───────────────────┬───────────────────┐
│      Open Time      │  Close  │       ma50        │       ma200       │
│      timestamp      │ double  │      double       │      double       │
├─────────────────────┼─────────┼───────────────────┼───────────────────┤
│ 2017-08-17 04:00:28 │ 4261.48 │           4261.48 │           4261.48 │
│ 2017-08-17 04:00:29 │ 4261.48 │           4261.48 │           4261.48 │
│ 2017-08-17 04:00:30 │ 4261.48 │           4261.48 │           4261.48 │
│ 2017-08-17 04:00:31 │ 4261.48 │           4261.48 │           4261.48 │
│ 2017-08-17 04:00:32 │ 4261.48 │           4261.48 │           4261.48 │
│ 2017-08-17 04:00:33 │ 4261.48 │           4261.48 │           4261.48 │
│ 2017-08-17 04:00:34 │ 4261.48 │           4261.48 │           4261.48 │
│ 2017-08-17 04:00:35 │ 4261.48 │           4261.48 │           4261.48 │
│ 2017-08-17 04:00:36 │ 4261.48 │           4261.48 │           4261.48 │
│ 2017-08-17 04:00:37 │ 4261.48 │ 4261

In the last step, I tried calculating the other technical indicators as well. These include moving average 14 seconds and 28 seconds, Bollinger bands and RSI.

In [76]:
%%time
# Calculate the window functions required for the technical indicators
start_time = time.time()
tech_indicators_duckdb = con.sql('''
                SELECT "Open Time",
                        Close,
                        Volume,
                        AVG("Close") OVER ( ORDER BY "Open Time"  
                                            RANGE BETWEEN INTERVAL 14 SECONDS PRECEDING
                                              AND INTERVAL 0 SECONDS FOLLOWING) as ma14,
                        
                        AVG("Close") OVER ( ORDER BY "Open Time" 
                                            RANGE BETWEEN INTERVAL 28 SECONDS PRECEDING
                                              AND INTERVAL 0 SECONDS FOLLOWING) as ma28,
                                              
                        STDDEV_POP("Close") OVER ( ORDER BY "Open Time"  
                                            RANGE BETWEEN INTERVAL 14 SECONDS PRECEDING
                                              AND INTERVAL 0 SECONDS FOLLOWING) as stddev14,
                        
                        AVG(CASE WHEN Close - Open > 0 THEN Close - Open ELSE 0 END) OVER ( ORDER BY "Open Time"  
                                            RANGE BETWEEN INTERVAL 14 SECONDS PRECEDING
                                              AND INTERVAL 0 SECONDS FOLLOWING) AS avg_gain,
                                              
                        AVG(CASE WHEN Open - Close > 0 THEN Open - Close ELSE 0 END) OVER ( ORDER BY "Open Time"  
                                            RANGE BETWEEN INTERVAL 14 SECONDS PRECEDING
                                              AND INTERVAL 0 SECONDS FOLLOWING) AS avg_loss
                FROM bc_dataset_duckdb
''')
end_time = time.time()
print(f'Total time taken: {end_time - start_time}')

Total time taken: 0.0
CPU times: total: 0 ns
Wall time: 0 ns


Below we see that even after adding indexes, the DuckDB table took longer time to compute the results compared to the Parquet files. Again notice that while the overall time is more than time taken for computation onparquet files, the CPU time is lower.

In [78]:
%%time
# Calculate the technical indicators and show the output
start_time = time.time()
con.sql('''
                SELECT "Open Time",
                        Close,
                        Volume,
                        ma14,
                        ma28,
                        stddev14,
                        ma14 + 2*stddev14 AS upper_bollinger,
                        ma14 - 2*stddev14 AS lower_bollinger,
                        ma14              AS middle_bollinger,
                        avg_gain,
                        avg_loss,
                        100 - (100/(1+(avg_gain/avg_loss))) AS rsi
                FROM tech_indicators_duckdb
''').show()
end_time = time.time()
print(f'Total time taken: {end_time - start_time}')

┌─────────────────────┬─────────┬──────────┬───────────────────┬───┬───────────────────┬──────────┬──────────┬────────┐
│      Open Time      │  Close  │  Volume  │       ma14        │ … │ middle_bollinger  │ avg_gain │ avg_loss │  rsi   │
│      timestamp      │ double  │  double  │      double       │   │      double       │  double  │  double  │ double │
├─────────────────────┼─────────┼──────────┼───────────────────┼───┼───────────────────┼──────────┼──────────┼────────┤
│ 2017-08-17 04:00:28 │ 4261.48 │      0.1 │           4261.48 │ … │           4261.48 │      0.0 │      0.0 │   NULL │
│ 2017-08-17 04:00:29 │ 4261.48 │      0.0 │           4261.48 │ … │           4261.48 │      0.0 │      0.0 │   NULL │
│ 2017-08-17 04:00:30 │ 4261.48 │      0.0 │           4261.48 │ … │           4261.48 │      0.0 │      0.0 │   NULL │
│ 2017-08-17 04:00:31 │ 4261.48 │      0.0 │           4261.48 │ … │           4261.48 │      0.0 │      0.0 │   NULL │
│ 2017-08-17 04:00:32 │ 4261.48 │ 1.6751

In [83]:
con.close()