# Parallel Training of One Vs. All Logistic Regression Classifiers

In this example, we will show how dagrs can help implement machine learning algorithms to train classifiers in a parallel manner.

Install required packages if you need to run the following cells.


In [None]:
!conda install --yes --file requirements.txt

## Background

Feel free to skip this section if you are an experienced machine learning expert.
This section helps you understand the concept of logistic regression and one vs. all classifiers. 

### Logistic Regression

Logistic Regression (LR) is a statistical method and machine learning (ML) algorithm used for binary classification tasks, where the goal is to predict one of two possible outcomes based on input data.

The core of logistic regression is the sigmoid function (also called the logistic function), which maps any input to a value between 0 and 1. This output can be interpreted as a probability that an instance belongs to a particular class.

The formula for the sigmoid function is:
$$
    \delta(z)=\frac{1}{1+e^{-z}}
$$
The output $\delta(z)$ represents the probability of the instance belonging to class 1. So the probability of belonging to class 0 is 1-$\delta(z)$.

Let's implement function $\delta(z)$ in Python:

In [None]:
import numpy as np

def sigmoid(z):
    return 1/(1+np.exp(-z))

### Cost Function
Logistic regression uses a log-likelihood function (also known as binary cross-entropy) to measure how well the model's predictions match the actual labels. The goal is to minimize this cost function through optimization (usually using gradient descent).

The binary cross-entropy loss for a single training example is:
$$
L(y,\hat{y})=-[ylog(\hat{y})+(1-y)log(1-\hat{y})]
$$

where:
- $y$ is the actual class (either 0 or 1),
- $\hat{y}$ is the predicted probability of the instance being in class 1.

Assume $\theta$ represents the parameters in our model, then the cost function of $\theta$ is
$$
J(\theta)=-\frac{1}{m}\sum^m_{i=1}[y^{(i)}log(\delta(\theta x^{(i)}))+(1-y)^{(i)}log(1-\delta(\theta x^{(i)}))] + \frac{\lambda}{2m}\sum^n_{i=1}\theta_j^2
$$

where $\lambda$ is a regularization parameter. It is used to prevent overfitting by penalizing large values of the model's parameters.

Now let's implement function $J(\theta)$ in Python:

In [None]:
def regularized_cost(Theta,X,y,l):
    ThetaReg=Theta[1:]
    cost=(-y*np.log(sigmoid(X@Theta)))-(1-y)*np.log(1-sigmoid((X@Theta)))
    reg=(ThetaReg@ThetaReg)*l/(2*len(X))
    return np.mean(cost)+reg

### Gradient Function

The gradient function refers to the vector of partial derivatives of a function with respect to its parameters. In machine learning and optimization, the gradient is used to update model parameters during training (e.g., in gradient descent) in order to minimize a cost or loss function.

The gradient of $J(\theta)$ with respect to $\theta_j$ is computed as follows:
$$
\frac{\partial J(\theta)}{\partial \theta_0}=\frac{1}{m}\sum^m_{i=1}(\delta(\theta x^{(i)}) - y^{(i)})x_0^{(i)}, \text{for $j = 0$},
$$
$$
\frac{\partial J(\theta)}{\partial \theta_j}=(\frac{1}{m}\sum^m_{i=1}(\delta(\theta x^{(i)}) - y^{(i)})x_j^{(i)}) + \frac{\lambda}{m}\theta_j, \text{for $j \ge 1$}.
$$

Next let's implement the gradient function in Python:

In [None]:
def regularized_gradient(Theta,X,y,l):
    ThetaReg=Theta[1:]
    cost=(X.T@(sigmoid(X@Theta)-y))*(1/len(X))
    reg=np.concatenate([np.array([0]),(l/len(X))*ThetaReg])
    return cost+reg

### One-vs-All Classifier

The One-vs-All (OvA) classifier, also known as One-vs-Rest (OvR), is a strategy used to extend binary classification algorithms to handle multi-class classification problems. It is commonly used in algorithms like logistic regression, Support Vector Machines, and others when you need to classify data into more than two classes.

