#Machine Learning At Scale: HWK 4

Name: Marjorie Sayer

Email: 3marjorie14@gmail.com

Class Name: MIDS 261 Machine Learning At Scale 
Section: 4 (Tuesday 4:00 - 5:30 PM PST)

Week: 4

Date: 2/11/16

###HW 4.0. 

**What is MrJob? How is it different to Hadoop MapReduce?**
MRJob is a python framework for writing MapReduce jobs. It enables the programmer to: 

- use Hadoop MapReduce either in the native Java framework or via Hadoop Streaming
- easily chain a sequence of Hadoop MapReduce jobs together 
- run jobs locally or on AWS using AWS Elastic Map Reduce (EMR)

**What are the mapper_init, mapper_final(), combiner_final(), reducer_final() methods? When are they called?**

In the MapReduce context, the basic programming units are mappers and reducers. Combiners are optimizers for mappers. The MRJob context is larger: a MapReduce job or sequence of jobs. In this larger context, MRJob supports the setup and teardown of mapper, combiner and reducer functions. In detail: 

- **mapper_init()** is a method that initializes a mapper program in MRJob. If, for example, mappers require access to a stored file, the location of the file can be specified in mapper_init(). 

- **mapper_final()** is a mapper that is applied after mapper_init() and mapper() methods have all completed. It can be used to stream output that has been collected through mapper actions. 

- **combiner_final()** like mapper_final(), combiner_final() is used after a combiner() has received all of its input. The output of the combiner_final() must be the same type of key-value pairs as the output of the mapper. 

- **reducer_final()** is a reducer method that is applied after all input has gone to the reducer. 


###HW 4.1.

**What is serialization in the context of MrJob or Hadoop?**

Serialization is the conversion of output data to bytes for transmission over a network. De-serialization is the conversion of bytes to input data. Both MrJob and Hadoop send data from mappers, optionally to combiners, and then to reducers. In cluster environments, mappers and reducers can be on different machines. In between stages the data is converted to bytes and reconverted for input. 

**When is it (serialization) used in these frameworks?**
Serialization is necessary from mapper output to combiner or reducer input; from combiner to reducer; and from reducer to final output. 

**What is the default serialization mode for input and outputs for MrJob?**

The default serialization mode for input and outputs for MrJob is JSON. 


###HW 4.2.

Recall the Microsoft logfiles data from the async lecture. The logfiles are described are located at:

https://kdd.ics.uci.edu/databases/msweb/msweb.html
http://archive.ics.uci.edu/ml/machine-learning-databases/anonymous/

This dataset records which areas (Vroots) of www.microsoft.com each user visited in a one-week timeframe in Feburary 1998.

 Here, you must preprocess the data on a single node (i.e., not on a cluster of nodes) from the format:

`C,"10001",10001   #Visitor id 10001`

`V,1000,1          #Visit by Visitor 10001 to page id 1000`

`V,1001,1          #Visit by Visitor 10001 to page id 1001`

`V,1002,1          #Visit by Visitor 10001 to page id 1002`

`C,"10002",10002   #Visitor id 10001`

`V`

