# DATASCI W261: Machine Learning at Scale
## Assignment Week 4
Jackson Lane (jelane@berkeley.edu) <br>
W261-3 <br>
# === INSTRUCTIONS for SUBMISSIONS ===
Follow the instructions for submissions carefully.

Submit your homework via the following form by  8:00AM of the following Tuesday (West Coast Time):

https://docs.google.com/forms/d/1ZOr9RnIe_A06AcZDB6K1mJN4vrLeSmS2PD6Xm3eOiis/viewform?usp=send_form 

# === Week 4 ASSIGNMENTS ===

# HW 4.0. 
#### What is MrJob? How is it different to Hadoop MapReduce? 

MRJob is an abstraction that gives an abstraction of the MapReduce programming framework.  It was developed by Yelp but is now available for free to the public.  

The main difference is that MRJob is an abstraction while Hadoop MapReduce is an actual implementation of the MapReduce programming paradigm.  You can run MRJob against multiple different MapReduce backends (Amazon EMR, Hadoop MapReduce, and Google Cloud Dataproc), whereas jobs written in Hadoop MapReduce can only run on Hadoop MapReduce.  Although MRJob does come with its own simple MapReduce implementation, the MRJob production use case is not to run jobs locally against this backend, but rather to run against a separate MapReduce implementation backend.  

MrJob also provides a simpler way to write MapReduce jobs in Python versus someting like Hadoop Streaming.  Whereas in Hadoop Streaming you would need to write each mapper, combiner, and reducer as its own class file with a separate invocation for each MR step, in MRJob you can write everything in a single Python class file. MRJob also takes care of splitting up the key and value fields in the MapReduce outputs, whereas in Hadoop Streaming you would need to split up the key and value fields manually.  

The drawback is that MRJob does not have as robust capabilities as Hadoop MapReduce.  In general, abstractions do not cover every functionality of their backend.


#### What are the mapper_init, mapper_final(), combiner_final(), reducer_final() methods? When are they called?

The mapper_init (and the reduce_init) functions are all functions called once per node at the begining of the Map (and Reduce) stage in the MR Job. They are used to initialize counter variables or load other data sets for use in the main mapper method.    

The mapper_final, combiner_final, and reducer_final methods are called after the mapper, combiner, and reducer phases, respectively.  They are used to tear down variables and clean up.


# HW 4.1
#### What is serialization in the context of MrJob or Hadoop? 

Serialization is how MRJob converts data to and from byes between each of the phases in a MapReduce job.

#### When it used in these frameworks? 

It's used to convert the input text into key value pairs for the mappers, to write mapper output to disk, to convert bytes received from the shuffle toto keys and values for reducer input, and to write key value pairs back to text for output.

#### What is the default serialization mode for input and outputs for MrJob? 

The default serialization mode for input is RawValueProtocol.
The default serialization mode for both internal communication and outputs is JSONProtocol.

# HW 4.2: Recall the Microsoft logfiles data from the async lecture. The logfiles are described are located at:

https://kdd.ics.uci.edu/databases/msweb/msweb.html
http://archive.ics.uci.edu/ml/machine-learning-databases/anonymous/

This dataset records which areas (Vroots) of www.microsoft.com each user visited in a one-week timeframe in Feburary 1998.

 Here, you must preprocess the data on a single node (i.e., not on a cluster of nodes) from the format:

- C,"10001",10001   #Visitor id 10001
- V,1000,1          #Visit by Visitor 10001 to page id 1000
- V,1001,1          #Visit by Visitor 10001 to page id 1001
- V,1002,1          #Visit by Visitor 10001 to page id 1002
- C,"10002",10002   #Visitor id 10001
- V
- Note: #denotes comments

to the format:

- V,1000,1,C, 10001
- V,1001,1,C, 10001
- V,1002,1,C, 10001

Write the python code to accomplish this.

In [446]:
#!/usr/bin/python
import sys,re

processed = open('Processed-anonymous-msweb.data', 'w')
urls = open('urls.data', 'w')

