Set up environment variables and create a spark context:

In [1]:
import os

In [2]:
os.environ["JAVA_HOME"] = '/usr/lib/jvm/java-1.8.0-openjdk-amd64/'
os.environ["PYSPARK_SUBMIT_ARGS"] = 'pyspark-shell'

In [3]:
os.environ["SPARK_HOME"] = '/home/mdldml/spark-2.2.0-bin-hadoop2.7/'

In [4]:
import sys

sys.path.append(os.environ['SPARK_HOME']+"/python")

In [5]:
sys.path.append(os.environ['SPARK_HOME']+"/python/lib/py4j-0.10.4-src.zip")

In [6]:
import py4j
from pyspark import SparkContext, SparkConf

In [7]:
conf = (SparkConf().setMaster("local[4]")
        .setAppName("HT3")
        .set("spark.executor.memory", "1g"))

In [8]:
sc = SparkContext(conf=conf)

### (0.5) implement TF-IDF feature extractor, test it on Amazon reviews dataset:

In [48]:
from operator import add
from math import log

Tokenization of document:

In [77]:
def tokenize(doc):
    result = doc.lower()
    for ch in '.,?!:;(){}[]-"''':
        result = result.replace(ch, ' ')
    return result.split()[1:]

Function which computes TF-IDF using Spark. Takes RDD[unicode], returns RDD[(documentId, tokenId, score)] and two mappings (correspondence between docs and their ids, similar for tokens, both also RDD):

In [78]:
def compute_tf_idf(docs):
    totalDocs = docs.count()
    docIds = docs.zipWithIndex()
    
    tokens = docs.flatMap(tokenize).distinct()
    tokenIds = tokens.zipWithIndex()

    tokensWithDocIds = docIds.flatMap(lambda (doc, docId): [(token, docId) for token in tokenize(doc)])
    idPairs = tokensWithDocIds.join(tokenIds).map(lambda (token, ids): (ids, 1))
    occurences = idPairs.reduceByKey(add)

    tokensByDoc = occurences.map(lambda ((docId, tokenId), occs): (docId, occs)).reduceByKey(add)
    tf = occurences.map(lambda ((docId, tokenId), occs): (docId, (tokenId, occs))) \
                    .join(tokensByDoc) \
                    .map(lambda (docId, ((tokenId, occs), totalOccs)): ((docId, tokenId), 1.0 * occs / totalOccs))

    docsWithToken = occurences.map(lambda ((docId, tokenId), occs): (tokenId, 1)).reduceByKey(add)
    idf = occurences.map(lambda ((docId, tokenId), occs): (tokenId, (docId, occs))) \
                    .join(docsWithToken) \
                    .map(lambda (tokenId, ((docId, occs), totalOccs)): ((docId, tokenId), log(totalDocs / totalOccs))) \
                    .filter(lambda ((docId, tokenId), idf): idf > 0.0)

    tf_idf = tf.join(idf).map(lambda ((docId, tokenId), (tf, idf)): ((docId, tokenId), tf * idf))
    
    return tf_idf, docIds, tokenIds

Now let's compute top-20 TF-IDF sums for each class:

In [79]:
def top_tf_idf_by_class(docs):
    flip = lambda (k, v) : (v, k)
    
    for docClass in ['__label__1', '__label__2']:
        docSet = docs.filter(lambda doc: doc.startswith(docClass))
        tf_idf, docIds, tokenIds = compute_tf_idf(docSet)
        scoreSums = tf_idf.map(lambda ((docId, tokenId), score): (tokenId, score)).reduceByKey(add)
        tokenSums = tokenIds.map(flip).join(scoreSums).map(lambda (tokenId, (token, score)): (token, score))
        topScores = tokenSums.top(20, key=lambda (token, score): score)
        
        print(docClass + ': ')
        print('\n'.join(['%s: %f' % (token, score) for (token, score) in topScores]))
        print('-' * 10)

In [80]:
docs = sc.textFile("train.ft.txt")
top_tf_idf_by_class(docs)

