In [1]:
import findspark
findspark.init()

# create spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("my app").master("local[4]").getOrCreate()

# get context from the session
sc = spark.sparkContext

In [11]:
Afile = 'A1600x800.txt'
Bfile = 'B800x80.txt'
Asize = [1600, 800]
Bsize = [800, 80]
X = 10
Y = 10

In [3]:
rdd1 = sc.textFile(Afile).union(sc.textFile(Bfile)).map(lambda t: tuple(t.split(',')))

In [4]:
def matmul_map(t):
    name, i, j, v = t
    # naive assumption: sizes are multiples of X and Y
    X_range = int(Asize[0] / X)
    Y_range = int(Bsize[1] / Y)
    i_block = int(int(i) / X_range)
    j_block = int(int(j) / Y_range)
    if name == "A":  
        for k_block in range(Y):
            yield ((i_block, k_block), ('A', int(i), int(j), float(v)))
    else:
        for k_block in range(X):
            yield ((k_block, j_block), ('B', int(i), int(j), float(v)))

In [5]:
def matmul_red(t):
    key, it = t
    X_height = int(Asize[0] / X)
    Y_width = int(Bsize[1] / Y)
    tmpA = {}
    tmpB = {}
    for name, i, j, v in it:
        if name == 'A':
            tmpA[(i, j)] = v
        else:
            tmpB[(i, j)] = v

    for i_idx in range(X_height):
        for j_idx in range(Y_width):
            i = i_idx + key[0] * X_height
            j = j_idx + key[1] * Y_width
            v = sum([ tmpA.get((i, k), 0.) * tmpB.get((k, j), 0.) for k in range(Asize[1]) ] )
            yield ((i, j), v)

In [13]:
import time
start_time = time.time()
res = rdd1.flatMap(matmul_map).groupByKey().flatMap(matmul_red).map(lambda t: t[1]).reduce(lambda a,b:a+b)
print("execution time = {}, output = {}".format(time.time() - start_time, res))

execution time = 409.89160799980164, output = -10713.0


# 최적 블럭 사이즈 계산

연산 $A \cdot B = C$의 C 행렬을 행을 X개, 열을 Y개의 구간으로 나누어 전체 X * Y (=P)개의 블럭으로 묶어 연산한다고 할때,
(단, A = n by k, B = k by m 행렬)

행렬 A에서 만들어지는 키-값 쌍의 개수는 $n \cdot k \cdot Y$개, B에서 만들어지는 키-값의 쌍 개수는  $k \cdot m \cdot X$개, 즉, 전체 키-값 쌍의 개수는,

$k(n \cdot Y + m \cdot X)$

산술기하평균 부등식에 의해 키-값 쌍 개수의 최솟값은,

$k(n \cdot Y + m \cdot X) \geq 2k\sqrt{n\cdot m\cdot X \cdot Y} = 2k\sqrt{n\cdot m\cdot P}$

등호는 $n \cdot Y = m \cdot X$일 때 성립하므로,

키-값 쌍의 개수를 최소화 하는 X와 Y의 값은

$Y = \sqrt{\frac{mP}{n}}, X = \sqrt{\frac{nP}{m}}$

n = 1600, m = 80, P = 100 이므로 

$Y = \sqrt{\frac{8000}{1600}}=2.23, X = \sqrt{\frac{160000}{80}}=44.7$

따라서 대략 Y = 2, X = 40으로 C를 40 * 40의 정사각형 블럭으로 나누어 계산

In [7]:
X = 40
Y = 2

In [8]:
start_time = time.time()
res = rdd1.flatMap(matmul_map).groupByKey().flatMap(matmul_red).map(lambda t: t[1]).reduce(lambda a,b:a+b)
print("execution time = {}, output = {}".format(time.time() - start_time, res))

-10713.0