# Turbofan POC: Populate nodes with data
CAA 04/08/2020

In this notebook, we will use use a coordinating server to distribute data to the nodes, either from a remote source or from a local directory.

NOTE: Before running this notebook, ensure that you have run `bash init.sh`.

In [5]:
import syft as sy
from syft.grid.clients.dynamic_fl_client import DynamicFLClient
import torch
import pandas as pd
from numpy.random import laplace
from math import floor

from turbofanpoc.federated_trainer.helper.data_helper import _load_data, WINDOW_SIZE, _drop_unnecessary_columns, _transform_to_windowed_data, get_data_loader, _clip_rul

### Define helper functions

In [6]:
def add_rul_to_train_data(train_data):
    """ Calculate and add the RUL to all rows in the given training data.

    :param train_data: The training data
    :return: The training data with added RULs
    """
    # retrieve the max cycles per engine_node: RUL
    train_rul = pd.DataFrame(train_data.groupby('engine_no')['time_in_cycles'].max()).reset_index()

    # merge the RULs into the training data
    train_rul.columns = ['engine_no', 'max']
    train_data = train_data.merge(train_rul, on=['engine_no'], how='left')

    # add the current RUL for every cycle
    train_data['RUL'] = train_data['max'] - train_data['time_in_cycles']
    train_data.drop('max', axis=1, inplace=True)

    return train_data

def round_to_multiple(x, base):
    '''
    Round x down to multiple of base
    '''
    return base * floor(x/base)

def batch(tensor, batch_size):
    features_size = tensor.shape[1:]
    # shuffle and batch
    randi = torch.randperm(tensor.shape[0])
    # remove undersized tensor
    out = tensor[randi].split(batch_size)[:-1]
    out = torch.cat(out).view(-1, batch_size, *features_size)
    return out

def tuple_batch(tensors, batch_size):
    '''
    tensors: tuple of tensors
    '''
    return (batch(t, batch_size) for t in tensors)

### Set up configs

In [11]:
DATA_PATH = "./turbofanpoc/data"
DATA_NAME = "train_data_initial.txt"
MINIBATCH_SIZE = 4
NOISE = 0.2
DP_TYPE = 'local'
LABEL_DISTR_SKEW = 0.2

### Set up network

In [16]:
# Hook Torch
hook = sy.TorchHook(torch)

nodes = ["ws://18.224.229.188:3000/",
         "ws://18.224.229.188:3001/"]

compute_nodes = []
for node in nodes:
    compute_nodes.append(DynamicFLClient(hook, node))



### Load dataset
The code below will load prepared data from the Turbofan POC repository.

In [17]:
data = _load_data(DATA_NAME, DATA_PATH)
data_dropcol = _drop_unnecessary_columns(data)
data_rul = add_rul_to_train_data(data_dropcol)
x, y = _transform_to_windowed_data(data_rul, WINDOW_SIZE)
y = _clip_rul(y)
 # transform to torch tensor
tensor_x = torch.Tensor(x)
tensor_y = torch.Tensor(y)

1245 features with shape (80, 11)
1245 labels with shape (1245, 1)


#### Optional: Add differential privacy to data
We can add noise to the data at this point if we want to simulate the addition of noise by distributed data owners.

In [15]:
def laplacian_mechanism(input_tensor, sensitivity=0.5, epsilon=0.05):
    '''
    sensitivity and epsilon are arbitrarily 
    chosen for now
    '''
    beta = sensitivity / epsilon
    noise = torch.tensor(laplace(0, beta, 1))
    return input_tensor + noise

def add_noise(input_tensor, p_noise):
    '''
    tensor: input tensor
    p_noise: probability with which noise is added
    '''
    be_honest = (torch.rand(input_tensor.shape) < p_noise).float()
    tensor_artificial = laplacian_mechanism(input_tensor)
    # add noise
    mod_tensor = input_tensor.float() * be_honest + (1 - be_honest) * tensor_artificial
    sk_tensor = mod_tensor.float().mean()
    # de-skew result
    noisy_tensor = ((mod_tensor / p_noise) - 0.5) * p_noise / (1 - p_noise)
    return mod_tensor.type(torch.float32)

if DP_TYPE=='local':
    tensor_x = add_noise(tensor_x, NOISE)

#### Optional: Introduce skew (non-IIDness)

