# Quick setup for pySpark and GraphFrame

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz

In [None]:
!tar xf /content/spark-3.0.3-bin-hadoop2.7.tgz

In [None]:
!wget -q https://repos.spark-packages.org/graphframes/graphframes/0.8.2-spark3.0-s_2.12/graphframes-0.8.2-spark3.0-s_2.12.jar

In [None]:
## install graphframe library on Colab
!mv /content/graphframes-0.8.2-spark3.0-s_2.12.jar /content/spark-3.0.3-bin-hadoop2.7/jars/graphframes-0.8.2-spark3.0-s_2.12.jar

In [None]:
!pip -q install findspark pyspark graphframes

[K     |████████████████████████████████| 281.3 MB 37 kB/s 
[K     |████████████████████████████████| 198 kB 64.2 MB/s 
[K     |████████████████████████████████| 154 kB 68.6 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
# confirm the spark installation
spark

In [None]:
os.environ["HADOOP_HOME"] = os.environ["SPARK_HOME"]

os.environ["PYSPARK_DRIVER_PYTHON"] = "jupyter"
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"] = "notebook"
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages graphframes:graphframes:0.8.2-spark3.0-s_2.12 pyspark-shell"

In [None]:
from graphframes import *

In [None]:
def init_spark(app_name="HelloWorldApp", execution_mode="local[*]"):
  spark = SparkSession.builder.master(execution_mode).appName(app_name).getOrCreate()
  sc = spark.sparkContext
  return spark, sc

## Example: GraphFrame

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

_, sc = init_spark()
sqlContext = SQLContext(sc)

## the rest of this code (down below) comes from: https://graphframes.github.io/graphframes/docs/_site/quick-start.html#getting-started-with-apache-spark-and-spark-packages

# Create a Vertex DataFrame with unique ID column "id"
v = sqlContext.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
], ["id", "name", "age"])

# Create an Edge DataFrame with "src" and "dst" columns
e = sqlContext.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
], ["src", "dst", "relationship"])

# Create a GraphFrame
from graphframes import *
g = GraphFrame(v, e)

# Query: Get in-degree of each vertex.
g.inDegrees.show()

# Query: Count the number of "follow" connections in the graph.
g.edges.filter("relationship = 'follow'").count()

+---+--------+
| id|inDegree|
+---+--------+
|  c|       1|
|  b|       2|
+---+--------+



2

# Importing the Data into Apache Spark

In [None]:
!mkdir data
!wget -q https://github.com/chunsejin/KGProjects_DAU/raw/master/data/transport-nodes.csv
!wget -q https://github.com/chunsejin/KGProjects_DAU/raw/master/data/transport-relationships.csv
!mv transport-relationships.csv data
!mv transport-nodes.csv data

Saving transport-nodes.csv to transport-nodes.csv
Saving transport-relationships.csv to transport-relationships.csv