For a classification problem with $K$ classes, the OvA strategy trains $K$ separate binary classifiers.For each classifier $k$ (where $k = 1, 2, ..., K$), the classifier is trained to distinguish between class $k$ and all other classes.

For example, if you have a multi-class problem with classes A, B, and C, you'll train three classifiers:
- Classifier 1: Class A vs. (Class B $\cup$ Class C)
- Classifier 2: Class B vs. (Class A $\cup$ Class C)
- Classifier 3: Class C vs. (Class A $\cup$ Class B)

## Training Set

`ex3data1.mat` contains 5000 images of handwritten digits. It is a dataset typically used in machine learning exercises.
The following code displays 100 images in this dataset randomly (this may take a while). 

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.io import loadmat

def load_data(path):
    data=loadmat(path)
    X=data["X"]
    y=data["y"]
    return X,y

def plot_an_image(X):
    # Pick 100 images randomly
    sample_index=np.random.choice(np.arange(X.shape[0]),100)
    sample_imagex=X[sample_index,:]
    #image (100:400)
    fix,ax_array=plt.subplots(nrows=10,ncols=10,figsize=(8,8),sharey=True,sharex=True)
    for row in range(10):
        for col in range(10):
            ax_array[row,col].matshow(sample_imagex[row*10+col].reshape(20,20),cmap='gray_r')
    plt.xticks([])
    plt.yticks([])
    plt.show()

X, y = load_data('ex3data1.mat')
plot_an_image(X)

## Traditional Approach to LR OvA Solutions

This sections shows how we write a ML algoritm in python to recognize these handwritten digits above.

We will use multiple logistic regression models to build a multi-class classifier. Because there are 10 classes (i.e. from digit 1 to digit 10), we need 10 separate logistic regression classifiers.

The [following code](./raw.py) trains 10 different classifiers (in function `one_vs_all`), and then prints prediction accuracy on the training set.

In [None]:
from scipy.io import loadmat
import numpy as np
from scipy.optimize import minimize
import sys

def one_vs_all(X,y,l,K):
    All_Theta=np.zeros((K,X.shape[1]))
    for i in range(1,K+1):
        Theta=np.zeros(X.shape[1])
        y_i=np.array([1 if labal==i else 0 for labal in y])
        ret=minimize(fun=regularized_cost, x0=Theta,args=(X,y_i,l),method='TNC',jac=regularized_gradient)
        All_Theta[i-1:]=ret.x
    return All_Theta

def load_data(path):
    data=loadmat(path)
    X=data["X"]
    y=data["y"]
    return X,y

def sigmoid(z):
    return 1/(1+np.exp(-z))

def regularized_cost(Theta,X,y,l):
    ThetaReg=Theta[1:]
    cost=(-y*np.log(sigmoid(X@Theta)))-(1-y)*np.log(1-sigmoid((X@Theta)))
    reg=(ThetaReg@ThetaReg)*l/(2*len(X))
    return np.mean(cost)+reg

def regularized_gradient(Theta,X,y,l):
    ThetaReg=Theta[1:]
    cost=(X.T@(sigmoid(X@Theta)-y))*(1/len(X))
    reg=np.concatenate([np.array([0]),(l/len(X))*ThetaReg])
    return cost+reg

def predict(X,All_Theta):
    h=sigmoid(X@All_Theta.T)
    h_argmax=np.argmax(h,axis=1)
    h_argmax=h_argmax+1
    return h_argmax


np.set_printoptions(threshold=sys.maxsize)
X, y = load_data('ex3data1.mat')
X=np.insert(X,0,1,axis=1)
y=y.flatten()
All_Theta=one_vs_all(X, y, 1, 10)
y_predict=predict(X, All_Theta)
accuracy=np.mean(y_predict==y)
print("accuracy=%.2f%%"%(accuracy*100))

