In [1]:
import pyspark
import os

In [2]:
#only for local use
#in case problem with PYTHONHASHSEED occurs on cluster see https://www.thoughtvector.io/blog/python-3-on-spark-return-of-the-pythonhashseed/
#don't forget to unset on local env afterwards
os.environ["PYTHONHASHSEED"] = "42"

In [3]:
sc = pyspark.SparkContext("local[*]")

In [4]:
def read_graph(path_to_file):
    """ read graph in the moodle"""
    res = sc.textFile(path_to_file).map(lambda x : x.split("\t"))
    res = res.flatMap(lambda x : [(x[0], (x[1], int(x[2]))), (x[1], (x[0], int(x[3])))])
    return res

In [27]:
def read_twitter_graph(path_to_file):
    """ read the twitter graph"""
    res = sc.textFile(path_to_file).map(lambda x : tuple(x.split(" ")))
    return res

In [4]:
def read_generated_graph(path_to_file):
    res = sc.textFile(path_to_file).map(lambda x : (x.split("\t")[0], (x.split("\t")[1], int(x.split("\t")[2]))))
    return res

In [5]:
test = read_generated_graph("../graph-data/weighted-graph-100.txt")

In [6]:
test.collect()

[('1', ('71', 5)),
 ('1', ('93', 19)),
 ('1', ('14', 7)),
 ('1', ('23', 11)),
 ('1', ('67', 4)),
 ('2', ('4', 7)),
 ('2', ('92', 18)),
 ('2', ('37', 18)),
 ('2', ('79', 12)),
 ('2', ('22', 1)),
 ('2', ('52', 8)),
 ('2', ('55', 17)),
 ('2', ('81', 10)),
 ('3', ('96', 12)),
 ('3', ('8', 4)),
 ('3', ('35', 5)),
 ('3', ('15', 10)),
 ('4', ('85', 10)),
 ('4', ('87', 20)),
 ('5', ('74', 15)),
 ('5', ('35', 1)),
 ('5', ('40', 3)),
 ('6', ('52', 2)),
 ('6', ('24', 20)),
 ('6', ('67', 18)),
 ('6', ('40', 7)),
 ('6', ('15', 18)),
 ('6', ('19', 2)),
 ('6', ('58', 12)),
 ('7', ('5', 7)),
 ('7', ('89', 13)),
 ('7', ('17', 13)),
 ('7', ('43', 14)),
 ('8', ('2', 14)),
 ('8', ('55', 16)),
 ('8', ('39', 11)),
 ('8', ('66', 11)),
 ('9', ('24', 18)),
 ('9', ('87', 17)),
 ('9', ('59', 14)),
 ('9', ('20', 14)),
 ('9', ('17', 14)),
 ('9', ('70', 1)),
 ('9', ('80', 15)),
 ('9', ('100', 19)),
 ('9', ('10', 12)),
 ('10', ('76', 2)),
 ('10', ('66', 9)),
 ('10', ('31', 13)),
 ('10', ('92', 13)),
 ('10', ('61', 6

## graph non pondéré

In [5]:
#very small graph
directions = sc.parallelize([(1,2),(1,3),(2,4),(3,5),(4,5)])
begin = 1
end = 5
shortest_paths = sc.parallelize([(begin, (0,[]))])

In [50]:
directions = read_twitter_graph("twitter_combined.txt")

In [51]:
begin, end = '15519274', '309366491'
shortest_paths = sc.parallelize([(begin, (0,[]))])
early_stop = 15

In [48]:
%%time
i = 0
while True:
    res = shortest_paths.lookup(end)
    i+=1
    print(i)
    if not (res == [] and i < early_stop):
        break

    shortest_paths = shortest_paths.join(directions).map(lambda x : (x[1][1], (x[1][0][0] + 1 , x[1][0][1] + [x[0]])))

1
2
3
4
CPU times: user 140 ms, sys: 50 ms, total: 190 ms
Wall time: 57.5 s


## graph pondéré

#### V1 (brute force)

In [None]:
#connections are denoted (origin, end, weight)

In [17]:
def compute_path(x):
    #x is the result of the join operation
    # the join should be in format (origin, ((weight_to_origin, path_to_origin, paths_visited_to_origin), (destination, weight_to_destination)))
    return (x[1][1][0], (x[1][0][0] + x[1][1][1], x[1][0][1] + [x[0]], {x[0]}))

In [18]:
def shortest_path_to_point(x,y):
    if x[0] <= y[0]:
        res = (x[0], x[1], x[2] | y[2])
    else:
        res = (y[0], y[1], x[2] | y[2])
    return res

In [6]:
#for a small graph test
directions = sc.parallelize([(1,(2,1)),(1,(3, 5)),(3,(2,2)),(2,(4,2)),(3,(5,2)),(4,(5,2))])
begin = 1
objective = 5
paths_to_objective = set(directions.map(lambda x : (x[1][0] , x[0])).filter(lambda x : x[0] == objective).collect()[0][1])
shortest_paths = sc.parallelize([(begin, (0,[],set()))])

In [19]:
directions = read_generated_graph("../graph-data/weighted-graph-100.txt")
begin, objective = directions.keys().takeSample(False, 2)
paths_to_objective = set(directions.map(lambda x : (x[1][0] , x[0])).filter(lambda x : x[0] == objective).lookup(objective))
shortest_paths = sc.parallelize([(begin, (0,[],set()))])
early_stop = 15

In [20]:
%%time
i = 0
while True:
    #stopping criteria
    i += 1
    print(i)
    if i > early_stop :
        break
    path_found = shortest_paths.filter(lambda x : x[0] == objective).collect()
    print(path_found)
    print(shortest_paths.lookup(objective))
    if path_found != [] and path_found[0][1][2] == paths_to_objective :
        break
        
    #real thing
    shortest_paths = shortest_paths.join(directions).map(compute_path).union(shortest_paths)
    shortest_paths = shortest_paths.reduceByKey(shortest_path_to_point)

1
[]
[]
2
[]
[]
3
[]
[]
4
[]
[]
5
[('33', (37, ['99', '25', '15', '36'], {'36'}))]
[]
6
[('33', (36, ['99', '25', '15', '88', '36'], {'62', '73', '36'}))]
[]
7
[('33', (36, ['99', '25', '15', '88', '36'], {'62', '42', '73', '36'}))]
[]
CPU times: user 450 ms, sys: 100 ms, total: 550 ms
Wall time: 49.7 s


In [23]:
shortest_paths.filter(lambda x : x[0] == objective).collect()

[('33', (36, ['99', '25', '15', '88', '36'], {'36', '42', '62', '73'}))]

In [24]:
begin, objective

('99', '33')

#### V2

In [27]:
def compute_path(x):
    #x is the result of the join operation
    # the join should be in format (origin, ((weight_to_origin, path_to_origin, paths_visited_to_origin), (destination, weight_to_destination)))
    return (x[1][1][0], {
                         "weight_of_path" : x[1][0]["weight_of_path"] + x[1][1][1], 
                         "path" : x[1][0]["path"] + [x[0]], 
                         "explored_path" : {x[0]}
    })

In [28]:
def shortest_path_to_point(x,y):
    if x["weight_of_path"] <= y["weight_of_path"]:
        res = {"weight_of_path" : x["weight_of_path"], 
               "path" : x["path"], 
               "explored_path" : x["explored_path"] | y["explored_path"]}
    else:
        res = {"weight_of_path" : y["weight_of_path"], 
               "path" : y["path"], 
               "explored_path" : x["explored_path"] | y["explored_path"]}
    return res

In [23]:
# for small graph
begin = 1
objective = 5
early_stop = 1
directions = sc.parallelize([(1,(2,1)),(1,(3, 5)),(3,(2,2)),(2,(4,2)),(3,(5,2)),(4,(5,2))])
paths_to_objective = set(directions.map(lambda x : (x[1][0] , x[0])).lookup(5))
shortest_paths = sc.parallelize([(begin, {"weight_of_path" :0, "path" : [], "explored_path" : set()})])
points_to_drop = sc.broadcast(set())
continue_criteria = True

In [34]:
directions = read_generated_graph("../graph-data/weighted-graph-100.txt")
begin, objective = "99", "33"
paths_to_objective = set(directions.map(lambda x : (x[1][0] , x[0])).filter(lambda x : x[0] == objective).lookup(objective))
shortest_paths = sc.parallelize([(begin, {"weight_of_path" :0, "path" : [], "explored_path" : set()})])
early_stop = 15
continue_criteria = True
points_to_drop = sc.broadcast(set())

In [None]:
%%time
i = 0
while continue_criteria:
    new_paths = shortest_paths.join(directions).map(compute_path)
    #print(new_paths.collect())
    try:
        min_new_paths =sc.broadcast(new_paths.map(lambda x : x[1]["weight_of_path"]).min())
        points_to_drop = sc.broadcast(set(shortest_paths.filter(lambda x : x[1]["weight_of_path"] < min_new_paths.value).keys().collect()) | points_to_drop.value)
    except ValueError:
        #if no new paths are detected
        min_new_paths = sc.broadcast(float("inf"))
    #print(min_new_paths.value)
    shortest_paths = new_paths.union(shortest_paths).reduceByKey(shortest_path_to_point).filter(lambda x : x[0] not in points_to_drop.value)
    directions = directions.filter(lambda x : x[0] not in points_to_drop.value and x[1][0] not in points_to_drop.value)
    #print(shortest_paths.collect())
    
    #stopping criteria
    continue_criteria = i < early_stop
    i +=1
    print(i)
    try: 
        #replace by lookup on real machine when hash problem is resolved
        path_to_objective = shortest_paths.filter(lambda x : x[0] == objective).collect()
        continue_criteria = not path_to_objective[0][1]["weight_of_path"] <= min_new_paths.value
    except IndexError:
        continue_criteria = min_new_paths.value != float("inf")
        pass
    except KeyError:
        continue_criteria = min_new_paths.value != float("inf")
        pass
        

1
2
3
4
5
6
7
8
9
10
11


In [31]:
new_paths.collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 155.0 failed 1 times, most recent failure: Lost task 2.0 in stage 155.0 (TID 1795, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-27-f7f1fb2be594>", line 5, in compute_path
TypeError: tuple indices must be integers or slices, not str

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor39.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-27-f7f1fb2be594>", line 5, in compute_path
TypeError: tuple indices must be integers or slices, not str

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [128]:
shortest_paths.filter(lambda x : x[0] == objective).collect()

[('6344',
  {'explored_path': {'5'},
   'path': ['226', '7664', '5'],
   'weight_of_path': 1000})]

SyntaxError: invalid syntax (<ipython-input-145-1533247940e0>, line 1)