# Ecole polytechnique 



## BIG DATA FOR BUISNESS : DATABASE MANAGEMENT


### _Single source shortest path based on Djikstra Algorithm_


#### *Souhail El Aissaoui - Hassan Lantry*

# Introduction

<font color=blue> _The task is to find shortest paths from a source node to all other nodes in the graph_<font>
    
<font color = black> This problem is solved by the Dijkstra’s algorithm, which is sequential.The project has a double purpose.<br>
    
First get familiar with Dijkstra’s algorithm, then devise a MapReduce version of the algorithm. <br>

As you will realise, the process is actually iterative, so the
identified MapReduce job must be iterated a certain number of times.
We will do so by implementing in both Python Hadoop and Spark via a pyspark ecosystem 

Optional: perform scalability experiments as for previous projects. A single comparison on a reasonable big graph would be sufficient <font>

###  1. *__Project goal__*


The purpose of this algorithm is to find shortest path of a source node to all other nodes in a graph. This problem is solved by the Dijkstra’s algorithm. This project has double purposes. First, to get familiar with Dijkstra’s algorithm, then implement a MapReduce version of it. The algorithm is iterative, so the identified MapReduce job must be iterated several times to find the final solution. We provided a Python-Hadoop streaming and Spark (Python) implementations of the algorithm.




###  2. *__Dijkstra Algorithm__*

![title](img/dji.png)

Dijkstra's algorithm (or Dijkstra's Shortest Path First algorithm, SPF algorithm) is an algorithm for finding the shortest paths between nodes in a graph, which may represent, for example, road networks. It was conceived by computer scientist Edsger W. Dijkstra in 1956 and published three years later.

The algorithm exists in many variants; Dijkstra's original variant found the shortest path between two nodes,but a more common variant fixes a single node as the "source" node and finds shortest paths from the source to all other nodes in the graph, producing a shortest-path tree.

###  3. *__Let's Implement it in Python Hadoop__*

In [1]:
import pandas as pd
import math
df = pd.read_csv('graph.txt', delim_whitespace=True, names=('from', 'B', 'to', 'D'))
df.head()

Unnamed: 0,from,B,to,D
0,4,5247,274,274
1,578,1672,502,502
2,94,6623,359,359
3,532,2373,800,800
4,64,1007,5,5


####  3.1 *__Data preprocessing__*

In [2]:
#preprocessing:
df = df.drop(['B', 'D'], axis=1)
df = df.groupby('from')['to'].apply(list)
df2 = pd.DataFrame()
df2['from'] = df.index
df2['to'] = df.values
df2['d'] = math.inf
df2['state'] = 'WHITE'
df2['path'] = '-'
#Seeting the initial and final nodes : 
initial = 1
final = 316
df2.loc[df2['from']==initial, 'd'] = 0
df2.loc[df2['from']==initial, 'state'] = 'GRAY'
df2

file = open("preprofin.txt","w") 
for index, row in df2.iterrows():
    file.write(str(row[0]) + "\t" + str(row[1]) + "/" + str(row[2]) + "/" + str(row[3]) + "/"+ str(row[4]) + "\n")
df2.head()

Unnamed: 0,from,to,d,state,path
0,1,"[48, 2, 63, 136, 564, 73, 377, 141, 63, 4]",0.0,GRAY,-
1,2,"[1, 164, 129, 19, 9, 321, 12, 536]",inf,WHITE,-
2,4,"[274, 766, 74, 23, 762, 142, 782, 394]",inf,WHITE,-
3,5,"[51, 199, 633, 524, 79, 394]",inf,WHITE,-
4,8,"[79, 5, 134]",inf,WHITE,-


####  3.2 *__Mapper.py__*

