In [1]:
import os

import pyreadr
import numpy as np
import opendatasets as ods
from dask.distributed import Client
from dask import dataframe as ddf

In [2]:
print(f'Working directory: {os.getcwd()}')
# for fname in os.listdir('Git'):
#     print(f' - {fname}')
os.chdir('Git/MachineLearningProjects/MLWithBigDataOnALaptop/src/dask')

Working directory: /opt/notebooks


# Download the data from Kaggle

You'll need to sign up with Kaggle, then go to the account page and create an API token before running this.

In [6]:
ods.download("https://www.kaggle.com/averkij/tennessee-eastman-process-simulation-dataset", data_dir='..')

Please provide your Kaggle credentials to download this dataset. Learn more: http://bit.ly/kaggle-creds
Your Kaggle username:

## File structure

This dataset consists of four files:
* TEP_FaultFree_Testing.RData (47.3 MB)
* TEP_FaultFree_Training.RData (24.7 MB)
* TEP_Faulty_Testing.RData (836.9 MB)
* TEP_Faulty_Training.RData (494.1 MB)

The "FaultFree" files contain simulation runs that demonstrate completely normal behaviour. The "Faulty" files contain simulations where a fault is introduced either one hour (training data) or eight hours (testing data) into the simulation. Simulations in the training files ran for 500 time steps (25 hours), while simulations in the test sets are larger (960 samples, 48 hours)

Columns 4 to 55 contain the actual measurements, while column 1 contains the fault number from 0 to 20, where 0 means no fault. To keep this simple, I'm going to convert this to 0 or 1 (no fault or a fault).

## Data structure

Column two contains `simulationRun`, a number from 1 to 500 in the training data, that determines what random seed was used to make that simulation. Importantly, multiple simulations using **the same `simulationRun` value** do exist. This happens in the "Faulty" files, where the simulation is run once for each fault. In the "FaultFree" files, there's only one simulation per `simulationRun`.

This does mean that the first hour of a simulation in the training data appears 21 times (once for the fault-free simulation and 20 times for the fault simulations). I'm going to solve this the easy way by dropping the first hour of each training simulation and eight hours of each testing simulation.

# Loading the data

In [3]:
os.makedirs('data', exist_ok=True)

## Loading RData files

This format is used by the R community, but for our purposes we need something that (a) works in Python and (b) doesn't need to be loaded entirely into RAM.

The `pyreadr` module loads RData frames into Pandas dataframes. Unfortunately, it loads the entire dataset into RAM. Here's an example of loading an object called `fault_free_training`.

In [4]:
r_data = pyreadr.read_r(
    '../tennessee-eastman-process-simulation-dataset/TEP_FaultFree_Training.RData',
    use_objects=['fault_free_training'])['fault_free_training']
r_data

Unnamed: 0,faultNumber,simulationRun,sample,xmeas_1,xmeas_2,xmeas_3,xmeas_4,xmeas_5,xmeas_6,xmeas_7,...,xmv_2,xmv_3,xmv_4,xmv_5,xmv_6,xmv_7,xmv_8,xmv_9,xmv_10,xmv_11
0,0.0,1.0,1,0.25038,3674.0,4529.0,9.2320,26.889,42.402,2704.3,...,53.744,24.657,62.544,22.137,39.935,42.323,47.757,47.510,41.258,18.447
1,0.0,1.0,2,0.25109,3659.4,4556.6,9.4264,26.721,42.576,2705.0,...,53.414,24.588,59.259,22.084,40.176,38.554,43.692,47.427,41.359,17.194
2,0.0,1.0,3,0.25038,3660.3,4477.8,9.4426,26.875,42.070,2706.2,...,54.357,24.666,61.275,22.380,40.244,38.990,46.699,47.468,41.199,20.530
3,0.0,1.0,4,0.24977,3661.3,4512.1,9.4776,26.758,42.063,2707.2,...,53.946,24.725,59.856,22.277,40.257,38.072,47.541,47.658,41.643,18.089
4,0.0,1.0,5,0.29405,3679.0,4497.0,9.3381,26.889,42.650,2705.1,...,53.658,28.797,60.717,21.947,39.144,41.955,47.645,47.346,41.507,18.461
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
249995,0.0,500.0,496,0.29325,3640.1,4473.0,9.1949,26.867,42.379,2700.2,...,53.429,29.249,60.773,21.532,40.451,34.064,48.953,48.291,40.812,18.756
249996,0.0,500.0,497,0.29134,3625.7,4506.2,9.2109,26.889,42.291,2700.6,...,53.830,28.975,61.517,21.750,42.762,42.645,51.055,48.589,40.933,19.360
249997,0.0,500.0,498,0.29438,3600.2,4478.3,9.1957,26.820,42.448,2700.3,...,54.163,28.676,61.656,21.487,42.109,39.770,46.770,48.648,41.465,19.344
249998,0.0,500.0,499,0.25269,3683.5,4486.4,9.2832,27.188,42.757,2697.4,...,53.453,24.889,61.564,21.392,39.334,42.274,43.623,48.797,39.835,18.512