## Train Classifiers in a Parallel Manner

If the training set is much larger than we use here, then training each classifier one by one will take a long time.

We can use dagrs to train these models in parallel.

First, create 10 nodes and train a model on each node. Then create a 'root' node collects the training results.

We only use dagrs for shceduling the training and predicting tasks. The [training](./lr_i.py) and [prediction](./lr_root.py) logic will still be written in python scripts.

### Training Script of Each Class
The training [script](./lr_i.py) receives 2 parameters: the training set file path and the class $i$. It calculates the converged $theta_i$, saves it to a file, and prints the path to the saved file to stdout.
```python
import numpy as np
from scipy.io import loadmat
from scipy.optimize import minimize
import sys


def load_data(path):
    data=loadmat(path)
    X=data["X"]
    y=data["y"]
    return X,y

def sigmoid(z):
    return 1/(1+np.exp(-z))

def regularized_cost(Theta,X,y,l):
    ThetaReg=Theta[1:]
    cost=(-y*np.log(sigmoid(X@Theta)))-(1-y)*np.log(1-sigmoid((X@Theta)))
    reg=(ThetaReg@ThetaReg)*l/(2*len(X))
    return np.mean(cost)+reg


def regularized_gradient(Theta,X,y,l):
    ThetaReg=Theta[1:]
    cost=(X.T@(sigmoid(X@Theta)-y))*(1/len(X))
    reg=np.concatenate([np.array([0]),(l/len(X))*ThetaReg])
    return cost+reg

def one_vs_all(X,y,l,i):
    Theta=np.zeros(X.shape[1])
    y_i=np.array([1 if labal==(i + 1) else 0 for labal in y])
    ret=minimize(fun=regularized_cost, x0=Theta,args=(X,y_i,l),method='TNC',jac=regularized_gradient)
    return ret.x

argc = len(sys.argv)
argv = sys.argv

X, y = load_data(argv[1])
X=np.insert(X,0,1,axis=1)
y=y.flatten()

i = int(argv[2])
theta = (one_vs_all(X, y, 1, i))

s = np.array2string(theta, max_line_width=1 << 31)
with open(f"theta_{i}.txt", "w") as f:
    f.write(s)
print(f"theta_{i}.txt")
```

The [prediction](./lr_root.py) script receives 10 arguments, each of which is the path to a file containing $theta_i$. This script will merge each file and get the $theta$ we need. Finally, it will calculate the prediction accuracy on the training set and print the result to stdout.
```python
import numpy as np
from scipy.io import loadmat
import sys
import os

def load_data(path):
    data=loadmat(path)
    X=data["X"]
    y=data["y"]
    return X,y

def sigmoid(z):
    return 1 / (1 + np.exp(-z))

def predict(X,All_Theta):
    h=sigmoid(X@All_Theta.T)
    h_argmax=np.argmax(h,axis=1)
    h_argmax=h_argmax+1
    return h_argmax

def main():
    np.set_printoptions(threshold=sys.maxsize)

    argv = sys.argv
    X, y = load_data(argv[1])
    X=np.insert(X,0,1,axis=1)
    y=y.flatten()
    
    # Load theta values for each class from the provided files
    All_Theta = np.zeros((10, X.shape[1]))  # 10 classes, with the same number of features

    for i in range(0, 10):
        # Load theta for class (i+1) from the file
        file = argv[2+i]
        j = int(file[6])
        with open(file, "r") as f:
            s = f.read().strip("[] ")
            All_Theta[j] = np.fromstring(s, sep=" ", dtype=float)
        os.remove(file)

    # Predict the class using the provided theta values
    y_predict = predict(X, All_Theta)

    # Calculate accuracy
    y_predict=predict(X, All_Theta)
    accuracy=np.mean(y_predict==y)

    print(f"Accuracy: {accuracy * 100:.2f}%\n")

if __name__ == "__main__":
    main()
```