__label__1: 
book: 16330.895751
was: 12885.357014
my: 11994.103282
that: 11469.250495
movie: 10358.086593
but: 10236.770890
are: 10091.592524
very: 9900.274922
as: 9841.842132
all: 9669.699670
be: 9586.675211
no: 9558.854168
they: 9450.397920
one: 9433.204380
like: 9342.514748
good: 9305.327600
just: 9234.535986
on: 9077.007486
would: 8903.926978
you: 8892.498017
----------
__label__2: 
great: 16505.712062
book: 15774.071349
was: 14761.892442
good: 14002.563714
very: 12586.753536
as: 11526.680868
movie: 11079.791894
are: 10867.738195
love: 10538.755586
all: 10482.726956
you: 10323.048546
that: 10303.384281
read: 10234.146347
they: 9955.414737
so: 9858.152240
my: 9847.553804
not: 9761.534446
one: 9694.977640
be: 9575.281311
with: 9386.017724
----------


### (0.5) implement and train logistic regression on MNIST:

Helper function to load dataset:

In [9]:
import sys
import os
import time
import numpy as np

def load_dataset():
    # We first define a download function, supporting both Python 2 and 3.
    if sys.version_info[0] == 2:
        from urllib import urlretrieve
    else:
        from urllib.request import urlretrieve

    def download(filename, source='http://yann.lecun.com/exdb/mnist/'):
        print("Downloading %s" % filename)
        urlretrieve(source + filename, filename)

    # We then define functions for loading MNIST images and labels.
    # For convenience, they also download the requested files if needed.
    import gzip

    def load_mnist_images(filename):
        if not os.path.exists(filename):
            download(filename)
        # Read the inputs in Yann LeCun's binary format.
        with gzip.open(filename, 'rb') as f:
            data = np.frombuffer(f.read(), np.uint8, offset=16)
        # The inputs are vectors now, we reshape them to monochrome 2D images,
        # following the shape convention: (examples, channels, rows, columns)
        data = data.reshape(-1, 1, 28, 28)
        # The inputs come as bytes, we convert them to float32 in range [0,1].
        # (Actually to range [0, 255/256], for compatibility to the version
        # provided at http://deeplearning.net/data/mnist/mnist.pkl.gz.)
        return (data / np.float32(256)).squeeze()

    def load_mnist_labels(filename):
        if not os.path.exists(filename):
            download(filename)
        # Read the labels in Yann LeCun's binary format.
        with gzip.open(filename, 'rb') as f:
            data = np.frombuffer(f.read(), np.uint8, offset=8)
        # The labels are vectors of integers now, that's exactly what we want.
        return data

    # We can now download and read the training and test set images and labels.
    X_train = load_mnist_images('train-images-idx3-ubyte.gz')
    y_train = load_mnist_labels('train-labels-idx1-ubyte.gz')
    X_test = load_mnist_images('t10k-images-idx3-ubyte.gz')
    y_test = load_mnist_labels('t10k-labels-idx1-ubyte.gz')

    # We reserve the last 10000 training examples for validation.
    #X_train, X_val = X_train[:-10000], X_train[-10000:]
    #y_train, y_val = y_train[:-10000], y_train[-10000:]

    # We just return all the arrays in order, as expected in main().
    # (It doesn't matter how we do this as long as we can read them again.)
    return X_train, y_train, X_test, y_test

In [10]:
X_train, y_train, X_test, y_test = load_dataset()

Reshape as [batch_size x total_features]:

In [11]:
X_train, X_test = X_train.reshape(len(X_train), -1), X_test.reshape(len(X_test), -1)

Some helper functions:

In [12]:
def sigmoid(x):
    return 1.0 / (1.0 + np.exp(-x))

def softmax(x):
    logits = np.exp(x - np.max(x))  # for numeric stability
    return logits / np.sum(logits, axis=0) if logits.ndim == 1 else logits / np.array([np.sum(logits, axis=1)]).T  # 2d version was used for draft non-spark version

Logistic regression train function. Unlike in previous exercise, it takes regular numpy dataset and encapsulates work with Spark: 