customer= "00000"
with open ("anonymous-msweb.data", "r") as myfile:
    for line in myfile.readlines():
        line = line.strip()
        #Keep track of the most recent customer ID
        if(line[0]=='C'):
            customer = line[line.rfind(",")::]
            continue
        #Output visit information joined with customer ID
        if(line[0]=='V'):
            processed.write(line+',C'+customer+'\n')
            continue
        if(line[0]=='A'):
            urls.write(line+"\n")
            
processed.close()
urls.close()


# HW 4.3: Find the 5 most frequently visited pages using MrJob from the output of 4.2 (i.e., transfromed log file).

In [447]:
%%writefile MRJob4_3.py
#!/usr/bin/python
# MRJob4_3.py
# Author: Jackson Lane
# Description: MRJob code to find the 5 most frequently visited pages from the output of 4.2
# Has two MR steps:
# The first step aggregates the number of visits to each site
# The second step get the top 5 sites with the most visits

from mrjob.job import MRJob, MRStep
import csv
import sys


class MRJob4_3(MRJob):
    
    def mapper_visitcount(self, _, line):
        # Passes on the page id and the visit count to the reducer 
        line = line.strip().split(",")
        yield line[1], int(line[2])

    def reducer_visitcount(self, pageID, counts):
        # sums up the visit counts for each page
        yield pageID, sum(counts)

        
    def reducer_top5_init(self):
        # We want to maintain a counter to only print out the top 5 sites
        self.currentrank = 5

    def reducer_top5(self, pageID, visit_counts):
        # Decrement counter and yield current site and visits
        if self.currentrank > 0:
            self.currentrank -= 1
            yield pageID, sum(visit_counts)
    
    def steps(self):
        return [MRStep(mapper=self.mapper_visitcount,reducer=self.reducer_visitcount),
                MRStep(reducer_init=self.reducer_top5_init,reducer=self.reducer_top5,
                jobconf={
                    "stream.num.map.output.key.fields":"2",
                    "mapreduce.job.output.key.comparator.class":
                        "org.apache.hadoop.mapred.lib.KeyFieldBasedComparator",
                    "mapreduce.partition.keycomparator.options":"-k2,2nr"
                          })]

    
if __name__ == '__main__':
    MRJob4_3.run()

Overwriting MRJob4_3.py


In [448]:
%reload_ext autoreload
%autoreload 2
from MRJob4_3 import MRJob4_3

mr_job = MRJob4_3(args=['Processed-anonymous-msweb.data','-r', 'hadoop', '--strict-protocols'])
with mr_job.make_runner() as runner:
    runner.run()
    print ("PageID\tCount")

    for line in runner.stream_output():
        line = mr_job.parse_output_line(line)
        line=[str(i) for i in line]
        print '\t'.join(line)

PageID	Count
1008	10836
1034	9383
1004	8463
1018	5330
1017	5108


# HW 4.4: Find the most frequent visitor of each page using MrJob and the output of 4.2  (i.e., transfromed log file). In this output please include the webpage URL, webpageID and Visitor ID.

In [449]:
%%writefile MRJob4_4.py
#!/usr/bin/python
# MRJob4_4.py
# Author: Jackson Lane
# Description: MRJob code to find each site's most frequent visitor.
# Has two MR steps:
# The first step aggregates the number of visits to each site by each customer
# The second step gets the top visitor for each site


from mrjob.job import MRJob, MRStep
import mrjob
import csv
import sys


