# Scope
I receive a 140mb csv file daily into a share drive that I would like to use for historical analytics.  I have two problems I am trying to solve for:

    1. I want to create a historical parquet dataset as the storage for the historical analytics

    2. I want to know what combintation of tools I should use to perform the analytics

## Options
Problem 1

    1. Convert the daily file into its own parquet file and write that.  With that I can then use Dask or Pyarrow.  It's highly unlikely we choose the approach.  The overhead associated we Parquet files such as the page meta data and footers is not optimized for many small files, which this would be (each parquet file would be around 20MB).
    
    2. Create partioned parquet dataset.  Partitioning strategy to be determined (i.e. weekly/monthly).  In particular partitioning is relevant because parquet does not have an append mode - so I cant simply append to the same parquet file or a file that contains a large number of dates.  As a result reading in the parquet file to memory to append the latest days data can get out of hand quickly (since I am appending all columns).  After a few months I will have an in memory dataframe of 12-16GB which will be very difficult for the PC and server we have to handle.  So a scalable solution is required.

Problem 2

    1. Pyarrow and pandas.  If were smart with data types and only choosing what columns we want this should suffice.
    2. Dask.
    3. Combination.  Dask for early steps, reading in and filtering, then convert to pandas when data is a managable size.

Data Source:
https://www.kaggle.com/benhamner/sf-bay-area-bike-share

In [1]:
import pyarrow as pa
import pyarrow.parquet as pq
from pyarrow import csv
import pandas as pd
import dask.dataframe as dd

Check total bytes to start is zero

# Attempt 1

In [2]:
pa.total_allocated_bytes()

0

In [28]:
data = csv.read_csv(r"C:\Users\matth\OneDrive\Data\Kaggle\sf-bay-area-bike-share\status.csv")

In [44]:
data.nbytes

3419272022

In [41]:
data.to_pandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 71984434 entries, 0 to 71984433
Data columns (total 4 columns):
 #   Column           Dtype 
---  ------           ----- 
 0   station_id       int64 
 1   bikes_available  int64 
 2   docks_available  int64 
 3   time             object
dtypes: int64(3), object(1)
memory usage: 2.1+ GB


In [40]:
data.to_pandas(categories=['station_id']).info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 71984434 entries, 0 to 71984433
Data columns (total 4 columns):
 #   Column           Dtype   
---  ------           -----   
 0   station_id       category
 1   bikes_available  int64   
 2   docks_available  int64   
 3   time             object  
dtypes: category(1), int64(2), object(1)
memory usage: 1.7+ GB


In [5]:
data.shape

(71984434, 4)

In [29]:
pa.total_allocated_bytes()

10300775040

In [30]:
data.nbytes

3419272022

In [11]:
df = data.to_pandas()

In [12]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 71984434 entries, 0 to 71984433
Data columns (total 4 columns):
 #   Column           Dtype 
---  ------           ----- 
 0   station_id       int64 
 1   bikes_available  int64 
 2   docks_available  int64 
 3   time             object
dtypes: int64(3), object(1)
memory usage: 2.1+ GB


In [13]:
df.station_id.value_counts()

5     1047142
7     1047142
6     1047142
42    1047141
34    1047141
       ...   
31     872235
80     872134
82     840950
83     798868
84     731527
Name: station_id, Length: 70, dtype: int64

In [13]:
data_doubled = pa.concat_tables([data,data])

In [18]:
data_doubled.shape

(143968868, 4)

In [20]:
data_doubled.nbytes/1000

6838544.044

It took 30 seconds to write a 2GB CSV to a partitioned dataset which compressed to 338MB.  Playing around I tried rewriting the dataset again to see if the write time was the same and it was.  However, I was surprised to see that the data was actually appended which was contrary to my expectations.

In [9]:
partition_path = r"C:\Users\matth\OneDrive\Data\Kaggle\sf-bay-area-bike-share\bike-data"

In [10]:
pq.write_to_dataset(data,
                    root_path=partition_path,
                    partition_cols=['station_id'])

To read that same dataset back into a table takes 6.52 seconds

In [25]:
pq.read_table(partition_path).shape

(143968868, 4)

Now writing with the double sized dataset the write time goes to 1 minute and 16 seconds.  So performance seems to be non linear with roughly 150% write time for a 100% increase in size.