In [None]:
#!/usr/bin/python
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    val = line.split("\t")
    words = val[1].split("/")
    key = val[0]
    words[1] = float(words[1])
    if words[2] == 'GRAY':
        toprint = "/".join([words[0], str(words[1]), "BLACK", words[3]])
        print '%s\t%s' % (key, toprint)
        if words[0] != "EMPTY":
			words[0] = eval(words[0])
			for i in words[0]:
				toprint = "/".join(["EMPTY", str(words[1] + 1), "GRAY", str(words[3])+ str(key)+ ","])
				print '%s\t%s' % (i, toprint)
    else:
        toprint = "/".join([words[0], str(words[1]), words[2], words[3]])
        print '%s\t%s' % (key, toprint)

####  3.2 *__Reducer.py__*

In [None]:
#!/usr/bin/python
import sys
#input comes from STDIN (standard input)
currentline = ""
currentdistance = float('inf')
currentneighbours = "EMPTY"
currentColor = "WHITE"
currentPath = ""
firstone = True
for line in sys.stdin:
    line = line.strip()
    val = line.split("\t")
    words = val[1].split("/")
    key = val[0]
    if words[0] != 'EMPTY':
        words[0] = eval(words[0]) 
    words[1] = float(words[1])
    if key != currentline:
        if not firstone:
            toprint = "/".join([str(currentneighbours), str(currentdistance), currentColor, currentPath])
            print '%s\t%s' % (currentline, toprint)
            currentline = key
            currentdistance = words[1]
            currentneighbours = words[0]
            currentColor = words[2]
            currentPath = words[3]
        else:
            currentline = key
            currentdistance = words[1]
            currentneighbours = words[0]
            currentColor = words[2]
            currentPath = words[3]
            firstone = False
    if currentdistance > words[1]:
        currentdistance = words[1]
        currentPath = words[3]
    if words[0] != 'EMPTY':
        currentneighbours = words[0]
    if words[2] == "BLACK":
        currentColor = words[2]
    if words[2] == "GRAY":
        if currentColor == "WHITE":
            currentColor = words[2]

toprint = "/".join([str(currentneighbours), str(currentdistance), currentColor, currentPath])
print '%s\t%s' % (currentline, toprint)

####  3.3 *Tail chain of the mapreduce job*

In [None]:
The algorithm is iterative, we have to tail chain mapreduce job to have the final result.
Here is the code used to perform the mapreduce job:


#Assuming that we are connected to the cluster:
cd Desktop/workspace/work/mr/
wget https://www.dropbox.com/s/ec4hmdsvr50qxbe/preprofin.txt 
#Here we download the preprocessed input file to be treated

hdfs dfs -mkdir /user/hadoop/mr
hdfs dfs -mkdir /user/hadoop/mr/input1
hdfs dfs -put preprofin.txt /user/hadoop/mr/input1

wget https://www.dropbox.com/s/64wlmdtq9ka8jes/mapper.py
wget https://www.dropbox.com/s/6ppdmz7oukk3hj4/reducer.py
chmod a+x reducer.py
chmod a+x mapper.py

perl -pe 's/\r$//g' < mapper.py > mapperL.py
perl -pe 's/\r$//g' < reducer.py > reducerL.py
#This step was used to transform windows file sustem to unix file system. without it, the mapreduce job was not succesfull.

chmod a+x reducerL.py
chmod a+x mapperL.py

#Code below is used to chain multiple three mapreduce jobs. 

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-input /user/hadoop/mr/input1 \
-output /user/hadoop/mr/output1 \
-file /home/hadoop/Desktop/workspace/work/mr/mapperL.py \
-mapper /home/hadoop/Desktop/workspace/work/mr/mapperL.py \
-file /home/hadoop/Desktop/workspace/work/mr/reducerL.py \
-reducer /home/hadoop/Desktop/workspace/work/mr/reducerL.py

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-input /user/hadoop/mr/output1 \
-output /user/hadoop/mr/output2 \
-file /home/hadoop/Desktop/workspace/work/mr/mapperL.py \
-mapper /home/hadoop/Desktop/workspace/work/mr/mapperL.py \
-file /home/hadoop/Desktop/workspace/work/mr/reducerL.py \
-reducer /home/hadoop/Desktop/workspace/work/mr/reducerL.py

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-input /user/hadoop/mr/output2 \
-output /user/hadoop/mr/output3 \
-file /home/hadoop/Desktop/workspace/work/mr/mapperL.py \
-mapper /home/hadoop/Desktop/workspace/work/mr/mapperL.py \
-file /home/hadoop/Desktop/workspace/work/mr/reducerL.py \
-reducer /home/hadoop/Desktop/workspace/work/mr/reducerL.py



