# Train a Pytorch model with SparkXshards

Copyright 2016 The BigDL Authors.

SparkXshards in Orca allows users to process large-scale dataset using existing Python codes in a distributed and data-parallel fashion, as shown below. This notebook is an example of how to train a pytorch model using data of SparkXshards on Orca. 

It is adapted from [PyTorch Tutorial: How to Develop Deep Learning Models with Python](https://machinelearningmastery.com/pytorch-tutorial-develop-deep-learning-models/)

In [None]:
# import necessary libraries
import numpy as np
from torch.nn import Linear
from torch.nn import ReLU
from torch.nn import Sigmoid
from torch.nn import Module
from torch.optim import SGD
from torch.nn import BCELoss
from torch.nn.init import kaiming_uniform_
from torch.nn.init import xavier_uniform_

import bigdl.orca.data.pandas
from bigdl.orca import init_orca_context, stop_orca_context
from bigdl.orca.learn.pytorch import Estimator
from bigdl.orca.learn.metrics import Accuracy
from bigdl.orca.data.transformer import StringIndexer
import os

os.environ['KMP_DUPLICATE_LIB_OK'] ='True'


In [None]:
# start an OrcaContext
sc = init_orca_context(cores=4, memory="4g")

## Load data in parallel and get general information

Load data into data_shards, it is a SparkXshards that can be operated on in parallel, here each element of the data_shards is a panda dataframe read from a file on the cluster. Users can distribute local code of `pd.read_csv(dataFile)` using `bigdl.orca.data.pandas.read_csv(datapath)`.

In [None]:
data_shards = bigdl.orca.data.pandas.read_csv('../ionosphere.csv', header=None)

In [4]:
# show the first couple of rows in the data_shards
data_shards.head(5)

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,25,26,27,28,29,30,31,32,33,34
0,1,0,0.99539,-0.05889,0.85243,0.02306,0.83398,-0.37708,1.0,0.0376,...,-0.51171,0.41078,-0.46168,0.21266,-0.3409,0.42267,-0.54487,0.18641,-0.453,g
1,1,0,1.0,-0.18829,0.93035,-0.36156,-0.10868,-0.93597,1.0,-0.04549,...,-0.26569,-0.20468,-0.18401,-0.1904,-0.11593,-0.16626,-0.06288,-0.13738,-0.02447,b
2,1,0,1.0,-0.03365,1.0,0.00485,1.0,-0.12062,0.88965,0.01198,...,-0.4022,0.58984,-0.22145,0.431,-0.17365,0.60436,-0.2418,0.56045,-0.38238,g
3,1,0,1.0,-0.45161,1.0,1.0,0.71216,-1.0,0.0,0.0,...,0.90695,0.51613,1.0,1.0,-0.20099,0.25682,1.0,-0.32382,1.0,b
4,1,0,1.0,-0.02401,0.9414,0.06531,0.92106,-0.23255,0.77152,-0.16399,...,-0.65158,0.1329,-0.53206,0.02431,-0.62197,-0.05707,-0.59573,-0.04608,-0.65697,g


In [5]:
# see the num of partitions of data_shards
data_shards.num_partitions()


1

In [6]:
# count total number of rows in the data_shards
len(data_shards)

351

In [7]:
# columns information of element of data_shards.
columns = data_shards.get_schema()['columns']

##  Encode labels

The labels are in strings. Users can transform the strings into integers using `StringIndexer`

In [8]:
label_encoder = StringIndexer(inputCol=columns[-1])
data_shards = label_encoder.fit_transform(data_shards)

createDataFrame from shards attempted Arrow optimization failed as: 'NoneType' object has no attribute 'json',Will try without Arrow optimization
2022-11-23 23:49:03 WARN  Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.


                                                                                

create shards from Spark DataFrame attempted Arrow optimization failed as: name 'df' is not defined. Will try without Arrow optimization


[Stage 12:>                                                         (0 + 1) / 1]                                                                                

Labels start from 1 so need to be updated to zero based.

In [9]:
def update_label_to_zero_base(df):
    df['34'] = df['34'] - 1
    df = df.astype("float32")
    return df
data_shards = data_shards.transform_shard(update_label_to_zero_base)
print(data_shards.head(5), data_shards.get_schema())

## Assemble feature and labels

In [10]:
data_shards = data_shards.assembleFeatureLabelCols(featureCols=list(columns[:-1]),
                                                   labelCols=[columns[-1]])

## Define PyTorch model and train it

Users can build a PyTorch model as usual and use Orca Estimator to train it

In [11]:
# define a MLP model
class MLP(Module):
    # define model elements
    def __init__(self, n_inputs):
        super(MLP, self).__init__()
        # input to first hidden layer
        self.hidden1 = Linear(n_inputs, 10)
        kaiming_uniform_(self.hidden1.weight, nonlinearity='relu')
        self.act1 = ReLU()
        # second hidden layer
        self.hidden2 = Linear(10, 8)
        kaiming_uniform_(self.hidden2.weight, nonlinearity='relu')
        self.act2 = ReLU()
        # third hidden layer and output
        self.hidden3 = Linear(8, 1)
        xavier_uniform_(self.hidden3.weight)
        self.act3 = Sigmoid()

    # forward propagate input
    def forward(self, X):
        # input to first hidden layer
        X = self.hidden1(X)
        X = self.act1(X)
        # second hidden layer
        X = self.hidden2(X)
        X = self.act2(X)
        # third hidden layer and output
        X = self.hidden3(X)
        X = self.act3(X)
        return X


def model_creator(config):
    model = MLP(config["n_inputs"])
    model.train()
    return model

In [12]:
# define criterion and optimizer
def optimizer_creator(model, config):
    optimizer = SGD(model.parameters(), lr=config["lr"], momentum=config["momentum"])
    return optimizer

criterion = BCELoss()


In [13]:
# build Orca Estimator
orca_estimator = Estimator.from_torch(model=model_creator,
                                      optimizer=optimizer_creator,
                                      loss=criterion,
                                      metrics=[Accuracy()],
                                      backend="spark",
                                      config={"n_inputs": 34,
                                              "lr": 0.01,
                                              "momentum": 0.9})


creating: createTorchLoss
creating: createTorchOptim
creating: createZooKerasAccuracy
creating: createEstimator


In [None]:
# train the model
orca_estimator.fit(data=data_shards, epochs=100, batch_size=32)

In [15]:
# stop OrcaContext
stop_orca_context()

Stopping orca context
