# Shortest path algorithm

### conventional approach

In [103]:
import sys 
import pandas as pd

class SPT():
    # The graph will be presented as Adjacency Matrix
    def __init__(self, Num_nodes, Name_nodes):
        self.names = Name_nodes  
        self.N = Num_nodes
        self.matrix = [[0 for column in range(Num_nodes)] for row in range(Num_nodes)]
        
    def minDist(self, dist_from_source, visited):
        Min = sys.maxsize # Initilaize minimum distance for next node
        
        # Search for the smallest distance and not yet vistitd
        for n in range(self.N):
            if dist_from_source[n] < Min and visited[n] == False:
                Min = dist_from_source[n]
                current = n
        return current
        
    
    def shortest_path(self, source):
        # initiate distance list with all inf, source =0 at the beginning
        dist_from_source = [sys.maxsize] * self.N 
        dist_from_source[source] = 0
        
        #initiate visited nodes list
        visited = [False] * self.N
        
        for node in range(self.N):
            current_n = self.minDist(dist_from_source, visited)
            
            # mark the node as visited
            visited[current_n] = True
            
            for node in range(self.N):
                # if there is a connection and the distance from source node smaller from the one already found and 
                # the node is not visited yet, update the distance
                if self.matrix[current_n][node] > 0 and visited[node] == False and\
                dist_from_source[node] > self.matrix[current_n][node] + dist_from_source[current_n] :
                    dist_from_source[node]= self.matrix[current_n][node] + dist_from_source[current_n]  
        
        df = pd.DataFrame(dist_from_source, index = self.names, columns = ['Shortest distance'])
        print(df)
        
    

In [105]:
g = SPT(5,['A','B','C','D','E']) 
g.matrix = [[0, 6, 0, 1, 0,], 
        [6, 0, 5, 2, 2], 
        [0, 5, 0, 0, 5], 
        [1, 2, 0, 0, 1], 
        [0, 2, 5, 1, 0] 
        ]; 
  
g.shortest_path(0)

   Shortest distance
A                  0
B                  3
C                  7
D                  1
E                  2


### MapReduce Frame 
* links file is the Adjacency list representation of directed graph

In [None]:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAppName('appName').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

In [112]:
class SPT():
    def __init__(self, source_node):
        # Importing all files in Q4 folder and create key/value pairs,(source_node,[(node1,dist),(node2,dist),...])
        # and persist it to reduce shuffling through network
        self.graph = sc.textFile('links.txt').map(lambda x: (x.split(' ')[0], x.split(' ')[1:])) \
                        .mapValues(lambda x: [(x[i], x[i+1]) for i in range(0, len(x), 2)]).persist()
        # Define source node
        self.src = source_node
        # Create distance RDD, initially 0 for the source and inf to other nodes
        # Also persist it.
        self.dist = self.graph.map(lambda x: (x[0], float('inf')) if x[0] != source_node else (x[0], 0))
        
    def shortest_path(self):
        #use accumulator to check the termination condition
#         acc = sc.accumulator(0)
#         prev=0
        
        for i in range(20):
            # Finding the shortest distance between each node and the source:
            update_dist = self.graph.join(self.dist).flatMap(lambda x: [(x[1][0][i][0] , int(x[1][0][i][1]) + x[1][1]) \
                                                                        for i in range(len(x[1][0]))] \
                                                             if x[1][1] != float('inf') else []) \
                                                    .reduceByKey(lambda x,y: min(x,y))
            
            # Update the distance if it's shorter than the one found in the previous iteration:
            self.dist = self.dist.leftOuterJoin(update_dist).mapValues(lambda x: x[0] if x[1] == None else min(x[0],x[1]))
            
            
            
#             # Checking if there are nodes we did not visit yet
#             self.dist.foreach(lambda node: acc.add(1) if node[1] == float('inf') else None)
            
#             # If We visited all nodes terminate
#             if acc.value == prev:
#                 break
#             else:
#                 prev = acc.value
            print('iteration: {} \n{}'.format(i,self.dist.collect()))
    
t = SPT(source_node ="")
t.shortest_path()

True

### You are given a directed graph with the (positive) weighted edges. The graph is represented in an adjacency list format in the input file. In each line of the input file, the first entry is a node in the graph. The subsequent entries in the line represent the edges and the edge lengths (weights). Each pair of values is essentially the end point of the directed edge and the length of the edge. As an example, the following input data represents a graph with 3 nodes (A, B,C). There is an edge from A to B with a length of 1 and an edge from A to C with a length of 4. Similarly, there is an edge from B to A with a length of 3 and an edge from C to B with a length of 2.
### A B 1 C 4
### B A 3
### C B 2
### Given a node as an input, write a Spark code that finds all nodes within the distance of 10 (ten) to this node (You can write a function that takes the starting node as an input parameter or you can use a variable in your code that represents the starting node). Essentially, you need to calculate shortest distance to other nodes, starting from the node that is given by the user. And output the nodes that have the (shortest) distance of 10 or less to this node. Use directedgraph* files provided to you as your dataset.

In [None]:
class SPT():
    def __init__(self, source_node):
        # Importing all files in Q4 folder and create key/value pairs,(source_node,[(node1,dist),(node2,dist),...])
        # and persist it to reduce shuffling through network
        self.graph = sc.wholeTextFiles(r"C:\Users\taylankabbani2019\Downloads\Q4\*") \
                        .flatMap(lambda x: x[1].split(' \n')).map(lambda x: (x.split(' ')[0], x.split(' ')[1:])) \
                        .mapValues(lambda x: [(x[i], x[i+1]) for i in range(0, len(x), 2)]).persist()
        # Define source node
        self.src = source_node
        # Create distance RDD, initially 0 for the source and inf to other nodes
        # Also persist it.
        self.dist = self.graph.map(lambda x: (x[0], float('inf')) if x[0] != source_node else (x[0], 0))
        
    def shortest_path(self):
        #use accumulator to check the termination condition
        acc = sc.accumulator(0)
        while True:
            # Finding the shortest distance between each node and the source:
            update_dist = self.graph.join(self.dist).flatMap(lambda x: [(x[1][0][i][0] , int(x[1][0][i][1]) + x[1][1]) \
                                                                        for i in range(len(x[1][0]))] \
                                                             if x[1][1] != float('inf') else []) \
                                                    .reduceByKey(lambda x,y: min(x,y))
            
            # Update the distance if it's shorter than the one found in the previous iteration:
            self.dist = self.dist.leftOuterJoin(update_dist).mapValues(lambda x: x[0] if x[1] == None else min(x[0],x[1]))
            
            # Checking if there are nodes we did not visit yet
            self.dist.foreach(lambda node: acc.add(1) if node[1] == float('inf') else None)
            
            # If We visited all nodes terminate
            if acc.value == 0:
                break
            else:
                acc.value = 0
        return self.dist
    def col(self):
        print(self.dist.take(5))
        
t = SPT('node5089')
t.shortest_path()