class MRJob4_4(MRJob):
    
    def mapper_visitcount(self, _, line):
        # Passes on the page id and the visit count to the reducer 
        line = line.strip().split(",")
        yield line[1]+","+line[4],line[2]

    def reducer_visitcount(self, keys, counts):
        # sums up the visit counts for each page
        pageID,visitorID = keys.split(",")
        yield pageID, str(visitorID) +"\t"+str(sum([int(i) for i in counts]))

    # The reducer init here creates a dictionary with each site's URL
    # It gets this data from the "urls.data" file sent to each reducer in the MRJob config
    def reducer_top_init(self):
        with open("urls.data") as urls:
            self.urls = {}
            for line in urls.readlines():
                fields = line.split(",")
                self.urls[fields[1]] = "www.microsoft.com" + fields[4].strip().strip('"') 

    def reducer_top(self, pageID, visitor):
            yield self.urls[pageID]+"\t"+pageID, visitor.next()
    
    def steps(self):
        return [MRStep(mapper=self.mapper_visitcount,reducer=self.reducer_visitcount),
                MRStep(reducer_init=self.reducer_top_init,reducer=self.reducer_top,
                jobconf={
                    "stream.num.map.output.key.fields":"3",
                    "mapreduce.job.output.key.comparator.class":
                        "org.apache.hadoop.mapred.lib.KeyFieldBasedComparator",
                    "mapreduce.partition.keycomparator.options":"-k3,3nr"
                          })]

    
if __name__ == '__main__':
    MRJob4_4.run()

Overwriting MRJob4_4.py


Driver Class

In [452]:
%reload_ext autoreload
%autoreload 2

from MRJob4_4 import MRJob4_4

mr_job = MRJob4_4(args=['Processed-anonymous-msweb.data','r','hadoop', '--file',"urls.data",'--strict-protocols'])
with mr_job.make_runner() as runner:
    runner.run()
    print 'URL\t\t\t\tPageID\tVisitorID\tVisits'
    for line in runner.stream_output():
        line =  mr_job.parse_output_line(line)
        print '\t'.join(line)

URL				PageID	VisitorID	Visits
www.microsoft.com/sitebuilder	1026	10016	1
www.microsoft.com/intdev	1027	10017	1
www.microsoft.com/oledev	1028	10017	1
www.microsoft.com/clipgallerylive	1029	10019	1
www.microsoft.com/ntserver	1030	10019	1
www.microsoft.com/msoffice	1031	10019	1
www.microsoft.com/games	1032	10019	1
www.microsoft.com/logostore	1033	10019	1
www.microsoft.com/ie	1034	10020	1
www.microsoft.com/windowssupport	1035	10021	1
www.microsoft.com/organizations	1036	10021	1
www.microsoft.com/windows95	1037	10021	1
www.microsoft.com/sbnmember	1038	10021	1
www.microsoft.com/isp	1039	10021	1
www.microsoft.com/office	1040	10021	1
www.microsoft.com/workshop	1041	10021	1
www.microsoft.com/vstudio	1042	10021	1
www.microsoft.com/smallbiz	1043	10021	1
www.microsoft.com/mediadev	1044	10024	1
www.microsoft.com/netmeeting	1045	10025	1
www.microsoft.com/iesupport	1046	10027	1
www.microsoft.com/publisher	1048	10030	1
www.microsoft.com/supportnet	1049	10031	1
www.microsoft.com/macoffice	1050	10032	1

# HW 4.5 Clustering Tweet Dataset

Here you will use a different dataset consisting of word-frequency distributions 
for 1,000 Twitter users. These Twitter users use language in very different ways,
and were classified by hand according to the criteria:

0: Human, where only basic human-human communication is observed.

1: Cyborg, where language is primarily borrowed from other sources
(e.g., jobs listings, classifieds postings, advertisements, etc...).

2: Robot, where language is formulaically derived from unrelated sources
(e.g., weather/seismology, police/fire event logs, etc...).

3: Spammer, where language is replicated to high multiplicity
(e.g., celebrity obsessions, personal promotion, etc... )

Using this data, you will implement a 1000-dimensional K-means algorithm in MrJob on the users
by their 1000-dimensional word stripes/vectors using several 
centroid initializations and values of K.

Note that each "point" is a user as represented by 1000 words, and that
word-frequency distributions are generally heavy-tailed power-laws
(often called Zipf distributions), and are very rare in the larger class
of discrete, random distributions. For each user you will have to normalize
by its "TOTAL" column. Try several parameterizations and initializations:

