###### Marvel-graph.txt:
```txt
4395 2237 1767 472.....
```
A hero may span multiple lines!

###### Marvel-names.txt
```txt
5300 'SPENCER, TRACY'
5301 'SPERZEL, ANTON'
...
```

# Most Popular Superhero: STRATEGY
- Map input data to (heroID, number of co-occurrences) per line
- Add up co-occurrence by heroID using reduceByKey()
- Flip(map) RDD to (number, heroID) to so we can
- Use max() on the RDD to find the hero with the most cooccurrences
- Look up the name of the winner and display the result

In [1]:
from pyspark import SparkConf, SparkContext

In [2]:
conf = SparkConf().setMaster('local').setAppName('PopularHero')
sc = SparkContext(conf=conf)

In [3]:
def count_cooccurences(line):
    elements = line.split()
    return (int(elements[0]), len(elements) - 1)

In [4]:
def parseName(line):
    fields = line.split('\"')
    return (int(fields[0]), fields[1].encode('utf8'))

In [5]:
names = sc.textFile('Marvel-Names.txt')
names_RDD = names.map(parseName)

In [6]:
lines = sc.textFile('Marvel-Graph.txt')
pairings = lines.map(count_cooccurences)
total_friends_by_character = pairings.reduceByKey(lambda x,y: x+ y)
flipped = total_friends_by_character.map(lambda x: (x[1],x[0]))

In [7]:
most_popular = flipped.max()
most_popular_name = names_RDD.lookup(most_popular[1])[0]

In [8]:
print (most_popular_name , " is the most popular superhero, with " ,str(most_popular[0]) , " co-appearances.")

b'CAPTAIN AMERICA'  is the most popular superhero, with  1933  co-appearances.


# Superhero Degrees of Separation: BFS

## Implementing BFS IN SPARK
- Represent each line as a node with connections, a color, and a distance.
```
Fof example:
5983 1165 3836 4361 1282

becomes
(5983, (1165, 3836, 4361, 1282), 999, WHITE)

- Our initial condition is that a node is infinitely distant (9999) and white

```python
def concvert_to_bfs(line):
    fields = line.split()
    hero_id = int(fields[0])
    connections = []
    for connection in fields[1:]:
        connectons.append(int(connection))
    color = 'WHITE'
    distance = 9999
    
    if hero_id == start_character_id:
        color = 'GRAY'
    distance = 0
    return (hero_id, (connections, distance, color))
```

### Iterratively Process THE RDD
- Just like each step of our BFS example
- Go through, looking for gray nodes to expand
- Color nodes we're done with black
- Update the distances as we go

###### The Mapper:
```
- Creates new nodes for each connection of gray nodes, with a distance incremented by one,
Color gray, and no conections

- Colors the gray node we just processed black
- Copies the node itself into the results
```

###### The Reducer:
- Combines together all nodes for the same hero ID
- Preserves the shortest distance, and the darkest color found
- Preserves the list of connections from the original node

# How DO WE KNOW WHEN WE'RE DONE?
- An accumulator allows many executors to increment a shared variable

For examples:
```python
hitCounter = sc.accumulator(0)
```
sets up a shared accumulator with an initial value of 0.

- For each iteration, if the character we're interested in is hit, we increment the hitCounter accumulator
- After each iteration, we check if hitCounter is greater than one, if so, we're done.

In [18]:
# The characters we wish to find the degree of separation between:
startCharacterID = 5306 #SpiderMan
targetCharacterID = 14  #ADAM 3,031 (who?)

In [19]:
# Our accumulator, used to signal when we find the target character during
# our BFS traversal.
hitCounter = sc.accumulator(0)

In [20]:
def convertToBFS(line):
    fields = line.split()
    heroID = int(fields[0])
    connections = []
    for connection in fields[1:]:
        connections.append(int(connection))

    color = 'WHITE'
    distance = 9999

    if (heroID == startCharacterID):
        color = 'GRAY'
        distance = 0

    return (heroID, (connections, distance, color))

In [22]:
# Creating RDD
def createStartingRdd():
    inputFile = sc.textFile("Marvel-Graph.txt")
    return inputFile.map(convertToBFS)

In [23]:
def bfsMap(node):
    characterID = node[0]
    data = node[1]
    connections = data[0]
    distance = data[1]
    color = data[2]

    results = []

    #If this node needs to be expanded...
    if (color == 'GRAY'):
        for connection in connections:
            newCharacterID = connection
            newDistance = distance + 1
            newColor = 'GRAY'
            if (targetCharacterID == connection):
                hitCounter.add(1)

            newEntry = (newCharacterID, ([], newDistance, newColor))
            results.append(newEntry)

        #We've processed this node, so color it black
        color = 'BLACK'

    #Emit the input node so we don't lose it.
    results.append( (characterID, (connections, distance, color)) )
    return results

In [24]:
def bfsReduce(data1, data2):
    edges1 = data1[0]
    edges2 = data2[0]
    distance1 = data1[1]
    distance2 = data2[1]
    color1 = data1[2]
    color2 = data2[2]

    distance = 9999
    color = color1
    edges = []

    # See if one is the original node with its connections.
    # If so preserve them.
    if (len(edges1) > 0):
        edges.extend(edges1)
    if (len(edges2) > 0):
        edges.extend(edges2)

    # Preserve minimum distance
    if (distance1 < distance):
        distance = distance1

    if (distance2 < distance):
        distance = distance2

    # Preserve darkest color
    if (color1 == 'WHITE' and (color2 == 'GRAY' or color2 == 'BLACK')):
        color = color2

    if (color1 == 'GRAY' and color2 == 'BLACK'):
        color = color2

    if (color2 == 'WHITE' and (color1 == 'GRAY' or color1 == 'BLACK')):
        color = color1

    if (color2 == 'GRAY' and color1 == 'BLACK'):
        color = color1

    return (edges, distance, color)

In [25]:
# Main program
iterationRdd = createStartingRdd()

In [26]:
for iteration in range(0, 10):
    print("Running BFS iteration# " + str(iteration+1))

    # Create new vertices as needed to darken or reduce distances in the
    # reduce stage. If we encounter the node we're looking for as a GRAY
    # node, increment our accumulator to signal that we're done.
    mapped = iterationRdd.flatMap(bfsMap)

    # Note that mapped.count() action here forces the RDD to be evaluated, and
    # that's the only reason our accumulator is actually updated.
    print("Processing " + str(mapped.count()) + " values.")

    if (hitCounter.value > 0):
        print("Hit the target character! From " + str(hitCounter.value) \
            + " different direction(s).")
        break

    # Reducer combines data for each character ID, preserving the darkest
    # color and shortest path.
    iterationRdd = mapped.reduceByKey(bfsReduce)

Running BFS iteration# 1
Processing 8330 values.
Running BFS iteration# 2
Processing 220615 values.
Hit the target character! From 1 different direction(s).
