# Relational Algebra Operations. MapReduce.

There are a number of operations on large-scale data that are used in database queries. Many traditional database applications involve retrieval of small amounts of data, even though the database itself may be large. For example, a query may ask for the bank balance of one particular account. Such queries are not useful applications of MapReduce. 

However, there are many operations on data that can be described easily in terms of the common database-query primitives, even if the queries themselves are not executed within a database management system. Thus, a good starting point for exploring applications of MapReduce is by considering the standard operations on relations. We assume you are familiar with database systems, the query language SQL, and the relational model, but to review, a ***relation*** is a table with column headers called ***attributes***. Rows of the relation are called ***tuples***. The set of attributes of a relation is called its ***schema***. 

There are several standard operations on relations, often referred to as ***relational algebra***, that are used to implement queries. The queries themselves usually are written in SQL. The relational-algebra operations we will discuss are:

- selection
- projection
- union, intersection and difference
- natural join
- grouping and aggregation

## Selection operations

Apply a condition C to each tuple in the relation and produce as output only those tuples that satisfy C. 

**Operation:** $Select_C(R)$.

**Map:** For each tuple $t$ in $R$, test if it satisfies $C$. If so, produce the key-value pair $(t, t)$. That is, both the key and value are $t$. 

**Reduce:** The Reduce function is the identity. It simply passes each key-value pair to the output.

|        | Input          | Output       |
|--------|----------------|--------------|
| map    | <k1, t>        | list(<t, t>) |
| reduce | (<t, list(t)>) | list(<t, t>) |


Let's create a custom data to work on: 

In [1]:
import pyspark
sc = pyspark.SparkContext('local[*]')



In [2]:
data = [
    (1, 2), (2, 3), (5, 6), (2, 8), (4, 4), (6, 1), # (1, 2), 
    (6, 2), (6, 3), (7, 8), (9, 8), (3, 3), (0, 1)
]
cols = ['A', 'B']
df = sc.parallelize(data)

`Selection(B <= 3)`

In [3]:
df_map = df.filter(lambda x: x[1] <= 3).map(lambda x: (x, x)) # Notice (1, 2)

In [4]:
df_map.collect()

[((1, 2), (1, 2)),
 ((2, 3), (2, 3)),
 ((6, 1), (6, 1)),
 ((6, 2), (6, 2)),
 ((6, 3), (6, 3)),
 ((3, 3), (3, 3)),
 ((0, 1), (0, 1))]

#### Reduce By Key
Spark RDD reduceByKey() transformation is used to merge the values of each key using an associative reduce function.  It is a wider transformation as it shuffles data across multiple partitions and it operates on pair RDD (key/value pair)

In [5]:
# df_res = df_map.map(lambda x: x[1]) # reduce's place 
df_res = df_map.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])).values()

In [6]:
df_res.collect()

[(3, 3), (1, 2), (2, 3), (6, 3), (0, 1), (6, 2), (6, 1)]

## Projection

For some subset S of the attributes of the relation, produce from each tuple only the components for the attributes in S.

**Operation:** $Project_A(R)$.

**Map:** For each tuple $t$ in $R$, construct a tuple $t'$ by eliminating from t those components whose attributes are not in A. Output the key-value pair $(t', t')$.