In [5]:
r_data.dtypes

faultNumber      float64
simulationRun    float64
sample             int32
xmeas_1          float64
xmeas_2          float64
xmeas_3          float64
xmeas_4          float64
xmeas_5          float64
xmeas_6          float64
xmeas_7          float64
xmeas_8          float64
xmeas_9          float64
xmeas_10         float64
xmeas_11         float64
xmeas_12         float64
xmeas_13         float64
xmeas_14         float64
xmeas_15         float64
xmeas_16         float64
xmeas_17         float64
xmeas_18         float64
xmeas_19         float64
xmeas_20         float64
xmeas_21         float64
xmeas_22         float64
xmeas_23         float64
xmeas_24         float64
xmeas_25         float64
xmeas_26         float64
xmeas_27         float64
xmeas_28         float64
xmeas_29         float64
xmeas_30         float64
xmeas_31         float64
xmeas_32         float64
xmeas_33         float64
xmeas_34         float64
xmeas_35         float64
xmeas_36         float64
xmeas_37         float64


### Fixing data types

Sample and simulation run are actually integers, not floating point numbers. There's also some difference in their storage between the files, so fix that here. The fault number column is int32, but when Dask calculates stats for this column, it uses this dtype and experiences overflow. Solution, convert to int64.

In [6]:
def convert_to_int64_inplace(df):
    df['sample'] = df['sample'].astype(np.int64)
    df['simulationRun'] = df['simulationRun'].astype(np.int64)
    df['faultNumber'] = df['faultNumber'].astype(np.int64)

In [7]:
convert_to_int64_inplace(r_data)

### Converting from Pandas to Dask

