## ***Vasilii Mosin***

# Logistic Regression with SGD on Spark

In this homework the task is to implement Logistic Regression with SGD on Spark (only plain Spark, without MLLib and so on) and test it on MNIST dataset. Also, code of the model must be encapsulated into a class (e.g. sklearn-like style).

In [1]:
import numpy as np
from tqdm import tqdm
from sklearn.metrics import accuracy_score

Downloading MNIST..

In [97]:
%%sh

wget -q -nc https://raw.githubusercontent.com/amitgroup/amitgroup/master/amitgroup/io/mnist.py

In [2]:
import mnist

In [99]:
%%sh

mkdir -p mnist && {
    cd mnist;
    wget -q -nc http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz &&
    wget -q -nc http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz &&
    wget -q -nc http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz &&
    wget -q -nc http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz &&
    gunzip *.gz
}

Preparing data..

In [3]:
X, y = mnist.load_mnist(dataset='training', path='mnist/')
X = X.reshape(-1, 1, 28, 28)

X_test, y_test = mnist.load_mnist(dataset='testing', path='mnist/')
X_test = X_test.reshape(-1, 1, 28, 28)

Creating train and test RDDs..

In [4]:
X_train_rdd = sc.parallelize((i, y[i], X[i].ravel().copy()) for i in xrange(X.shape[0])).persist()
X_test_rdd = sc.parallelize((i, y_test[i], X_test[i].ravel().copy()) for i in xrange(X_test.shape[0])).persist()

Supporting functions..

In [5]:
def sigmoid(x):
    return 1/(1+np.exp(-x))
    
def softmax(x):
    x_max = np.max(x)
    exps = np.exp(x - x_max)
    return exps / np.sum(exps)
    
def labeling(y):
    res = np.zeros(10)
    res[y] = 1
    return res

Logistic Regression class..

In [21]:
class LogisticRegression(object):
    def __init__(self):
        np.random.seed(26L)
        self.W = np.random.uniform(-1, 1, size=(28 * 28, 10))
    
    def predict(self, X):
        return X.map(lambda (i, y, x): np.argmax(softmax(x.dot(self.W))))
    
    def fit(self, X, iterations=100, learning_rate=1, batch_size=0.8, early_stopping_rate=0.85):
        for k in tqdm(range(iterations)):
            X_batch = X.sample(0,batch_size)
            grad = X_batch.map(lambda (i, y, x): np.outer(x, sigmoid(x.dot(self.W))-labeling(y)))\
                        .reduce(lambda a,b: a+b)
            self.W -= learning_rate*grad
            if (early_stopping_rate):
                y_pred_train = X_batch.map(lambda (i, y, x): np.argmax(softmax(x.dot(self.W)))).collect()
                y_true_train = X_batch.map(lambda (i, y, x): y).collect()
                accuracy_train = accuracy_score(y_true_train, y_pred_train)
                if (accuracy_train > early_stopping_rate):
                    print "Early stopping... Train accuracy achieved."
                    return

In [22]:
# creating a model
lr = LogisticRegression()

In [23]:
# fitting the model
lr.fit(X_train_rdd)

 37%|███▋      | 37/100 [16:24<27:42, 26.38s/it]

Early stopping... Train accuracy achieved.





In [24]:
# making predictions
y_pred = lr.predict(X_test_rdd).collect()

In [25]:
# calculating test accuracy
print 'Accuracy on the test: '+str(accuracy_score(y_test, y_pred))

Accuracy on the test: 0.878
