# How to run this script
1. Open your terminal
2. Set your working directory to the folder containing this Jupyter Notebook
3. Run 'pyspark'
4. Copy paste the initialization. Press enter
5. Copy paste the rest of the script. Press enter

# Examples
You can try to run the script with the following startNode and endNode:
- 'Best path found': startNode, endNode= 50, 359
- 'There is no path between your startNode and your endNode': startNode, endNode= 50, 823

In [None]:
######################
##  INITIALIZATION  ##
######################

startNode, endNode = 50, 823
# Examples you can try: 
#   - "Best path found" when start Node, end Node= 50, 359
#   - "No path found": for 50 to 823

# Loading and formatting the txt file
rawFile = sc.textFile("../data/graph.txt")
file = rawFile.map(lambda x: (x.split('\t')[0], x.split('\t')[2], [x.split('\t')[0],  x.split('\t')[2]], int(x.split('\t')[1]), 0))
columns = ["startPath", "endPath", "totalPath", "cumDistance", "ignore"]

# Creating a DataFrame
df = spark.createDataFrame(file, schema = columns)



######################
##       LOOP       ##
######################

# (1) Checking that the startNode and the endNode exist
if df.filter(df.startPath == startNode).count() == 0:
    print("The startNode is not valid.")
elif df.filter(df.endPath == endNode).count() == 0:
    print("The endNode is not valid.")
else:
    #
    # (2) Creating a temporary table (for SQL usage)
    df.createOrReplaceTempView("df")
    df = spark.sql('SELECT ROW_NUMBER() OVER (ORDER BY "startPath") AS id, * FROM df')
    df.createOrReplaceTempView("df")
    #
    #
    # **Starting simple: considering there's only one row with this min.**
    numberIterations = 0
    bestPathFound = False
    noRoadToEndNode = False
    #
    while not bestPathFound and not noRoadToEndNode:
        numberIterations = numberIterations + 1
        print("Iteration number ", numberIterations)
        #
        # (3) Identifying the closest point "x" from the starting node
        #
        while True:
            closestNeighbor1 = spark.sql('SELECT id, startPath, endPath, totalPath, minCumDistance FROM df JOIN (SELECT min(CumDistance) as minCumDistance FROM df WHERE startPath = {} AND ignore = 0) dfMin ON df.cumDistance = dfMin.minCumDistance'.format(startNode)).collect()
            if len(closestNeighbor1) == 0:
                print("There is no path from node {} to node {}.".format(startNode, endNode))
                noRoadToEndNode = True
                break
        #
            elif closestNeighbor1[0]["endPath"] == str(endNode):
                print("Shortest path found!")
                bestPathFound = True
                break
        #
        # (4) Identifying the closest point from "x"
        #
            closestNeighbor2 = spark.sql('SELECT id, startPath, endPath, totalPath, minCumDistance FROM df JOIN (SELECT min(CumDistance) as minCumDistance FROM df WHERE startPath = {} AND ignore = 0) dfMin ON df.cumDistance = dfMin.minCumDistance'.format(closestNeighbor1[0]["endPath"])).collect()
        #
        # (5) Verifying that "x" has at least one neighbor
        #
            if len(closestNeighbor2) == 0: # if "x" doesn't have neighbor ...
                df = spark.sql('SELECT id, startPath, endPath, totalPath, CumDistance, CASE WHEN ignore = 1 THEN 1 WHEN id = {} THEN 1 ELSE 0 END AS ignore FROM df'.format(closestNeighbor1[0]['id'])) # ... remove its row
                df.createOrReplaceTempView("df")
            else:
                break # "x" has at least one neighbor -> we can continue
        #
        if bestPathFound or noRoadToEndNode:
            break
        #
        # (6) Merging the two paths & removing the rows used for the merge
        #
        newId = spark.sql('SELECT MAX(id) FROM df').collect()[0]["max(id)"] + 1
        totalPath1 = []
        for x in closestNeighbor1[0]['totalPath']:
            totalPath1.append(x)
        totalPath2 = []
        for x in closestNeighbor2[0]['totalPath']:
            totalPath2.append(x)
        newRow = spark.createDataFrame([(newId, 
                                         closestNeighbor1[0]['startPath'], 
                                         closestNeighbor2[0]['endPath'],
                                         totalPath1 + totalPath2[1:],
                                         int(closestNeighbor1[0]['minCumDistance']) + int(closestNeighbor2[0]['minCumDistance']),
                                         0)])
        #
        df = spark.sql("SELECT * FROM df WHERE id != {} AND id != {}".format(closestNeighbor1[0]['id'], closestNeighbor2[0]['id']))
        #
        df = df.union(newRow)
        df.createOrReplaceTempView("df")
    #
    if bestPathFound:
        spark.sql('SELECT * FROM df WHERE startPath = {} AND endPath = {}'.format(startNode, endNode)).show()