In [14]:
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
sqlContext.sql("set spark.sql.shuffle.partitions=4")
sqlContext.sql("set spark.default.parallelism=4")

DataFrame[key: string, value: string]

In [2]:
# Config
file = "data/test-graph.txt"
max_iter = 100
num_partition = 4
debug = True

In [3]:
# Load the data
textFile = sc.textFile(file)
print("All lines:", textFile.count())
dataFile = textFile.filter(lambda l: len(l) > 0 and l[0] != '#')
print("Correct lines:", dataFile.count())

All lines: 16
Correct lines: 11


In [4]:
# Create edges RDD
def to_int_tuple(line, delim='\t'):
    strings = line.split(delim)[:2]
    return (int(strings[0]), int(strings[1]))
    
edgesRDD = dataFile.map(to_int_tuple)
edgesRDD.partitionBy(num_partition)
edgesRDD.cache()

# Check that all rows have exactly 2 entries
assert edgesRDD.filter(lambda t: len(t) == 2).count() == dataFile.count()

In [5]:
edges = edgesRDD.toDF(["fro", "to"])
edges.cache()

DataFrame[fro: bigint, to: bigint]

In [9]:
def compose(df1, df2):
    """ Compose 2 relations represented by PairRDDs. """
    r1 = df1.toDF('fro', 'inter')
    r2 = df2.toDF('inter', 'to')
    return r1.join(r2, 'inter').drop('inter').coalesce(num_partition)

In [16]:
print("############# RDD: method 1 (single steps) ###############")
new_paths = edges
all_paths = edges

start = time.time()
true_start = start

print("NUM e:", edges.rdd.getNumPartitions())
print("NUM n:", new_paths.rdd.getNumPartitions())

# invariant:
###  - all_paths and new_paths are on 'num_partitions' partitions

for i in range(1, max_iter):
    print("________________________________")
    print("Iteration #%d:" % (i,))
    new_paths = compose(new_paths, edges)
    # Leave only really new paths
    print("NUM n:", new_paths.rdd.getNumPartitions())
    new_paths = new_paths.subtract(all_paths).distinct().coalesce(num_partition)
    new_paths.cache()
    print("Number of new paths: %d\n" % (new_paths.count(),))
    
    # Add new paths to all paths
    print("NUM a:", all_paths.rdd.getNumPartitions())
    all_paths = all_paths.unionAll(new_paths).coalesce(num_partition)
    all_paths.cache()
    
    if debug:
        print(new_paths.take(1000), '\n')
        
    end = time.time()
    print("Iteration time: %f s." % (end - start,))
    start = end
    
    # Finish, when no more paths added
    if new_paths.rdd.isEmpty():
        print("No new paths, finishing...")
        break

print("\n\n________________________________")
print("Total paths found: %d" % (all_paths.count(),))
print("Number of iterations: #%d" % (i,))

if debug:
    print()
    print(all_paths.take(1000), '\n')

true_end = time.time()
method1_time = true_end - true_start
print("\nCollecting time: %f s." % (true_end - start,))
print("Total time elapsed: %f s." % (method1_time,))
print("________________________________\n\n")

############# RDD: method 1 (single steps) ###############
NUM e: 2
NUM n: 2
________________________________
Iteration #1:
NUM n: 4
Number of new paths: 13

NUM a: 2
[Row(fro=1, to=3), Row(fro=5, to=9), Row(fro=9, to=6), Row(fro=10, to=9), Row(fro=3, to=5), Row(fro=4, to=6), Row(fro=6, to=10), Row(fro=5, to=7), Row(fro=10, to=7), Row(fro=2, to=4), Row(fro=5, to=8), Row(fro=10, to=8), Row(fro=0, to=2)] 

Iteration time: 1.448267 s.
________________________________
Iteration #2:
NUM n: 4
Number of new paths: 13

NUM a: 4
[Row(fro=4, to=7), Row(fro=9, to=7), Row(fro=9, to=9), Row(fro=0, to=3), Row(fro=2, to=5), Row(fro=4, to=8), Row(fro=6, to=6), Row(fro=1, to=4), Row(fro=3, to=6), Row(fro=5, to=10), Row(fro=9, to=8), Row(fro=4, to=9), Row(fro=10, to=10)] 