### Build the Dagrs Graph
We use a [yml config file](./config.yml) to define nodes and edges. Each training node (node0, node1, ..., node9) is connected with node 'root'.

```yml
  dagrs:
  node0:
    name: "node_0"
    after: []
    cmd: ""
  node1:
    name: "node_1"
    after: []
    cmd: ""
  node2:
    name: "node_2"
    after: []
    cmd: ""
  node3:
    name: "node_3"
    after: []
    cmd: ""
  node4:
    name: "node_4"
    after: []
    cmd: ""
  node5:
    name: "node_5"
    after: []
    cmd: ""
  node6:
    name: "node_6"
    after: []
    cmd: ""
  node7:
    name: "node_7"
    after: []
    cmd: ""
  node8:
    name: "node_8"
    after: []
    cmd: ""
  node9:
    name: "node_9"
    after: []
    cmd: ""
  root:
    name: "root"
    after: [
      "node0",
      "node1",
      "node2",
      "node3",
      "node4",
      "node5",
      "node6",
      "node7",
      "node8",
      "node9",
    ]
    cmd: ""
```

Training nodes will run a `NodeAction`, which starts the training python script with the argument class $i$ and a path to the training set file, waits for it and sends the script's stdout to root.

```rust
const ENV_DATA_SRC: &str = "data_src";

struct NodeAction {
    class: usize,
}

#[async_trait]
impl Action for NodeAction {
    async fn run(
        &self,
        _: &mut InChannels,
        out_channels: &OutChannels,
        env: Arc<EnvVar>,
    ) -> Output {
        let data_src: &&str = env.get_ref(ENV_DATA_SRC).unwrap();

        let cmd_act = CommandAction::new(
            "python",
            vec![
                "examples/lr_i.py".to_string(),
                format!("{}", data_src),
                format!("{}", self.class),
            ]
        );
        let result = cmd_act.run(&mut InChannels::default(), &OutChannels::default(), env).await;
        match result {
            Output::Out(content) => {
                let content: Arc<(Vec<String>, Vec<String>)> =
                    content.unwrap().into_inner().unwrap();
                let (stdout, _) = (&content.0, &content.1);

                let theta = stdout.get(0).unwrap().clone();
                out_channels.broadcast(Content::new(theta)).await
            }
            Output::Err(e) => panic!("{}", e),
            Output::ErrWithExitCode(code, content) => panic!(
                "Exit with {:?}, {:?}",
                code,
                content.unwrap().get::<(Vec<String>, Vec<String>)>()
            ),
        };
        Output::empty()
    }
}

impl NodeAction {
    fn new(class: usize) -> Self {
        Self { class }
    }
}
```

For the root node, it will execute the logic encapsulated in `RootAction`, which will wait all the training nodes to finish, and send training nodes' output to the root python script.

```rust
struct RootAction;
#[async_trait]
impl Action for RootAction {
    async fn run(&self, in_channels: &mut InChannels, _: &OutChannels, env: Arc<EnvVar>) -> Output {
        let data_src: &&str = env.get_ref(ENV_DATA_SRC).unwrap();
        let mut thetas: Vec<String> = in_channels
            .map(|theta| {
                let theta = theta.unwrap();
                let theta = theta.get::<String>().unwrap();
                theta.clone()
            })
            .await;

        let mut args = vec!["examples/lr_root.py".to_string(), format!("{}", data_src)];

        args.append(&mut thetas);

        let cmd_action = CommandAction::new("python", args);
        cmd_action
            .run(&mut InChannels::default(), &OutChannels::default(), env)
            .await
    }
}
```

Run the cell below and you will get the same result with the [traditional approach](#Traditional-Approach-to-LR-OvA-Solutions). 

In [None]:
!cd .. && cargo run --example sklearn --release

## Summary
This example shows how to use dagrs to parallelize the training of multiple OvA classifiers. The way parameters are passed between nodes can be optimized. You can also replace the python script with Rust code if higher performance is required.