# Matrix Multiplication using MapReduce

#### Importing SparkContext from pyspark and combinations from itertools

In [0]:
from pyspark import SparkContext
from itertools import combinations

### Example 1 - Input file with two matrices of same dimensions
 #### Printing the Input file

In [0]:
dbutils.fs.cp("/FileStore/tables/set2.txt", "file:///tmp/set2.txt")
with open("/tmp/set2.txt", "r") as file:
    print (file.read())

1 2
3 4

5 6
7 8



### Define the functions to be used in the program

The `AddLineNumbers` function recieves a list of strings and adds row indices.

In [0]:
def AddLineNumbers(lines):
    result = []
    for idx, item in enumerate(lines):
        result.append((idx, item))
    return result

The `crossMultiply` function does multiplication on alternate set variables taking [A, i, A<sub>ij</sub>] and [B, k, B<sub>jk</sub>] and produces ((i,k), A<sub>ij</sub>  * B<sub>jk</sub>) in a combination way.

In [0]:
def crossMultiply(items):
    result = []
    for A, B in list(combinations(items, 2)):
        if (A[0] != B[0]):
            result.append(((A[1],B[1]), A[2]*B[2]))
    return result

The `CheckFloatStrings` function checks if a string has non-float characters and returns `True` if it does and `False` if it does not.

In [0]:
def CheckFloatStrings(lines):
    for line in lines:
        for item in line.split():
            try:
                float(item)
            except ValueError:
                return True
    return False

The `CheckMatrixDimensions` function checks if the no.of colums in the `A` matrix is the same as the no. of rows in the `B` matrix.

In [0]:
def CheckMatrixDimensions(rdd):
    records = rdd.collect()
    for itr in range(1,len(records)):
        if len(records[itr][1]) != len(records[0][1]):
            return True
    return False

The `MapperA` function provides the initial mapper for the `A` matrix, which recieves a list of strings, and produces a list of (j, (A, i, Aij)) values.</br>
The `MapperB` function provides the same function for the `B` matrix, which produces the list of (j, (B, k, Bjk)) values

In [0]:
def MapperA(lines):
    setA = sc.parallelize(AddLineNumbers(lines)).filter(lambda x: len(x[1]) > 0)
    MappedResult = setA.map(lambda x: [(j,(0, x[0], float(val))) for j, val in enumerate(x[1].split())])\
                       .flatMap(lambda x: x)
    return MappedResult

In [0]:
def MapperB(lines):
    setB = sc.parallelize(AddLineNumbers(lines)).filter(lambda x: len(x[1]) > 0)
    MappedResult = setB.map(lambda x: [(x[0],(1, k, float(val))) for k, val in enumerate(x[1].split())])\
                       .flatMap(lambda x: x)
    return MappedResult

The `reducer1` function provides the reducer function for the first phase, where it takes all the grouped entries for a single key and applies the crossMultiplication on those values.

In [0]:
def reducer1(rdd):
    return rdd.map(lambda x: crossMultiply(x[1]))

The `reducer2` function provides the reduce function for the second phase, where grouping and aggregation occurs.

In [0]:
def reducer2(rdd):
    return rdd.reduceByKey(lambda x, y: x+y).sortByKey()

### Running the Code
Now that all the required functions are defined we shall run all the code

In [0]:
sc = SparkContext.getOrCreate()

data = sc.textFile("/FileStore/tables/set2.txt")
lines = data.filter(lambda row: not row.startswith('#')).collect()

partitionIndex = lines.index('')
linesA = lines[:partitionIndex]
linesB = lines[partitionIndex+1:]

if CheckFloatStrings(linesA) or CheckFloatStrings(linesB):
    print("File contains non-float characters")
else:
    allMappedResults = MapperA(linesA) + MapperB(linesB)
    reducerInput = allMappedResults.groupByKey().mapValues(lambda x: list(x))
    if CheckMatrixDimensions(reducerInput):
        print("Matrix Dimensions Does not match")
    else:
        reducedResult = reducer1(reducerInput)
        secondPhaseInput = reducedResult.flatMap(lambda x: x)
        finalResult = reducer2(secondPhaseInput)

### Explanation
Initially, we read the file and filter all the comment lines. </br></br>
Next we find the empty line in the file that seperates the matrices `A` and `B` and strore that in the variable in `pratitionIndex`. Using this `pratitionIndex`, we partition the file into two lists, `linesA` and `linesB`, one for each matrix. </br></br>
Following that, we check if the matrices contatain any non-float strings and stop the program there, if it does. If both matrices contain only valid charecters, then we proceed and apply the initial mapper function on both the lists and aggregate the results in `allMappedResults` variable. At this phase, we have all the results for the initial phase.

In [0]:
allMappedResults.collect()

Out[139]: [(0, (0, 0, 1.0)),
 (1, (0, 0, 2.0)),
 (0, (0, 1, 3.0)),
 (1, (0, 1, 4.0)),
 (0, (1, 0, 5.0)),
 (0, (1, 1, 6.0)),
 (1, (1, 0, 7.0)),
 (1, (1, 1, 8.0))]