### 3.4 Implementation

In [None]:
#graycount.py

import sys

inp = sys.argv[1]
r = open(inp, "r")
lines = r.readlines()
nb = 0
for line in lines:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    val = line.split("\t")
    words = val[1].split("/")
    
    if words[2] == 'GRAY':
        nb += 1
print(nb)

In [None]:
Mapreduce job execution:
    
19/02/24 15:08:58 INFO mapreduce.Job: Job job_1551012986990_0017 completed successfully
19/02/24 15:08:58 INFO mapreduce.Job: Counters: 51
        File System Counters
                FILE: Number of bytes read=7642
                FILE: Number of bytes written=3033178
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=102096
                HDFS: Number of bytes written=11361
                HDFS: Number of read operations=69
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=14
        Job Counters
                Killed map tasks=1
                Launched map tasks=16
                Launched reduce tasks=7
                Data-local map tasks=8
                Rack-local map tasks=8
                Total time spent by all maps in occupied slots (ms)=7778400
                Total time spent by all reduces in occupied slots (ms)=3998304
                Total time spent by all map tasks (ms)=162050
                Total time spent by all reduce tasks (ms)=41649
                Total vcore-milliseconds taken by all map tasks=162050
                Total vcore-milliseconds taken by all reduce tasks=41649
                Total megabyte-milliseconds taken by all map tasks=248908800
                Total megabyte-milliseconds taken by all reduce tasks=127945728
        Map-Reduce Framework
                Map input records=393
                Map output records=403
                Map output bytes=11574
                Map output materialized bytes=12014
                Input split bytes=2272
                Combine input records=0
                Combine output records=0
                Reduce input groups=393
                Reduce shuffle bytes=12014
                Reduce input records=403
                Reduce output records=393
                Spilled Records=806
                Shuffled Maps =112
                Failed Shuffles=0
                Merged Map outputs=112
                GC time elapsed (ms)=4112
                CPU time spent (ms)=19740
                Physical memory (bytes) snapshot=7867559936
                Virtual memory (bytes) snapshot=84767440896
                Total committed heap usage (bytes)=7212630016
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=99824
        File Output Format Counters
                Bytes Written=11361
            
#SECOND RUN:

19/03/01 21:57:40 INFO mapreduce.Job: Job job_1551476123970_0002 completed successfully
19/03/01 21:57:40 INFO mapreduce.Job: Counters: 51
        File System Counters
                FILE: Number of bytes read=8076
                FILE: Number of bytes written=3425652
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=21942
                HDFS: Number of bytes written=11558
                HDFS: Number of read operations=78
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=14
        Job Counters
                Killed map tasks=1
                Launched map tasks=19
                Launched reduce tasks=7
                Data-local map tasks=16
                Rack-local map tasks=3
                Total time spent by all maps in occupied slots (ms)=8418768
                Total time spent by all reduces in occupied slots (ms)=5070336
                Total time spent by all map tasks (ms)=175391
                Total time spent by all reduce tasks (ms)=52816
                Total vcore-milliseconds taken by all map tasks=175391
                Total vcore-milliseconds taken by all reduce tasks=52816
                Total megabyte-milliseconds taken by all map tasks=269400576
                Total megabyte-milliseconds taken by all reduce tasks=162250752
        Map-Reduce Framework
                Map input records=393
                Map output records=434
                Map output bytes=12422
                Map output materialized bytes=10828
                Input split bytes=2660
                Combine input records=0
                Combine output records=0
                Reduce input groups=395
                Reduce shuffle bytes=10828
                Reduce input records=434
                Reduce output records=395
                Spilled Records=868
                Shuffled Maps =133
                Failed Shuffles=0
                Merged Map outputs=133
                GC time elapsed (ms)=4402
                CPU time spent (ms)=21330
                Physical memory (bytes) snapshot=9021509632
                Virtual memory (bytes) snapshot=94561951744
                Total committed heap usage (bytes)=8306819072
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=19282
        File Output Format Counters
                Bytes Written=11558

