# MidTerm DATASCI W261: Machine Learning at Scale

**Name:** Megan Jasek  
**Email:** meganjasek@ischool.berkeley.edu  
**Class Name:** W261-2  
**Week Number:** 8, MidTerm Exam  
**Date:** 6/29/16  

### MRJob for KMeans

In [22]:
%%writefile Kmeans.py
from numpy import argmin, array, random
from mrjob.job import MRJob
from mrjob.step import MRJobStep
from itertools import chain

#Calculate find the nearest centroid for data point 
def MinDist(datapoint, centroid_points):
    datapoint = array(datapoint)
    centroid_points = array(centroid_points)
    diff = datapoint - centroid_points 
    diffsq = diff**2
    
    distances = (diffsq.sum(axis = 1))**0.5
    # Get the nearest centroid for each instance
    min_idx = argmin(distances)
    return min_idx

#Check whether centroids converge
def stop_criterion(centroid_points_old, centroid_points_new,T):
    oldvalue = list(chain(*centroid_points_old))
    newvalue = list(chain(*centroid_points_new))
    Diff = [abs(x-y) for x, y in zip(oldvalue, newvalue)]
    Flag = True
    for i in Diff:
        if(i>T):
            Flag = False
            break
    return Flag


class MRKmeans(MRJob):
    centroid_points=[]
    k=3    
    def steps(self):
        return [
            MRJobStep(mapper_init = self.mapper_init, mapper=self.mapper,combiner = self.combiner,reducer=self.reducer)
               ]
    #load centroids info from file
    def mapper_init(self):
        self.centroid_points = [map(float,s.split('\n')[0].split(',')) for s in open("Centroids.txt").readlines()]
        open('Centroids.txt', 'w').close()
    #load data and output the nearest centroid index and data point 
    def mapper(self, _, line):
        D = (map(float,line.split(',')))
        idx = MinDist(D,self.centroid_points)
        yield int(idx), (D[0],D[1],1)
    #Combine sum of data points locally
    def combiner(self, idx, inputdata):
        sumx = sumy = num = 0
        for x,y,n in inputdata:
            num = num + n
            sumx = sumx + x
            sumy = sumy + y
        yield int(idx),(sumx,sumy,num)
    #Aggregate sum for each cluster and then calculate the new centroids
    def reducer(self, idx, inputdata): 
        centroids = []
        num = [0]*self.k 
        distances = 0
        for i in range(self.k):
            centroids.append([0,0])
        for x, y, n in inputdata:
            num[idx] = num[idx] + n
            centroids[idx][0] = centroids[idx][0] + x
            centroids[idx][1] = centroids[idx][1] + y
        centroids[idx][0] = centroids[idx][0]/num[idx]
        centroids[idx][1] = centroids[idx][1]/num[idx]
        with open('Centroids.txt', 'a') as f:
            f.writelines(str(centroids[idx][0]) + ',' + str(centroids[idx][1]) + '\n')
        yield idx,(centroids[idx][0],centroids[idx][1])
        
if __name__ == '__main__':
    MRKmeans.run()

Overwriting Kmeans.py


**Driver:**  
Generate random initial centroids  

New Centroids = initial centroids  

While(1):  

* Cacluate new centroids
* stop if new centroids close to old centroids
* Updates centroids

In [None]:
%reload_ext autoreload
%autoreload 2

from numpy import random, array
from Kmeans import MRKmeans, stop_criterion
mr_job = MRKmeans(args=['Kmeandata.csv '])

#Geneate initial centroids
centroid_points = [[0,0],[6,3],[3,6]]
k = 3
with open('Centroids.txt', 'w+') as f:
        f.writelines(','.join(str(j) for j in i) + '\n' for i in centroid_points)

# Update centroids iteratively
for i in range(10):
    # save previous centoids to check convergency
    centroid_points_old = centroid_points[:]
    print "iteration"+str(i+1)+":"
    with mr_job.make_runner() as runner: 
        runner.run()
        # stream_output: get access of the output 
        for line in runner.stream_output():
            key,value =  mr_job.parse_output_line(line)
            print key, value
            centroid_points[key] = value
    print "\n"
    i = i + 1
print "Centroids\n"
print centroid_points