**Reduce:** For each key $t'$ produced by any of the Map tasks, there will be one or more key-value pairs ($t'$, $t'$). The Reduce function turns $(t', [t', t', . . . , t'])$ into $(t', t')$, so it produces exactly one pair $(t', t')$ for this key $t'$.

|        | Input                           | Output             |
|--------|---------------------------------|--------------------|
| map    | <k1, t>                         | list(<$t'$, $t'$>) |
| reduce | (<$t'$, list($t'$, ..., $t'$)>) | list(<$t'$, $t'$>) |

In [7]:
data = [
    (1, 2, 3), (2, 2, 2), (1, 2, 1), (4, 2, 1), (6, 8, 4), (3, 2, 2), 
    (1, 2, 5), (2, 3, 2), (1, 3, 1), (3, 2, 1), (6, 8, 9), (3, 4, 2)
]
cols = ['A', 'B', 'C']
df = sc.parallelize(data)

In [8]:
df.collect()

[(1, 2, 3),
 (2, 2, 2),
 (1, 2, 1),
 (4, 2, 1),
 (6, 8, 4),
 (3, 2, 2),
 (1, 2, 5),
 (2, 3, 2),
 (1, 3, 1),
 (3, 2, 1),
 (6, 8, 9),
 (3, 4, 2)]

Here, we are going to choose only first second columns and drop the third column. 

In [9]:
df_map = df.map(lambda x: ((x[0], x[1]), (x[0], x[1])))
print(df_map.collect())


y = df_map.groupByKey()
y.collect()

[((1, 2), (1, 2)), ((2, 2), (2, 2)), ((1, 2), (1, 2)), ((4, 2), (4, 2)), ((6, 8), (6, 8)), ((3, 2), (3, 2)), ((1, 2), (1, 2)), ((2, 3), (2, 3)), ((1, 3), (1, 3)), ((3, 2), (3, 2)), ((6, 8), (6, 8)), ((3, 4), (3, 4))]


[((4, 2), <pyspark.resultiterable.ResultIterable at 0x7f8b38f40ba8>),
 ((6, 8), <pyspark.resultiterable.ResultIterable at 0x7f8b38f40780>),
 ((1, 2), <pyspark.resultiterable.ResultIterable at 0x7f8b38f40908>),
 ((2, 3), <pyspark.resultiterable.ResultIterable at 0x7f8b38f406d8>),
 ((3, 4), <pyspark.resultiterable.ResultIterable at 0x7f8b38f40390>),
 ((2, 2), <pyspark.resultiterable.ResultIterable at 0x7f8b38f40668>),
 ((1, 3), <pyspark.resultiterable.ResultIterable at 0x7f8b38f40160>),
 ((3, 2), <pyspark.resultiterable.ResultIterable at 0x7f8b38f40128>)]

In [10]:
[(key[0],[val for val in key[1]]) for key in df_map.groupByKey().collect()]

# Same as .map(lambda x : (x[0], list(x[1]))) below

[((4, 2), [(4, 2)]),
 ((6, 8), [(6, 8), (6, 8)]),
 ((1, 2), [(1, 2), (1, 2), (1, 2)]),
 ((2, 3), [(2, 3)]),
 ((3, 4), [(3, 4)]),
 ((2, 2), [(2, 2)]),
 ((1, 3), [(1, 3)]),
 ((3, 2), [(3, 2), (3, 2)])]

In [11]:
df_map = df.map(lambda x: ((x[0], x[1]), (x[0], x[1]))).groupByKey().map(lambda x : (x[0], list(x[1]))) 
# mapValues(lambda x: list(x))
df_map.collect()

[((4, 2), [(4, 2)]),
 ((6, 8), [(6, 8), (6, 8)]),
 ((1, 2), [(1, 2), (1, 2), (1, 2)]),
 ((2, 3), [(2, 3)]),
 ((3, 4), [(3, 4)]),
 ((2, 2), [(2, 2)]),
 ((1, 3), [(1, 3)]),
 ((3, 2), [(3, 2), (3, 2)])]

#### Map Values
When we use map() with a Pair RDD, we get access to both Key & value. There are times we might only be interested in accessing the value(& not key). In those case, we can use mapValues() instead of map()

In [14]:
dataExtra = [
    ((4, 2), [(4, 2)]),
     ((6, 8), [(6, 8), (6, 8)]),
     ((1, 2), [(1, 2), (1, 2), (1, 2)]),
     ((2, 3), [(2, 3)])]
dfExtra = sc.parallelize(dataExtra)
dfExtra.collect()

[((4, 2), [(4, 2)]),
 ((6, 8), [(6, 8), (6, 8)]),
 ((1, 2), [(1, 2), (1, 2), (1, 2)]),
 ((2, 3), [(2, 3)])]

In [15]:
dfExtra.mapValues(lambda x: x[0]).collect() 
# mapValues just alters the values and if you want to see only the values you use .values()

[((4, 2), (4, 2)), ((6, 8), (6, 8)), ((1, 2), (1, 2)), ((2, 3), (2, 3))]

In [16]:
dfExtra = dfExtra.mapValues(lambda x: x[0]).values()
dfExtra.collect() 

[(4, 2), (6, 8), (1, 2), (2, 3)]

In [17]:
# df_res = df_map.values().map(lambda x: x[0])

# df_map printed above
print(df_map.mapValues(lambda x: x[0]).collect())

df_res = df_map.mapValues(lambda x: x[0]).values() 
df_res.collect()

[((4, 2), (4, 2)), ((6, 8), (6, 8)), ((1, 2), (1, 2)), ((2, 3), (2, 3)), ((3, 4), (3, 4)), ((2, 2), (2, 2)), ((1, 3), (1, 3)), ((3, 2), (3, 2))]


[(4, 2), (6, 8), (1, 2), (2, 3), (3, 4), (2, 2), (1, 3), (3, 2)]

## Union

**Operation:** $Union(R, S)$.

**Map:** Turn each input tuple $t$ either from relation $R$ or $S$ into a key-value pair $(t, t)$.


**Reduce:** Associated with each key $t$ there will be either one or two values. Produce output $(t, t)$ in either case.

|        | Input          | Output       |
|--------|----------------|--------------|
| map    | <k1, t>        | list(<t, t>) |
| reduce | (<t, list(t)>) | list(<t, t>) |

In [18]:
data1 = [(1, 2), (2, 3), (5, 6), (2, 3), (4, 4), (6, 1)]
data2 = [(6, 1), (6, 3), (7, 8), (9, 8), (3, 3), (0, 1)]
cols = ['A', 'B']
rdd1 = sc.parallelize(data1)
rdd2 = sc.parallelize(data2)

In [19]:
rdd1.take(3), rdd2.take(3)

([(1, 2), (2, 3), (5, 6)], [(6, 1), (6, 3), (7, 8)])

In [20]:
# This is the map function and grouping in map reduce
rdd1 = rdd1.map(lambda x: (x, x)).groupByKey().map(lambda x : (x[0], list(x[1])))
rdd1.collect()

[((1, 2), [(1, 2)]),
 ((2, 3), [(2, 3), (2, 3)]),
 ((5, 6), [(5, 6)]),
 ((4, 4), [(4, 4)]),
 ((6, 1), [(6, 1)])]

In [21]:
rdd2 = rdd2.map(lambda x: (x, x)).groupByKey().map(lambda x : (x[0], list(x[1])))
rdd2.collect()

[((3, 3), [(3, 3)]),
 ((6, 3), [(6, 3)]),
 ((7, 8), [(7, 8)]),
 ((0, 1), [(0, 1)]),
 ((6, 1), [(6, 1)]),
 ((9, 8), [(9, 8)])]

#### Reduce By Key
Spark RDD reduceByKey() transformation is used to merge the values of each key using an associative reduce function.  It is a wider transformation as it shuffles data across multiple partitions and it operates on pair RDD (key/value pair)

In [22]:
rdd1.union(rdd2).collect()

[((1, 2), [(1, 2)]),
 ((2, 3), [(2, 3), (2, 3)]),
 ((5, 6), [(5, 6)]),
 ((4, 4), [(4, 4)]),
 ((6, 1), [(6, 1)]),
 ((3, 3), [(3, 3)]),
 ((6, 3), [(6, 3)]),
 ((7, 8), [(7, 8)]),
 ((0, 1), [(0, 1)]),
 ((6, 1), [(6, 1)]),
 ((9, 8), [(9, 8)])]

In [23]:
print(rdd1.union(rdd2).reduceByKey(lambda x, y: x + y)) # Here X and Y are the value (list) with the same key

for element in rdd1.union(rdd2).reduceByKey(lambda x, y: x + y).collect():
    print(element)

PythonRDD[53] at RDD at PythonRDD.scala:53
((1, 2), [(1, 2)])
((5, 6), [(5, 6)])
((6, 3), [(6, 3)])
((0, 1), [(0, 1)])
((4, 4), [(4, 4)])
((6, 1), [(6, 1), (6, 1)])
((9, 8), [(9, 8)])
((3, 3), [(3, 3)])
((2, 3), [(2, 3), (2, 3)])
((7, 8), [(7, 8)])


In [24]:
rdd1.union(rdd2).reduceByKey(lambda x, y: x + y).mapValues(lambda x: x[0]).values().collect()

[(1, 2),
 (5, 6),
 (6, 3),
 (0, 1),
 (4, 4),
 (6, 1),
 (9, 8),
 (3, 3),
 (2, 3),
 (7, 8)]

## Intersection

**Operation:** $Intersection(R, S)$.

**Map:** Turn each input tuple $t$ either from relation $R$ or $S$ into a key-value pair $(t, t)$.


**Reduce:** If key $t$ has value  $list(t, t)$, then produce $(t, t)$. Otherwise, produce nothing.

|        | Input                  | Output                          |
|--------|------------------------|---------------------------------|
| map    | <k1, t>                | list(<t, t>)                    |
| reduce | $(<$$t$, $list(t)$$>)$ | list(<t, t>) if (<t, list(t, t)>) |

In [25]:
data1 = [(1, 2), (2, 3), (5, 6), (2, 3), (4, 4), (6, 1)]
data2 = [(6, 1), (6, 3), (7, 8), (9, 8), (3, 3), (0, 1)]
cols = ['A', 'B']
rdd1 = sc.parallelize(data1)
rdd2 = sc.parallelize(data2)

In [26]:
rdd1.take(3), rdd2.take(3)

([(1, 2), (2, 3), (5, 6)], [(6, 1), (6, 3), (7, 8)])

In [27]:
rdd1 = rdd1.map(lambda x: (x, x)).groupByKey().map(lambda x : (x[0], list(x[1])))
rdd1.collect()

[((1, 2), [(1, 2)]),
 ((2, 3), [(2, 3), (2, 3)]),
 ((5, 6), [(5, 6)]),
 ((4, 4), [(4, 4)]),
 ((6, 1), [(6, 1)])]

In [28]:
rdd2 = rdd2.map(lambda x: (x, x)).groupByKey().map(lambda x : (x[0], list(x[1])))
rdd2.collect()

[((3, 3), [(3, 3)]),
 ((6, 3), [(6, 3)]),
 ((7, 8), [(7, 8)]),
 ((0, 1), [(0, 1)]),
 ((6, 1), [(6, 1)]),
 ((9, 8), [(9, 8)])]

In [29]:
# This is wrong but Idky intersection doesnt work
# Wrong because (2,3) isnt an intersection it just appears twice
rdd1.union(rdd2).reduceByKey(lambda x, y: x + y).values().filter(lambda x: len(x) > 1).map(lambda x: x[0]).collect()

[(6, 1), (2, 3)]

## Difference

**Operation:** $Minus(R, S)$.

**Map:** For each row `r` create a key-value pair `(r, T1)` if row is from table 1 else product key-value pair `(r, T2)`.


**Reduce:** Output the row if and only if the value in the list is `T1` , otherwise output nothing.

In [30]:
data1 = [(1, 2), (2, 3), (5, 6), (6, 1), (6, 3), (7, 8)]
data2 = [(2, 3), (4, 4), (6, 1), (9, 8), (3, 3), (0, 1)]
cols = ['A', 'B']
rdd1 = sc.parallelize(data1)
rdd2 = sc.parallelize(data2)

In [31]:
rdd1.collect()

[(1, 2), (2, 3), (5, 6), (6, 1), (6, 3), (7, 8)]

In [32]:
rdd1.map(lambda x: (x, 'T1')).groupByKey().collect()

[((1, 2), <pyspark.resultiterable.ResultIterable at 0x7f8b38f48d30>),
 ((2, 3), <pyspark.resultiterable.ResultIterable at 0x7f8b38f439e8>),
 ((5, 6), <pyspark.resultiterable.ResultIterable at 0x7f8b38fb0588>),
 ((6, 3), <pyspark.resultiterable.ResultIterable at 0x7f8b38fb0d30>),
 ((7, 8), <pyspark.resultiterable.ResultIterable at 0x7f8b38fb0b38>),
 ((6, 1), <pyspark.resultiterable.ResultIterable at 0x7f8b38fb0a58>)]

In [33]:
# .groupByKey() This gives a group by object
# .mapValues(lambda x: list(x)) This gives the printability
rdd1_map = rdd1.map(lambda x: (x, 'T1')).groupByKey().mapValues(lambda x: list(x))
rdd1_map.collect()

[((1, 2), ['T1']),
 ((2, 3), ['T1']),
 ((5, 6), ['T1']),
 ((6, 3), ['T1']),
 ((7, 8), ['T1']),
 ((6, 1), ['T1'])]

In [34]:
rdd2_map = rdd2.map(lambda x: (x, 'T2')).groupByKey().mapValues(lambda x: list(x))
rdd2_map.collect()

[((3, 3), ['T2']),
 ((2, 3), ['T2']),
 ((0, 1), ['T2']),
 ((4, 4), ['T2']),
 ((6, 1), ['T2']),
 ((9, 8), ['T2'])]

In [35]:
rdd_res = (rdd1_map + rdd2_map).reduceByKey(lambda x, y: x + y) # In reduce by key you will see the x and y 
# are the values of the keys which are the same and the operation isbeing done on them
rdd_res.collect()

[((3, 3), ['T2']),
 ((1, 2), ['T1']),
 ((2, 3), ['T1', 'T2']),
 ((5, 6), ['T1']),
 ((6, 3), ['T1']),
 ((7, 8), ['T1']),
 ((0, 1), ['T2']),
 ((4, 4), ['T2']),
 ((6, 1), ['T1', 'T2']),
 ((9, 8), ['T2'])]

In [36]:
rdd_res.filter(lambda x: x[1] == ['T1']).keys().collect()

[(1, 2), (5, 6), (6, 3), (7, 8)]

## Grouping and Aggregation

**Map:** For each row in the table, take the attributes using which grouping is to be done as the key, and value will be the ones on which aggregation is to be performed. For example, If a relation has 4 columns `A, B, C, D` and we want to group by `A`, `B` and do an aggregation on `C` we will make `(A, B)` as the key and `C` as the value.

**Reduce:** Apply the aggregation operation (`sum, max, min, avg, …`) on the list of values and output the result.

In [37]:
data = [
    (1, 2, 3, 1), (2, 2, 2, 1), (1, 2, 1, 1), (4, 2, 1, 1), (6, 8, 4, 1), (3, 2, 2, 1), 
    (1, 2, 5, 1), (2, 3, 2, 1), (1, 3, 1, 1), (3, 2, 1, 1), (6, 8, 9, 1), (3, 4, 2, 1)
]
cols = ['A', 'B', 'C', 'D'] # just to show
rdd = sc.parallelize(data)

In [38]:
rdd.take(5)

[(1, 2, 3, 1), (2, 2, 2, 1), (1, 2, 1, 1), (4, 2, 1, 1), (6, 8, 4, 1)]

In [39]:
rdd.map(lambda x: ((x[0], x[1]), x[2])).collect()

[((1, 2), 3),
 ((2, 2), 2),
 ((1, 2), 1),
 ((4, 2), 1),
 ((6, 8), 4),
 ((3, 2), 2),
 ((1, 2), 5),
 ((2, 3), 2),
 ((1, 3), 1),
 ((3, 2), 1),
 ((6, 8), 9),
 ((3, 4), 2)]

In [40]:
rdd_map = rdd.map(lambda x: ((x[0], x[1]), x[2])).groupByKey().mapValues(lambda x: list(x))
rdd_map.collect()

[((4, 2), [1]),
 ((6, 8), [4, 9]),
 ((1, 2), [3, 1, 5]),
 ((2, 3), [2]),
 ((3, 4), [2]),
 ((2, 2), [2]),
 ((1, 3), [1]),
 ((3, 2), [2, 1])]

In [41]:
rdd_map.mapValues(sum).collect()

[((4, 2), 1),
 ((6, 8), 13),
 ((1, 2), 9),
 ((2, 3), 2),
 ((3, 4), 2),
 ((2, 2), 2),
 ((1, 3), 1),
 ((3, 2), 3)]

## Natural Join Using Map Reduce

The natural join will keep the rows that matches the values in the common column for both tables. To perform natural join we will have to keep track of from which table the value came from. If the values for the same key are from different tables we need to form pairs of those values along with key to get a single row of the output. Join can explode the number of rows as we have to form each and every possible combination of the values for both tables.

- **Map Function**: For two relations `Table 1(A, B)` and `Table 2(B, C)` the map function will create key-value pairs of form `b`: `[(T1, a)]` for table 1 where T1 represents the fact that the value `a` came from table 1, for table 2 key-value pairs will be of the form `b`: `[(T2, c)]`. 

- **Reduce Function**: For a given key `b` construct all possible combinations for the values where one value is from table `T1` and the other value is from table `T2`. The output will consist of key-value pairs of form `b: [(a, c)]` which represent one row `a, b, c` for the output table.

For an example lets consider joining `Table 1` and `Table 2`, where `B` is the common column. 

In [None]:
spark.stop() # if there is a session object named spark

In [42]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[1]").appName("Relational-Algebra").getOrCreate()

In [43]:
data1 = [(1, 2), (2, 3), (5, 6), (6, 1), (6, 3), (7, 6)]
data2 = [(2, 3), (4, 4), (6, 1), (9, 8), (3, 4), (2, 1)]
cols1 = ['A', 'B'] 
cols2 = ['B', 'C'] 
rdd1 = sc.parallelize(data1)
rdd2 = sc.parallelize(data2)

In [44]:
spark.createDataFrame(rdd1, schema=cols1).show()
# spark.createDataFrame(rdd2, schema=cols2).show()

+---+---+
|  A|  B|
+---+---+
|  1|  2|
|  2|  3|
|  5|  6|
|  6|  1|
|  6|  3|
|  7|  6|
+---+---+



In [110]:
+---+---+   +---+---+
|  A|  B|   |  B|  C|
+---+---+   +---+---+
|  1|  2|   |  2|  3|
|  2|  3|   |  4|  4|
|  5|  6|   |  6|  1|
|  6|  1|   |  9|  8|
|  6|  3|   |  3|  4|
|  7|  6|   |  2|  1|
+---+---+   +---+---+

The data after applying the map function and grouping at the map workers will look like:

In [45]:
rdd1_map = rdd1.map(lambda x: (x[1], ('T1', x[0])))
rdd1_map.collect()

[(2, ('T1', 1)),
 (3, ('T1', 2)),
 (6, ('T1', 5)),
 (1, ('T1', 6)),
 (3, ('T1', 6)),
 (6, ('T1', 7))]

In [46]:
rdd2_map = rdd2.map(lambda x: (x[0], ('T2', x[1])))
rdd2_map.collect()

[(2, ('T2', 3)),
 (4, ('T2', 4)),
 (6, ('T2', 1)),
 (9, ('T2', 8)),
 (3, ('T2', 4)),
 (2, ('T2', 1))]

Next we add the datasets together, using the `union` function. (or you can use just `+`)

In [47]:
rdd = rdd1_map.union(rdd2_map)
rdd.collect()

[(2, ('T1', 1)),
 (3, ('T1', 2)),
 (6, ('T1', 5)),
 (1, ('T1', 6)),
 (3, ('T1', 6)),
 (6, ('T1', 7)),
 (2, ('T2', 3)),
 (4, ('T2', 4)),
 (6, ('T2', 1)),
 (9, ('T2', 8)),
 (3, ('T2', 4)),
 (2, ('T2', 1))]

Until now, we've been working on mapping, next, we will use `groupByKey` to reduce, which sends items with the same key to the same reducer and it collects records with the same key into a list. 

In [48]:
rdd_grouped = (rdd1_map.union(rdd2_map)).groupByKey().mapValues(lambda x: list(x))
rdd_grouped.collect()

[(1, [('T1', 6)]),
 (9, [('T2', 8)]),
 (2, [('T1', 1), ('T2', 3), ('T2', 1)]),
 (3, [('T1', 2), ('T1', 6), ('T2', 4)]),
 (4, [('T2', 4)]),
 (6, [('T1', 5), ('T1', 7), ('T2', 1)])]

After we map a function that will create a row by taking one value from table T1 and other one from T2. If there are only values from T1 or T2 in the values list that won’t constitute a row in output.

In [49]:
def cross_case(seq):
  left, right = [], []
  for row in seq:
    if row[0] == 'T1':
      left.append(row)
    elif row[0] == 'T2':
      right.append(row)

  return [(v, w) for v in left for w in right]

In [50]:
# If look at B column then 2,3,6 have values in both tables
# However, Issue is Values are being repeated below
result = rdd_grouped.mapValues(lambda x: cross_case(x))
result.collect()

[(1, []),
 (9, []),
 (2, [(('T1', 1), ('T2', 3)), (('T1', 1), ('T2', 1))]),
 (3, [(('T1', 2), ('T2', 4)), (('T1', 6), ('T2', 4))]),
 (4, []),
 (6, [(('T1', 5), ('T2', 1)), (('T1', 7), ('T2', 1))])]

This is the result of joining two tables. 

As we need to keep context from which table a value came from, we can’t get rid of the data that needs to be sent across the workers for application of reduce task, this operation also becomes costly as compared to others we discussed so far. The fact that for each list of values we need to create pairs also plays a major factor in the computation cost associated with this operation.

## Matrix Multiplication

In [63]:
import numpy as np
from pprint import pprint
from collections import defaultdict

m = np.matrix(
    [ 
        [1, 2, 0],
        [3, 0, 5],
        [0, 7, 0],
        [4, 0, 0],
        [1, 8, 2]
    ]
)

n = np.matrix(
    [
        [1, 4, 5, 0],
        [0, 4, 0, 1],
        [6, 3, 9, 1],
    ]
)
shape_m = m.shape
shape_n = n.shape

m_n = m * n
pprint(m_n)

matrix([[ 1, 12,  5,  2],
        [33, 27, 60,  5],
        [ 0, 28,  0,  7],
        [ 4, 16, 20,  0],
        [13, 42, 23, 10]])


In [64]:
m_n = [[i, j, m_n[i,j]] for i in range(m_n.shape[0]) for j in range(m_n.shape[1]) if m_n[i,j] != 0]
pprint(m_n)

[[0, 0, 1],
 [0, 1, 12],
 [0, 2, 5],
 [0, 3, 2],
 [1, 0, 33],
 [1, 1, 27],
 [1, 2, 60],
 [1, 3, 5],
 [2, 1, 28],
 [2, 3, 7],
 [3, 0, 4],
 [3, 1, 16],
 [3, 2, 20],
 [4, 0, 13],
 [4, 1, 42],
 [4, 2, 23],
 [4, 3, 10]]


In [65]:
print('M Shape: ', shape_m)
print('N Shape: ', shape_n)

M Shape:  (5, 3)
N Shape:  (3, 4)


In [66]:
# Representing matrices in the form of i, j , m[i][j] and i, j, n[i][j]
m = [[i, j, m[i,j]] for i in range(m.shape[0]) for j in range(m.shape[1]) if m[i,j] != 0]

n = [[i, j, n[i,j]] for i in range(n.shape[0]) for j in range(n.shape[1]) if n[i, j] != 0]

print('M')
pprint(m)
print('N')
pprint(n)

# Without the 0s

# [Row, Coulumn, Value]

M
[[0, 0, 1],
 [0, 1, 2],
 [1, 0, 3],
 [1, 2, 5],
 [2, 1, 7],
 [3, 0, 4],
 [4, 0, 1],
 [4, 1, 8],
 [4, 2, 2]]
N
[[0, 0, 1],
 [0, 1, 4],
 [0, 2, 5],
 [1, 1, 4],
 [1, 3, 1],
 [2, 0, 6],
 [2, 1, 3],
 [2, 2, 9],
 [2, 3, 1]]


In [67]:
# create key value pairs
# key = j, values = [mij, ...]

ma = defaultdict(list)
for j in range(len(m)):
    ma[m[j][1]].append((m[j][0], m[j][2])) # This way Key is column and list have row and value
    
# key = j, value = njk
na = defaultdict(list)
for j in range(len(n)):
    na[n[j][0]].append((n[j][1], n[j][2])) # This way Key is row and list have column and value

    
    
# Because R1xC1 * R2xC2
# If C1 == R2, the new matrix shape is R1xC2. So imagine those two to act just like join column in inner join
pprint(ma)
pprint(na)

# {Column: [(Row, Value)]} for ma
# {Row: [(Column, Value)]} for na

defaultdict(<class 'list'>,
            {0: [(0, 1), (1, 3), (3, 4), (4, 1)],
             1: [(0, 2), (2, 7), (4, 8)],
             2: [(1, 5), (4, 2)]})
defaultdict(<class 'list'>,
            {0: [(0, 1), (1, 4), (2, 5)],
             1: [(1, 4), (3, 1)],
             2: [(0, 6), (1, 3), (2, 9), (3, 1)]})


In [69]:
# Reduce keys for each possible j value
# Group by keys from ma, na
# For each j value we will take each i, mij value from ma and multiply it by k, mjk
# The key will now be this (i, k) value



op = defaultdict(list)
for j in range(shape_m[1]): # Couldve done shape_n[0] cause both are 3 and looking at # keys for ma and na dicts
    if j in ma and j in na:
        for mi in ma[j]: # For all list items(tuples) for key 0,1,2 in ma
            for ni in na[j]: # For all list items(tuples) for key 0,1,2 in na
                i = mi[0] # Picking Row from ma
                k = ni[0] # picking column from na
                op[(i,k)].append(mi[1] * ni[1]) # On Row Col position inserting the values
pprint(op)

defaultdict(<class 'list'>,
            {(0, 0): [1],
             (0, 1): [4, 8],
             (0, 2): [5],
             (0, 3): [2],
             (1, 0): [3, 30],
             (1, 1): [12, 15],
             (1, 2): [15, 45],
             (1, 3): [5],
             (2, 1): [28],
             (2, 3): [7],
             (3, 0): [4],
             (3, 1): [16],
             (3, 2): [20],
             (4, 0): [1, 12],
             (4, 1): [4, 32, 6],
             (4, 2): [5, 18],
             (4, 3): [8, 2]})


In [70]:
# Group by the keys again and sum the values to get the final result which is the multiplication of m and n
ans = list()
for k, v in op.items():
    ans.append([k[0], k[1], sum(v)])
pprint(sorted(ans))

[[0, 0, 1],
 [0, 1, 12],
 [0, 2, 5],
 [0, 3, 2],
 [1, 0, 33],
 [1, 1, 27],
 [1, 2, 60],
 [1, 3, 5],
 [2, 1, 28],
 [2, 3, 7],
 [3, 0, 4],
 [3, 1, 16],
 [3, 2, 20],
 [4, 0, 13],
 [4, 1, 42],
 [4, 2, 23],
 [4, 3, 10]]


Above, you saw how to code matrix calculation using just usual numpy and pythonic code (list comprehension). Next using this knowledge, try to convert it to PySpark code. 

## Task

In [111]:
# CODE HERE
m = np.matrix(
    [ 
        [1, 2, 0],
        [3, 0, 5],
        [0, 7, 0],
        [4, 0, 0],
        [1, 8, 2]
    ]
)

n = np.matrix(
    [
        [1, 4, 5, 0],
        [0, 4, 0, 1],
        [6, 3, 9, 1],
    ]
)
shape_m = m.shape
shape_n = n.shape

m_n = m * n



# Representing matrices in the form of i, j , m[i][j] and i, j, n[i][j]
M = [(i, j, m[i,j]) for i in range(m.shape[0]) for j in range(m.shape[1]) if m[i,j] != 0]

N = [(i, j, n[i,j]) for i in range(n.shape[0]) for j in range(n.shape[1]) if n[i, j] != 0]

print('M')
pprint(M)
print('N')
pprint(N)

# Without the 0s

# [Row, Coulumn, Value]

M
[(0, 0, 1),
 (0, 1, 2),
 (1, 0, 3),
 (1, 2, 5),
 (2, 1, 7),
 (3, 0, 4),
 (4, 0, 1),
 (4, 1, 8),
 (4, 2, 2)]
N
[(0, 0, 1),
 (0, 1, 4),
 (0, 2, 5),
 (1, 1, 4),
 (1, 3, 1),
 (2, 0, 6),
 (2, 1, 3),
 (2, 2, 9),
 (2, 3, 1)]


In [112]:
M = sc.parallelize(M)
N = sc.parallelize(N)

In [113]:
M.map(lambda x: (x[1], (x[0], x[2], 'M'))).collect()

[(0, (0, 1, 'M')),
 (1, (0, 2, 'M')),
 (0, (1, 3, 'M')),
 (2, (1, 5, 'M')),
 (1, (2, 7, 'M')),
 (0, (3, 4, 'M')),
 (0, (4, 1, 'M')),
 (1, (4, 8, 'M')),
 (2, (4, 2, 'M'))]

In [114]:
print('M')
NewM = M.map(lambda x: (x[1], (x[0], x[2], 'M'))).groupByKey().mapValues(lambda x: list(x))
pprint(NewM.collect())
print('N')
NewN = N.map(lambda x: (x[0], (x[1], x[2], 'N'))).groupByKey().mapValues(lambda x: list(x))
pprint(NewN.collect())


M
[(0, [(0, 1, 'M'), (1, 3, 'M'), (3, 4, 'M'), (4, 1, 'M')]),
 (1, [(0, 2, 'M'), (2, 7, 'M'), (4, 8, 'M')]),
 (2, [(1, 5, 'M'), (4, 2, 'M')])]
N
[(0, [(0, 1, 'N'), (1, 4, 'N'), (2, 5, 'N')]),
 (1, [(1, 4, 'N'), (3, 1, 'N')]),
 (2, [(0, 6, 'N'), (1, 3, 'N'), (2, 9, 'N'), (3, 1, 'N')])]


In [115]:
grouped = (NewM + NewN).reduceByKey(lambda x, y: (x + y))
grouped.collect()

[(0,
  [(0, 1, 'M'),
   (1, 3, 'M'),
   (3, 4, 'M'),
   (4, 1, 'M'),
   (0, 1, 'N'),
   (1, 4, 'N'),
   (2, 5, 'N')]),
 (1, [(0, 2, 'M'), (2, 7, 'M'), (4, 8, 'M'), (1, 4, 'N'), (3, 1, 'N')]),
 (2,
  [(1, 5, 'M'),
   (4, 2, 'M'),
   (0, 6, 'N'),
   (1, 3, 'N'),
   (2, 9, 'N'),
   (3, 1, 'N')])]

In [116]:
def multiplyMatrix(row):
#     MTable, NTable = [], []
#     for e in row:
#         if e[2] == "M":
    return [(100,200),(200,300),(400,500)]
       
    
# left, right = [], []
# for row in seq:
# if row[0] == 'T1':
#   left.append(row)
# elif row[0] == 'T2':
#   right.append(row)

# return [(v, w) for v in left for w in right]

In [117]:
grouped.map(lambda x: multiplyMatrix(x[1])).collect()

[[(100, 200), (200, 300), (400, 500)],
 [(100, 200), (200, 300), (400, 500)],
 [(100, 200), (200, 300), (400, 500)]]

In [100]:
pprint((NewM + NewN).collect())
pprint((NewM + NewN).map(lambda x: (x[0], list(x[1][0]))).collect())
# .collect()# .reduceByKey(lambda x, y: x + y).collect()

[(0, [(0, 1), (1, 3), (3, 4), (4, 1)]),
 (0, [(0, 1), (1, 4), (2, 5)]),
 (1, [(0, 2), (2, 7), (4, 8)]),
 (1, [(1, 4), (3, 1)]),
 (2, [(1, 5), (4, 2)]),
 (2, [(0, 6), (1, 3), (2, 9), (3, 1)])]
[(0, [0, 1]), (0, [0, 1]), (1, [0, 2]), (1, [1, 4]), (2, [1, 5]), (2, [0, 6])]


Stop all previous sessions/contexts

In [None]:
# code here
spark.stop()

Create spark session

In [None]:
# code here 
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[1]").appName("Matrix-Multiplication").getOrCreate()
sc = spark.sparkContext

[Matrix Multiplication Example](https://raw.githubusercontent.com/tnurbek/ds702/main/Lab2/1_Go9f2bN64Y4wrFLhd0Cl3w.png)

In [None]:
matrix1 = [[1, 2, 0], [3, 0, 5], [0, 7, 0], [4, 0, 0], [1, 8, 2]]
matrix2 = [[1, 4, 5, 0], [0, 4, 0, 1], [6, 3, 9, 1]]

matrix1 = [[i, j, matrix1[i][j]] for i in range(len(matrix1)) for j in range(len(matrix1[0])) if matrix1[i][j] != 0]


In [None]:
matrix1 = sc.parallelize(matrix1)

In [None]:
matrix1.map(lambda x: ((x[0], x[1]), x[2])).collect()

[((0, 0), 1),
 ((0, 1), 2),
 ((1, 0), 3),
 ((1, 2), 5),
 ((2, 1), 7),
 ((3, 0), 4),
 ((4, 0), 1),
 ((4, 1), 8),
 ((4, 2), 2)]

In [None]:
# code here...