## Set Parameters

In [1]:
NN = 7 # Number of nearest neighbours for each city
spark.conf.set("spark.sql.shuffle.partitions", "4")

## Imports

In [2]:
from graphframes import GraphFrame

In [3]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import math

## Load and adjust basic DataFrame

In [4]:
citiesDFSchema = StructType([
  StructField("city", StringType(), False),
  StructField("city_ascii", StringType(), True),
  StructField("latitude", FloatType(), False),
  StructField("longitude", FloatType(), False),
  StructField("country", StringType(), True),
  StructField("iso2", StringType(), True),
  StructField("iso3", StringType(), True),
  StructField("admin_name", StringType(), True),
  StructField("capital", StringType(), True),
  StructField("population", FloatType(), True),
  StructField("id", LongType(), True)
])

In [5]:
citiesDF = spark.read.format("csv")\
  .option("sep", ",")\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .schema(citiesDFSchema)\
  .load("/Users/petergerngross/Programming/Data/csv/Graph/simplemaps_worldcities_basicv1/worldcities.csv")\
  .drop("population")\
  .drop("id")\
  .drop("city_ascii")\
  .drop("country")\
  .drop("iso3")\
  .drop("admin_name")\
  .withColumnRenamed("city", "id")\
  .withColumn("capNum", when(col("capital") == "primary", 4)\
                        .when(col("capital") == "admin", 3)\
                        .when(col("capital") == "minor", 2)\
                        .otherwise(1))\
  .drop("capital")\
  .where("capNum != 1")\
  .drop("capNum")
  

citiesDF.printSchema()