### 3.5 Local simulation of the job

In [5]:
def mapper(inp, out):
    r = open(inp, "r")
    lines = r.readlines()
    file = open(out,"w") 
    
    for line in lines:
        # remove leading and trailing whitespace
        line = line.strip()
        # split the line into words
        val = line.split("\t")
        words = val[1].split("/")
        key = val[0]
        words[1] = float(words[1])
        if words[2] == 'GRAY':
            toprint = "/".join([words[0], str(words[1]), "BLACK", words[3]])
            file.write(key + "\t" + toprint + "\n")
            if words[0] != "EMPTY":
                words[0] = eval(words[0])
                for i in words[0]:
                    toprint = "/".join(["EMPTY", str(words[1] + 1), "GRAY", str(words[3])+ str(key)+ ","])
                    file.write(str(i) + "\t" + toprint + "\n")
        else:
            toprint = "/".join([words[0], str(words[1]), words[2], words[3]])
            file.write(key + "\t" + toprint + "\n")

In [6]:
def reducer(inp, out):
    r = open(inp, "r")
    lines = r.readlines()
    file = open(out,"w") 
        
    currentline = ""
    currentdistance = float('inf')
    currentneighbours = "EMPTY"
    currentColor = "WHITE"
    currentPath = ""
    firstone = True
    for line in lines:
        line = line.strip()
        val = line.split("\t")
        words = val[1].split("/")
        key = val[0]
        if words[0] != 'EMPTY':
            words[0] = eval(words[0]) 
        words[1] = float(words[1])
        if key != currentline:
            if not firstone:
                toprint = "/".join([str(currentneighbours), str(currentdistance), currentColor, currentPath])
                file.write(currentline + "\t" + toprint + "\n")
                currentline = key
                currentdistance = words[1]
                currentneighbours = words[0]
                currentColor = words[2]
                currentPath = words[3]
            else:
                currentline = key
                currentdistance = words[1]
                currentneighbours = words[0]
                currentColor = words[2]
                currentPath = words[3]
                firstone = False
        if currentdistance > words[1]:
            currentdistance = words[1]
            currentPath = words[3]
        if words[0] != 'EMPTY':
            currentneighbours = words[0]
        if words[2] == "BLACK":
            currentColor = words[2]
        if words[2] == "GRAY":
            if currentColor == "WHITE":
                currentColor = words[2]

    toprint = "/".join([str(currentneighbours), str(currentdistance), currentColor, currentPath])
    file.write(currentline + "\t" + toprint + "\n")

In [7]:
#graycount.py
def graycount(inp):
    r = open(inp, "r")
    lines = r.readlines()
    nb = 0
    for line in lines:
        # remove leading and trailing whitespace
        line = line.strip()
        # split the line into words
        val = line.split("\t")
        words = val[1].split("/")

        if words[2] == 'GRAY':
            nb += 1
    print(nb)
    return(nb)

In [8]:
#shufflesort
import pandas as pd
def SS(inp, out):
    df = pd.read_csv(inp, sep="\t", names=('from', 'info'))
    df = df.sort_values(by=['from'])
    df.to_csv(out, index=False, header= False, sep = "\t")

In [10]:
#first job:

mapper("preprofin.txt", "map1.txt")
SS("map1.txt", "SS1.txt")
reducer("SS1.txt", "out1.txt")
graycount("out1.txt")

#iteration until no gray line lasts

i = 1
while graycount("out" + str(i) + ".txt") != 0:
    mapper("out" + str(i) + ".txt", "map" + str(i +1) + ".txt")
    SS("map" + str(i+1) + ".txt", "SS" + str(i+1) + ".txt")
    reducer("SS" + str(i+1) + ".txt", "out" + str(i+1) + ".txt")
    i += 1
    
