# Building Fast and Efficient Tabular Data Pipelines 

## Introduction

Usually, tabular data processing does not leave a Jupyter notebook as it is more convenient for data exploration and visualization. However, when we are handling large amounts of data, we are met with computational and memory constraints that affect the entire pipeline speed and our budget. Buying more memory and faster CPUs with more cores is not a sustainable solution and in some settings may not be available or help at all.

### Problems

- Data does not fit memory e.g. cancer data
    - The main dataset used by lots of researchers is the **TCGA** dataset
    - It contains ~3 million rows and ~120 columns, this is multiple gigabytes
    - Loading this data using `pandas.read_csv(file_path)` on a VM on Azure with **16 GB** memory and **4 cores** crashes the VM due to insufficient memory

- Running processing steps like filtration and grouping may create necessary copies which slows down the pipeline

- Running algorithms like clustering patients or tumors can take too long on data this big

### Possible Solutions

- Buy larger memory and faster CPU with more cores. **Costs money!**
- Use CPU cores to parallelize the pipeline. **Not always possible!**

## Demo

We start by creating a dummy dataset of 3 million rows and 102 columns of different datatypes like string, dates, integers and floats and save it into a csv file.

In [48]:
import pandas as pd
import numpy as np
import time

In [49]:
np.random.seed(0)
dtypes = [np.int64, np.float64, str]
nrows, ncols = 3000000,100