Dask dataframes can be created from Pandas dataframes, various files (CSV, HDF5, etc.) and SQL database connections. The `from_pandas` function creates a Dask dataframe where the underlying data is stored in Pandas (i.e. its still in RAM). This conversion is useful for converting the dataframe to other formats supported by Dask so we can make use of out-of-core processing, or if you want to make use of Dask distributed for multicore processing. Doing this conversion for all four RData files is the most RAM-consuming part of this demo. **Use file formats that Dask can work with natively, e.g. CSV, HDF5. See [this list](https://docs.dask.org/en/latest/dataframe-create.html). Avoid Dask+Bcolz, I couldn't find a way to stop Dask from loading the entire array into RAM.**

To write the dataframe to an HDF file, we use `to_hdf`, while we use `read_hdf` to read it back. When reading it back, Dask is actually just creating a task graph. The HDF5 file is only read into RAM when the task graph is executed.

Dask divides this dataframe into blocks containing `chunksize` rows. Chunks represent the smallest part of the dataframe that will be loaded into RAM. Smaller chunks need less RAM for data, but result in bigger task graphs (which also require RAM). Having too many tasks results in innefficient processing. Using a `chunksize` that results in about 100-1000 chunks seems to work well, but of course make sure you can fit one chunk in about 1/25th to 1/50th of the RAM you want to use. More complex pipelines will decrease this limit.

In [8]:
data = ddf.from_pandas(r_data, chunksize=5000)

In [9]:
data.to_hdf('data/train.hdf', key='/train_*')

['data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf']

In [10]:
del data
data = ddf.read_hdf('data/train.hdf', key='/train_*')
len(data)

250000

In [11]:
del r_data
del data

### Loading the part of the training data that contains faults

This is pretty similar to the previous section. The two differences are:
- We have to discard the first 20 samples from each simulation as it is a duplicate of the fault-free simulations.
- We're going to append this to the fault-free training data and save all of it to a collection of HDF5 files.

In [12]:
r_data = pyreadr.read_r('../tennessee-eastman-process-simulation-dataset/TEP_Faulty_Training.RData', use_objects=['faulty_training'])['faulty_training']
r_data

Unnamed: 0,faultNumber,simulationRun,sample,xmeas_1,xmeas_2,xmeas_3,xmeas_4,xmeas_5,xmeas_6,xmeas_7,...,xmv_2,xmv_3,xmv_4,xmv_5,xmv_6,xmv_7,xmv_8,xmv_9,xmv_10,xmv_11
0,1,1.0,1,0.25038,3674.0,4529.0,9.2320,26.889,42.402,2704.3,...,53.744,24.657,62.544,22.137,39.935,42.323,47.757,47.510,41.258,18.447
1,1,1.0,2,0.25109,3659.4,4556.6,9.4264,26.721,42.576,2705.0,...,53.414,24.588,59.259,22.084,40.176,38.554,43.692,47.427,41.359,17.194
2,1,1.0,3,0.25038,3660.3,4477.8,9.4426,26.875,42.070,2706.2,...,54.357,24.666,61.275,22.380,40.244,38.990,46.699,47.468,41.199,20.530
3,1,1.0,4,0.24977,3661.3,4512.1,9.4776,26.758,42.063,2707.2,...,53.946,24.725,59.856,22.277,40.257,38.072,47.541,47.658,41.643,18.089
4,1,1.0,5,0.29405,3679.0,4497.0,9.3381,26.889,42.650,2705.1,...,53.658,28.797,60.717,21.947,39.144,41.955,47.645,47.346,41.507,18.461
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4999995,20,500.0,496,0.23419,3655.3,4461.7,9.3448,27.008,42.481,2703.0,...,53.670,23.350,61.061,20.719,40.999,38.653,47.386,47.528,40.212,17.659
4999996,20,500.0,497,0.26704,3647.4,4540.2,9.3546,27.034,42.671,2704.7,...,54.650,26.362,60.020,20.263,41.579,33.624,47.536,47.647,41.199,18.741
4999997,20,500.0,498,0.26543,3630.3,4571.6,9.4089,27.129,42.470,2705.1,...,54.274,26.521,59.824,20.189,41.505,40.967,52.437,47.802,41.302,23.199
4999998,20,500.0,499,0.27671,3655.7,4498.9,9.3781,27.353,42.281,2705.8,...,53.506,26.781,62.818,20.453,40.208,40.957,47.628,48.086,40.510,15.932


In [13]:
convert_to_int64_inplace(r_data)

In [16]:
# Delete data that overlaps with the fault-free training data.
r_data.drop(r_data[r_data['sample'] <= 20].index, inplace=True)
r_data.reset_index(drop=True)
r_data.shape

(4800000, 55)

In [17]:
data = ddf.from_pandas(r_data, chunksize=5000)

In [18]:
data.to_hdf('data/train.hdf', key='/train_*', append=True)

['data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train.hdf',
 'data/train

In [19]:
del r_data
del data

### Convert the test data into HDF5 files

Everything we've done above compressed into one cell :)

In [20]:
r_data = pyreadr.read_r('../tennessee-eastman-process-simulation-dataset/TEP_FaultFree_Testing.RData', use_objects=['fault_free_testing'])['fault_free_testing']
convert_to_int64_inplace(r_data)
data = ddf.from_pandas(r_data, chunksize=9600)
del r_data
data.to_hdf('data/test.hdf', key='/test_*')
del data

r_data = pyreadr.read_r('../tennessee-eastman-process-simulation-dataset/TEP_Faulty_Testing.RData', use_objects=['faulty_testing'])['faulty_testing']
convert_to_int64_inplace(r_data)
# Delete data that appears in the fault-free testing data.
r_data.drop(r_data[r_data['sample'] <= 8*20].index, inplace=True)
r_data.reset_index(drop=True)
data = ddf.from_pandas(r_data, chunksize=9600)
del r_data
data.to_hdf('data/test.hdf', key='/test_*', append=True)
del data

# Setting up dask.distributed for parallel processing

Not only do you get parallel processing, this is how you limit memory usage. The `memory_limit` parameter specifies how much RAM to assign to each worker, in this case 1GiB. Therefore, the total memory consumption is going to be at most 4GiB. If more RAM is needed, the workers will crash.

In [27]:
dask_client = Client(n_workers=4, threads_per_worker=4, memory_limit='1GiB')

In [22]:
training_data = ddf.read_hdf('data/train.hdf', key='/train_*')

In [28]:
min_values = training_data.min()

In [24]:
min_values.loc['xmeas_1'].compute() # Process the whole task graph to arrive at an answer.

xmeas_1   -0.004735
dtype: float64

In [30]:
min_values.compute()

faultNumber         0.000000
simulationRun       1.000000
sample              1.000000
xmeas_1            -0.004735
xmeas_2          3327.400000
xmeas_3          3540.700000
xmeas_4             6.639900
xmeas_5            25.386000
xmeas_6            39.656000
xmeas_7          2413.800000
xmeas_8            61.873000
xmeas_9           119.630000
xmeas_10            0.018396
xmeas_11           68.219000
xmeas_12           44.801000
xmeas_13         2317.100000
xmeas_14           18.436000
xmeas_15           44.983000
xmeas_16         2870.900000
xmeas_17           19.137000
xmeas_18           52.119000
xmeas_19           -3.375200
xmeas_20          232.450000
xmeas_21           79.898000
xmeas_22           62.870000
xmeas_23           23.225000
xmeas_24            7.443800
xmeas_25           18.156000
xmeas_26            5.912600
xmeas_27           12.198000
xmeas_28            0.912180
xmeas_29           20.152000
xmeas_30           11.980000
xmeas_31           12.324000
xmeas_32      

# Cleanup

In [31]:
dask_client.shutdown()

In [32]:
del dask_client