# 1. Create dummy train and test data as np.arrary

In [2]:
import pandas as pd
import numpy as np
import torch
import os
from pyspark.ml.torch.distributor import TorchDistributor
from pyspark.sql import SparkSession

In [2]:
X_train = np.array([[-0.86143957,0.51365619]
                   ,[ 0.15802293, 0.75413978]
                   ,[ 0.10045332,-1.02042385]
                   ,[ 0.21542833, 0.81745144]
                   ,[-0.33609156, 0.83596599]
                   ,[ 0.67947953,-0.23228435]
                   ,[ 0.72633661,-0.68139863]
                   ,[ 0.64147019,-0.78286524]
                   ,[ 0.87929062, 0.49881147]
                   ,[ 0.11631592, 0.77449553]])

y_train = np.array([0, 1, 0, 1, 1, 1, 0, 0, 0, 1])
X_test = np.array([[-0.78586889,  0.59082396],
                [-0.70465859,  0.34926669],
                [-0.70729354, -0.48752429],
                [ 0.11865543, -0.86599624],
                [-0.23169779, -0.91255477],
                [ 0.8423562 ,  0.16031322],
                [-0.14796425,  1.01458079],
                [-0.04146067,  0.9391543 ],
                [ 0.74951054,  0.32433079],
                [-0.70183734,  0.6173987 ]])
y_test = np.array([0, 1, 1, 1, 0, 1, 0, 0, 1, 0])

# 2. Convert the train and test data from np.arrays to PyTorch tensors

In [3]:
from torch.utils.data import Dataset, DataLoader

# Convert data to torch tensors
class Data(Dataset):
    def __init__(self, X, y):
        self.X = torch.from_numpy(X.astype(np.float32))
        self.y = torch.from_numpy(y.astype(np.float32))
        self.len = self.X.shape[0]
       
    def __getitem__(self, index):
        return self.X[index], self.y[index]
   
    def __len__(self):
        return self.len

batch_size = 5

# Instantiate training and test data
train_data = Data(X_train, y_train)
train_dataloader = DataLoader(dataset=train_data, batch_size=batch_size, shuffle=True)

test_data = Data(X_test, y_test)
test_dataloader = DataLoader(dataset=test_data, batch_size=batch_size, shuffle=True)

In [4]:
# Check if it's working using the train data: Expect 2 batches since batch size is 5
for batch, (X, y) in enumerate(train_dataloader):
    print(f"Batch: {batch+1}")
    print(f"X shape: {X.shape}")
    print(f"y shape: {y.shape}")

Batch: 1
X shape: torch.Size([5, 2])
y shape: torch.Size([5])
Batch: 2
X shape: torch.Size([5, 2])
y shape: torch.Size([5])


# 3. Create a simple two-layer neural network that uses the rectified linear unit(ReLU) activation function (torch.nn.functional.relu)

In [5]:
import torch
from torch import nn
from torch import optim

input_dim = 2
hidden_dim = 10
output_dim = 1

class NeuralNetwork(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(NeuralNetwork, self).__init__()
        self.layer_1 = nn.Linear(input_dim, hidden_dim)
        nn.init.kaiming_uniform_(self.layer_1.weight, nonlinearity="relu")
        self.layer_2 = nn.Linear(hidden_dim, output_dim)
       
    def forward(self, x):
        x = torch.nn.functional.relu(self.layer_1(x))
        x = torch.nn.functional.sigmoid(self.layer_2(x))

        return x

In [6]:
model = NeuralNetwork(input_dim, hidden_dim, output_dim)
model

NeuralNetwork(
  (layer_1): Linear(in_features=2, out_features=10, bias=True)
  (layer_2): Linear(in_features=10, out_features=1, bias=True)
)

# 4. Define train function with binary crossentropy loss and stochastic gradient descent optimizer with a learning rate of 0.1

In [7]:
torch.cuda.device_count()

1

In [8]:
num_epochs = 5
learning_rate = 0.1
loss_values = []
device = 'cuda' if torch.distributed.is_nccl_available() else 'cpu'
back_end = 'nccl' if device=='cuda' else 'gloo'

In [11]:
def train_distribute():
    #### These imports HAVE TO be included INSIDE of this train function in order to be installed on each worker. ####
    import torch.distributed as dist
    from torch.nn.parallel import DistributedDataParallel as DDP
    from torch.utils.data.distributed import DistributedSampler
    
    print("Running distributed training")

    os.environ['MASTER_ADDR'] = '192.168.1.70'
    os.environ['MASTER_PORT'] = '4040'
    dist.init_process_group(backend=back_end, init_method='env://', rank = 1, world_size = 1)
    
    #### Added Distributed Train Dataloader ####
    train_data = Data(X_train, y_train)
    train_sampler = DistributedSampler(dataset=train_data)
    train_dataloader = DataLoader(train_data, batch_size=batch_size, sampler=train_sampler) 
    
    #### Added Distributed Model ####
    model = NeuralNetwork(input_dim, hidden_dim, output_dim).to(device)
    ddp_model = DDP(model) # device_ids=[local_rank], output_device=local_rank)
    
    #### Train Model ####
    learning_rate = 0.1
    loss_fn = nn.BCELoss()
    optimizer = torch.optim.SGD(ddp_model.parameters(), lr=learning_rate)
    for epoch in range(num_epochs):
        ddp_model.train()
        for batch_idx, (X, y) in enumerate(train_dataloader):
            X, y = X.to(device), y.to(device)
            # zero the parameter gradients
            optimizer.zero_grad()
            # forward + backward + optimize
            pred = ddp_model(X)
            loss = loss_fn(pred, y.unsqueeze(-1))
            loss_values.append(loss.item())
            loss.backward()
            optimizer.step()
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(X), len(train_dataloader) * len(X),
                100. * batch_idx / len(train_dataloader), loss.item()))

# 5. Test without TorchDistributor
### The following validates the training loop by running training on a single CPU

In [12]:
train_distribute()

Running distributed training


# 6. Single node multi-CPU training

In [3]:
spark = SparkSession \
    .builder \
    .appName("test") \
    .getOrCreate()

In [4]:
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
spark.conf.set("spark.sql.files.ignoreCorruptFiles", True)
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

DataFrame[key: string, value: string]

In [8]:
# TorchDistributor(num_processes=1, local_mode=False, use_gpu=True).run(train_distribute)
df = spark.read.option("header","true") \
.format("csv") \
.load("iris.csv")

In [9]:
df.show()

+------------+-----------+------------+-----------+-------+
|sepal.length|sepal.width|petal.length|petal.width|variety|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|         .2| Setosa|
|         4.9|          3|         1.4|         .2| Setosa|
|         4.7|        3.2|         1.3|         .2| Setosa|
|         4.6|        3.1|         1.5|         .2| Setosa|
|           5|        3.6|         1.4|         .2| Setosa|
|         5.4|        3.9|         1.7|         .4| Setosa|
|         4.6|        3.4|         1.4|         .3| Setosa|
|           5|        3.4|         1.5|         .2| Setosa|
|         4.4|        2.9|         1.4|         .2| Setosa|
|         4.9|        3.1|         1.5|         .1| Setosa|
|         5.4|        3.7|         1.5|         .2| Setosa|
|         4.8|        3.4|         1.6|         .2| Setosa|
|         4.8|          3|         1.4|         .1| Setosa|
|         4.3|          3|         1.1| 

In [11]:
spark.stop()