In [4]:
%%writefile customers.dat
1|Alice Bob|31|CA
2|Sam Sneed|51|NV
3|Jon Sneed|37|CA
4|Arnold Wesise|17|NY
5|Henry Bob|25|NV
6|Yo Yo Ma|37|NY
7|Jon York|41|WA
8|Alex Ball|26|WA
9|Jim Davis|19|CA

Overwriting customers.dat


In [5]:
%%writefile orders.dat
1|Apple
3|Garlic
2|Milk
1|Iphone
4|Ipad
5|Book
7|Potato
8|Tomato
9|Orange
5|shoes

Overwriting orders.dat


##MrJob class for ReducerSideInnerJoin

In [7]:
%%writefile reducersideinnerjoin.py
from mrjob.job import MRJob
from mrjob.step import MRStep
 
class innerjoin(MRJob):
    def mapper(self, _, line):
        x = line.split("|")
        if len(x) == 4:
            yield x[0], ("lefttable", x[1], x[2], x[3])
        else:
            yield x[0], ("righttable", x[1])

    def reducer(self, key, values):
        customers = list()
        orders = list()
        for val in values:
            if val[0] == u'lefttable':
                customers.append(val)
            else:
                orders.append(val)
        for o in orders:
            for c in customers:
                yield None, [key] + c[1:] + o[1:]
    
    def steps(self):
        return [MRStep(mapper=self.mapper,
                       reducer=self.reducer)] 

if __name__ == '__main__':
    innerjoin.run()

Overwriting reducersideinnerjoin.py


In [9]:
!python reducersideinnerjoin.py 'customers.dat' 'orders.dat'

No configs found; falling back on auto-configuration
Creating temp directory /tmp/reducersideinnerjoin.hadoop.20160629.220526.650317
Running step 1 of 1...
Streaming final output from /tmp/reducersideinnerjoin.hadoop.20160629.220526.650317/output...
null	["7", "Jon York", "41", "WA", "Potato"]
null	["8", "Alex Ball", "26", "WA", "Tomato"]
null	["9", "Jim Davis", "19", "CA", "Orange"]
null	["1", "Alice Bob", "31", "CA", "Apple"]
null	["1", "Alice Bob", "31", "CA", "Iphone"]
null	["2", "Sam Sneed", "51", "NV", "Milk"]
null	["3", "Jon Sneed", "37", "CA", "Garlic"]
null	["4", "Arnold Wesise", "17", "NY", "Ipad"]
null	["5", "Henry Bob", "25", "NV", "Book"]
null	["5", "Henry Bob", "25", "NV", "shoes"]
Removing temp directory /tmp/reducersideinnerjoin.hadoop.20160629.220526.650317...


##Run the code through python driver

####  Reminder: You cannot use the programmatic runner functionality in the same file as your job class. That is because the file with the job class is sent to Hadoop to be run. Therefore, the job file cannot attempt to start the Hadoop job, or you would be recursively creating Hadoop jobs!

Use make_runner() to run an MRJob
1. seperate driver from mapreduce jobs
2. now we can run it within pythonnode book 
3. In python, typically one class is in each file. Each mrjob job is a seperate class, should be in a seperate file

In [8]:
%reload_ext autoreload
%autoreload 2

from reducersideinnerjoin import innerjoin
mr_job = innerjoin(args=['customers.dat','orders.dat'])
with mr_job.make_runner() as runner: 
    runner.run()
    count = 0
    # stream_output: get access of the output 
    for line in runner.stream_output():
        key,value =  mr_job.parse_output_line(line)
        print value
        count = count + 1
print "\n"
print "There are %s records" %count

[u'7', u'Jon York', u'41', u'WA', u'Potato']
[u'8', u'Alex Ball', u'26', u'WA', u'Tomato']
[u'9', u'Jim Davis', u'19', u'CA', u'Orange']
[u'1', u'Alice Bob', u'31', u'CA', u'Apple']
[u'1', u'Alice Bob', u'31', u'CA', u'Iphone']
[u'2', u'Sam Sneed', u'51', u'NV', u'Milk']
[u'3', u'Jon Sneed', u'37', u'CA', u'Garlic']
[u'4', u'Arnold Wesise', u'17', u'NY', u'Ipad']
[u'5', u'Henry Bob', u'25', u'NV', u'Book']
[u'5', u'Henry Bob', u'25', u'NV', u'shoes']


