# Apache Arrow

## 1 Compare performance of csv, Parquet and Arrow - 1 Change

In [1]:
import pyarrow.parquet as pq
import pyarrow as pa
import pandas as pd
import numpy as np
import os
import psutil

### 1.1 Load and prepare data 

In [2]:
## Read Palmer Station Penguin dataset from GitHub
df = pd.read_csv("https://raw.githubusercontent.com/allisonhorst/"
                 "palmerpenguins/47a3476d2147080e7ceccef4cf70105c808f2cbf/"
                 "data-raw/penguins_raw.csv")

In [3]:
# Increase dataset to 1m rows and reset index
df = df.sample(1_000_000, replace=True).reset_index(drop=True)


# Update sample number (0 to 999'999)
df["Sample Number"] = df.index
# Add some random variation to numeric columns
df[["Culmen Length (mm)", "Culmen Depth (mm)", 
    "Flipper Length (mm)", "Body Mass (g)"]] = df[["Culmen Length (mm)", "Culmen Depth (mm)", 
                                                   "Flipper Length (mm)", "Body Mass (g)"]] \
                                               + np.random.rand(df.shape[0], 4)

# Create dataframe where missing numeric values are filled with zero
df_nonan = df.copy()
df_nonan[["Culmen Length (mm)", "Culmen Depth (mm)", 
          "Flipper Length (mm)", "Body Mass (g)"]] = df[["Culmen Length (mm)", "Culmen Depth (mm)", 
                                                         "Flipper Length (mm)", "Body Mass (g)"]].fillna(0)

### 1.2 Write to disk 

In [14]:
# Write to csv
df.to_csv("penguin-dataset.csv")

# Write to parquet
df.to_parquet("penguin-dataset.parquet")

context = pa.default_serialization_context()

# Write to Arrow
# Convert from pandas to Arrow
table = pa.Table.from_pandas(df)
# Write out to file

writer = pa.RecordBatchFileWriter('penguin-dataset.arrow', table.schema)
writer.write(table)
writer.close()
#with pa.OSFile('penguin-dataset.arrow', 'wb') as sink:
    #with pa.RecordBatchFileWriter(sink, table.schema,write_legacy_format=True) as writer:
        #writer.write_table(table)

# Convert from no-NaN pandas to Arrow
table_nonan = pa.Table.from_pandas(df_nonan)
# Write out to file
writer = pa.RecordBatchFileWriter('penguin-dataset-nonan.arrow', table.schema)
writer.write(table_nonan)
writer.close()
#with pa.OSFile('penguin-dataset-nonan.arrow', 'wb') as sink:
    #with pa.RecordBatchFileWriter(sink, table_nonan.schema,write_legacy_format=True) as writer:
        #writer.write_table(table_nonan)

### 1.3 Reading time - calculate average of numeric column

#### 1.3.1 Read csv and calculate mean

In [5]:
%%timeit
pd.read_csv("penguin-dataset.csv")["Flipper Length (mm)"].mean()

3.4 s ± 105 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


#### 1.3.2 Read parquet and calculate mean

In [6]:
%%timeit
pd.read_parquet("penguin-dataset.parquet", columns=["Flipper Length (mm)"]).mean()

  labels = getattr(columns, 'labels', None) or [
  return pd.MultiIndex(levels=new_levels, labels=labels, names=columns.names)
  labels, = index.labels


45.3 ms ± 989 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)


#### 1.3.3 Read Arrow using file API

In [15]:
%%timeit
with pa.OSFile('penguin-dataset.arrow', 'rb') as source:
    table = pa.ipc.open_file(source).read_all().column("Flipper Length (mm)")
result = table.to_pandas().mean()

133 ms ± 2.73 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


#### 1.3.4 Read Arrow with memory-mapped API with missing values

In [16]:
%%timeit
source = pa.memory_map('penguin-dataset.arrow', 'r')
table = pa.ipc.RecordBatchFileReader(source).read_all().column("Flipper Length (mm)")
result = table.to_pandas().mean()

6.19 ms ± 82.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


#### 1.3.5 Read Arrow with memory-mapped API without missing values (zero-copy)

In [17]:
%%timeit
source = pa.memory_map('penguin-dataset-nonan.arrow', 'r')
table = pa.ipc.RecordBatchFileReader(source).read_all().column("Flipper Length (mm)")
result = table.to_pandas().mean()

4.04 ms ± 80.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


### 1.4 Memory consumption - read column

In [18]:
# Measure initial memory consumption
memory_init = psutil.Process(os.getpid()).memory_info().rss >> 20

#### 1.4.1 Read csv

In [19]:
col_csv = pd.read_csv("penguin-dataset.csv")["Flipper Length (mm)"]
memory_post_csv = psutil.Process(os.getpid()).memory_info().rss >> 20

#### 1.4.2 Read parquet

In [20]:
col_parquet = pd.read_parquet("penguin-dataset.parquet", columns=["Flipper Length (mm)"])
memory_post_parquet = psutil.Process(os.getpid()).memory_info().rss >> 20

#### 1.4.3 Read Arrow using file API

In [21]:
with pa.OSFile('penguin-dataset.arrow', 'rb') as source:
    col_arrow_file = pa.ipc.open_file(source).read_all().column("Flipper Length (mm)").to_pandas()
memory_post_arrowos = psutil.Process(os.getpid()).memory_info().rss >> 20

#### 1.4.4 Read Arrow with memory-mapped API with missing values

In [22]:
source = pa.memory_map('penguin-dataset.arrow', 'r')
table_mmap = pa.ipc.RecordBatchFileReader(source).read_all().column("Flipper Length (mm)")
col_arrow_mapped = table_mmap.to_pandas()
memory_post_arrowmmap = psutil.Process(os.getpid()).memory_info().rss >> 20

#### 1.4.5 Read Arrow with memory-mapped API without missing values (zero-copy)

In [23]:
source = pa.memory_map('penguin-dataset-nonan.arrow', 'r')
table_mmap_zc = pa.ipc.RecordBatchFileReader(source).read_all().column("Flipper Length (mm)")
col_arrow_mapped_zc = table_mmap_zc.to_pandas()
memory_post_arrowmmap_zc = psutil.Process(os.getpid()).memory_info().rss >> 20

#### 1.4.6 Display memory consupmtion

In [24]:
# Print memory consumption
print(f"csv: {memory_post_csv - memory_init}\n"
      f"Parquet: {memory_post_parquet - memory_post_csv}\n"
      f"Arrow file API: {memory_post_arrowos - memory_post_parquet}\n"
      f"Arrow memory-mapped API with NaNs: {memory_post_arrowmmap - memory_post_arrowos}\n"
      f"Arrow memory-mapped API (zero-copy): {memory_post_arrowmmap_zc - memory_post_arrowmmap}\n")

csv: 223
Parquet: -4
Arrow file API: -8
Arrow memory-mapped API with NaNs: 8
Arrow memory-mapped API (zero-copy): 0