(A) K=4 uniform random centroid-distributions over the 1000 words (generate 1000 random numbers and normalize the vectors)
(B) K=2 perturbation-centroids, randomly perturbed from the aggregated (user-wide) distribution 
(C) K=4 perturbation-centroids, randomly perturbed from the aggregated (user-wide) distribution 
(D) K=4 "trained" centroids, determined by the sums across the classes. Use use the 
(row-normalized) class-level aggregates as 'trained' starting centroids (i.e., the training is already done for you!).
Note that you do not have to compute the aggregated distribution or the 
class-aggregated distributions, which are rows in the auxiliary file:

For experiments A, B, C and D and iterate until a threshold (try 0.001) is reached.
After convergence, print out a summary of the classes present in each cluster.
In particular, report the composition as measured by the total
portion of each class type (0-3) contained in each cluster,
and discuss your findings and any differences in outcomes across parts A-D.

In [440]:
%%writefile Kmeans.py
# Kmeans code takan from Week 5 Readings
# Modified to accommodate 1000 fields instead of 4
# And to take variable k
# And to drop first 3 fields from input lines

from numpy import argmin, array, random
from mrjob.job import MRJob, MRStep
from itertools import chain

#Calculate find the nearest centroid for data point 
def MinDist(datapoint, centroid_points):
    datapoint = array(datapoint)
    centroid_points = array(centroid_points)
    diff = datapoint - centroid_points 
    diffsq = diff*diff
    # Get the nearest centroid for each instance
    minidx = argmin(list(diffsq.sum(axis = 1)))
    return minidx

#Check whether centroids converge
def stop_criterion(centroid_points_old, centroid_points_new,T):
    oldvalue = list(chain(*centroid_points_old))
    newvalue = list(chain(*centroid_points_new))
    Diff = [abs(x-y) for x, y in zip(oldvalue, newvalue)]
    Flag = True
    for i in Diff:
        if(i>T):
            Flag = False
            break
    return Flag

class MRKmeans(MRJob):
    centroid_points=[]
    k=4    
    def steps(self):
        return [
            MRStep(mapper_init = self.mapper_init, mapper=self.mapper,combiner = self.combiner,reducer=self.reducer)
               ]
    def mapper_init(self):
        #load centroids info from file
        self.centroid_points = [map(float,s.split('\n')[0].split(',')) for s in open("Centroids.txt").readlines()]
        self.k = len(self.centroid_points)
    #Mapper to assign each tweet to the closest centroid
    def mapper(self, _, line):
        line =line.split(",")
        D = map(float,line[3:])
        yield int(MinDist(D,self.centroid_points)), [D,1]
    #Combine sum of data points locally
    def combiner(self, idx, inputdata):
        sums = [0 for i in range(1000)]
        num = 0
        for D,n in inputdata:
            num = num + n
            sums = [sums[i] + int(D[i]) for i in range(1000)]
        yield idx,(sums,num)
    #Aggregate sum for each cluster and then calculate the new centroids
    def reducer(self, idx, inputdata): 
        centroids = [[float(0)]*1000] * self.k
        num = [0]*self.k 
        for D, n in inputdata:
            num[idx] = num[idx] + n
            centroids[idx] = [centroids[idx][i] + D[i] for i in range(1000)]
        for i in range(1000):
            centroids[idx][i] = centroids[idx][i]/num[idx]
        yield idx,centroids[idx]
      
if __name__ == '__main__':
    MRKmeans.run()

Overwriting Kmeans.py


In [433]:
# Functions to generate the different types of centroid point

# The below was taken from the homework instructions.  
# I changed the name of the function to "perturbed" for clarity