In [13]:
def logistic_regression_train(X, y, learning_rate=1.0, epochs=100):
    n, d = X.shape
    m = 1 + np.max(y)
    
    y_wide = np.zeros((n, m))  # 1/0 answer for every class for convinience
    y_wide[np.arange(n), y] = 1.0
    
    W = np.random.random((d, m)) - 0.5
    b = np.zeros(m)
    
    data = sc.parallelize(zip(list(X), list(y), list(y_wide)))  # work with Spark begins here
    
    for i in range(epochs):
        logits = data.map(lambda (X_i, y_i, y_wide_i): softmax(X_i.dot(W) + b))
        preds = logits.map(lambda logits_i: np.argmax(logits_i))
        
        dataWithLogits = data.zip(logits)
        loss = dataWithLogits.map(lambda ((X_i, y_i, y_wide_i), logits_i): -np.log(logits_i[y_i])).mean()
        accuracy = 100.0 * preds.zip(data).map(lambda (preds_i, (X_i, y_i, y_wide_i)): 1.0 if preds_i == y_i else 0.0).mean()
        
        W -= learning_rate * dataWithLogits.map(lambda ((X_i, y_i, y_wide_i), logits_i): np.outer(X_i, logits_i - y_wide_i)).mean()
        b -= learning_rate * dataWithLogits.map(lambda ((X_i, y_i, y_wide_i), logits_i): logits_i - y_wide_i).mean()
        
        print('Epoch %d:' % i)
        print('\tloss:\t\t%f\n\taccuracy:\t%.4f%%' % (loss, accuracy))
    
    return W, b

In [14]:
W, b = logistic_regression_train(X_train, y_train, epochs=100)

Epoch 0:
	loss:		4.341922
	accuracy:	8.6783%
Epoch 1:
	loss:		3.582653
	accuracy:	22.2800%
Epoch 2:
	loss:		2.934465
	accuracy:	24.4550%
Epoch 3:
	loss:		2.511403
	accuracy:	39.2667%
Epoch 4:
	loss:		2.014235
	accuracy:	47.3533%
Epoch 5:
	loss:		1.794536
	accuracy:	51.4800%
Epoch 6:
	loss:		1.370442
	accuracy:	60.9750%
Epoch 7:
	loss:		1.059472
	accuracy:	66.2833%
Epoch 8:
	loss:		1.003032
	accuracy:	67.7467%
Epoch 9:
	loss:		0.966827
	accuracy:	69.0867%
Epoch 10:
	loss:		0.998471
	accuracy:	69.0750%
Epoch 11:
	loss:		0.913067
	accuracy:	71.0683%
Epoch 12:
	loss:		0.901040
	accuracy:	70.8233%
Epoch 13:
	loss:		0.874549
	accuracy:	72.1900%
Epoch 14:
	loss:		0.865013
	accuracy:	72.1583%
Epoch 15:
	loss:		0.806786
	accuracy:	74.3150%
Epoch 16:
	loss:		0.788362
	accuracy:	74.5850%
Epoch 17:
	loss:		0.735690
	accuracy:	76.6150%
Epoch 18:
	loss:		0.710541
	accuracy:	77.2067%
Epoch 19:
	loss:		0.674186
	accuracy:	78.7233%
Epoch 20:
	loss:		0.650285
	accuracy:	79.4500%
Epoch 21:
	loss:		0.6264

Let's also check test dataset accuracy:

In [16]:
def logistic_regression_predict(X, W, b):
    data = sc.parallelize(X)
    logits = data.map(lambda X_i: softmax(X_i.dot(W) + b))
    return logits.map(lambda logits_i: np.argmax(logits_i)).collect()

In [19]:
def logistic_regression_check(X, y, W, b):
    preds = sc.parallelize(logistic_regression_predict(X, W, b))
    y_p = sc.parallelize(y)
    
    accuracy = 100.0 * preds.zip(y_p).map(lambda (preds_i, y_i): 1.0 if preds_i == y_i else 0.0).mean()

    print('accuracy:\t%.4f%%' % accuracy)

In [20]:
logistic_regression_check(X_test, y_test, W, b)

accuracy:	88.9700%