In [14]:
pq.write_to_dataset(data_doubled, 
                    root_path=partition_path,
                    partition_cols=['station_id'])

Now I want to try reading a specific partition, I will check its shape, and then try just writing that partition many times to see if that also works.  I see there are 5.2mm records.

In [11]:
data_23 = pq.ParquetDataset(partition_path, filters=[('station_id', '=', '23')]).read()

In [12]:
data_23.schema

bikes_available: int64
docks_available: int64
time: string
station_id: dictionary<values=int64, indices=int32, ordered=0>
metadata
--------
{b'pandas': b'{"index_columns": [], "column_indexes": [], "columns": [{"name":'
            b' "bikes_available", "field_name": "bikes_available", "pandas_typ'
            b'e": "int64", "numpy_type": "int64", "metadata": null}, {"name": '
            b'"docks_available", "field_name": "docks_available", "pandas_type'
            b'": "int64", "numpy_type": "int64", "metadata": null}, {"name": "'
            b'time", "field_name": "time", "pandas_type": "unicode", "numpy_ty'
            b'pe": "object", "metadata": null}], "creator": {"library": "pyarr'
            b'ow", "version": "0.16.0"}, "pandas_version": "1.0.1"}'}

I also confirm the filter worked as expected.

In [33]:
data_23.to_pandas().station_id.value_counts()

23    5235705
9           0
30          0
32          0
33          0
       ...   
56          0
55          0
54          0
51          0
10          0
Name: station_id, Length: 70, dtype: int64

In [40]:
data_23.to_pandas().head()

Unnamed: 0,bikes_available,docks_available,time,station_id
0,8,7,2013/08/29 12:06:01,23
1,8,7,2013/08/29 12:07:01,23
2,8,7,2013/08/29 12:08:01,23
3,8,7,2013/08/29 12:09:01,23
4,8,7,2013/08/29 12:10:01,23


I will now write this table back to the dataset 2 more times and I expect if the append operation works as it did on the whole dataset that just that partition will increase by about 10.4mm records.  The write operations themselves were pretty quick coming in around 2.5 seconds each.

In [36]:
pq.write_to_dataset(data_23,
                   root_path=partition_path,
                   partition_cols=['station_id'])

Now to test if the data was correctly appended.

In [38]:
data_23_2 = pq.ParquetDataset(partition_path, filters=[('station_id', '=', '23')]).read()

Looking at the shape of the table I can see that it did increase by the expected size.

In [39]:
data_23_2.shape

(15707115, 4)

Another test I will do is restarting the kernel to lose all variables and try appending an additional 2 times.  I'm doing this as a another check as my expectation coming in was that parquet is not appendable and I want to ensure that it will work in real life scenario where the same python session will not be used. 

In [4]:
data_23_3 = pq.ParquetDataset(partition_path, filters=[('station_id', '=', '23')]).read()

In [5]:
data_23_3.shape

(15707115, 4)

In [6]:
pq.write_to_dataset(data_23_3,
                   root_path=partition_path,
                   partition_cols=['station_id'])

After restarting the kernel, rewriting the data, and reading in the partition I see the expected number of rows.  It appears this solution will work to append data.  

In [11]:
pq.ParquetDataset(partition_path, filters=[('station_id', '=', '23')]).read().shape

(31414230, 4)

In [13]:
pq.ParquetDataset(partition_path, filters=[('station_id', '=', '23')]).read().to_pandas().station_id.unique()

[23]
Categories (1, int64): [23]

After digging into the directory I see that all the additional writes that I did added individual parquet files.  Which is actually very inefficient as highlighted before.  So an alternative solution will have to be investigated.  Everything above can be ignored.

# Attempt 2

write_table gives the ability to overwrite a single file which may come in handy.  A possible approach to this could be to pick manually partition on something like a combination of year and date and simply overwrite the relevant partition daily.  This would limit each parquet file size to around 400-500MB (in the real data) which translates to around 4-5GB in pandas which is acceptable.

In [2]:
data = csv.read_csv(r"C:\Users\matth\OneDrive\Data\Kaggle\sf-bay-area-bike-share\status.csv")

In [3]:
pq.write_table(data, r"C:\Users\matth\OneDrive\Data\Kaggle\sf-bay-area-bike-share\bikes\bikes.parquet.snappy")