###################################################################################
##Geneate random initial centroids around the global aggregate
##Part (B) and (C) of this question
###################################################################################
def perturbed(k):
    counter = 0
    for line in open("topUsers_Apr-Jul_2014_1000-words_summaries.txt").readlines():
        if counter == 2:        
            data = re.split(",",line)
            globalAggregate = [float(data[i+3])/float(data[2]) for i in range(1000)]
        counter += 1
    #perturb the global aggregate for the four initializations    
    centroids = []
    for i in range(k):
        rndpoints = random.sample(1000)
        peturpoints = [rndpoints[n]/10+globalAggregate[n] for n in range(1000)]
        centroids.append(peturpoints)
        total = 0    # We need to find the data-wide centroid first
    # Add random noise to this data-wide centroid to get the second centroi
        for j in range(len(centroids[i])):
            total += centroids[i][j]
        for j in range(len(centroids[i])):
            centroids[i][j] = centroids[i][j]/total
    return centroids

# Function to generate uniform centroids.
# Pick k random tweets from the corpus to serve as our inital centroids
def uniform(k):
        data=[]
        with open('topUsers_Apr-Jul_2014_1000-words.txt', 'r') as myfile:
            for line in myfile.readlines():
                line = line.strip()
                line = line.split(",")
                # Collect the word count fields into an array
                data.append(line[3:])
        #We now randomly select k tweets to be our centroids
        return [map(int,data[random.randint(0,1000)]) for i in range(k)]

# Function to get the trained centroids from the 1000 word summaries file
def trained(k):
    with open('topUsers_Apr-Jul_2014_1000-words_summaries.txt','r') as myfile:
        centroids = []
        #Skip first two lines (column names and totals)
        myfile.readline()
        myfile.readline()
        
        #The remaining four lines in the summaries text file correspond to
        #the word counts for each of the four types of users.
        for line in myfile.readlines():
            line = line.strip()
            line = line.split(",")
            # Get the per tweet average for each of the word counts
            point = [int(i)/float(line[2]) for i in line[3:]]
            centroids.append(point)
        # Return only the first k of the centroids 
        return centroids[:k]


In [434]:
#Functions to report the goodness of fit and purity of the generated k-means clusters

#Reuse the MinDist function from the KMeans MRJob
def MinDist(datapoint, centroid_points):
    datapoint = array(datapoint)
    centroid_points = array(centroid_points)
    diff = datapoint - centroid_points 
    diffsq = diff*diff
    # Get the nearest centroid for each instance
    minidx = argmin(list(diffsq.sum(axis = 1)))
    return minidx


def reportPurity(centroids,k):
    # Confusion matrix and summary variables
    confuse_matrix = [[0 for x in range(k)] for y in range(4)]
    predicted_counts = [0 for x in range(k)]
    actual_counts = [0 for x in range(4)]
    # Assign each tweet to the closest cluster, check if cluster matches actual code
    with open('topUsers_Apr-Jul_2014_1000-words.txt') as myfile:
        for line in myfile.readlines():
            line = line.strip().split(",")
            # As before, ignore the first three fields and only consider the actual word count fields
            tweet = [float(i) for i in line[3:]]
            # Get the closest cluster
            predicted = int(MinDist(tweet, centroids))
            # Get which code the tweet actually belongs to
            actual = int(line[1])
            # Update the confusion matrix and totals
            confuse_matrix[actual][predicted] += 1
            predicted_counts[predicted] += 1
            actual_counts[actual] += 1
    
    # With confusion matrix and prediction counts, calculate purity
    purity = sum([max([confuse_matrix[i][j] for i in range(4)])  for j in range(k)])/float(sum(predicted_counts))

            
    # Print out the findings  
    labels = ["Human","Cyborg","Robot","Spammer"]
    print "\tCluster ID"
    print "Actual\t|\t" +'\t'.join(map(str,range(k))) + "\t|Total"
    print "---------------------------------------------------------------"
    for i in range(4):
        print labels[i] + '\t|\t'+ '\t'.join(map(str,confuse_matrix[i])) + "\t|" + str(actual_counts[i])
    print "---------------------------------------------------------------"
    print "Total\t|\t" + "\t".join(map(str,predicted_counts))

    print 
    print "Purity Score:",purity

