#### Setting up environment

In [1]:
import findspark
#findspark.init(spark_path)
findspark.init()

from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkContext, SparkConf

In [2]:
environment_to_connect = 'local' # CHANGE IT IF CONNECTION TO A CLUSTER

conf = SparkConf().setAppName('matrix_multiplication_method').setMaster(environment_to_connect)
sc = SparkContext(conf=conf)
spark = SQLContext(sc)

In [3]:
import time

#### Source

In [4]:
def convert_row(line, row): # i : row number
    coo_row = []
    for j in range(len(line)):
        value = line[j]
        if value != 0: coo_row.append((row,j,value))
    return(coo_row)

In [5]:
def perform_mult(matrix_A_path, matrix_B_path, num_partitions_matrix_A, num_partitions_matrix_B):
    
    # Read matrices
    matrix_A = sc.textFile(matrix_A_path, num_partitions_matrix_A)
    matrix_B = sc.textFile(matrix_B_path, num_partitions_matrix_B)
    matrix_A_rows = matrix_A.count()
    matrix_B_rows = matrix_B.count()
    
    # Tokenize values and convert them into float
    matrix_A = matrix_A.map( lambda line: list(map(float, line.split(' '))) )
    matrix_B = matrix_B.map( lambda line: list(map(float, line.split(' '))) )
    matrix_A_columns = len(matrix_A.take(1)[0])
    matrix_B_columns = len(matrix_B.take(1)[0])
    print('Matrix A --> Rows: ' + str(matrix_A_rows), 'Columns: ' + str(matrix_A_columns))
    print('Matrix B --> Rows: ' + str(matrix_B_rows), 'Columns: ' + str(matrix_B_columns))
    
    # Convert matrices to coordinates (sparse) format
    matrix_A_list = matrix_A.collect()
    matrix_B_list = matrix_B.collect()
    
    get_coo_matrix = (lambda matrix_list: [(convert_row(matrix_list[index], index)) for index in range(len(matrix_list))])
    
    coo_matrix_A = get_coo_matrix(matrix_A_list)
    coo_matrix_B = get_coo_matrix(matrix_B_list)
    
    # Parallelize matrices with sparse format in order to be processed
    coo_matrix_A = sc.parallelize(coo_matrix_A, num_partitions_matrix_A)
    coo_matrix_B = sc.parallelize(coo_matrix_B, num_partitions_matrix_B)
    
    # Save RDDs in main memory
    coo_matrix_A.cache()
    coo_matrix_B.cache()
    
    # MATRIX MULTIPLICATION --> Two Map/Reduce steps
    
    start_time = time.time()
    
    # Produce key, value pairs (j, (i, Aij)) and (j, (k, Bjk))
    first_map_matrix_A = coo_matrix_A.flatMap( lambda line: [(row[1], (row[0], row[2])) for row in line] )
    first_map_matrix_B = coo_matrix_B.flatMap( lambda line: [(row[0], (row[1], row[2])) for row in line] )
    
    # For each key j: generate a key-value pair, where the key is (i, k) and the value is Aij*Bjk
    # Then, apply the identity function.
    first_reduce_sec_map = first_map_matrix_A.join(first_map_matrix_B).\
    map( lambda line: ((line[1][0][0], line[1][1][0]), line[1][0][1] * line[1][1][1]) )
    
    # Group by key (i,k) and sum the obtained results
    second_reduce = first_reduce_sec_map.reduceByKey( lambda x, y: round(x + y, 4) )
    
    # Sort result. The result consists of pairs ((i,k), v) for the output matrix
    result = second_reduce.sortByKey()
    
    end_time = time.time()
    print("Total execution time for map/reduce steps: {} seconds".format(round(end_time - start_time, 2)))
    
    return(result)

#### Set parameters

In [9]:
matrix_A_path = 'Matriz_Ejemplo_A.dat'
num_partitions_matrix_A = 1

matrix_B_path = 'Matriz_Ejemplo_B.dat'
num_partitions_matrix_B = 1

#### Execute multiplication

In [10]:
result_matrix = perform_mult(matrix_A_path, matrix_B_path, num_partitions_matrix_A, num_partitions_matrix_B)

Matrix A --> Rows: 1000 Columns: 128
Matrix B --> Rows: 128 Columns: 60
Total execution time for map/reduce steps: 30.64 seconds


In [11]:
result_matrix.count()

60000

In [12]:
result_matrix.take(70)

[((0, 0), 2022.5443),
 ((0, 1), 2194.2844),
 ((0, 2), 2145.9141),
 ((0, 3), 2232.1916),
 ((0, 4), 2079.6449),
 ((0, 5), 2286.4625),
 ((0, 6), 2203.4818),
 ((0, 7), 2181.6806),
 ((0, 8), 2179.9472),
 ((0, 9), 2139.8339),
 ((0, 10), 2201.6202),
 ((0, 11), 2246.1318),
 ((0, 12), 2112.8754),
 ((0, 13), 2089.9635),
 ((0, 14), 2230.4036),
 ((0, 15), 2326.9978),
 ((0, 16), 2209.185),
 ((0, 17), 2128.445),
 ((0, 18), 2100.9829),
 ((0, 19), 2264.4892),
 ((0, 20), 2215.9385),
 ((0, 21), 2203.7053),
 ((0, 22), 2202.9464),
 ((0, 23), 2317.13),
 ((0, 24), 2104.3664),
 ((0, 25), 2151.3703),
 ((0, 26), 2175.0896),
 ((0, 27), 2280.6067),
 ((0, 28), 2155.6193),
 ((0, 29), 2260.3449),
 ((0, 30), 2130.6846),
 ((0, 31), 2279.9108),
 ((0, 32), 2179.3534),
 ((0, 33), 2093.5267),
 ((0, 34), 2341.5597),
 ((0, 35), 2124.6643),
 ((0, 36), 2228.5736),
 ((0, 37), 2127.9422),
 ((0, 38), 2137.3472),
 ((0, 39), 2105.4985),
 ((0, 40), 2047.4022),
 ((0, 41), 2122.7344),
 ((0, 42), 2191.6016),
 ((0, 43), 2089.4202),
 (