In [4]:
# flake8: noqa

import pyarrow.feather as feather
import pandas as pd
import json
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
from pandas.util.testing import rands
import gc
import os
import time

pa.set_cpu_count(8)

print(f"using {pa.cpu_count()} cpu cores")
    

def get_timing(f, niter=1):
    start = time.clock_gettime(time.CLOCK_REALTIME)
    for i in range(niter):
        f()
    result = (time.clock_gettime(time.CLOCK_REALTIME) - start) / niter
    return result


files = {
    'fanniemae': {
        'base': '2016Q4',
        'source': {
            'path': '2016Q4.txt',
            'sep': '|',
            'header': None
        }
    },
    'nyctaxi': {
        'base': 'yellow_tripdata_2010-01',
        'source': {
            'path': 'yellow_tripdata_2010-01.csv',
            'sep': ',',
            'header': 0
        }
    }
}


compression_cases = [
    (None, None),   # uncompressed
    ('zstd', 1),    # minimal compression
    ('zstd', 10),   # moderate
    ('lz4', None)   # LZ4 doesn't support compression level
]


def write_files(files, chunksize=1<<16):
    statistics = []
    for name, info in files.items():
        source = info['source']
        print("reading {}".format(source['path']))
        df = pd.read_csv(source['path'], sep=source['sep'], 
                         header=source['header'], 
                         low_memory=False)
        if source['header'] is None:
            df.columns = ['f{}'.format(i) for i in range(len(df.columns))]

        t = (pa.Table.from_pandas(df, preserve_index=False)
             .replace_schema_metadata(None))
        for compression, compression_level in compression_cases:
            path = '{}_{}_{}.feather'.format(info['base'], 
                                             compression or 'uncompressed',
                                             compression_level)
            print((name, compression, compression_level))
            tm = get_timing(lambda: 
                            feather.write_feather(df, path, compression=compression,
                                                  compression_level=compression_level,
                                                  chunksize=chunksize))
            file_size = os.stat(path).st_size
            result = name, compression, compression_level, file_size, tm
            print(result)
            statistics.append(result)
    return statistics

def get_read_results():
    all_results = []
    for name, info in files.items():
        for compression, compression_level in compression_cases:
            path = '{}_{}_{}.feather'.format(info['base'], 
                                             compression or 'uncompressed',
                                             compression_level)
            read_time = get_timing(lambda: feather.read_table(path, memory_map=False),
                                   niter=5)
            result = name, compression, compression_level, read_time
            print(result)
            all_results.append(result) 
    return all_results

using 8 cpu cores


In [5]:
chunksizes = [1 << 10, 1 << 11, 1 << 12, 1 << 13, 1 << 14, 1 << 15,
              1 << 16]

results_by_chunksize = {}
for chunksize in chunksizes:
    print(chunksize)
    write_results = write_files(files, chunksize=chunksize)
    read_results = get_read_results()    
    results_by_chunksize[chunksize] = write_results, read_results

1024
reading 2016Q4.txt
('fanniemae', None, None)
('fanniemae', None, None, 5084410194, 11.884642839431763)
('fanniemae', 'zstd', 1)
('fanniemae', 'zstd', 1, 501955562, 11.54361605644226)
('fanniemae', 'zstd', 10)
('fanniemae', 'zstd', 10, 439460538, 40.15117073059082)
('fanniemae', 'lz4', None)
('fanniemae', 'lz4', None, 765604482, 12.363005876541138)
reading yellow_tripdata_2010-01.csv
('nyctaxi', None, None)
('nyctaxi', None, None, 2522035242, 6.970196723937988)
('nyctaxi', 'zstd', 1)
('nyctaxi', 'zstd', 1, 878914098, 7.667033433914185)
('nyctaxi', 'zstd', 10)
('nyctaxi', 'zstd', 10, 828266042, 32.220927715301514)
('nyctaxi', 'lz4', None)
('nyctaxi', 'lz4', None, 1262344938, 7.114352226257324)
('fanniemae', None, None, 2.2620407581329345)
('fanniemae', 'zstd', 1, 3.4737910270690917)
('fanniemae', 'zstd', 10, 3.4430580615997313)
('fanniemae', 'lz4', None, 3.521429014205933)
('nyctaxi', None, None, 1.0237845420837401)
('nyctaxi', 'zstd', 1, 1.8016125202178954)
('nyctaxi', 'zstd', 10, 

In [6]:
reads = []
writes = []

for chunksize, (write_results, read_results) in results_by_chunksize.items():
    write_results = pd.DataFrame.from_records(
        write_results, columns=['dataset', 'codec', 'codec_level', 
                                'file_size', 'write_time'])
    read_results = pd.DataFrame.from_records(
        read_results, columns=['dataset', 'codec', 'codec_level', 
                               'read_time'])
    write_results['chunksize'] = chunksize
    read_results['chunksize'] = chunksize
    
    reads.append(read_results)
    writes.append(write_results)
    
reads = pd.concat(reads, ignore_index=True)
writes = pd.concat(writes, ignore_index=True)

def munge_codecs(codec_s, codec_level_s):
    results = []
    codec_s = codec_s.fillna('uncompressed')
    for codec, codec_level in zip(codec_s, codec_level_s):
        if pd.isnull(codec_level):
            results.append(codec)
        else:
            results.append(codec + '-' + str(int(codec_level)))
    return results

reads['codec'] = munge_codecs(reads['codec'], reads.pop('codec_level'))
writes['codec'] = munge_codecs(writes['codec'], writes.pop('codec_level'))

In [13]:
%matplotlib notebook

In [7]:
reads.to_csv('ipc_read_parallel.csv')
writes.to_csv('ipc_write_parallel.csv')

In [8]:
reads

Unnamed: 0,dataset,codec,read_time,chunksize
0,fanniemae,uncompressed,1.285094,1024
1,fanniemae,zstd-1,3.586269,1024
2,fanniemae,zstd-10,3.704591,1024
3,fanniemae,lz4,3.590986,1024
4,nyctaxi,uncompressed,0.608589,1024
5,nyctaxi,zstd-1,1.891127,1024
6,nyctaxi,zstd-10,1.665766,1024
7,nyctaxi,lz4,1.227717,1024
8,fanniemae,uncompressed,0.472191,2048
9,fanniemae,zstd-1,2.05742,2048
