#DATASCI W261: Machine Learning at Scale

##Version 1: One MapReduce Stage (join data at the first reducer)

# Create Matrices

Matrix A data start with 0
Matrix B data start with 1
$$ \textbf{A} = \left( \begin{array}{ccc}
5 & 0  \\
3 & 8  \\
0 & 6 \end{array} \right) $$
$$ \textbf{B} = \left( \begin{array}{ccc}
6 & 3  \\
2 & 0 \end{array} \right) $$

In [1]:
# Format: A/B, rowIndex, columnIndex1, Value1, columnIndex2, Value2,...
!echo 0, 0, 0, 5 > Matrics.txt
!echo 0, 1, 0, 3, 1, 8 >> Matrics.txt
!echo 0, 2, 1, 6 >> Matrics.txt
!echo 1, 0, 0, 6, 1, 3 >> Matrics.txt
!echo 1, 1, 0, 2 >> Matrics.txt

# MrJob class code

In [6]:
%%writefile MatrixMultiplication.py
#Version 1: One MapReduce Stage (join data at the first reducer)
from mrjob.job import MRJob
from mrjob.compat import jobconf_from_env
class MRMatrixAB(MRJob):
    #Emit all the data need to caculate cell i,j in result matrix
    def mapper(self, _, line):
        v = line.split(',')
        n = (len(v)-2)/2 #number of Non-zero columns for this each
        i = int(jobconf_from_env("row.num.A")) # we need to know how many rows of A
        j = int(jobconf_from_env("col.num.B")) # we need to know how many columns of B
        
        if v[0]=='0':
            for p in range(n):
                for q in range(j):
                    yield (int(v[1]),q), (int(v[p*2+2]),float(v[p*2+3]))
            
        elif v[0]=='1':
            for p in range(n):
                for q in range(i):
                    yield (q,int(v[p*2+2])), (int(v[1]),float(v[p*2+3]))
                
    # Sum up the product for cell i,j
    def reducer(self, key, values):
        idx_dict = {}
        s = 0.0
        preidx = -1
        preval = 0
        for idx, value in values:
            if str(idx) in idx_dict:
                s = s + value * idx_dict[str(idx)]
            else:
                idx_dict[str(idx)] = value
        yield key,s

if __name__ == '__main__':
    MRMatrixAB.run()

Overwriting MatrixMultiplication.py


In [7]:
!python MatrixMultiplication.py Matrics.txt --jobconf row.num.A=3 --jobconf col.num.B=2

Using configs in /etc/mrjob.conf
Creating temp directory /var/folders/2f/rb8qqgd55bl77zgchyxsfl7h0000gp/T/MatrixMultiplication.koza.20170215.211413.372373
Running step 1 of 1...
Streaming final output from /var/folders/2f/rb8qqgd55bl77zgchyxsfl7h0000gp/T/MatrixMultiplication.koza.20170215.211413.372373/output...
[0, 0]	30.0
[0, 1]	15.0
[1, 0]	34.0
[1, 1]	9.0
[2, 0]	12.0
[2, 1]	0.0
Removing temp directory /var/folders/2f/rb8qqgd55bl77zgchyxsfl7h0000gp/T/MatrixMultiplication.koza.20170215.211413.372373...


# Driver:

In [8]:
from numpy import empty
from MatrixMultiplication import MRMatrixAB
mr_job = MRMatrixAB(args=['Matrics.txt','--jobconf', 'row.num.A=3','--jobconf', 'col.num.B=2'])

C =[]
CC = empty([3,2])

# Calculate A*B
print "Matrix C = A * B:"
with mr_job.make_runner() as runner: 
    runner.run()
    for line in runner.stream_output():
        key,value =  mr_job.parse_output_line(line)
        C.append((key,value))
        CC[key[0],key[1]] = value
        print key, value
print " "
print "Matrix C" 
print CC

Matrix C = A * B:
[0, 0] 30.0
[0, 1] 15.0
[1, 0] 34.0
[1, 1] 9.0
[2, 0] 12.0
[2, 1] 0.0
 
Matrix C
[[ 30.  15.]
 [ 34.   9.]
 [ 12.   0.]]