root
 |-- id: string (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- iso2: string (nullable = true)



In [6]:
citiesDF.show(10)

+-----------+--------+---------+----+
|         id|latitude|longitude|iso2|
+-----------+--------+---------+----+
|  Malishevë| 42.4822|  20.7458|  XK|
|    Prizren| 42.2139|  20.7397|  XK|
|Zubin Potok| 42.9144|  20.6897|  XK|
|   Kamenicë| 42.5781|  21.5803|  XK|
|       Viti| 42.3214|  21.3583|  XK|
|   Shtërpcë| 42.2394|  21.0272|  XK|
|     Shtime| 42.4331|  21.0397|  XK|
|   Vushtrri| 42.8231|  20.9675|  XK|
|    Dragash| 42.0265|  20.6533|  XK|
|   Podujevë| 42.9111|  21.1899|  XK|
+-----------+--------+---------+----+
only showing top 10 rows



In [7]:
citiesDF.count()

5063

## Function for calculating distances from geographic coordinates

In [8]:
def deg2rad(degrees):
  return math.pi * degrees / 180

def geoDistFlat(phi1, lambda1, phi2, lambda2):
  phiMRad = deg2rad((phi1 + phi2) / 2)
  k1 = 111.13209 - 0.56605 * math.cos(2 * phiMRad) + 0.00120 * math.cos(4 * phiMRad)
  k2 = 111.41513 * math.cos(phiMRad) - 0.09455 * math.cos(3 * phiMRad) + 0.00012 * math.cos(5 * phiMRad)
  return math.sqrt(math.pow(k1 * (phi2 - phi1), 2) + math.pow(k2 * (lambda2 - lambda1), 2))

### Register the function in Spark

In [9]:
geoDistUDF = udf(geoDistFlat)

## Create Edges DataFrame

In [10]:
cities1DF = citiesDF\
  .drop("iso2")\
  .withColumnRenamed("id", "src")\
  .withColumnRenamed("latitude", "latSrc")\
  .withColumnRenamed("longitude", "longSrc")
    
cities2DF = cities1DF\
  .withColumnRenamed("src", "dst")\
  .withColumnRenamed("latSrc", "latDst")\
  .withColumnRenamed("longSrc", "longDst")

In [11]:
cityConnectsDF = cities1DF.crossJoin(cities2DF)\
  .where("src != dst")\
  .withColumn("cityDistance", round(geoDistUDF(col("latSrc"), col("longSrc"), col("latDst"), col("longDst"))).cast("int"))\
  .select("src", "dst", "cityDistance")

In [12]:
cityConnectsDF.printSchema()

root
 |-- src: string (nullable = true)
 |-- dst: string (nullable = true)
 |-- cityDistance: integer (nullable = true)



In [13]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col

window = Window.partitionBy(cityConnectsDF["src"]).orderBy(cityConnectsDF["cityDistance"])

cityConnectsNNDF = cityConnectsDF.select('*', rank().over(window).alias('rank'))\
  .filter(col('rank') <= NN)\
  .drop(col("rank"))

In [14]:
cityConnectsNNDF.count()

35473

In [15]:
cityConnectsNNDF.where("src = 'Berlin' OR dst = 'Berlin'").show()

+-------------------+-------------------+------------+
|                src|                dst|cityDistance|
+-------------------+-------------------+------------+
|          Magdeburg|             Berlin|         129|
|            Potsdam|             Berlin|          26|
|           Szczecin|             Berlin|         125|
|             Berlin|            Potsdam|          26|
|             Berlin|            Cottbus|         105|
|             Berlin|           Szczecin|         125|
|             Berlin|Gorzów Wielkopolski|         126|
|             Berlin|          Magdeburg|         129|
|             Berlin|            Leipzig|         149|
|             Berlin|       Zielona Góra|         157|
|            Cottbus|             Berlin|         105|
|       Zielona Góra|             Berlin|         157|
|Gorzów Wielkopolski|             Berlin|         126|
+-------------------+-------------------+------------+



## Create and modify GraphFrames

In [16]:
cityGraph = GraphFrame(citiesDF, cityConnectsNNDF)
cityGraph.cache()

GraphFrame(v:[id: string, latitude: float ... 2 more fields], e:[src: string, dst: string ... 1 more field])

In [17]:
print("The cityGraph has "\
      + str(cityGraph.vertices.count()) + " nodes and "\
      + str(cityGraph.edges.count()) + " edges.")

The cityGraph has 5063 nodes and 35473 edges.


In [18]:
citySubgraph = cityGraph.filterVertices("iso2 IN ('DE','AT','CH')")
print("The citySubgraph has "\
      + str(citySubgraph.vertices.count()) + " nodes and "\
      + str(citySubgraph.edges.count()) + " edges.")

The citySubgraph has 88 nodes and 496 edges.


In [19]:
citySubgraph.outDegrees.where("id IN ('Frankfurt','Stralsund')").show()

+---------+---------+
|       id|outDegree|
+---------+---------+
|Stralsund|        2|
|Frankfurt|        7|
+---------+---------+



## Shortest Paths with BFS

In [20]:
paths = citySubgraph.bfs("id = 'Frankfurt'", "id = 'Basel'")

In [21]:
paths.printSchema()

root
 |-- from: struct (nullable = false)
 |    |-- id: string (nullable = true)
 |    |-- latitude: float (nullable = true)
 |    |-- longitude: float (nullable = true)
 |    |-- iso2: string (nullable = true)
 |-- e0: struct (nullable = false)
 |    |-- src: string (nullable = true)
 |    |-- dst: string (nullable = true)
 |    |-- cityDistance: integer (nullable = true)
 |-- v1: struct (nullable = false)
 |    |-- id: string (nullable = true)
 |    |-- latitude: float (nullable = true)
 |    |-- longitude: float (nullable = true)
 |    |-- iso2: string (nullable = true)
 |-- e1: struct (nullable = false)
 |    |-- src: string (nullable = true)
 |    |-- dst: string (nullable = true)
 |    |-- cityDistance: integer (nullable = true)
 |-- v2: struct (nullable = false)
 |    |-- id: string (nullable = true)
 |    |-- latitude: float (nullable = true)
 |    |-- longitude: float (nullable = true)
 |    |-- iso2: string (nullable = true)
 |-- e2: struct (nullable = false)
 |    |-- src: s

In [22]:
paths.select(concat(col("from.id"), lit(", ")\
              ,col("v1.id"), lit(", ")\
              ,col("v2.id"), lit(", ")\
              ,col("v3.id"), lit(", ")\
              ,col("to.id")\
             ).alias("path"),\
             (col("e0.cityDistance")\
             + col("e1.cityDistance")\
             + col("e2.cityDistance")\
             + col("e3.cityDistance"))\
             .alias("totalDistance")\
            ).show(3,False)

+-------------------------------------------------+-------------+
|path                                             |totalDistance|
+-------------------------------------------------+-------------+
|Frankfurt, Mainz, Karlsruhe, Freiburg, Basel     |311          |
|Frankfurt, Mannheim, Karlsruhe, Freiburg, Basel  |293          |
|Frankfurt, Heidelberg, Karlsruhe, Freiburg, Basel|297          |
+-------------------------------------------------+-------------+