print("Number of jobs needed to obtain final results: ")
print(i)

9
9
9
36
67
77
70
31
15
4
1
0
Number of jobs needed to obtain final results: 
10


### 3.6 MapReduce job Results

In [15]:
r = open("finalOUTPUT.txt", "r")
lines = r.readlines()
for l in lines[1:10]:
    print(l)


112     [547, 110]/inf/WHITE/-

119     [19]/4.0/BLACK/-1,2,164,195,

126     [445, 189, 196, 151, 251, 657, 133]/3.0/BLACK/-1,141,16,

133     [141, 32, 379, 136, 8]/2.0/BLACK/-1,377,

16      [54, 5, 66, 1, 516, 81, 35, 126, 269, 78, 766, 1, 547, 20]/2.0/BLACK/-1,141,

161     [157, 158, 629]/5.0/BLACK/-1,136,26,286,348,

168     [51]/7.0/BLACK/-1,73,449,567,219,86,226,

175     [214, 422]/4.0/BLACK/-1,136,26,203,

182     [619]/5.0/BLACK/-1,2,9,352,105,



###  4. *__Let's Implement it in Spark__*

####  4.1 *__Spark Implementation__*

After implementing in Hadoop we create a script in Spark that follows a pattern following the same logic, a loop with a breaking condition when the distance at every node no longer change at the next step. 

In [None]:
textFile = sc.textFile("preprofin.txt")

count = sc.accumulator(0)

def customSplitNodesTextFile(node):
	if len(node.split(' ')) < 3:
		nid, distance = node.split(' ')
		neighbors = None
	else:
		nid, distance, neighbors = node.split(' ')
		neighbors = neighbors.split(':')
		neighbors = neighbors[:len(neighbors) - 1]
	path = nid
	return (nid , (int(distance), neighbors, path))

def customSplitNodesIterative(node):
	nid = node[0]
	distance = node[1][0]
	neighbors = node[1][1]
	path = node[1][2]
	elements = path.split('->')
	if elements[len(elements) - 1] != nid:
		path = path + '->' + nid;
	return (nid , (int(distance), neighbors, path))

def customSplitNeighbor(parentPath, parentDistance, neighbor):
	if neighbor!=None:
		nid, distance = neighbor.split(',')
		distance = parentDistance + int(distance)
		path = parentPath + '->' + nid
		return (nid, (int(distance), 'None', path))

def minDistance(nodeValue1, nodeValue2):
	neighbors = None
	distance = 0
	path = ''
	if nodeValue1[1] != 'None':
		neighbors = nodeValue1[1]
	else:
		neighbors = nodeValue2[1]
	dist1 = nodeValue1[0]
	dist2 = nodeValue2[0]
	if dist1 <= dist2:
		distance = dist1
		path = nodeValue1[2]
	else:
		count.add(1);
		distance = dist2
		path = nodeValue2[2]
	return (distance, neighbors, path)

def formatResult(node):
	nid = node[0]
	minDistance = node[1][0]
	path = node[1][2]
	return nid, minDistance, path

nodes = textFile.map(lambda node: customSplitNodesTextFile(node))

oldCount = 0
iterations = 0
while True:
	iterations += 1
	nodesValues = nodes.map(lambda x: x[1])
	neighbors = nodesValues.filter(lambda nodeDataFilter: nodeDataFilter[1]!=None).map(
		lambda nodeData: map(
			lambda neighbor: customSplitNeighbor(
				nodeData[2], nodeData[0], neighbor
			), nodeData[1]
		)
	).flatMap(lambda x: x)
	mapper = nodes.union(neighbors)
	reducer = mapper.reduceByKey(lambda x, y: minDistance(x, y))
	nodes = reducer.map(lambda node: customSplitNodesIterative(node))
	nodes.count() # We call the count to execute all the RDD transformations
	if oldCount == count.value:
		break
	oldCount=count.value

print('Finished after: ' + str(iterations) + ' iterations')