Next, we group all entities for a single key in the variable `reducerInput` variable. Now here before we reduce, we check if the matrix dimensions match for matrix multiplication and display an error if it does not. In this example they match, so we proceed to call the reducer for the first phase and store the results in the `reducedResults` variable.

In [0]:
reducedResult.collect()

Out[140]: [[((0, 0), 5.0), ((0, 1), 6.0), ((1, 0), 15.0), ((1, 1), 18.0)],
 [((0, 0), 14.0), ((0, 1), 16.0), ((1, 0), 28.0), ((1, 1), 32.0)]]

Now that we have the reduced result from the reducer for each key, we aggregate them into the `secondPhaseInput` variable.

In [0]:
secondPhaseInput.collect()

Out[141]: [((0, 0), 5.0),
 ((0, 1), 6.0),
 ((1, 0), 15.0),
 ((1, 1), 18.0),
 ((0, 0), 14.0),
 ((0, 1), 16.0),
 ((1, 0), 28.0),
 ((1, 1), 32.0)]

After this, we run the reducer for the second phase and store the result in the `finalResult` variable.

In [0]:
results = finalResult.collect()
for result in results:
    print(result)

((0, 0), 19.0)
((0, 1), 22.0)
((1, 0), 43.0)
((1, 1), 50.0)


### Define the code in a function
As we have seen how the code works, we can write it in a routine `DisplayMatrixMultiplication` to be used in further examples.

In [0]:
def DisplayMatrixMultiplication(file):
    sc = SparkContext.getOrCreate()

    data = sc.textFile(file)
    lines = data.filter(lambda row: not row.startswith('#')).collect()

    partitionIndex = lines.index('')
    linesA = lines[:partitionIndex]
    linesB = lines[partitionIndex+1:]

    if CheckFloatStrings(linesA) or CheckFloatStrings(linesB):
        print("File contains non-float characters")
    else:
        allMappedResults = MapperA(linesA) + MapperB(linesB)
        reducerInput = allMappedResults.groupByKey().mapValues(lambda x: list(x))
        if CheckMatrixDimensions(reducerInput):
            print("Matrix Dimensions Does not match")
        else:
            reducedResult = reducer1(reducerInput)
            secondPhaseInput = reducedResult.flatMap(lambda x: x)
            finalResult = reducer2(secondPhaseInput).collect()
            print("Result:")
            for result in finalResult:
                print(result)

### Example 2 - Input file consisting of strings in the matrix
#### Printing the Input file

In [0]:
dbutils.fs.cp("/FileStore/tables/set1.txt", "file:///tmp/set1.txt")
with open("/tmp/set1.txt", "r") as file:
    print (file.read())

1 2 3.0
3 4

5 b
7 8



The input file consists of non-float characters in the matrices which displays the error message and exits from the program

In [0]:
DisplayMatrixMultiplication("/FileStore/tables/set1.txt")

File contains non-float characters


### Example 3 - Input file with two 4x4 matrices 
#### Printing the Input file

In [0]:
dbutils.fs.cp("/FileStore/tables/set3.txt", "file:///tmp/set3.txt")
with open("/tmp/set3.txt", "r") as file:
    print (file.read())

# matrix A
 7.13 8.20198 15.0128976   123
43.5  0.00386 39.8       94683.12
 8    7	      6		     5.0
 4    3	      2		     1.0

# matrix B
  1.0	    2	  3	  4
  5.0	    6	  7	  8
345	   88.888 0.543	  5.9204864
  0.001 54321	  0.00231 1.456



In [0]:
DisplayMatrixMultiplication("/FileStore/tables/set3.txt")

Result:
((0, 0), 5227.712571999999)
((0, 1), 6682880.938321869)
((0, 2), 87.23999339680002)
((0, 3), 362.10749606539264)
((1, 0), 13869.202419999998)
((1, 1), 5143285386.28556)
((1, 2), 370.8564272)
((1, 3), 138268.28895872)
((2, 0), 2113.005)
((2, 1), 272196.328)
((2, 2), 76.26955)
((2, 3), 130.80291839999998)
((3, 0), 709.001)
((3, 1), 54524.776)
((3, 2), 34.08831)
((3, 3), 53.296972800000006)


### Example 4 - Input file with two matrices of each 3x4 where column length of A does not match row length of B
#### Printing the Input file

In [0]:
dbutils.fs.cp("/FileStore/tables/set5-3.txt", "file:///tmp/set5-2.txt")
with open("/tmp/set5-3.txt", "r") as file:
    print (file.read())

# matrix A
 7.13 8.20198 15.0128976 21.16
43.5  0.00386 39.8       17.97
 8    7	      6		       4
 

# matrix B
  5.0	    6	  7	  8
345	   88.888 0.543	  5.9204864
  0.001 54321	  0.00231 1.456



In [0]:
DisplayMatrixMultiplication("/FileStore/tables/set5-2.txt")

Matrix Dimensions Does not match


The input file consists of two matrices where the dimensions do not match for matrix multiplication which displays the error message and exits from the program