In [3]:
from pyspark import SparkConf, SparkContext

In [4]:
conf = SparkConf().setMaster("local").setAppName("hellow_9313")
sc = SparkContext(conf = conf)

In [22]:
# simple map phase
data = sc.parallelize(["hello", "world", "hello", "world", "word", "count", "hello"])
rdd = data.map(lambda w: (w, 1))
rdd.collect()

[('hello', 1),
 ('world', 1),
 ('hello', 1),
 ('world', 1),
 ('word', 1),
 ('count', 1),
 ('hello', 1)]

## CombineByKey

### Example 1

Key and value are corresponding to **word** and **count**. Write a *combineByKey* function that computes the total number of each word.

In [None]:
# ans

In [46]:
# sol
def combiner(v):
    return v
def mergeValue(c, v):
    return c + v
def mergeCombiner(c1, c2):
    return c1.extend(c2)
combine = rdd.combineByKey(combiner, mergeValue, mergeCombiner)
combine.collect()

[('hello', 3), ('world', 2), ('word', 1), ('count', 1)]

### Example 2

Key and value are corresponding to **word** and **score**. Write a *combineByKey* function that computes the maximum score for each word.

In [None]:
# ans

In [54]:
# sol
data1 = sc.parallelize([("A",1),("A",10),("B",1),("B",2),("A",4),("A",6),("C",1),("C",1),("C",1),("C",1),("C",1),("C",1)] )
data1.combineByKey(lambda v: v, lambda c, v: max(c, v), lambda c1, c2: max(c1, c2)).collect()

[('A', 10), ('B', 2), ('C', 1)]

### Example 3

Key and value are corresponding to **student ID** and **score**. Write a *combineByKey* function that computes the **sum** and **count**.

In [None]:
# ans

In [64]:
# sol
data2 = sc.parallelize([("1", 80), ("1", 50), ("1", 65), ("2", 10), ("2", 20)])
data2.combineByKey(lambda v: (v, 1), lambda c, v: (c[0]+v, c[1]+1), lambda c1, c2: (c1[0]+c2[0], c1[1]+c2[1])).collect()

[('1', (195, 3)), ('2', (30, 2))]

---
## Merge

### Union

In [9]:
a1 = sc.parallelize([("A1", 1), ("A2", 2)])
b1 = sc.parallelize([("A1", 1), ("A2", 2)])
a1.union(b1).collect()

[('A1', 1), ('A2', 2), ('A1', 1), ('A2', 2)]

### Zip

In [10]:
a2 = sc.parallelize([("A1", 1), ("A2", 2)])
b2 = sc.parallelize([("B1", 1), ("B2", 2)])
a2.zip(b2).collect()

[(('A1', 1), ('B1', 1)), (('A2', 2), ('B2', 2))]

### Join

In [12]:
a3 = sc.parallelize([("A", 1), ("B", 2)])
b3 = sc.parallelize([("B", 3), ("C", 4)])
a3.join(b3).collect()

[('B', (2, 3))]

In [23]:
# a.leftOuterJoin(b)
# case 1: both in a and b
a31 = sc.parallelize([("A", 1), ("B", 2)])
b31 = sc.parallelize([("A", 3), ("B", 4)])
print("case 1:", a31.leftOuterJoin(b31).collect())
# case 2: in a but not b
a32 = sc.parallelize([("A", 1), ("B", 2)])
b32 = sc.parallelize([("B", 4)])
print("case 2:", a32.leftOuterJoin(b32).collect())
# case 3: in b but not a
a33 = sc.parallelize([("A", 1)])
b33 = sc.parallelize([("A", 3), ("B", 4)])
print("case 3:", a33.leftOuterJoin(b33).collect())

case 1: [('A', (1, 3)), ('B', (2, 4))]
case 2: [('A', (1, None)), ('B', (2, 4))]
case 3: [('A', (1, 3))]


In [27]:
print("full outer join:", a3.fullOuterJoin(b3).collect())
print("left outer joni:", a3.leftOuterJoin(b3).collect())
print("right outer join:", a3.rightOuterJoin(b3).collect())

full outer join: [('C', (None, 4)), ('A', (1, None)), ('B', (2, 3))]
left outer joni: [('A', (1, None)), ('B', (2, 3))]
right outer join: [('C', (None, 4)), ('B', (2, 3))]


---
## Co-occurance Matrix

In [32]:
document = ["A boy can do anything for a girl"]
rdd = sc.parallelize(document)

def stripe(x):
    """
    x: input sentences
    return: output neighbour with distance of 3
    """
    return x.split()[1]
rdd.map(stripe).collect()

['boy']

In [69]:
rdd = sc.parallelize([(1, "A boy can do anything a girl"),
                      (2, "The car turned the corner"),
                      (3, "She opened the door")])
def parse(x):
    seen = {}
    tf_list = []
    # strip period after the sentence if possible
    for term in x[1].lower().split():  # all terms are in lowercase
        seen[term] = seen.get(term, 0) + 1 if term in seen else 1
    return [(t, [x[0], seen[t]]) for t in seen]
rdd.flatMap(parse).collect()

[('a', [1, 2]),
 ('boy', [1, 1]),
 ('can', [1, 1]),
 ('do', [1, 1]),
 ('anything', [1, 1]),
 ('girl', [1, 1]),
 ('the', [2, 2]),
 ('car', [2, 1]),
 ('turned', [2, 1]),
 ('corner', [2, 1]),
 ('she', [3, 1]),
 ('opened', [3, 1]),
 ('the', [3, 1]),
 ('door', [3, 1])]