載入SparkContext和SparkConf

In [5]:
from pyspark import SparkContext
from pyspark import SparkConf

In [6]:
conf = SparkConf().setAppName('appName').setMaster('local')

In [7]:
sc = SparkContext(conf=conf)

先建立M(i,j)的mapper1, 將index j取出來當成key;
再建立N(j,k)的mapper1, 將index j取出來當成key

In [8]:
def mapper1(line):
    wordlist = line.split(",")
    maplist = []
    key = wordlist.pop(2)
    maplist.append((int(key),wordlist))
    return maplist    

In [9]:
def mapper2(line):
    wordlist = line.split(",")
    maplist = []
    key = wordlist.pop(1)
    maplist.append((int(key),wordlist))
    return maplist    

定義reducer, 將相同key的value list蒐集起來

In [10]:
def reducer1(x,y):
    return x+y

讀取檔案"5ooinput.txt"

In [20]:
data = sc.textFile('2input.txt')

先建立兩個空的list M,N,這兩個list之後會用來蒐集M矩陣和N矩陣的元素

In [21]:
M = []
N = []

將所有資料傳換成list的資料型態, 再用if-else把MN兩個矩陣的元素分開

In [22]:
d = data.collect()

In [23]:
for i in range(len(d)):
    if 'M' in d[i]:
        M.append(d[i])
    else :
        N.append(d[i])

分好之後將M,N各自由list轉成RDD

In [25]:
M

['M,0,0,10',
 'M,0,1,0',
 'M,0,2,20',
 'M,1,0,0',
 'M,1,1,30',
 'M,1,2,0',
 'M,2,0,40',
 'M,2,1,0',
 'M,2,2,50']

In [26]:
rddm = sc.parallelize(M)
rddn = sc.parallelize(N)

屬於M矩陣的元素藉由mapper1轉換成(j,[M,i,m])的形式, 把j拿出來當key; 同理, 將N矩陣的元素轉換成(j,[N,k,n])的形式。

In [27]:
rddm = rddm.flatMap(mapper1)
rddn = rddn.flatMap(mapper2)

為了後續要把對應的矩陣元素相乘，所以在執行reduce後轉換回list的資料型態。

In [28]:
m = rddm.reduceByKey(reducer1).collect()    
n = rddn.reduceByKey(reducer1).collect()  

In [29]:
m

[(0, ['M', '0', '10', 'M', '1', '0', 'M', '2', '40']),
 (1, ['M', '0', '0', 'M', '1', '30', 'M', '2', '0']),
 (2, ['M', '0', '20', 'M', '1', '0', 'M', '2', '50'])]

接下來解釋如何將(j,(M,i,m))和(j,(N,k,n))轉換成((i,k),mn)：

(1)首先建立一個空的list, 命名為aggregate, 用來蒐集轉換後的((i,k),mn)

(2)第一層for loop是將j組轉換後的((i,k),mn)集合起來

(3)第二層有三組for loop, 第一個和第二個loop執行的目的是要將list m和list n的資料排列成[['M','i','m'],...,]的形式方便後續運算值的取用,而a和b則是在loop中的暫時容器, 用來裝排列好的資料, 排列完成後第三組loop才真正執行乘法運算, 運算後再以((i,k),mn)的形式存入list1這個暫時容器, list1在第二層loop結束時存入aggregate

(4)這個三層迴圈需要非常久的時間才能跑完(大概一天多)，所以不是一個好方法，不過礙於作業繳交時限，只能先用這個方法，後續有想到其他解法會再補上。

aggregate = []
for j in range(len(m)):
    data1 = m[j][1]
    a = []
    for i in range(0, len(data1), 3):
        a.append([data1[i],data1[i+1],data1[i+2]])
    data2 = n[j][1]
    b = []
    for i in range(0, len(data2), 3):
        b.append([data2[i],data2[i+1],data2[i+2]])
    list1 = []
    for i in range(len(a)):
        mul1 = int(a[i][2])
        index1 = a[i][1]
        for k in range(len(b)):
            mul2 = int(b[k][2])
            index2 = b[k][1]
            list1.append(((index1,index2),mul1*mul2))
    aggregate.extend(list1)

aggregate已經將資料整理成[(index1,index2),value]的形式，接下來只要將aggregate轉換成RDD的資料型式，再用map-reduce就完成矩陣乘法了。

In [15]:
rdda = sc.parallelize(aggregate)

用reduceByKey將相同key(i,k)的值都相加起來。

In [16]:
outcome = rdda.reduceByKey(reducer1).collect()

再用sort()指令排序

In [17]:
outcome.sort()

開一個新的檔案

In [18]:
file = open("mapreduce_output.txt", "w")

In [19]:
file.write("Output\n")

7

將資料轉換成string之後一一寫入剛剛建立的檔案

In [20]:
for i in range(len(outcome)):
    file.write(str(outcome[i][0][0]))
    file.write(',')
    file.write(str(outcome[i][0][1]))
    file.write(',')
    file.write(str(outcome[i][1]))
    file.write(' \n')

In [21]:
file.close()