There are 10 records


In [19]:
%%writefile reducersideleftjoin.py
from mrjob.job import MRJob
from mrjob.step import MRStep
 
class leftjoin(MRJob):
    def mapper(self, _, line):
        x = line.split("|")
        if len(x) == 4:
            yield x[0], ("lefttable", x[1], x[2], x[3])
        else:
            yield x[0], ("righttable", x[1])

    def reducer(self, key, values):
        customers = list()
        orders = list()
        for val in values:
            if val[0] == u'lefttable':
                customers.append(val)
            else:
                orders.append(val)
        for c in customers:
            for o in orders:
                yield None, [key] + c[1:] + o[1:]
            if orders == []:
                yield None, [key] + c[1:] + [None]
    
    def steps(self):
        return [MRStep(mapper=self.mapper,
                       reducer=self.reducer)] 

if __name__ == '__main__':
    leftjoin.run()

Overwriting reducersideleftjoin.py


In [20]:
!python reducersideleftjoin.py 'customers.dat' 'orders.dat'

No configs found; falling back on auto-configuration
Creating temp directory /tmp/reducersideleftjoin.hadoop.20160629.221207.205060
Running step 1 of 1...
Streaming final output from /tmp/reducersideleftjoin.hadoop.20160629.221207.205060/output...
null	["6", "Yo Yo Ma", "37", "NY", null]
null	["7", "Jon York", "41", "WA", "Potato"]
null	["8", "Alex Ball", "26", "WA", "Tomato"]
null	["9", "Jim Davis", "19", "CA", "Orange"]
null	["1", "Alice Bob", "31", "CA", "Apple"]
null	["1", "Alice Bob", "31", "CA", "Iphone"]
null	["2", "Sam Sneed", "51", "NV", "Milk"]
null	["3", "Jon Sneed", "37", "CA", "Garlic"]
null	["4", "Arnold Wesise", "17", "NY", "Ipad"]
null	["5", "Henry Bob", "25", "NV", "Book"]
null	["5", "Henry Bob", "25", "NV", "shoes"]
Removing temp directory /tmp/reducersideleftjoin.hadoop.20160629.221207.205060...


In [19]:
%%writefile reducersideleftjoin.py
from mrjob.job import MRJob
from mrjob.step import MRStep
 
class leftjoin(MRJob):
    def mapper(self, _, line):
        x = line.split("|")
        if len(x) == 4:
            yield x[0], ("lefttable", x[1], x[2], x[3])
        else:
            yield x[0], ("righttable", x[1])

    def reducer(self, key, values):
        customers = list()
        orders = list()
        for val in values:
            if val[0] == u'lefttable':
                customers.append(val)
            else:
                orders.append(val)
        for c in customers:
            for o in orders:
                yield None, [key] + c[1:] + o[1:]
            if orders == []:
                yield None, [key] + c[1:] + [None]
    
    def steps(self):
        return [MRStep(mapper=self.mapper,
                       reducer=self.reducer)] 

if __name__ == '__main__':
    leftjoin.run()

Overwriting reducersideleftjoin.py


In [20]:
!python reducersideleftjoin.py 'customers.dat' 'orders.dat'

No configs found; falling back on auto-configuration
Creating temp directory /tmp/reducersideleftjoin.hadoop.20160629.221207.205060
Running step 1 of 1...
Streaming final output from /tmp/reducersideleftjoin.hadoop.20160629.221207.205060/output...
null	["6", "Yo Yo Ma", "37", "NY", null]
null	["7", "Jon York", "41", "WA", "Potato"]
null	["8", "Alex Ball", "26", "WA", "Tomato"]
null	["9", "Jim Davis", "19", "CA", "Orange"]
null	["1", "Alice Bob", "31", "CA", "Apple"]
null	["1", "Alice Bob", "31", "CA", "Iphone"]
null	["2", "Sam Sneed", "51", "NV", "Milk"]
null	["3", "Jon Sneed", "37", "CA", "Garlic"]
null	["4", "Arnold Wesise", "17", "NY", "Ipad"]
null	["5", "Henry Bob", "25", "NV", "Book"]
null	["5", "Henry Bob", "25", "NV", "shoes"]
Removing temp directory /tmp/reducersideleftjoin.hadoop.20160629.221207.205060...
