#DATASCI W261, Machine Learning at Scale
--------
####Assignement:  week \#13
####[Lei Yang](mailto:leiyang@berkeley.edu) | [Michael Kennedy](mailto:mkennedy@ischool.berkeley.edu) | [Natarajan Krishnaswami](mailto:natarajan@krishnaswami.org)
####Due: 2016-04-22, 8AM PST

### Start Spark

In [4]:
import os
import sys
spark_home = os.environ['SPARK_HOME'] = '/Users/leiyang/Downloads/spark-1.6.1-bin-hadoop2.6/'
if not spark_home:
    raise ValueError('SPARK_HOME enviroment variable is not set')
sys.path.insert(0,os.path.join(spark_home,'python'))
sys.path.insert(0,os.path.join(spark_home,'python/lib/py4j-0.8.2.1-src.zip'))
execfile(os.path.join(spark_home,'python/pyspark/shell.py'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Python version 2.7.9 (default, Dec 15 2014 10:37:34)
SparkContext available as sc, HiveContext available as sqlContext.


###HW 13.1: Spark implementation of basic PageRank

Write a basic Spark implementation of the iterative PageRank algorithm
that takes sparse adjacency lists as input.
Make sure that your implementation utilizes teleportation (1-damping/the number of nodes in the network), 
and further, distributes the mass of dangling nodes with each iteration
so that the output of each iteration is correctly normalized (sums to 1).
[NOTE: The PageRank algorithm assumes that a random surfer (walker), starting from a random web page,
chooses the next page to which it will move by clicking at random, with probability d,
one of the hyperlinks in the current page. This probability is represented by a so-called
‘damping factor’ d, where d ∈ (0, 1). Otherwise, with probability (1 − d), the surfer
jumps to any web page in the network. If a page is a dangling end, meaning it has no
outgoing hyperlinks, the random surfer selects an arbitrary web page from a uniform
distribution and “teleports” to that page]

In your Spark solution, please use broadcast variables and caching to make sure your code is as efficient as possible.


As you build your code, use the test data

s3://ucb-mids-mls-networks/PageRank-test.txt
Or under the Data Subfolder for HW7 on Dropbox with the same file name. 
(On Dropbox https://www.dropbox.com/sh/2c0k5adwz36lkcw/AAAAKsjQfF9uHfv-X9mCqr9wa?dl=0)

with teleportation parameter set to 0.15 (1-d, where d, the damping factor is set to 0.85), and crosscheck
your work with the true result, displayed in the first image
in the Wikipedia article:

https://en.wikipedia.org/wiki/PageRank

and here for reference are the corresponding PageRank probabilities:

A,0.033
B,0.384
C,0.343
D,0.039
E,0.081
F,0.039
G,0.016
H,0.016
I,0.016
J,0.016
K,0.016

Run this experiment locally first. Report the local configuration that you used and how long in minutes and seconds it takes to complete your job.

Repeat this experiment on AWS. Report the AWS cluster configuration that you used and how long in minutes and seconds it takes to complete your job. (in your notebook, cat the cluster config file)



###PageRank

In [1]:
from time import time
from datetime import datetime

def initialize(line):
    # parse line
    nid, adj = line.strip().split('\t', 1)
    exec 'adj = %s' %adj
    # initialize node struct
    node = {'a':adj.keys(), 'p':0}
    rankMass = 1.0/len(adj)
    # emit pageRank mass and node
    return [(m, rankMass) for m in node['a']] + [(nid.strip('"'), node)]

def accumulateMass(a, b):
    if isinstance(a, float) and isinstance(b, float):
        return a+b
    if isinstance(a, float) and not isinstance(b, float):
        b['p'] += a
        return b
    else:
        a['p'] += b
        return a

def getDangling(node):
    global nDangling
    if isinstance(node[1], float):
        nDangling += 1
        return (node[0], {'a':[], 'p':node[1]})
    else:
        return node

def redistributeMass(node):
    node[1]['p'] = (p_dangling.value+node[1]['p'])*damping + alpha
    return node

def distributeMass(node):
    global lossMass
    mass, adj = node[1]['p'], node[1]['a']
    node[1]['p'] = 0
    if len(adj) == 0:
        lossMass += mass
        return [node]
    else:
        rankMass = mass/len(adj)
        return [(x, rankMass) for x in adj]+[node]

def getIndex(line):
    elem = line.strip().split('\t')
    return (elem[1], elem[0])

def logTime():
    return str(datetime.now())


### Driver

In [4]:
from operator import add

# load the graph
graph_file = sc.textFile('hdfs://localhost:9000/user/leiyang/PageRank-test.txt')
index_file = sc.textFile('file:///Users/leiyang/Downloads/toy_index.txt')

# initialize variables
nDangling = sc.accumulator(0)
lossMass = sc.accumulator(0.0)
damping = 0.85
alpha = 1 - damping
nTop, nIter = 100, 10
start = time()
print '%s: start PageRank initialization ...' %(logTime())
graph = graph_file.flatMap(initialize).reduceByKey(accumulateMass).map(getDangling).cache()
# get graph size
G = graph.count()
# broadcast dangling mass for redistribution
p_dangling = sc.broadcast(1.0*nDangling.value/G)
graph = graph.map(redistributeMass)
print '%s: initialization completed, dangling node(s): %d, total nodes: %d' %(logTime(), nDangling.value, G)

for i in range(nIter-1):
    print '%s: running iteration %d ...' %(logTime(), i+2)
    lossMass.value = 0.0
    graph = graph.flatMap(distributeMass).reduceByKey(accumulateMass).cache() #checkpoint()?
    # need to call an action here in order to have loss mass
    graph.count()
    print '%s: redistributing loss mass: %.4f' %(logTime(), lossMass.value)
    p_dangling = sc.broadcast(lossMass.value/G)
    graph = graph.map(redistributeMass)

totalMass = graph.aggregate(0, (lambda x, y: y[1]['p'] + x), (lambda x, y: x+y))
print '%s: normalized weight of the graph: %.4f' %(logTime(), totalMass/G)
print '%s: PageRanking completed in %.2f minutes.' %(logTime(), (time()-start)/60.0)
# get the page name by join
topPages = graph.map(lambda n:(n[0],n[1]['p']/G)).sortBy(lambda n: n[1], ascending=False).take(nTop)
rankList = index_file.map(getIndex).join(sc.parallelize(topPages)).map(lambda l: l[1])
# save final rank list
rankList.sortBy(lambda n: n[1], ascending=False).saveAsTextFile('pageRank')
print '%s: results saved, job completed!' %logTime()


2016-04-15 22:52:21.595538: start PageRank initialization ...
2016-04-15 22:52:22.789142: initialization completed, dangling node(s): 1, total nodes: 11
2016-04-15 22:52:22.789784: running iteration 2 ...
2016-04-15 22:52:22.893942: redistributing loss mass: 0.6523
2016-04-15 22:52:22.900249: running iteration 3 ...
2016-04-15 22:52:23.016227: redistributing loss mass: 0.4174
2016-04-15 22:52:23.021943: running iteration 4 ...
2016-04-15 22:52:23.103576: redistributing loss mass: 0.7042
2016-04-15 22:52:23.108007: running iteration 5 ...
2016-04-15 22:52:23.193200: redistributing loss mass: 0.4136
2016-04-15 22:52:23.197213: running iteration 6 ...
2016-04-15 22:52:23.274630: redistributing loss mass: 0.4254
2016-04-15 22:52:23.278632: running iteration 7 ...
2016-04-15 22:52:23.352806: redistributing loss mass: 0.3753
2016-04-15 22:52:23.356641: running iteration 8 ...
2016-04-15 22:52:23.430620: redistributing loss mass: 0.3812
2016-04-15 22:52:23.434674: running iteration 9 ...
2016

In [5]:
!cat pageRank/part*

(u"'Node_b", 0.3632359489889102)
(u"'Node_c", 0.36288372803871793)
(u"'Node_e", 0.08114525762548769)
(u"'Node_d", 0.03938466342002967)
(u"'Node_f", 0.03938466342002967)
(u"'Node_a", 0.03293010178620472)
(u"'Node_h", 0.016207127344124005)
(u"'Node_j", 0.016207127344124005)
(u"'Node_g", 0.016207127344124005)
(u"'Node_i", 0.016207127344124005)
(u"'Node_k", 0.016207127344124005)


###HW 13.2: Applying PageRank to the Wikipedia hyperlinks network

Run your Spark PageRank implementation on the Wikipedia dataset for 10 iterations,
and display the top 100 ranked nodes (with alpha = 0.85).

Run your PageRank implementation on the Wikipedia dataset for 50 iterations,
and display the top 100 ranked nodes (with teleportation factor of 0.15). 
Plot the pagerank values for the top 100 pages resulting from the 50 iterations run. Then plot the pagerank values for the same 100 pages that resulted from the 10 iterations run.  Comment on your findings.  Have the top 100 ranked pages changed? Have the pagerank values changed? Explain.

Report the AWS cluster configuration that you used and how long in minutes and seconds it takes to complete your job.

NOTE: ====  English Wikipedia hyperlink network.data ====
The dataset is available via Dropbox at:

https://www.dropbox.com/sh/2c0k5adwz36lkcw/AAAAKsjQfF9uHfv-X9mCqr9wa?dl=0

on S3 at  s3://ucb-mids-mls-networks/wikipedia/
-- s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-out.txt # Graph
-- s3://ucb-mids-mls-networks/wikipedia/indices.txt               # Page titles and page Ids

The dataset is built from the Sept. 2015 XML snapshot of English Wikipedia.
For this directed network, a link between articles: 

A -> B

is defined by the existence of a hyperlink in A pointing to B.
This network also exists in the indexed format:

Data: s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-out.txt
Data: s3://ucb-mids-mls-networks/wikipedia/all-pages-indexed-in.txt
Data: s3://ucb-mids-mls-networks/wikipedia/indices.txt

but has an index with more detailed data:

(article name) \t (index) \t (in degree) \t (out degree)

In the dictionary, target nodes are keys, link weights are values .
Here, a weight indicates the number of time a page links to another.
However, for the sake of this assignment, treat this an unweighted network,
and set all weights to 1 upon data input.


###Submit job to the cluster
- ssh i the master node of EMR cluster
- do data preparation
- submit job to spark

###Cluster configuration: 1 m1.large master node + 9 m1.large task nodes

In [None]:
/usr/bin/spark-submit \
--master yarn \
--deploy-mode client \
--name LeiPageRankToy \
--py-files PageRank.py \
--executor-memory '4600m' \
--executor-cores 2 \
--driver-memory '4600m' \
--num-executors 11 \
PageRankDriver.py > wiki_10_log

###10 Iterations [log](https://raw.githubusercontent.com/leiyang-mids/w261_project/master/HW13/spark_log_10_iterations?token=AL1BFhvNgpQxAyXpSJR1HHtWVHm3pDwJks5XJE0kwA%3D%3D)

In [1]:
!cat pageRank_time_10_iterations

2016-04-22 21:47:50.407410: start PageRank initialization ...
2016-04-22 21:51:14.131401: initialization completed, dangling node(s): 9410987, total nodes: 15192277
2016-04-22 21:51:14.131474: running iteration 2 ...
2016-04-22 21:54:16.556416: redistributing loss mass: 7608969.0130
2016-04-22 21:54:16.564424: running iteration 3 ...
2016-04-22 21:57:10.348773: redistributing loss mass: 7103036.3037
2016-04-22 21:57:10.356219: running iteration 4 ...
2016-04-22 21:59:57.663450: redistributing loss mass: 6940792.1449
2016-04-22 21:59:57.720320: running iteration 5 ...
2016-04-22 22:02:49.159656: redistributing loss mass: 6884560.5231
2016-04-22 22:02:49.166445: running iteration 6 ...
2016-04-22 22:05:39.179975: redistributing loss mass: 6863177.9617
2016-04-22 22:05:39.186458: running iteration 7 ...
2016-04-22 22:08:22.604490: redistributing loss mass: 6854533.8830
2016-04-22 22:08:22.610610: running iteration 8 ...
2016-04-22 22:11:08.030830: redistributing loss mass: 

###50 Iterations [log](https://raw.githubusercontent.com/leiyang-mids/w261_project/master/HW13/pageRank_time_50_iterations?token=AL1BFgR-jEQyDh6rJxB4Ra7r0YRhquhDks5XJE1NwA%3D%3D)

In [2]:
!cat pageRank_time_50_iterations

[hadoop@ip-172-31-9-186 lei]$ cat wiki_50_log
2016-04-22 22:27:22.269262: start PageRank initialization ...
2016-04-22 22:30:40.932047: initialization completed, dangling node(s): 9410987, total nodes: 15192277
2016-04-22 22:30:40.932248: running iteration 2 ...
2016-04-22 22:33:38.183716: redistributing loss mass: 7608969.0130
2016-04-22 22:33:38.191269: running iteration 3 ...
2016-04-22 22:36:25.688220: redistributing loss mass: 7103036.3037
2016-04-22 22:36:25.695027: running iteration 4 ...
2016-04-22 22:39:13.888332: redistributing loss mass: 6940792.1449
2016-04-22 22:39:13.895518: running iteration 5 ...
2016-04-22 22:42:05.660363: redistributing loss mass: 6884560.5231
2016-04-22 22:42:05.667313: running iteration 6 ...
2016-04-22 22:44:48.044150: redistributing loss mass: 6863177.9617
2016-04-22 22:44:48.050751: running iteration 7 ...
2016-04-22 22:47:42.365782: redistributing loss mass: 6854533.8830
2016-04-22 22:47:42.372406: running iteration 8 ...
2016-04

###HW 13.3: Spark GraphX versus your implementation of PageRank

Run the Spark  GraphX PageRank implementation on the Wikipedia dataset for 10 iterations,
and display the top 100 ranked nodes (with alpha = 0.85).

Run your PageRank implementation on the Wikipedia dataset for 50 iterations,
and display the top 100 ranked nodes (with teleportation factor of 0.15). 
Have the top 100 ranked pages changed? Comment on your findings. Plot both 100 curves.

Report the AWS cluster configuration that you used and how long in minutes and seconds it takes to complete this job.

Put the runtime results of HW13.2 and HW13.3 in a tabular format (with rows corresponding to implemention and columns corresponding to experiment setup (10 iterations, 50 iterations)). Discuss the run times and explaing the differences. 

Plot the pagerank values for the top 100 pages resulting from the 50 iterations run (using GraphX). Then plot the pagerank values for the same 100 pages that resulted from the 50 iterations run of your homegrown pagerank implemnentation.  Comment on your findings.  Have the top 100 ranked pages changed? Have the pagerank values changed? Explain.


### Scala code

In [None]:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._


object WikiPageRank {
    
  def main(args: Array[String]) {
    val t0 = System.nanoTime()        
    val conf = new SparkConf().setAppName("WikiPageRank")
    val sc = new SparkContext(conf)
    var nIter = args(0).toInt

    // Create an RDD for the edges and vertices    
    val links = sc.textFile("hdfs:///user/leiyang/all-pages-indexed-out.txt", 80).flatMap(getLinks);
    val pages = sc.textFile("hdfs:///user/leiyang/indices.txt", 16).map(getPages);

    // Build the initial Graph
    val graph = Graph(pages, links);
    // Run pageRank
    val rank = PageRank.run(graph, numIter=nIter).vertices.cache()
    // Normalize the rank score
    val total = rank.map(l=>l._2).sum()
    val tops = rank.sortBy(l=>l._2, ascending=false).take(200).map(l => (l._1, l._2/total))
    val ret = sc.parallelize(tops).join(pages).map(l => (l._2._2._1, l._2._1)).sortBy(l=>l._2, ascending=false).take(200)
    val elapse = (System.nanoTime()-t0)/1000000000.0/60.0
    // Show results
    println("PageRanking finishes in " + elapse + " minutes!")
    println(ret.mkString("\n"))
  }

  def getLinks(line: String): Array[Edge[String]] = {
      val elem = line.split("\t", 2)
      for {n <-  elem(1).stripPrefix("{").split(",")
          // get Edge between id
      }yield Edge(elem(0).toLong, n.split(":")(0).trim().stripPrefix("'").stripSuffix("'").toLong, "")
  }

  def getPages(line: String): (VertexId, (String, String)) = {
      val elem = line.split("\t")
      return (elem(1).toLong, (elem(0), ""))
  }
}


###10 Iterations [log](https://raw.githubusercontent.com/leiyang-mids/w261_project/master/HW13/spark_log_10_iterations_graphX?token=AL1BFpUUjZUxgTLgn3EQJfHN6QF3CNfIks5XJPISwA%3D%3D)

In [3]:
#!/usr/bin/spark-submit \
#--master yarn \
#--deploy-mode client \
#--class WikiPageRank \
#--name "WikiPageRank" \
#--executor-memory '4600m' \
#--num-executors 11 \
#--driver-memory '4600m' \
#target/scala-2.10/pagerank-project_2.10-1.0.jar \
#10 > wiki_10_log_GraphX
#!cat top_200_after_10_iterations_graphX

###HW 13.4: Criteo Phase 2 baseline


SPECIAL NOTE:
Please share your findings as they become available with class via the Google Group. You will get brownie points for this.  Once results are shared please use them and build on them.

The Criteo data for this challenge is located in the following S3/Dropbox buckets:

- On Dropbox see:     https://www.dropbox.com/sh/dnevke9vsk6yj3p/AABoP-Kv2SRxuK8j3TtJsSv5a?dl=0

- Raw Data:  (Training, Validation and Test data)
https://console.aws.amazon.com/s3/home?region=us-west-1#&bucket=criteo-dataset&prefix=rawdata/

- Hashed Data: Training, Validation and Test data in hash encoded (10,000 buckets) and sparse representation
https://console.aws.amazon.com/s3/home?region=us-west-1#&bucket=criteo-dataset&prefix=processeddata/
- source: https://s3-eu-west-1.amazonaws.com/criteo-labs/dac.tar.gz


Using the training dataset, validation dataset and testing dataset in the Criteo bucket perform the following experiment:

- write spark code (borrow from Phase 1 of this project) to train a logistic regression model with the following hyperparamters:

 - Number of buckets for hashing: 1,000
 - Logistic Regression: no regularization term
 - Logistic Regression: step size = 10

- Report the AWS cluster configuration that you used and how long in minutes and seconds it takes to complete this job.

- Report in tabular form the [AUC](https://en.wikipedia.org/wiki/Receiver_operating_characteristic) value for the Training, Validation, and Testing datasets.
- Report in tabular form  the logLossTest for the Training, Validation, and Testing datasets.

- Don't forget to put a caption on your tables (above each table).

###Supporting Functions

In [54]:
%%writefile CriteoHelper.py

from collections import defaultdict
from pyspark.mllib.linalg import SparseVector 
from pyspark.mllib.classification import LogisticRegressionWithSGD
from sklearn import metrics
from datetime import datetime
from math import log, exp
import numpy as np
import hashlib

# hash function
def hashFunction(numBuckets, rawFeats, printMapping=False):
    """Calculate a feature dictionary for an observation's features based on hashing.

    Note:
        Use printMapping=True for debug purposes and to better understand how the hashing works.

    Args:
        numBuckets (int): Number of buckets to use as features.
        rawFeats (list of (int, str)): A list of features for an observation.  Represented as
            (featureID, value) tuples.
        printMapping (bool, optional): If true, the mappings of featureString to index will be
            printed.

    Returns:
        dict of int to float:  The keys will be integers which represent the buckets that the
            features have been hashed to.  The value for a given key will contain the count of the
            (featureID, value) tuples that have hashed to that key.
    """
    mapping = {}
    for ind, category in rawFeats:
        featureString = category + str(ind)
        mapping[featureString] = int(int(hashlib.md5(featureString).hexdigest(), 16) % numBuckets)
    if(printMapping): print mapping
    sparseFeatures = defaultdict(float)
    for bucket in mapping.values():
        sparseFeatures[bucket] += 1.0
    return dict(sparseFeatures)

# feature hash
def parseHashPoint(point, numBuckets):
    """Create a LabeledPoint for this observation using hashing.

    Args:
        point (str): A comma separated string where the first value is the label and the rest are
            features.
        numBuckets: The number of buckets to hash to.

    Returns:
        LabeledPoint: A LabeledPoint with a label (0.0 or 1.0) and a SparseVector of hashed
            features.
    """    
    elem = point.strip().split(',')    
    rawFea = [(i, elem[i+1]) for i in range(len(elem) - 1)]
    index = np.sort(hashFunction(numBuckets, rawFea, False).keys())    
    return LabeledPoint(elem[0], SparseVector(numBuckets, index, [1]*len(index)))   

# Logistic Regression Modeling & Evaluation
def getP(x, w, intercept):
    """Calculate the probability for an observation given a set of weights and intercept.

    Note:
        We'll bound our raw prediction between 20 and -20 for numerical purposes.

    Args:
        x (SparseVector): A vector with values of 1.0 for features that exist in this
            observation and 0.0 otherwise.
        w (DenseVector): A vector of weights (betas) for the model.
        intercept (float): The model's intercept.

    Returns:
        float: A probability between 0 and 1.
    """
    rawPrediction = x.dot(w) + intercept
    # Bound the raw prediction value
    rawPrediction = min(rawPrediction, 20)
    rawPrediction = max(rawPrediction, -20)
    return 1/(1+exp(-rawPrediction))


def computeLogLoss(p, y):
    """Calculates the value of log loss for a given probabilty and label.

    Note:
        log(0) is undefined, so when p is 0 we need to add a small value (epsilon) to it
        and when p is 1 we need to subtract a small value (epsilon) from it.

    Args:
        p (float): A probabilty between 0 and 1.
        y (int): A label.  Takes on the values 0 and 1.

    Returns:
        float: The log loss value.
    """
    epsilon = 10e-12    
    return -log(p+epsilon) if y==1 else -log(1-p+epsilon)

def evaluateResults(lrModel, data):
    """Calculates the log loss for the data given the model.

    Args:
        model (LogisticRegressionModel): A trained logistic regression model.
        data (RDD of LabeledPoint): Labels and features for each observation.

    Returns:
        float: Log loss for the data.
    """    
    return data.map(lambda p: computeLogLoss(getP(p.features, lrModel.weights, lrModel.intercept), p.label)).mean()


# calculate AUC
def getAUC(rddData, lrModel):
    labelsAndScores = rddData.map(lambda lp: (lp.label, getP(lp.features, lrModel.weights, lrModel.intercept)))
    labelsAndWeights = labelsAndScores.collect()
    labelsAndWeights.sort(key=lambda (k, v): v, reverse=True)
    labelsByWeight = np.array([k for (k, v) in labelsAndWeights])

    length = labelsByWeight.size
    truePositives = labelsByWeight.cumsum()
    numPositive = truePositives[-1]
    falsePositives = np.arange(1.0, length + 1, 1.) - truePositives

    truePositiveRate = truePositives / numPositive
    falsePositiveRate = falsePositives / (length - numPositive)
        
    return metrics.auc(falsePositiveRate, truePositiveRate)

def logTime():
    return str(datetime.now())

Overwriting CriteoHelper.py


###Driver

In [53]:
%%writefile Criteo_Dirver_1.py

from time import time
from pyspark import SparkContext

execfile('CriteoHelper.py')

# define parameters
print '%s: start logistic regression job ...' %(logTime())
numBucketsCTR = 1000
lrStep = 10
start = time()
sc = SparkContext()

# data preparaion
print '%s: preparing data ...' %(logTime())
rawTrainData = sc.textFile('hdfs:///user/leiyang/criteo/rawTrain', 2).map(lambda x: x.replace('\t', ','))
rawValidationData = sc.textFile('hdfs:///user/leiyang/criteo/rawValidation', 2).map(lambda x: x.replace('\t', ','))
rawTestData = sc.textFile('hdfs:///user/leiyang/criteo/rawTest', 2).map(lambda x: x.replace('\t', ','))

# data encoding
hashTrainData = rawTrainData.map(lambda p: parseHashPoint(p, numBucketsCTR))
hashTrainData.cache()
hashValidationData = rawValidationData.map(lambda p: parseHashPoint(p, numBucketsCTR))
hashValidationData.cache()
hashTestData = rawTestData.map(lambda p: parseHashPoint(p, numBucketsCTR))
hashTestData.cache()

# build model
print '%s: building logistic regression model ...' %(logTime())
model = LogisticRegressionWithSGD.train(hashTrainData, iterations=500, step=lrStep, regType=None, intercept=True)

# get log loss
print '%s: evaluating log loss ...' %(logTime())
logLossVa = evaluateResults(model, hashValidationData)
logLossTest = evaluateResults(model, hashTestData)
logLossTrain = evaluateResults(model, hashTrainData)

# get AUC
print '%s: evaluating AUC ...' %(logTime())
aucVal = getAUC(hashValidationData, model)
aucTrain = getAUC(hashTrainData, model)
aucTest = getAUC(hashTestData, model)
print '\n%s: job completes in %.2f minutes!' %(logTime(), (time()-start)/60.0)

# show results
print '\n\t\t log loss \t\t\t AUC'
print 'Training:\t %.4f\t\t %.4f' %(logLossTrain, aucTrain)
print 'Validation:\t %.4f\t\t %.4f' %(logLossVa, aucVal)
print 'Test:\t %.4f\t %.4f' %(logLossTest, aucTest)

Overwriting Criteo_Dirver_1.py


###Submit the job to Spark
- cluster: 1 m1.large master node + 9 m1.large task nodes

In [None]:
/usr/bin/spark-submit \
--master yarn \
--deploy-mode client \
--name LeiCriteoJob \
--py-files CriteoHelper.py \
--executor-memory '11000m' \
--executor-cores 2 \
--driver-memory '11000m' \
--num-executors 11 \
Criteo_Dirver_1.py > Criteo_log1

###HW 13.5: Criteo Phase 2 hyperparameter tuning 
SPECIAL NOTE:
Please share your findings as they become available with class via the Google Group. You will get brownie points for this.  Once results are shared please used them and build on them.

NOTE:  please do  HW 13.5 in groups of 3 

Using the training dataset, validation dataset and testing dataset in the Criteo bucket perform the following experiments:

- write spark code (borrow from Phase 1 of this project) to train a logistic regression model with various hyperparamters. Do a gridsearch of the hyperparameter space and determine optimal settings using the validation set.
 - Number of buckets for hashing: 1,000, 10,000, .... explore different values  here
 - Logistic Regression: regularization term: [1e-6, 1e-3]  explore other  values here also
 - Logistic Regression: step size: explore different step sizes. Focus on a stepsize of 1 initially. 

- Report the AWS cluster configuration that you used and how long in minutes and seconds it takes to complete this job.
- Report in tabular form and using heatmaps the AUC values (https://en.wikipedia.org/wiki/Receiver_operating_characteristic) for the Training, Validation, and Testing datasets.
- Report in tabular form and using heatmaps  the logLossTest for the Training, Validation, and Testing datasets.
- Don't forget to put a caption on your tables (above the table) and on your heatmap figures (put caption below figures) detailing the experiment associated with each table or figure (data, algorithm used, parameters and settings explored.

- Discuss the optimal setting to solve this problem  in terms of the following:
 - Features
 - Learning algortihm
 - Spark cluster

Justiy your recommendations based on your experimental results and cross reference with table numbers and figure numbers. Also highlight key results with annotations, both textual and line and box based, on your tables and graphs.



###HW13.6 Heritage Healthcare Prize (OPTIONAL)

The slides for Week 13 Live session contain background information for the HHH competition to predict the number of days a patient will spend in hospital.  Please review the sldies. All the data, RCode, documentation, and slides for HHH problem are located at: 

https://www.dropbox.com/sh/upt0j2q44ncrn1m/AAApdpXNYaEFy8KbMoE90-KSa?dl=0 

In particular have a look at the following R Code:

https://www.dropbox.com/s/jltk9z7jkc1o856/mainDriver.R?dl=0

This code runs and will produce a baseline submission file for HHH. 

Challenge: 

Rewrite this code in Spark (all steps) and produce a submission file. Report your experimental setup and experimental times.

Improve the predictive quality of your system through activities such as:

-- new features
-- feature transformations
-- data sampling/deletion
-- third party data
-- learning algorithms
-- hyperparameter tuning
-- etc.


State your assumptions (Training data, validation data, held out test data). Report your experimental setup and experimental times, and evaluation metrics versus the baseline submission code provided above and discuss.


###Start HDFS

In [8]:
!/usr/local/Cellar/hadoop/2*/sbin/start-yarn.sh
!/usr/local/Cellar/hadoop/2*/sbin/start-dfs.sh
!/usr/local/Cellar/hadoop/2*/sbin/mr-jobhistory-daemon.sh --config /usr/local/Cellar/hadoop/2*/libexec/etc/hadoop/ start historyserver 

starting yarn daemons
resourcemanager running as process 17585. Stop it first.
localhost: nodemanager running as process 17686. Stop it first.
Starting namenodes on [localhost]
localhost: starting namenode, logging to /usr/local/Cellar/hadoop/2.7.1/libexec/logs/hadoop-leiyang-namenode-Leis-MacBook-Pro.local.out
localhost: starting datanode, logging to /usr/local/Cellar/hadoop/2.7.1/libexec/logs/hadoop-leiyang-datanode-Leis-MacBook-Pro.local.out
Starting secondary namenodes [0.0.0.0]
0.0.0.0: starting secondarynamenode, logging to /usr/local/Cellar/hadoop/2.7.1/libexec/logs/hadoop-leiyang-secondarynamenode-Leis-MacBook-Pro.local.out
historyserver running as process 18148. Stop it first.


###Stop HDFS

In [6]:
!/usr/local/Cellar/hadoop/2*/sbin/stop-yarn.sh
!/usr/local/Cellar/hadoop/2*/sbin/stop-dfs.sh
!/usr/local/Cellar/hadoop/2*/sbin/mr-jobhistory-daemon.sh --config /usr/local/Cellar/hadoop/2*/libexec/etc/hadoop/ stop historyserver 
sc.stop()

stopping yarn daemons
no resourcemanager to stop
localhost: no nodemanager to stop
no proxyserver to stop
Stopping namenodes on [localhost]
localhost: no namenode to stop
localhost: no datanode to stop
Stopping secondary namenodes [0.0.0.0]
0.0.0.0: no secondarynamenode to stop
no historyserver to stop
