In [1]:
import numpy as np
import pyspark
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD

def sigmoid(x):
    return 1.0 / (1.0 + np.exp(-x))

def gradient(p, w):
    x = p.label
    y = p.features
    return -sigmoid(-x * np.dot(w, y)) * x * y 

def CustomSGD(train_data, iter, lr):
    
    w = np.zeros(4)
    
    for _ in range(iter):
        grad = train_data.map(lambda p: gradient(p, w)).reduce(lambda x, y: x + y)
        w -= lr * grad / train_data.count()
    
    return w
            
def predict(model, features):
    y = sigmoid(np.dot(model, features))
    return round(y)

def mapper(line):
    """
    Mapper that converts an input line to a feature vector
    """    
    values = [float(x) for x in line.split(',')]
    return LabeledPoint(values[-1], values[:-1])

sc = pyspark.SparkContext()

data = sc.textFile("data_banknote_authentication.txt")
parsedData = data.map(mapper)


In [2]:
model = LogisticRegressionWithSGD.train(parsedData)

labelsAndPreds = parsedData.map(lambda point: (point.label, model.predict(point.features)))

trainErr = labelsAndPreds.filter(lambda p: p[0] != p[1]).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))

Training Error = 0.04446064139941691


In [3]:
model = CustomSGD(parsedData, iter = 1000, lr=1)

labelsAndPreds = parsedData.map(lambda point: (point.label, predict(model, point.features)))

trainErr = labelsAndPreds.filter(lambda p: p[0] != p[1]).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))

Training Error = 0.04518950437317784