Iteration time: 1.629313 s.
________________________________
Iteration #3:
NUM n: 4
Number of new paths: 7

NUM a: 4
[Row(fro=3, to=9), Row(fro=0, to=4), Row(fro=2, to=6), Row(fro=3, to=8), Row(fro=1, to=5), Row(fro=3, to=7), Row(fro=

Py4JJavaError: An error occurred while calling o2207.count.
: java.lang.OutOfMemoryError: Java heap space


In [None]:
print("############# RDD: method 2 (paths combining) ###############")
new_paths = edges
all_paths = edges

start = time.time()
true_start = start

# invariant:
###  - all_paths and new_paths are on 'num_partitions' partitions

for i in range(1, max_iter):
    print("________________________________")
    print("Iteration #%d:" % (i,))
    new_paths = compose(all_paths, all_paths)
    # Leave only really new paths
    new_paths = new_paths.subtract(all_paths).distinct().coalesce(num_partition)
    new_paths.cache()
    print("Number of new paths: %d\n" % (new_paths.count(),))
    
    # Finish, when no more paths added
    if new_paths.rdd.isEmpty():
        print("No new paths, finishing...")
        break
    
    # Add new paths to all paths
    all_paths = all_paths.unionAll(new_paths).coalesce(num_partition)
    all_paths.cache()
    
    if debug:
        print(new_paths.take(1000), '\n')
        
    end = time.time()
    print("Iteration time: %f s." % (end - start,))
    start = end

print("\n\n________________________________")
print("Total paths found: %d" % (all_paths.count(),))
print("Number of iterations: #%d" % (i,))

if debug:
    print()
    print(all_paths.take(1000), '\n')

true_end = time.time()
method2_time = true_end - true_start
print("\nCollecting time: %f s." % (true_end - start,))
print("Total time elapsed: %f s." % (method2_time,))
print("________________________________\n\n")

In [None]:
print("############# RDD: method 3 (paths combining 'optimized') ###############")

start = time.time()
true_start = start

all_paths = edges
new_paths = compose(edges, edges)
new_paths = new_paths.subtract(all_paths).distinct().coalesce(num_partition)
# invariant:
###  - all_paths and new_paths are disjoint
###  - all_paths and new_paths are on 'num_partitions' partitions

for i in range(2, max_iter):
    print("________________________________")
    print("Iteration #%d:" % (i,))
    # Obtain new paths by composing old ones
    all_x_new_paths = compose(all_paths, new_paths)
    new_x_all_paths = compose(new_paths, all_paths)
    new_x_new_paths = compose(new_paths, new_paths)
    # Leave only really new paths
    all_paths = all_paths.unionAll(new_paths).coalesce(num_partition)
    all_paths.cache()
    new_paths = all_x_new_paths.unionAll(new_x_all_paths).unionAll(new_x_new_paths)
    new_paths = new_paths.subtract(all_paths).distinct().coalesce(num_partition)
    new_paths.cache()
    print("Number of new paths: %d\n" % (new_paths.count(),))
    
    # Finish, when no more paths added
    if new_paths.rdd.isEmpty():
        print("No new paths, finishing...")
        break
    
    if debug:
        print(new_paths.take(1000), '\n')
        
    end = time.time()
    print("Iteration time: %f s." % (end - start,))
    start = end

print("\n\n________________________________")
print("Total paths found: %d" % (all_paths.count(),))
print("Number of iterations: #%d" % (i,))

if debug:
    print()
    print(all_paths.take(1000), '\n')

true_end = time.time()
method3_time = true_end - true_start
print("\nCollecting time: %f s." % (true_end - start,))
print("Total time elapsed: %f s." % (method3_time,))
print("________________________________\n\n")

In [None]:
print("########### Summary ############")
print("Method 1: %f s." % (method1_time,))
print("Method 2: %f s." % (method2_time,))
print("Method 3: %f s." % (method3_time,))