In [1]:
from zfstools import connection
import pyarrow.parquet as pq
import pandas as pd
import numpy as np
import fastparquet
import tqdm
import os

In [2]:
%%bash
du -sh /mnt/sdb/forex

2.2G	/mnt/sdb/forex


### RESET

In [73]:
%%bash
zfs list -t snapshot -o name | tail -n +2 | xargs -I {} zfs destroy {}
rm -rf /tss/*

rm: cannot remove '/tss/spark': Device or resource busy


### Helper Functions

In [4]:
def read_test():
    paths = [os.path.join('/tss/.zfs/snapshot/1', f) for f in os.listdir('/tss/.zfs/snapshot/1')]
    
    def full_series():
        for p in tqdm.tqdm(paths):
            df = pq.read_table(p).to_pandas()
    
    def last_value():
        for p in tqdm.tqdm(paths):
            pq_file = pq.ParquetFile(p)
            last = pq_file.read_row_group(pq_file.num_row_groups - 1).to_pandas()[-1:]
            
    def all_values_one_day(day):
        for p in tqdm.tqdm(paths):
            df = pq.read_table(p).to_pandas()
            df_one_day = df[df.seriesdate.str.startswith(day)]
            
    full_series()
    last_value()
    all_values_one_day('20090701')
    
# Splits a df into n+1 parts where the first part consist of the first tot-n rows
def split_df(df, n):
    return [df[:-n]] + [df[n+i:n+i+1] for i in range(n)]

### Globals

In [5]:
forex_loc = '/mnt/sdb/forex'
paths = [os.path.join(forex_loc, f) for f in os.listdir(forex_loc)]
zfs = connection.ZFSConnection()
names = ['symbol', 'seriesdate', 'low', 'high']

n = 100

In [6]:
dfs = [df for p in paths[:5] for df in np.array_split(pd.read_csv(p, names=names), 400)]
for i, df in zip(range(len(dfs)), dfs): df.symbol = i
valueses = map(lambda x: split_df(x, n), dfs)
sum(map(lambda x: len(x), dfs))

10436374

In [8]:
class TimeSeriesStoreSingleThread(object):
    def __init__(self):
        self.max_row_groups = 50

    def write(self, dfs, pid):
        for df_i, df in tqdm.tqdm(zip(range(len(dfs)), dfs)):
            fastparquet.write('/tss/{}.parquet'.format(df_i), df, file_scheme='simple')
        zfs.snapshot_recursively('tss', pid)
    
    def append(self, dfs, pid):
        for df_i, df in zip(range(len(dfs)), dfs):
            path = '/tss/{}.parquet'.format(df_i)
            if len(fastparquet.api.ParquetFile(path).row_groups) > self.max_row_groups:
                fastparquet.write(path, fastparquet.api.ParquetFile(path).to_pandas(), file_scheme='simple')
            fastparquet.write(path, df, file_scheme='simple', append=True)
        zfs.snapshot_recursively('tss', pid)

In [9]:
store = TimeSeriesStoreSingleThread()

In [10]:
store.write([v[0] for v in valueses], 0)

100%|██████████| 2000/2000 [00:09<00:00, 200.04it/s]


In [11]:
_ = [store.append([v[i] for v in valueses], i) for i in tqdm.tqdm(range(1, 2))]

100%|██████████| 1/1 [00:07<00:00,  7.77s/it]


In [12]:
_ = [store.append([v[i] for v in valueses], i) for i in tqdm.tqdm(range(2, 5))]

100%|██████████| 3/3 [00:24<00:00,  8.21s/it]


In [13]:
read_test()

100%|██████████| 2000/2000 [00:07<00:00, 283.57it/s]
100%|██████████| 2000/2000 [00:02<00:00, 995.86it/s] 
100%|██████████| 2000/2000 [00:07<00:00, 274.32it/s]


In [15]:
from threading import Thread

def chunker(seq, size):
    return (seq[pos:pos + size] for pos in xrange(0, len(seq), size))

class TimeSeriesStoreMultithread(object):
    def __init__(self):
        self.max_row_groups = 5
        self.batch_size = 1000
        
    def __write__(self, idx_dfs):
        for df_i, df in idx_dfs:
            fastparquet.write('/tss/{}.parquet'.format(df_i), df, file_scheme='simple')
    
    def __append__(self, idx_dfs):
        for df_i, df in idx_dfs:
            path = '/tss/{}.parquet'.format(df_i)
            if len(fastparquet.api.ParquetFile(path).row_groups) > self.max_row_groups:
                fastparquet.write(path, fastparquet.api.ParquetFile(path).to_pandas(), file_scheme='simple')
            fastparquet.write(path, df, file_scheme='simple', append=True)

    def write(self, dfs, pid):
        dfs_ids = zip(range(len(dfs)), dfs)
        chunks = chunker(dfs_ids, self.batch_size)
        ts = [Thread(target=self.__write__, args=[chunk]) for chunk in chunks]
        _ = [t.start() for t in ts]
        _ = [t.join() for t in ts]
        zfs.snapshot_recursively('tss', pid)

    def append(self, dfs, pid):
        dfs_ids = zip(range(len(dfs)), dfs)
        chunks = chunker(dfs_ids, self.batch_size)
        ts = [Thread(target=self.__append__, args=[chunk]) for chunk in chunks]
        _ = [t.start() for t in ts]
        _ = [t.join() for t in ts]
        zfs.snapshot_recursively('tss', pid)

In [16]:
store = TimeSeriesStoreMultithread()

In [17]:
%%timeit -n1 -r1
store.write([v[0] for v in valueses], 0)

1 loop, best of 1: 16.7 s per loop


In [18]:
%%timeit -n1 -r1
_ = [store.append([v[i] for v in valueses], i) for i in tqdm.tqdm(range(1, 2))]

100%|██████████| 1/1 [00:16<00:00, 16.16s/it]

1 loop, best of 1: 16.2 s per loop





In [19]:
read_test()

100%|██████████| 2000/2000 [00:05<00:00, 399.09it/s]
100%|██████████| 2000/2000 [00:02<00:00, 913.70it/s]
100%|██████████| 2000/2000 [00:07<00:00, 253.50it/s]


In [48]:
import pyspark
spark = pyspark.sql.session.SparkSession.Builder() \
    .config('spark.executor.memory', '16g') \
    .config('spark.driver.memory', '16g') \
    .getOrCreate()
sc = spark.sparkContext

### 

In [7]:
class TimeSeriesBatched(object):
    
    def __init__(self):
        self.max_row_groups = 100
    
    def write(self, big_df, pid):
        fastparquet.write('/tss/df.parquet', big_df, file_scheme='simple', partition_on=['symbol'])
        zfs.snapshot_recursively('tss', pid)

    def append(self, big_df, pid):
        path = '/tss/df.parquet'
        if len(fastparquet.api.ParquetFile(path).row_groups) > self.max_row_groups:
            fastparquet.write(path, fastparquet.api.ParquetFile(path).to_pandas(), file_scheme='simple')
        fastparquet.write(path, big_df, file_scheme='simple', partition_on=['symbol'], append=True)
        zfs.snapshot_recursively('tss', pid)


In [8]:
store = TimeSeriesBatched()
big_df = pd.concat([v[0] for v in valueses])
first_append = pd.concat([v[1] for v in valueses])

In [76]:
%%timeit -n1 -r1
store.write(big_df, 0)

1 loop, best of 1: 8.75 s per loop


In [10]:
%%timeit -n1 -r1
store.append(first_append, 1)

1 loop, best of 1: 78.5 ms per loop


In [11]:
def run():
    for i in tqdm.tqdm(xrange(2, n)):
        df = pd.concat([v[i] for v in valueses])
        store.append(first_append, i)
        del df

run()

100%|██████████| 98/98 [00:46<00:00,  2.12it/s]


In [12]:
%%bash
zfs list -t snapshot

NAME     USED  AVAIL  REFER  MOUNTPOINT
tss@0     45K      -   154M  -
tss@1     60K      -   154M  -
tss@2   52.5K      -   154M  -
tss@3     42K      -   154M  -
tss@4   29.5K      -   154M  -
tss@5     94K      -   154M  -
tss@6   84.5K      -   154M  -
tss@7   75.5K      -   154M  -
tss@8   67.5K      -   154M  -
tss@9   61.5K      -   154M  -
tss@10    54K      -   154M  -
tss@11    43K      -   154M  -
tss@12    31K      -   154M  -
tss@13  95.5K      -   154M  -
tss@14    86K      -   154M  -
tss@15    77K      -   155M  -
tss@16    69K      -   155M  -
tss@17    62K      -   155M  -
tss@18  54.5K      -   155M  -
tss@19  44.5K      -   155M  -
tss@20  32.5K      -   155M  -
tss@21    97K      -   155M  -
tss@22  87.5K      -   155M  -
tss@23  78.5K      -   155M  -
tss@24  70.5K      -   155M  -
tss@25  63.5K      -   155M  -
tss@26    56K      -   155M  -
tss@27    45K      -   155M  -
tss@28    33K      -   155M  -
tss@29  98.5K      -   155M  -
tss@30  87.5K      -   156M  -

In [14]:
%%timeit -n1 -r1
path = '/tss/.zfs/snapshot/1/df.parquet'
tmp = pq.read_pandas(path).to_pandas()
del tmp

1 loop, best of 1: 2.78 s per loop


In [45]:
class TimeSeriesStoreHive(object):
    
    def __init__(self):
        self.max_row_groups = 100
    
    def write(self, big_df, pid):
        fastparquet.write('/tss/df.parquet', big_df, file_scheme='hive', partition_on=['symbol'])
        zfs.snapshot_recursively('tss', pid)

    def append(self, big_df, pid):
        path = '/tss/df.parquet'
        fastparquet.write(path, big_df, file_scheme='hive', partition_on=['symbol'], append=True)
        zfs.snapshot_recursively('tss', pid)


In [39]:
store = TimeSeriesStoreHive()

In [40]:
%%timeit -n1 -r1
store.write(big_df, 0)

1 loop, best of 1: 8.63 s per loop


In [41]:
%%timeit -n1 -r1
store.append(first_append, 1)

1 loop, best of 1: 4.72 s per loop


In [53]:
%%timeit -n1 -r1
tmp = spark.read.parquet('/tss/df.parquet/symbol=1').toPandas()
del tmp

1 loop, best of 1: 130 ms per loop


In [101]:
class TimeSeriesStoreSpark(object):
    
    def write(self, df, pid):
        df.write.mode('overwrite').partitionBy('symbol').parquet('/tss/spark/df.parquet')
        zfs.snapshot_recursively('tss/spark', pid)

    def append(self, big_df, pid):
        path = '/tss/spark/df.parquet'
        spark_append.write.mode('append').partitionBy('symbol').parquet('/tss/spark/df.parquet')
        zfs.snapshot_recursively('tss/spark', pid)


In [102]:
store = TimeSeriesStoreSpark()

In [116]:
spark_df = spark.read.parquet('/tss/df.parquet').cache()
_ = spark_df.count()
fastparquet.write('/tss/an_append.parquet', first_append, file_scheme='hive', partition_on=['symbol'])
spark_append = spark.read.parquet('/tss/an_append.parquet')

In [119]:
%%timeit -n1 -r1
store.write(spark_df, 0)

1 loop, best of 1: 9.13 s per loop


In [120]:
%%timeit -n1 -r1
print spark.read.parquet('/tss/spark/df.parquet').count()

10236374
1 loop, best of 1: 3.03 s per loop


In [121]:
%%timeit -n1 -r1
store.append(spark_append, 1)

1 loop, best of 1: 4.17 s per loop


In [122]:
%%timeit -n1 -r1
print spark.read.parquet('/tss/spark/df.parquet/symbol=1').count()

4742
1 loop, best of 1: 79.8 ms per loop