(Note: #denotes comments) 
to the format:

`V,1000,1,C, 10001`

`V,1001,1,C, 10001`

`V,1002,1,C, 10001`

Write the python code to accomplish this.

In [10]:
%%writefile format_data.py
#!/usr/bin/python
"""This program reformats the Microsoft anonymous visitor data.
In particular, customer IDs and corresponding visited site IDs
are placed on the same line. 
"""

import sys

checkV = False
cstring = ""

for line in sys.stdin:
    
    if line[:2] == "V,":
        line = line.strip()
        recv = line.split(",")
        outstring = "V," + recv[1] + "," + recv[2] + cstring
        print outstring
    elif line[:2] == "C,":
        line = line.strip()
        recc = line.split(",")
        cstring = ",C, " + recc[2] #set new customer ID string


Overwriting format_data.py


In [6]:
!chmod a+x format_data.py

In [11]:
!cat anonymous-msweb.data | python format_data.py > hw42output.txt

In [12]:
!head -n10 hw42output.txt

V,1000,1,C, 10001
V,1001,1,C, 10001
V,1002,1,C, 10001
V,1001,1,C, 10002
V,1003,1,C, 10002
V,1001,1,C, 10003
V,1003,1,C, 10003
V,1004,1,C, 10003
V,1005,1,C, 10004
V,1006,1,C, 10005


###HW 4.3.

Find the 5 most frequently visited pages using MrJob from the output of 4.2 (i.e., transfromed log file).

In [70]:
%%writefile hw43five_top.py
#!/usr/bin/python
"""Find top five most frequently visited Vroots. 
This program will take the output of HW4.2 and output tab-seperated lines of
    Vroot -> number of visits
To run:
    python hw43five_top.py hw42output.txt
To store output:
    python hw43five_top.py hw42output.txt > hw43five_top.out
"""
from mrjob.job import MRJob
from mrjob.step import MRStep
import csv

def csv_readline(line):
    """Given a sting CSV line, return a list of strings."""
    for row in csv.reader([line]):
        return row

class TopPages(MRJob):
    """
    SORT_VALUES = True

    JOBCONF = {'mapred.output.key.comparator.class':\
                          'org.apache.hadoop.mapred.lib.KeyFieldBasedComparator',\
                          'mapred.text.key.comparator.options': '-k1nr -k2'}
    """
    

    def mapper_get_words(self, line_no, line):
        """Extracts the Vroot that was visited"""
        cell = csv_readline(line)
        if cell[0] == 'V':
            yield cell[1],1
                  # What  Key, Value  do we want to output?

    def reducer_count_words(self, vroot, visit_counts):
        """Sumarizes the visit counts by adding them together.  If total visits
        is more than 400, yield the results"""
        total = sum(i for i in visit_counts)
                # How do we calculate the total visits from the visit_counts?
        if total >= 400:
            yield vroot,total #know from Quiz there are > 5 with total >=400
            
    def mapper_routekeys(self, vroot, total):
        """Sends key value pairs from Job 1 to hadoop shuffle for Job 2."""
        yield None, (total, vroot)
            
    def reducer_find_5_max(self, _ , vlist):
        """Outputs the five Vroots with maximum visits."""
        output = sorted(list(vlist), reverse=True)[:5]
        print "Number of Visits\tVroot ID"
        for item in output:
            yield item[0], item[1]
            
                  
    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   reducer=self.reducer_count_words),
            MRStep(mapper=self.mapper_routekeys,\
                   reducer=self.reducer_find_5_max)
        ]

        
if __name__ == '__main__':
    TopPages.run()

Overwriting hw43five_top.py


In [71]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [72]:
from hw43five_top import TopPages
import csv

mr_job = TopPages(args=['./hw42output.txt'])
with mr_job.make_runner() as runner:
    runner.run()
    for line in runner.stream_output():
        print mr_job.parse_output_line(line)



Number of Visits	Vroot ID
(10836, '1008')
(9383, '1034')
(8463, '1004')
(5330, '1018')
(5108, '1017')


###HW 4.4. 

Find the most frequent visitor of each page using MrJob and the output of 4.2  (i.e., transfromed log file). In this output please include the webpage URL, webpageID and Visitor ID.

###HW 4.5. Clustering Tweet Dataset

Using this data, you will implement a 1000-dimensional K-means algorithm in MrJob on the users
by their 1000-dimensional word stripes/vectors using several 
centroid initializations and values of K.

Note that each "point" is a user as represented by 1000 words, and that
word-frequency distributions are generally heavy-tailed power-laws
(often called Zipf distributions), and are very rare in the larger class
of discrete, random distributions. For each user you will have to normalize
by its "TOTAL" column. Try several parameterizations and initializations:

(A) K=4 uniform random centroid-distributions over the 1000 words (generate 1000 random numbers and normalize the vectors)
(B) K=2 perturbation-centroids, randomly perturbed from the aggregated (user-wide) distribution 
(C) K=4 perturbation-centroids, randomly perturbed from the aggregated (user-wide) distribution 
(D) K=4 "trained" centroids, determined by the sums across the classes. Use use the 
(row-normalized) class-level aggregates as 'trained' starting centroids (i.e., the training is already done for you!).

In [None]:
%%writefile Kmeans.py
from numpy import argmin, array, random
from mrjob.job import MRJob
from mrjob.step import MRJobStep
from itertools import chain

#Calculate the nearest centroid for data point 
def MinDist(datapoint, centroid_points):
    datapoint = array(datapoint)
    centroid_points = array(centroid_points)
    diff = datapoint - centroid_points 
    diffsq = diff*diff
    # Get the nearest centroid for each instance
    minidx = argmin(list(diffsq.sum(axis = 1)))
    return minidx

#Check whether centroids converge
def stop_criterion(centroid_points_old, centroid_points_new,T):
    oldvalue = list(chain(*centroid_points_old))
    newvalue = list(chain(*centroid_points_new))
    Diff = [abs(x-y) for x, y in zip(oldvalue, newvalue)]
    Flag = True
    for i in Diff:
        if(i>T):
            Flag = False
            break
    return Flag