{'transport-nodes.csv': b'id,latitude,longitude,population\n"Amsterdam",52.379189,4.899431,821752\n"Utrecht",52.092876,5.104480,334176\n"Den Haag",52.078663,4.288788,514861\n"Immingham",53.61239,-0.22219,9642\n"Doncaster",53.52285,-1.13116,302400\n"Hoek van Holland",51.9775,4.13333,9382\n"Felixstowe",51.96375,1.3511,23689\n"Ipswich",52.05917,1.15545,133384\n"Colchester",51.88921,0.90421,104390\n"London",51.509865,-0.118092,8787892\n"Rotterdam",51.9225,4.47917,623652\n"Gouda",52.01667,4.70833,70939\n',
 'transport-relationships.csv': b'src,dst,relationship,cost\n"Amsterdam","Utrecht","EROAD",46\n"Amsterdam","Den Haag","EROAD",59\n"Den Haag","Rotterdam","EROAD",26\n"Amsterdam","Immingham","EROAD",369\n"Immingham","Doncaster","EROAD",74\n"Doncaster","London","EROAD",277\n"Hoek van Holland","Den Haag","EROAD",27\n"Felixstowe","Hoek van Holland","EROAD",207\n"Ipswich","Felixstowe","EROAD",22\n"Colchester","Ipswich","EROAD",32\n"London","Colchester","EROAD",106\n"Gouda","Rotterdam","EROAD",2

In [None]:
from pyspark.sql.types import *
from graphframes import *

In [None]:
# // tag::load-graph-frame[]
def create_transport_graph():
    node_fields = [
        StructField("id", StringType(), True),
        StructField("latitude", FloatType(), True),
        StructField("longitude", FloatType(), True),
        StructField("population", IntegerType(), True)
    ]
    nodes = spark.read.csv("data/transport-nodes.csv", header=True,
                           schema=StructType(node_fields))

    rels = spark.read.csv("data/transport-relationships.csv", header=True)
    reversed_rels = (rels.withColumn("newSrc", rels.dst)
                     .withColumn("newDst", rels.src)
                     .drop("dst", "src")
                     .withColumnRenamed("newSrc", "src")
                     .withColumnRenamed("newDst", "dst")
                     .select("src", "dst", "relationship", "cost"))

    relationships = rels.union(reversed_rels)

    return GraphFrame(nodes, relationships)
# // end::load-graph-frame[]


In [None]:
# 그래프를 생성해 봅시다 
# 변수는 g로 설정하여 위의 함수를 로드해봅시다.
g = create_transport_graph()

## GraphFrame Basics

In [None]:
g.vertices.show()

+----------------+---------+---------+----------+
|              id| latitude|longitude|population|
+----------------+---------+---------+----------+
|       Amsterdam| 52.37919| 4.899431|    821752|
|         Utrecht|52.092876|  5.10448|    334176|
|        Den Haag|52.078663| 4.288788|    514861|
|       Immingham| 53.61239| -0.22219|      9642|
|       Doncaster| 53.52285| -1.13116|    302400|
|Hoek van Holland|  51.9775|  4.13333|      9382|
|      Felixstowe| 51.96375|   1.3511|     23689|
|         Ipswich| 52.05917|  1.15545|    133384|
|      Colchester| 51.88921|  0.90421|    104390|
|          London|51.509865|-0.118092|   8787892|
|       Rotterdam|  51.9225|  4.47917|    623652|
|           Gouda| 52.01667|  4.70833|     70939|
+----------------+---------+---------+----------+



12

In [None]:
# 노드의 갯수
g.vertices.count()

In [None]:
# 그래프의 edges들을 출력해 봅시다. 엣지의 갯수도 출력해 봅시다

## 그래프의 밀집도(Density)
$D={2 R}/{( N(N-1) )}$

In [None]:
N = g.vertices.count()
R = g.edges.count()
D = 2R / N * (N-1)
D

## InDegree/OutDegree

In [None]:
vertexInDegrees = g.inDegrees
vertexInDegrees.show()

+----------------+--------+
|              id|inDegree|
+----------------+--------+
|       Doncaster|       2|
|       Rotterdam|       3|
|          London|       2|
|        Den Haag|       4|
|       Immingham|       2|
|       Amsterdam|       3|
|      Colchester|       2|
|         Utrecht|       2|
|           Gouda|       3|
|         Ipswich|       2|
|Hoek van Holland|       3|
|      Felixstowe|       2|
+----------------+--------+



In [None]:
# 그래프로부터 각 vertex의 out degree를 구해봅시다

In [None]:
# Find the youngest user's age in the graph.
# This queries the vertex DataFrame.
g.vertices.groupBy().min("population").show()

+---------------+
|min(population)|
+---------------+
|           9382|
+---------------+



## Filter

In [None]:
# Count the number of "EROAD" in the graph.
# This queries the edge DataFrame.
numERoads = g.edges.filter("relationship = 'EROAD'").count()
numERoads

30

## Ploting with Folium


In [None]:
import folium

In [None]:
coordinates = g.vertices.select('id','latitude','longitude').collect()
coordinates

[Row(id='Amsterdam', latitude=52.379188537597656, longitude=4.899431228637695),
 Row(id='Utrecht', latitude=52.09287643432617, longitude=5.104479789733887),
 Row(id='Den Haag', latitude=52.07866287231445, longitude=4.288787841796875),
 Row(id='Immingham', latitude=53.612388610839844, longitude=-0.2221899926662445),
 Row(id='Doncaster', latitude=53.522850036621094, longitude=-1.131160020828247),
 Row(id='Hoek van Holland', latitude=51.977500915527344, longitude=4.13332986831665),
 Row(id='Felixstowe', latitude=51.963748931884766, longitude=1.351099967956543),
 Row(id='Ipswich', latitude=52.05916976928711, longitude=1.1554499864578247),
 Row(id='Colchester', latitude=51.88920974731445, longitude=0.9042099714279175),
 Row(id='London', latitude=51.509864807128906, longitude=-0.11809200048446655),
 Row(id='Rotterdam', latitude=51.92250061035156, longitude=4.479169845581055),
 Row(id='Gouda', latitude=52.01667022705078, longitude=4.708330154418945)]

In [None]:
print(coordinates[0]['latitude'], coordinates[0]['longitude'])

52.379188537597656 4.899431228637695


In [None]:
# 지도에 위치 출력
map = folium.Map(location=[coordinates[0]['latitude'], coordinates[0]['longitude']], zoom_start=7, control_scale=True, width=600, height=400)

In [None]:
for coord in coordinates:
  folium.Marker([coord['latitude'], coord['longitude']], popup=coord["id"]).add_to(map)

In [None]:
map

# Breadth First Search with Apache Spark

In [None]:
(g.vertices
.filter("population > 100000 and population < 300000")
.sort("population")
.show())

+----------+--------+---------+----------+
|        id|latitude|longitude|population|
+----------+--------+---------+----------+
|Colchester|51.88921|  0.90421|    104390|
|   Ipswich|52.05917|  1.15545|    133384|
+----------+--------+---------+----------+



In [None]:
from_expr = "id='Den Haag'"
to_expr = "population > 100000 and population < 300000 and id <> 'Den Haag'"
result = g.bfs(from_expr, to_expr)

In [None]:
print(result.columns)

['from', 'e0', 'v1', 'e1', 'v2', 'e2', 'to']


In [None]:
columns = [column for column in result.columns if not column.startswith("e")]
result.select(columns).show()

+--------------------+--------------------+--------------------+--------------------+
|                from|                  v1|                  v2|                  to|
+--------------------+--------------------+--------------------+--------------------+
|[Den Haag, 52.078...|[Hoek van Holland...|[Felixstowe, 51.9...|[Ipswich, 52.0591...|
+--------------------+--------------------+--------------------+--------------------+



# Shortest Path (Weighted) with Apache Spark

In [None]:
from graphframes.lib import AggregateMessages as AM
from pyspark.sql import functions as F

In [None]:
add_path_udf = F.udf(lambda path, id: path + [id], ArrayType(StringType()))

In [None]:
# // tag::custom-shortest-path[]
def shortest_path(g, origin, destination, column_name="cost"):
    if g.vertices.filter(g.vertices.id == destination).count() == 0:
        return (spark.createDataFrame(sc.emptyRDD(), g.vertices.schema)
                .withColumn("path", F.array()))

    vertices = (g.vertices.withColumn("visited", F.lit(False))
                .withColumn("distance", F.when(g.vertices["id"] == origin, 0)
                            .otherwise(float("inf")))
                .withColumn("path", F.array()))
    cached_vertices = AM.getCachedDataFrame(vertices)
    g2 = GraphFrame(cached_vertices, g.edges)

    while g2.vertices.filter('visited == False').first():
        current_node_id = g2.vertices.filter('visited == False').sort("distance").first().id

        msg_distance = AM.edge[column_name] + AM.src['distance']
        msg_path = add_path_udf(AM.src["path"], AM.src["id"])
        msg_for_dst = F.when(AM.src['id'] == current_node_id, F.struct(msg_distance, msg_path))
        new_distances = g2.aggregateMessages(F.min(AM.msg).alias("aggMess"),
                                             sendToDst=msg_for_dst)

        new_visited_col = F.when(
            g2.vertices.visited | (g2.vertices.id == current_node_id), True).otherwise(False)
        new_distance_col = F.when(new_distances["aggMess"].isNotNull() &
                                  (new_distances.aggMess["col1"] < g2.vertices.distance),
                                  new_distances.aggMess["col1"]) \
            .otherwise(g2.vertices.distance)
        new_path_col = F.when(new_distances["aggMess"].isNotNull() &
                              (new_distances.aggMess["col1"] < g2.vertices.distance),
                              new_distances.aggMess["col2"].cast("array<string>")) \
            .otherwise(g2.vertices.path)

        new_vertices = (g2.vertices.join(new_distances, on="id", how="left_outer")
                        .drop(new_distances["id"])
                        .withColumn("visited", new_visited_col)
                        .withColumn("newDistance", new_distance_col)
                        .withColumn("newPath", new_path_col)
                        .drop("aggMess", "distance", "path")
                        .withColumnRenamed('newDistance', 'distance')
                        .withColumnRenamed('newPath', 'path'))
        cached_new_vertices = AM.getCachedDataFrame(new_vertices)
        g2 = GraphFrame(cached_new_vertices, g2.edges)
        if g2.vertices.filter(g2.vertices.id == destination).first().visited:
            return (g2.vertices.filter(g2.vertices.id == destination)
                    .withColumn("newPath", add_path_udf("path", "id"))
                    .drop("visited", "path")
                    .withColumnRenamed("newPath", "path"))
    return (spark.createDataFrame(sc.emptyRDD(), g.vertices.schema)
            .withColumn("path", F.array()))
# // end::custom-shortest-path[]

In [None]:
result = shortest_path(g, "Amsterdam", "Colchester", "cost")
result.select("id", "distance", "path").show(truncate=False)

+----------+--------+------------------------------------------------------------------------+
|id        |distance|path                                                                    |
+----------+--------+------------------------------------------------------------------------+
|Colchester|347.0   |[Amsterdam, Den Haag, Hoek van Holland, Felixstowe, Ipswich, Colchester]|
+----------+--------+------------------------------------------------------------------------+



In [None]:
path = result.select('path').collect()

In [None]:
def add_line_to_map(path, m):
  coords = []
  for p in path[0][0]:
    point = g.vertices.filter(f"id = '{p}'").collect()[0]
    coords.append( [ point['latitude'], point['longitude'] ] )

  folium.PolyLine(coords, color="red", line_weight=5).add_to(m)

In [None]:
add_line_to_map(path, map)

In [None]:
map

# All Pairs Shortest Path with Apache Spark

In [None]:
result = g.shortestPaths(["Colchester", "Immingham", "Hoek van Holland"])
result.sort(["id"]).select("id", "distances").show(truncate=False)

+----------------+--------------------------------------------------------+
|id              |distances                                               |
+----------------+--------------------------------------------------------+
|Amsterdam       |[Immingham -> 1, Hoek van Holland -> 2, Colchester -> 4]|
|Colchester      |[Hoek van Holland -> 3, Immingham -> 3, Colchester -> 0]|
|Den Haag        |[Hoek van Holland -> 1, Immingham -> 2, Colchester -> 4]|
|Doncaster       |[Hoek van Holland -> 4, Immingham -> 1, Colchester -> 2]|
|Felixstowe      |[Immingham -> 4, Hoek van Holland -> 1, Colchester -> 2]|
|Gouda           |[Hoek van Holland -> 2, Immingham -> 3, Colchester -> 5]|
|Hoek van Holland|[Immingham -> 3, Hoek van Holland -> 0, Colchester -> 3]|
|Immingham       |[Hoek van Holland -> 3, Immingham -> 0, Colchester -> 3]|
|Ipswich         |[Immingham -> 4, Hoek van Holland -> 2, Colchester -> 1]|
|London          |[Hoek van Holland -> 4, Immingham -> 2, Colchester -> 1]|
|Rotterdam  

# Single Source Shortest Path


In [None]:
from graphframes.lib import AggregateMessages as AM
from pyspark.sql import functions as F

In [None]:
add_path_udf = F.udf(lambda path, id: path + [id], ArrayType(StringType()))

In [None]:
# // tag::sssp[]
def sssp(g, origin, column_name="cost"):
    vertices = g.vertices \
        .withColumn("visited", F.lit(False)) \
        .withColumn("distance",
            F.when(g.vertices["id"] == origin, 0).otherwise(float("inf"))) \
        .withColumn("path", F.array())
    cached_vertices = AM.getCachedDataFrame(vertices)
    g2 = GraphFrame(cached_vertices, g.edges)

    while g2.vertices.filter('visited == False').first():
        current_node_id = g2.vertices.filter('visited == False').sort("distance").first().id

        msg_distance = AM.edge[column_name] + AM.src['distance']
        msg_path = add_path_udf(AM.src["path"], AM.src["id"])
        msg_for_dst = F.when(AM.src['id'] == current_node_id, F.struct(msg_distance, msg_path))
        new_distances = g2.aggregateMessages(
            F.min(AM.msg).alias("aggMess"), sendToDst=msg_for_dst)

        new_visited_col = F.when(
            g2.vertices.visited | (g2.vertices.id == current_node_id), True).otherwise(False)
        new_distance_col = F.when(new_distances["aggMess"].isNotNull() &
                                  (new_distances.aggMess["col1"] < g2.vertices.distance),
                                  new_distances.aggMess["col1"]) \
                            .otherwise(g2.vertices.distance)
        new_path_col = F.when(new_distances["aggMess"].isNotNull() &
                              (new_distances.aggMess["col1"] < g2.vertices.distance),
                              new_distances.aggMess["col2"].cast("array<string>")) \
                        .otherwise(g2.vertices.path)

        new_vertices = g2.vertices.join(new_distances, on="id", how="left_outer") \
            .drop(new_distances["id"]) \
            .withColumn("visited", new_visited_col) \
            .withColumn("newDistance", new_distance_col) \
            .withColumn("newPath", new_path_col) \
            .drop("aggMess", "distance", "path") \
            .withColumnRenamed('newDistance', 'distance') \
            .withColumnRenamed('newPath', 'path')
        cached_new_vertices = AM.getCachedDataFrame(new_vertices)
        g2 = GraphFrame(cached_new_vertices, g2.edges)

    return g2.vertices \
                .withColumn("newPath", add_path_udf("path", "id")) \
                .drop("visited", "path") \
                .withColumnRenamed("newPath", "path")


In [None]:
via_udf = F.udf(lambda path: path[1:-1], ArrayType(StringType()))

In [None]:
result = sssp(g, "Amsterdam", "cost")
(result
.withColumn("via", via_udf("path"))
.select("id", "distance", "via")
.sort("distance")
.show(truncate=False))

+----------------+--------+-------------------------------------------------------------+
|id              |distance|via                                                          |
+----------------+--------+-------------------------------------------------------------+
|Amsterdam       |0.0     |[]                                                           |
|Utrecht         |46.0    |[]                                                           |
|Den Haag        |59.0    |[]                                                           |
|Gouda           |81.0    |[Utrecht]                                                    |
|Rotterdam       |85.0    |[Den Haag]                                                   |
|Hoek van Holland|86.0    |[Den Haag]                                                   |
|Felixstowe      |293.0   |[Den Haag, Hoek van Holland]                                 |
|Ipswich         |315.0   |[Den Haag, Hoek van Holland, Felixstowe]                     |
|Colcheste

# Minimum Spanning Tree

In [None]:
# your challenge we left

# Random Walk

In [None]:
# your challenge we left

# Appendix: Timing and Profling
%time: Time the execution of a single statement

%timeit: Time repeated execution of a single statement for more accuracy

%prun: Run code with the profiler

%lprun: Run code with the line-by-line profiler

%memit: Measure the memory use of a single statement

%mprun: Run code with the line-by-line memory profile