## GD and SGD

#### Dependencies

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import multiprocessing as mp
import pandas as pd
from sklearn.preprocessing import LabelEncoder, StandardScaler

In [None]:
!pip3 install -U ucimlrepo 

#### Load data

In [None]:
from ucimlrepo import fetch_ucirepo 
  
# fetch dataset 
ionosphere = fetch_ucirepo(id=52) 
# data (as pandas dataframes)
X = ionosphere.data.features 
y = ionosphere.data.targets 

In [None]:
samples = X.to_numpy()
labels = np.array([0 if i=='g' else 1 for i in y.to_numpy()])

#### Define GD ans SGD functions

In [None]:
def logistic_loss(samples, labels, theta, lamb=0):
    ''' Logistic regression loss '''
    res = 0
    # loss function 
    for x_n, y_n in zip(samples, labels):
        res += np.log(1 + np.exp(-y_n * (x_n.T @ theta))) + (lamb/2) * np.linalg.norm(theta)**2

    return res

def grad_logistic(samples, labels, theta, lamb=0):
    ''' Gradient of logistic regression loss '''
    n_samp, n_feat = samples.shape
    acc = 0
    # for every sample
    for x_n, y_n in zip(samples, labels):
        e_term1 = 1/(1 + np.exp(y_n * (x_n.T @ theta)))
        tmp = np.zeros(n_feat)
        # compute partial derivatives
        for idx, f in enumerate(x_n):
            tmp[idx] = (e_term1 * -(y_n * f)) + lamb * theta[idx]
        acc += tmp
        
    return acc/n_samp

def sg_logistic(samples, labels, theta, rng, lamb=0, batch_size=1,):
    ''' Stochastic gradient descent. Uses vanilla gradient descent with randomly chosen subset of samples. '''
    n_samp, n_feat = samples.shape
    # randomly choose samples according to batch_size
    s = rng.integers(low=0, high=n_samp, size=batch_size)
    sample = samples[s]
    label = labels[s]
    grad = grad_logistic(samples=sample, labels=label, theta=theta, lamb=lamb)

    return grad

#### Training

In [None]:
%%time
n_samp, n_feat = samples.shape
rng = np.random.default_rng(42)
theta = rng.uniform(high=1, low=-1, size=n_feat)
lamb = 0.01
lr = 0.5
le = []
for e in range(5000): # cant parallelize iter
    loss_e = logistic_loss(samples=samples, labels=labels, theta=theta, lamb=lamb)
    le.append(loss_e)
    grad = sg_logistic(samples=samples, labels=labels, theta=theta, lamb=lamb, rng=rng, batch_size=50)
    # grad = grad_logistic(samples=samples, labels=labels, theta=theta, lamb=lamb)
    theta = theta - lr * grad


## Distributed GD

#### Load data

In [None]:
from ucimlrepo import fetch_ucirepo 
  
# fetch dataset 
adult = fetch_ucirepo(id=2) 
  
# data (as pandas dataframes) 
X = adult.data.features 
y = adult.data.targets 


#### Preprocess data

In [None]:
# fill missing values with 'nan' and encode string labels
std_scale = StandardScaler()
samples = X.fillna('nan').apply(LabelEncoder().fit_transform).to_numpy()
samples = std_scale.fit_transform(samples)
samples

In [None]:
labels = np.array([0 if i=='<=50K' else 1 for i in y['income']]) # x <= 50K -> 0, x > 50K -> 1
labels

In [None]:
samples.shape

In [None]:
labels.shape

#### Define worker GD and SGD

In [None]:
def worker_gd(my_id, samples, labels, lamb, seed, channel, batch_size): # channel isolated communication channel between worker and main process 
                                                # channel[1]=downlink channel[0]=uplink
    ''' Worker function to compute GD and SGD '''
    terminate = False
    while True:
        # block before getting data from server
        while channel[1].empty(): # wait until received model parameters
            continue
        data = channel[1].get()
        terminate = True if data is None else False
        if terminate:
            break
        
        theta = data
        # compute gradient
        n_samp, n_feat = samples.shape
        # randomly select samples for SGD. Note is batch_size = total number of samples is do vanilla GD
        if batch_size != n_samp:
            rng = np.random.default_rng(seed)
            s = rng.integers(low=0, high=n_samp, size=batch_size)
            samples = samples[s]
            labels = labels[s]
        # compute gradient
        acc_grad = 0
        for x_n, y_n in zip(samples, labels):
            e_term1 = 1/(1 + np.exp(y_n * (x_n.T @ theta)))
            tmp = np.zeros(n_feat)
            for idx, f in enumerate(x_n):
                tmp[idx] = (e_term1 * -(y_n * f)) + lamb * theta[idx]
            acc_grad += tmp
            
        grad_m = acc_grad
        upload = (my_id, grad_m)
        # send results back to server
        channel[0].put(upload)

### <span style="color:red">WARNING</span>.: If training terminated by user, worker processes are still running in the background. In this case, manually kill process using command line terminal. 

In [None]:
%%time
# initialize variables
n_samp, n_feat = samples.shape
seed = 42
rng = np.random.default_rng(seed)
theta = rng.uniform(high=1, low=-1, size=n_feat)
lamb = 0.01
lr = 0.01
M = 4

# create processes and distribute. process will be waiting until data appears in downlink
split_samples = np.array_split(samples, M, axis=0)
split_labels = np.array_split(labels, M, axis=0)
processes = []
comm_channels = []
for i in range(M):
    channel = [mp.Queue(), mp.Queue()]
    batch_size = split_samples[i].shape[0] # uncomment for vanilla GD
    # batch_size = 20 # uncomment for SGD
    process = mp.Process(target=worker_gd, args=(i, split_samples[i], split_labels[i], lamb, seed, channel, batch_size))
    processes.append(process)
    comm_channels.append(channel)
    process.start()

# run
for e in range(200): 
    loss_e = logistic_loss(samples=samples, labels=labels, theta=theta, lamb=lamb)
    print(loss_e)
    # send model to workers throught comm_channels
    for id in range(M): # channel[0] = uplink  channel[1] = downlink
        donwlink = comm_channels[id][1]
        donwlink.put(theta)

    # wait until all workers has sent their gradients
    num_workers_ready = 0
    w_id = 0
    acc_grad = 0
    while num_workers_ready < M: # check if all worker have sent gradient
        uplink = comm_channels[w_id][0]
        if not uplink.empty():
            g = uplink.get() # -> (id, gradient)
            acc_grad += g[1]
            num_workers_ready = num_workers_ready + 1

        w_id = (w_id + 1) % M
    # gradient descent
    grad = acc_grad/n_samp
    theta = theta - lr * grad

# terminate workers by sending None
for id in range(M): # channel[0] = uplink  channel[1] = downlink
        donwlink = comm_channels[id][1]
        donwlink.put(None)
        processes[id].join()
        processes[id].terminate()
        # print(f'Is {processes[id]} alive? {processes[id].is_alive()}') # check if process still alive      