class MRKmeans(MRJob):
    centroid_points=[]
    k=3    
    def steps(self):
        return [
            MRJobStep(mapper_init = self.mapper_init, mapper=self.mapper,combiner = self.combiner,reducer=self.reducer)
               ]
    #load centroids info from file
    def mapper_init(self):
        self.centroid_points = [map(float,s.split('\n')[0].split(',')) for s in open("Centroids.txt").readlines()]
        open('Centroids.txt', 'w').close()
    #load data and output the nearest centroid index and data point 
    def mapper(self, _, line):
        D = (map(float,line.split(',')))
        yield int(MinDist(D,self.centroid_points)), (D[0],D[1],1)
    #Combine sum of data points locally
    def combiner(self, idx, inputdata):
        sumx = sumy = num = 0
        for x,y,n in inputdata:
            num = num + n
            sumx = sumx + x
            sumy = sumy + y
        yield idx,(sumx,sumy,num)
    #Aggregate sum for each cluster and then calculate the new centroids
    def reducer(self, idx, inputdata): 
        centroids = []
        num = [0]*self.k 
        for i in range(self.k):
            centroids.append([0,0])
        for x, y, n in inputdata:
            num[idx] = num[idx] + n
            centroids[idx][0] = centroids[idx][0] + x
            centroids[idx][1] = centroids[idx][1] + y
        centroids[idx][0] = centroids[idx][0]/num[idx]
        centroids[idx][1] = centroids[idx][1]/num[idx]
        with open('Centroids.txt', 'a') as f:
            f.writelines(str(centroids[idx][0]) + ',' + str(centroids[idx][1]) + '\n')
        yield idx,(centroids[idx][0],centroids[idx][1])
      
if __name__ == '__main__':
    MRKmeans.run()

###Driver

Generate random initial centroids

New Centroids = initial centroids

While(1)：

Cacluate new centroids

stop if new centroids close to old centroids

Updates centroids

In [None]:
from numpy import random, array
from Kmeans import MRKmeans, stop_criterion
mr_job = MRKmeans(args=['Kmeandata.csv'])

#Generate k initial centroids
def init_rand_centroids(k,n):
    """
    Initialize k centroids of dimension n. 
    For HW4.5, n is 1000. 
    """
    centroid_points = []    
    for i in range(k):
        inputv = [random.uniform() for j in range(n)]
        total = sum(inputv)
        inputv = array(inputv)
        inputv = inputv / total
        centroid_points.append(inputv)
    return centroid_points

def startCentroidsBC(k, n):
    counter = 0
    for line in open("topUsers_Apr-Jul_2014_1000-words_summaries.txt").readlines():
        if counter == 2:        
            data = re.split(",",line)
            globalAggregate = [float(data[i+3])/float(data[2]) for i in range(n)]
        counter += 1
    ## perturb the global aggregate for the four initializations    
    centroids = []
    for i in range(k):
        rndpoints = random.sample(n)
        peturpoints = [rndpoints[n]/10+globalAggregate[n] for n in range(n)]
        centroids.append(peturpoints)
        total = 0
        for j in range(len(centroids[i])):
            total += centroids[i][j]
        for j in range(len(centroids[i])):
            centroids[i][j] = centroids[i][j]/total
    return centroids

def trained_centroids(k, n):
    pass

def initialize_centroids(k, n, flavor):
    """
    Initializes k centroids of dimension n, using the algorithm specified in "flavor". 
    """
    if flavor == "random":
        centroid_points = init_rand_centroids(k,n)
    elif flavor == "perturb":
        centroid_points = startCentroidsBC(k, n)
    elif flavor == "trained":
        centroid_points = trained_centroids(k, n)
        
    with open('Centroids.txt', 'w+') as f:
        f.writelines(','.join(str(j) for j in i) + '\n' for i in centroid_points)
    f.close()
    
# Initialize centroids. 
k = 4 #can be 4 or 2
n = 1000
flavor = "random" #can be "random" "perturb" "trained"
intitalize_centroids(k, n, flavor)

# Update centroids iteratively
i = 0
while(1):
    # save previous centoids to check convergency
    centroid_points_old = centroid_points[:]
    print "iteration"+str(i)+":"
    with mr_job.make_runner() as runner: 
        runner.run()
        # stream_output: get access of the output 
        for line in runner.stream_output():
            key,value =  mr_job.parse_output_line(line)
            print key, value
            centroid_points[key] = value
    print "\n"
    i = i + 1
    if(stop_criterion(centroid_points_old,centroid_points,0.01)):
        break
print "Centroids\n"
print centroid_points #within reducer, intermediate centroids written to file