float_rows = np.random.rand(nrows, ncols//2)
int_rows = (np.random.rand(nrows, ncols//2) * np.random.randint(0,100)).astype(np.int32)
string_row = np.expand_dims(np.array([f"TCGA_{np.random.randint(0,100)}" for _ in range(nrows)]),1)
date_row = np.expand_dims(np.array([f"{np.random.randint(1980,2000)}-{np.random.randint(1,13)}-{np.random.randint(1,28)}" for _ in range(nrows)]),1)

rows = np.hstack([string_row,date_row,int_rows,float_rows])

cols = [f"col_{i}" for i in range(ncols+2)]

df = pd.DataFrame(rows, columns=cols)
df.to_csv("dummy_data.csv",index=False)
df.head(5)
del df, rows, int_rows, float_rows, string_row, date_row

After saving the file, we start reading it using pandas, nothing fancy just our usual `read_csv()` function. Then, we see the memory usage of each column in the dataframe.

In [50]:
start = time.perf_counter()
df = pd.read_csv("dummy_data.csv")
end = time.perf_counter()
print(f"Read file in {end-start:.2} s")
print(f"Total memory usage: {df.memory_usage(deep=True).sum() // 1000000} MB")
df.memory_usage(deep=True)

Read file in 2.8e+01 s
Total memory usage: 2789 MB


Index            128
col_0      191699718
col_1      197751198
col_2       24000000
col_3       24000000
             ...    
col_97      24000000
col_98      24000000
col_99      24000000
col_100     24000000
col_101     24000000
Length: 103, dtype: int64

In [51]:
df.dtypes

col_0       object
col_1       object
col_2        int64
col_3        int64
col_4        int64
            ...   
col_97     float64
col_98     float64
col_99     float64
col_100    float64
col_101    float64
Length: 102, dtype: object

We can see above that the dataframe takes about 1300 MB of memory. We can see that `col_1` was parsed as a generic object not a date. Also, we can see that integers and floats were read in **64 bit** precision. A simple optimization would be to specify column datatypes to `read_csv()` and force pandas to parse the date, and reduce the precision of numeric values. This small addition led to reduction in memory usage by **50%**.

In [52]:
dtypes = dict(zip(cols[2:],[np.int32]*50+[np.float32]*50))
start = time.perf_counter()
df = pd.read_csv("dummy_data.csv", sep=",",dtype=dtypes,parse_dates=["col_1"])
end = time.perf_counter()
print(f"Read file in {end-start:.2} s")
print(f"Total memory usage: {df.memory_usage(deep=True).sum() // 1000000} MB")
df.memory_usage(deep=True)

Read file in 1.6e+01 s
Total memory usage: 1415 MB


Index            128
col_0      191699718
col_1       24000000
col_2       12000000
col_3       12000000
             ...    
col_97      12000000
col_98      12000000
col_99      12000000
col_100     12000000
col_101     12000000
Length: 103, dtype: int64

In [53]:
df.dtypes

col_0              object
col_1      datetime64[ns]
col_2               int32
col_3               int32
col_4               int32
                ...      
col_97            float32
col_98            float32
col_99            float32
col_100           float32
col_101           float32
Length: 102, dtype: object

In larger datasets, this can lead to a reduction of GBs of memory and makes computation possible in low memory environments where previously we would have had to upgrade the memory. However, there are situations where our dataset is very large and setting datatypes did not help us in reading the file. One thing we could do is read the data in chunks and do some processing on these chunks if possible, then save the result on disk. This results in reduced memory usage of **1 GB** and saving time.

In [54]:
%load_ext memory_profiler

The memory_profiler extension is already loaded. To reload it, use:
  %reload_ext memory_profiler


In [57]:
def process():
    start = time.perf_counter()

    df = pd.read_csv("dummy_data.csv", header=0, dtype=dtypes, parse_dates=["col_1"])

    # filtration
    before_date = df[df["col_1"] <= "1990-12-24"]

    col2_gt_0 = before_date[before_date["col_2"] > 0]

    col3_gt_50 = col2_gt_0[col2_gt_0["col_3"] > 50]

    col_100_lt_05 = col3_gt_50[col3_gt_50["col_100"] < 0.5]
        
    col_100_lt_05.to_csv("processed_chunks.csv", sep=",", mode="a", header=False, index=False,columns=cols)

    end = time.perf_counter()
    print(f"Processed data in {end-start:.2} s")

In [58]:
%memit process()

Processed data in 1.6e+01 s
peak memory: 6033.54 MiB, increment: 1144.41 MiB


In [60]:
def process():
    start = time.perf_counter()

    for i,chunk in enumerate(pd.read_csv("dummy_data.csv", header=0, dtype=dtypes, parse_dates=["col_1"], chunksize=500000)):

        # filtration
        before_date = chunk[chunk["col_1"] <= "1990-12-24"]

        col2_gt_0 = before_date[before_date["col_2"] > 0]
        
        col3_gt_50 = col2_gt_0[col2_gt_0["col_3"] > 50]
        
        col_100_lt_05 = col3_gt_50[col3_gt_50["col_100"] < 0.5]
            
        col_100_lt_05.to_csv("processed_chunks.csv", sep=",", mode="a", header=False, index=False,columns=cols)

    end = time.perf_counter()
    print(f"Processed data in {end-start:.2} s")

In [61]:
%memit process()

Processed data in 1.6e+01 s
peak memory: 4697.96 MiB, increment: 0.00 MiB


Chunking the data enables us to use multiprocessing to process chunks in parallel.

In [59]:
from multiprocessing import Pool, Lock
import os

l = Lock()

def process(chunk,i):
    start = time.perf_counter()

    # filtration
    before_date = chunk[chunk["col_1"] <= "1990-12-24"]

    col2_gt_0 = before_date[before_date["col_2"] > 0]

    col3_gt_50 = col2_gt_0[col2_gt_0["col_3"] > 50]

    col_100_lt_05 = col3_gt_50[col3_gt_50["col_100"] < 0.5]
    
    l.acquire()
    col_100_lt_05.to_csv(f"processed_chunks.csv", sep=",", mode="a", header=False, index=False,columns=cols)
    l.release()

    end = time.perf_counter()
    print(f"Processed chunk {i} in {end-start:.2} s, process #: {os.getpid()}")

pool = Pool(processes=3)


iterator = pd.read_csv("dummy_data.csv", header=0, dtype=dtypes, parse_dates=["col_1"], chunksize=500000)

start = time.perf_counter()
for i,chunk in enumerate(iterator):
    pool.apply_async(process, (chunk,i,))

pool.close()
pool.join()

end = time.perf_counter()
print(f"Processed chunks in {end-start:.2} s, process #: {os.getpid()}")


Processed chunk 0 in 0.06 s, process #: 689143
Processed chunk 1 in 0.058 s, process #: 689144
Processed chunk 2 in 0.049 s, process #: 689145
Processed chunk 3 in 0.04 s, process #: 689143
Processed chunk 4 in 0.04 s, process #: 689144
Processed chunk 5 in 0.039 s, process #: 689145
Processed chunks in 1.7e+01 s, process #: 676661


## References

1. https://pandas.pydata.org/docs/user_guide/scale.html
2. https://pypi.org/project/memory-profiler/
3. https://youtu.be/di-cXkg-hJU?si=VFL6r1PcHW-HGEQw