In [14]:
def label_distribution_skew(x, y, partitions, skew=1):
    def worker_split(N_labels, N_workers):
        """number of labels to assign to n workers"""
        worker_labels = round(max(1, N_labels / N_workers))
        worker_split = round(max(1, N_workers / N_labels))
        return worker_labels, worker_split

    worker_data = []
    N_labels = torch.histc(y, bins=partitions, max=500)
    n_labels, n_workers = worker_split(N_labels, partitions)
    
    worker_idx = 0
    for label_idx in range(0, N_labels, n_labels):
        mask = np.isin(y, range(label_idx, label_idx+n_labels))
        subset_idx = np.argwhere(mask)[:, 0]
        n_samples = subset_idx.shape[0]
        sample_size = math.floor(skew*n_samples)
        subset_idx = np.random.choice(subset_idx, sample_size, replace=False)
        x_subset = x[subset_idx, ]
        y_subset = y[subset_idx]   
    
        for partition in zip(np.array_split(x_subset, n_workers),
                             np.array_split(y_subset, n_workers)):
            worker_data.append(partition)
    
        x = np.delete(x, subset_idx, axis=0)
        y = np.delete(y, subset_idx)    
        worker_idx = worker_idx + n_workers

    return worker_data, x, y

if LABEL_DISTR_SKEW:
    dataiter = label_distribution_skew(tensor_x, tensor_y, len(compute_nodes), skew=LABEL_DISTR_SKEW)

RuntimeError: bool value of Tensor with more than one value is ambiguous

#### Create dataloader

### Tag and send split datasets to each worker

In [25]:
dataset_train = torch.utils.data.TensorDataset(tensor_x, tensor_y)
if not LABEL_DISTR_SKEW:
    trainloader = torch.utils.data.DataLoader(dataset_train, 
        # split data equally among nodes with shuffle
        batch_size=dataset_train.__len__()//len(compute_nodes),
        shuffle=True,
        drop_last=True,)
        #pin_memory=True) for faster dataloading to CUDA
else: 
        trainloader = torch.utils.data.DataLoader(dataset_train, 
        # split data equally among nodes without shuffle
        batch_size=dataset_train.__len__()//len(compute_nodes),
        shuffle=False,
        drop_last=True,)
        #pin_memory=True) for faster dataloading to CUDA
dataiter = iter(trainloader)

In [26]:
shared_x = []
shared_y = []
for node in compute_nodes:
    # create minibatches
    worker_batch = dataiter.next()
    sensors_train_tfan, labels_train_tfan = tuple_batch(worker_batch, MINIBATCH_SIZE)
    print(sensors_train_tfan.shape, labels_train_tfan.shape)
    # Tag tensors (allows them to be retrieved later)
    if not DP_TYPE:
        tagged_sensors = sensors_train_tfan.tag("#X", "#turbofan", "#dataset").describe("The input datapoints to the Turbofan dataset.")
    elif DP_TYPE=='local':
        tagged_sensors = sensors_train_tfan.tag("#X", "#localdp", "#turbofan", "#dataset").describe("The input datapoints to the Turbofan dataset.")
    tagged_label = labels_train_tfan.tag("#Y", "#turbofan", "#dataset").describe("The input labels to the Turbofan dataset.")
    
    shared_x.append(tagged_sensors.send(node))
    shared_y.append(tagged_label.send(node))

torch.Size([155, 4, 80, 11]) torch.Size([155, 4, 1])
torch.Size([155, 4, 80, 11]) torch.Size([155, 4, 1])


In [27]:
# print("X tensor pointers: ", shared_x1, shared_x2)
# print("Y tensor pointers: ", shared_y1, shared_y2)

print("X tensor pointers: ", shared_x)
print("Y tensor pointers: ", shared_y)

X tensor pointers:  [(Wrapper)>[PointerTensor | me:67070848708 -> alice:7948937110]
	Tags: #turbofan #dataset #localdp #X 
	Shape: torch.Size([155, 4, 80, 11])
	Description: The input datapoints to the Turbofan dataset...., (Wrapper)>[PointerTensor | me:92409215119 -> bob:42608560662]
	Tags: #turbofan #dataset #localdp #X 
	Shape: torch.Size([155, 4, 80, 11])
	Description: The input datapoints to the Turbofan dataset....]
Y tensor pointers:  [(Wrapper)>[PointerTensor | me:66967328021 -> alice:66802965416]
	Tags: #Y #dataset #turbofan 
	Shape: torch.Size([155, 4, 1])
	Description: The input labels to the Turbofan dataset...., (Wrapper)>[PointerTensor | me:95935075564 -> bob:25910322289]
	Tags: #Y #dataset #turbofan 
	Shape: torch.Size([155, 4, 1])
	Description: The input labels to the Turbofan dataset....]


### Disconnect nodes

To ensure that our training process (in the Part 2 notebook), if located on the same server, is not using cached or local data for training.

In [44]:
for node in compute_nodes:
    node.close()