Driver class

In [443]:
# Since we'll need to run this 4 times, I've turned the driver code from the Week 5 Kmeans code into its own reusable function.  
from numpy import random
from Kmeans import MRKmeans, stop_criterion
mr_job = MRKmeans(args=['topUsers_Apr-Jul_2014_1000-words.txt', '-r', 'hadoop','--file', 'Centroids.txt', '--file','topUsers_Apr-Jul_2014_1000-words.txt'])



def run_kmeans(centroid_type, k):
    
    
    # First we have to generate the centroid points.  
    centroid_points = eval(centroid_type+"("+str(k)+")")
    num = []
                        
    with open('Centroids.txt', 'w+') as myfile:
            myfile.writelines(','.join(str(j) for j in i) + '\n' for i in centroid_points)
            
    # Update centroids iteratively    
    i = 0
    while(1):
        # save previous centoids to check convergency
        centroid_points_old = centroid_points[:]
        with mr_job.make_runner() as runner: 
            runner.run()
            # stream_output: get access of the output 
            for line in runner.stream_output():
                key,value =  mr_job.parse_output_line(line)
                centroid_points[key] = value
        i = i + 1
        if(stop_criterion(centroid_points_old,centroid_points,0.01)):
            break
        with open('Centroids.txt', 'w+') as myfile:
            myfile.writelines(','.join(str(j) for j in i) + '\n' for i in centroid_points)
    print "Convergence after", i, "iterations"
    
    #Report the purity and summary
    reportPurity(centroid_points,k)


#### 4.5 A

In [444]:
run_kmeans("uniform", 4)

Convergence after 17 iterations
	Cluster ID
Actual	|	0	1	2	3	|Total
---------------------------------------------------------------
Human	|	0	0	0	752	|752
Cyborg	|	26	12	4	49	|91
Robot	|	3	10	4	37	|54
Spammer	|	0	0	0	103	|103
---------------------------------------------------------------
Total	|	29	22	8	941

Purity Score: 0.794


#### 4.5 B

In [445]:
run_kmeans("perturbed", 2)

Convergence after 11 iterations
	Cluster ID
Actual	|	0	1	|Total
---------------------------------------------------------------
Human	|	752	0	|752
Cyborg	|	76	15	|91
Robot	|	45	9	|54
Spammer	|	103	0	|103
---------------------------------------------------------------
Total	|	976	24

Purity Score: 0.767


#### 4.5 C

In [438]:
run_kmeans("perturbed", 4)

Convergence after 24 iterations
	Cluster ID
Actual	|	0	1	2	3	|Total
---------------------------------------------------------------
Human	|	0	0	752	0	|752
Cyborg	|	4	0	50	37	|91
Robot	|	4	4	44	2	|54
Spammer	|	0	0	103	0	|103
---------------------------------------------------------------
Total	|	8	4	949	39

Purity Score: 0.797


#### 4.5 D

In [439]:
run_kmeans("trained", 4)

Convergence after 19 iterations
	Cluster ID
Actual	|	0	1	2	3	|Total
---------------------------------------------------------------
Human	|	752	0	0	0	|752
Cyborg	|	31	13	0	47	|91
Robot	|	40	3	4	7	|54
Spammer	|	103	0	0	0	|103
---------------------------------------------------------------
Total	|	926	16	4	54

Purity Score: 0.816


#### Discussion

The trained centroids has the best Purity Score, which makes sense as the trained centroids were derived formulaically from the "training" data itself, the corpus of tweets.  But this means that the scores from trained centroids will probably not be generalizable if we apply these clusters to a different cluster, as it's likely the trained centroids are an overfitting of the training data.  

The other three types of centroids were all generated with some randomness and thus have lower purity scores than the trained centroids, but their scores will likely be more generalizable than those of the trained centroids to different corpuses.  The centroids generated with k=2 in particular has a lower purity because it's trying to put 4 different types of data into just two clusters, so 100% purity is mathematically impossible.