### Data Storage in big data and distributed systems

Efficient storage can dramatically improve performance, particularly when operating repeatedly from disk.

Decompressing text and parsing CSV files is expensive. One of the most effective strategies with medium data is to use a binary storage format like HDF5. Often the performance gains from doing this is sufficient so that you can switch back to using Pandas again instead of using dask.dataframe.

In this section we'll learn how to efficiently arrange and store your datasets in on-disk binary formats. We'll use the following:

Main Take-aways

+ Storage formats affect performance by an order of magnitude
+ Text data will keep even a fast format like HDF5 slow
+ A combination of binary formats, column storage, and partitioned data turns one second wait times into 80ms wait times.

In [1]:
import os
filename = os.path.join('data', 'accounts.*.csv')
filename

'data/accounts.*.csv'

In [2]:
import dask.dataframe as dd
df_csv = dd.read_csv(filename)
df_csv.head()

NameError: name 'filename' is not defined

In [3]:
target = os.path.join('data', 'accounts.h5')
target



'data/accounts.h5'

In [4]:
%time df_csv.to_hdf(target, '/data')

CPU times: user 2.47 s, sys: 352 ms, total: 2.82 s
Wall time: 4.85 s


['data/accounts.h5', 'data/accounts.h5', 'data/accounts.h5']

In [5]:

# same data as before
df_hdf = dd.read_hdf(target, '/data')
df_hdf.head()

Unnamed: 0,id,names,amount
0,9,Wendy,15
1,15,Victor,77
2,381,Bob,3064
3,358,Ingrid,2041
4,299,Kevin,204


In [6]:
%time df_csv.amount.sum().compute()

CPU times: user 906 ms, sys: 279 ms, total: 1.19 s
Wall time: 558 ms


4186059719

In [7]:
%time df_hdf.amount.sum().compute()

CPU times: user 427 ms, sys: 150 ms, total: 577 ms
Wall time: 757 ms


4186059719

Sadly they are about the same, or perhaps even slower.

The culprit here is names column, which is of object dtype and thus hard to store efficiently. There are two problems here:

How do we store text data like names efficiently on disk?
Why did we have to read the names column when all we wanted was amount


In [8]:
target = os.path.join('data', 'accounts_optimized.h5')
%time df_hdf.categorize(columns=['names']).to_hdf(target, '/data')

CPU times: user 2.79 s, sys: 470 ms, total: 3.26 s
Wall time: 4.77 s


['data/accounts_optimized.h5',
 'data/accounts_optimized.h5',
 'data/accounts_optimized.h5']

In [9]:
df_hdf = dd.read_hdf(target, '/data')
df_hdf.head()

Unnamed: 0,id,names,amount
0,9,Wendy,15
1,15,Victor,77
2,381,Bob,3064
3,358,Ingrid,2041
4,299,Kevin,204


In [10]:

# But loads more quickly
%time df_hdf.amount.sum().compute()

CPU times: user 243 ms, sys: 64.3 ms, total: 307 ms
Wall time: 332 ms


4186059719

### Remote files
Dask can access various cloud- and cluster-oriented data storage services such as Amazon S3 or HDFS

Advantages:

+ scalable, 
+ secure storage

Disadvantages:
+ network speed becomes bottleneck

The way to set up dataframes (and other collections) remains very similar to before. Note that the data here is available anonymously, but in general an extra parameter storage_options= can be passed with further details about how to interact with the remote storage.

In [9]:
#taxi = dd.read_csv('s3://nyc-tlc/trip data/yellow_tripdata_2015-*.csv')

taxi = dd.read_csv('https://www.dropbox.com/s/17ui51hwpoqeb4p/fhv_tripdata_2015-01.csv?dl=1')
taxi

Unnamed: 0_level_0,Dispatching_base_num,Pickup_date,locationID
npartitions=2,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
,object,object,float64
,...,...,...
,...,...,...


In [10]:
taxi.head()

Unnamed: 0,Dispatching_base_num,Pickup_date,locationID
0,B00013,2015-01-01 00:30:00,
1,B00013,2015-01-01 01:22:00,
2,B00013,2015-01-01 01:23:00,
3,B00013,2015-01-01 01:44:00,
4,B00013,2015-01-01 02:00:00,


In [11]:
taxi.tail()

Unnamed: 0,Dispatching_base_num,Pickup_date,locationID
684980,B02765,2015-01-31 23:59:02,169.0
684981,B02765,2015-01-31 23:59:07,80.0
684982,B02765,2015-01-31 23:59:34,186.0
684983,B02765,2015-01-31 23:59:40,181.0
684984,B02765,2015-01-31 23:59:48,79.0


In [12]:
taxi = taxi.dropna()
taxi.head()

Unnamed: 0,Dispatching_base_num,Pickup_date,locationID
20904,B00053,2015-01-01 01:05:00,45.0
20905,B00053,2015-01-01 01:30:00,141.0
20906,B00053,2015-01-01 02:00:00,191.0
20907,B00053,2015-01-01 05:45:00,143.0
20908,B00053,2015-01-01 08:40:00,243.0