In [72]:
df = data.to_pandas(categories=['station_id'])

In [88]:
data.to_pandas().assign(station_id=lambda x: x.station_id.astype('int8')).info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 71984434 entries, 0 to 71984433
Data columns (total 4 columns):
 #   Column           Dtype 
---  ------           ----- 
 0   station_id       int8  
 1   bikes_available  int64 
 2   docks_available  int64 
 3   time             object
dtypes: int64(2), int8(1), object(1)
memory usage: 1.7+ GB


In [73]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 71984434 entries, 0 to 71984433
Data columns (total 4 columns):
 #   Column           Dtype   
---  ------           -----   
 0   station_id       category
 1   bikes_available  int64   
 2   docks_available  int64   
 3   time             object  
dtypes: category(1), int64(2), object(1)
memory usage: 1.7+ GB


In [74]:
pth = "C:\\Users\\matth\\OneDrive\\Data\\Kaggle\\sf-bay-area-bike-share\\bikes\\bikes_{}.parquet.snappy"

for s in df.station_id.unique():
    df.query("station_id==@s").to_parquet(pth.format(str(s)), index=False)

In [19]:
data_quad = pa.concat_tables([data,data,data,data])

In [20]:
pq.write_table(data_quad, "bikes.parquet.snappy")

In [38]:
pth2 = "C:\\Users\\matth\\OneDrive\\Data\\Kaggle\\sf-bay-area-bike-share\\bikes_preserve"

In [79]:
df2 = pq.ParquetDataset(pth2, read_dictionary=['station_id']).read().to_pandas(categories=['station_id'])

In [84]:
pq.ParquetDataset(pth2, read_dictionary=['station_id']).read()

pyarrow.Table
station_id: int64
bikes_available: int64
docks_available: int64
time: string
metadata
--------
{b'pandas': b'{"index_columns": [], "column_indexes": [], "columns": [{"name":'
            b' "station_id", "field_name": "station_id", "pandas_type": "categ'
            b'orical", "numpy_type": "int8", "metadata": {"num_categories": 70'
            b', "ordered": false}}, {"name": "bikes_available", "field_name": '
            b'"bikes_available", "pandas_type": "int64", "numpy_type": "int64"'
            b', "metadata": null}, {"name": "docks_available", "field_name": "'
            b'docks_available", "pandas_type": "int64", "numpy_type": "int64",'
            b' "metadata": null}, {"name": "time", "field_name": "time", "pand'
            b'as_type": "unicode", "numpy_type": "object", "metadata": null}],'
            b' "creator": {"library": "pyarrow", "version": "0.16.0"}, "pandas'
            b'_version": "1.0.1"}'}

In [80]:
df2.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 71984434 entries, 0 to 71984433
Data columns (total 4 columns):
 #   Column           Dtype   
---  ------           -----   
 0   station_id       category
 1   bikes_available  int64   
 2   docks_available  int64   
 3   time             object  
dtypes: category(1), int64(2), object(1)
memory usage: 1.7+ GB


In [54]:
pth3 = "C:\\Users\\matth\\OneDrive\\Data\\Kaggle\\sf-bay-area-bike-share\\bikes"

In [87]:
pq.ParquetFile(pth3 + "\\bikes_55.parquet.snappy").schema

<pyarrow._parquet.ParquetSchema object at 0x000002C6A9676688>
station_id: INT64
bikes_available: INT64
docks_available: INT64
time: BYTE_ARRAY String
 

In [75]:
df3 = pq.ParquetDataset(pth3).read(use_pandas_metadata=True).to_pandas()

In [82]:
df3.info(null_counts=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 71984434 entries, 0 to 71984433
Data columns (total 4 columns):
 #   Column           Non-Null Count     Dtype 
---  ------           --------------     ----- 
 0   station_id       71984434 non-null  int64 
 1   bikes_available  71984434 non-null  int64 
 2   docks_available  71984434 non-null  int64 
 3   time             71984434 non-null  object
dtypes: int64(3), object(1)
memory usage: 2.1+ GB


In [83]:
sys.getsizeof(df3)

7198443552

In [61]:
df.info(null_counts=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 71984434 entries, 0 to 71984433
Data columns (total 4 columns):
 #   Column           Non-Null Count     Dtype 
---  ------           --------------     ----- 
 0   station_id       71984434 non-null  int64 
 1   bikes_available  71984434 non-null  int64 
 2   docks_available  71984434 non-null  int64 
 3   time             71984434 non-null  object
dtypes: int64(3), object(1)
memory usage: 2.1+ GB


In [57]:
df.equals(df3)

False

In [63]:
df3.equals(df)

False

In [69]:
a = df.groupby('station_id').sum()
a

Unnamed: 0_level_0,bikes_available,docks_available
station_id,Unnamed: 1_level_1,Unnamed: 2_level_1
2,13790337,14406923
3,8859768,6835432
4,5543081,5953024
5,8497365,11381034
6,7967246,7731859
...,...,...
77,13145043,14229020
80,5928987,7140769
82,5125120,7472056
83,5179287,6799866


In [70]:
b = df3.groupby('station_id').sum()
b

Unnamed: 0_level_0,bikes_available,docks_available
station_id,Unnamed: 1_level_1,Unnamed: 2_level_1
2,13790337,14406923
3,8859768,6835432
4,5543081,5953024
5,8497365,11381034
6,7967246,7731859
...,...,...
77,13145043,14229020
80,5928987,7140769
82,5125120,7472056
83,5179287,6799866


In [71]:
a.equals(b)

True

In [30]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 71984434 entries, 0 to 71984433
Data columns (total 4 columns):
 #   Column           Dtype 
---  ------           ----- 
 0   station_id       int64 
 1   bikes_available  int64 
 2   docks_available  int64 
 3   time             object
dtypes: int64(3), object(1)
memory usage: 2.1+ GB


In [40]:
df2.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 71984434 entries, 2083920 to 39525030
Data columns (total 4 columns):
 #   Column           Dtype 
---  ------           ----- 
 0   station_id       int64 
 1   bikes_available  int64 
 2   docks_available  int64 
 3   time             object
dtypes: int64(3), object(1)
memory usage: 2.7+ GB


In [32]:
pth3 = r"C:\\Users\\matth\\OneDrive\\Data\\Kaggle\\sf-bay-area-bike-share\\bikes\bikes_3.parquet.snappy"
pq.read_table(pth3)

pyarrow.Table
station_id: int64
bikes_available: int64
docks_available: int64
time: string
__index_level_0__: int64
metadata
--------
{b'pandas': b'{"index_columns": ["__index_level_0__"], "column_indexes": [{"na'
            b'me": null, "field_name": null, "pandas_type": "unicode", "numpy_'
            b'type": "object", "metadata": {"encoding": "UTF-8"}}], "columns":'
            b' [{"name": "station_id", "field_name": "station_id", "pandas_typ'
            b'e": "int64", "numpy_type": "int64", "metadata": null}, {"name": '
            b'"bikes_available", "field_name": "bikes_available", "pandas_type'
            b'": "int64", "numpy_type": "int64", "metadata": null}, {"name": "'
            b'docks_available", "field_name": "docks_available", "pandas_type"'
            b': "int64", "numpy_type": "int64", "metadata": null}, {"name": "t'
            b'ime", "field_name": "time", "pandas_type": "unicode", "numpy_typ'
            b'e": "object", "metadata": null}, {"name": null, "fie

In [81]:
import sys
def sizeof_fmt(num, suffix='B'):
    ''' by Fred Cirera,  https://stackoverflow.com/a/1094933/1870254, modified'''
    for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']:
        if abs(num) < 1024.0:
            return "%3.1f %s%s" % (num, unit, suffix)
        num /= 1024.0
    return "%.1f %s%s" % (num, 'Yi', suffix)

for name, size in sorted(((name, sys.getsizeof(value)) for name, value in locals().items()),
                         key= lambda x: -x[1])[:10]:
    print("{:>30}: {:>8}".format(name, sizeof_fmt(size)))

                           _14:  7.2 GiB
                           df3:  6.7 GiB
                            df:  6.2 GiB
                           df2:  6.2 GiB
                           _13:  3.8 GiB
                          data:  3.2 GiB
                           _32: 55.6 MiB
                        si_tbl: 38.5 MiB
                            __:  1.7 KiB
                           ___:  1.7 KiB
