<a href="https://colab.research.google.com/github/smduarte/spbd-2223/blob/main/lab10/spbd2122_graphframes_connected_components.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# SPBD 2022
## GraphFrames Connected Components Example




In [None]:
#@title Spark Setup
!pip install pyspark findspark --quiet

[K     |████████████████████████████████| 281.4 MB 38 kB/s 
[K     |████████████████████████████████| 199 kB 47.0 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
import numpy as np


spark = SparkSession.builder.master('local[*]').appName('connectedComponents') \
        .config('spark.jars.packages', 'graphframes:graphframes:0.8.2-spark3.1-s_2.12')\
        .getOrCreate()

cells = [("a", 0, 0), ("b", 0, 1), ("c", 2, 2), ("d", 3, 2), ("e", 3, 3), ("f", 0, 2)]

df = spark.createDataFrame(data=cells, schema = ["id","i","j"])
df.show(truncate=False)

df = df.crossJoin(df.withColumnRenamed("i","i2").withColumnRenamed("j", "j2").withColumnRenamed("id", "id2"))
df.show(truncate=False)

def _areNeighbours(li, lj, ri, rj):
    dx = li - ri
    dy = lj - rj
    return (dx >= 0 and dx <= 1 and dy == 0) or (dx == 0 and dy >= 0 and dy <= 1)

areNeighbours = udf(_areNeighbours)
df = df.select('*', areNeighbours(df.i, df.j, df.i2, df.j2).alias("adjacent")) \

df = df.filter( df.adjacent == True).drop('adjacent')
df.printSchema()

df.show(truncate=False)


+---+---+---+
|id |i  |j  |
+---+---+---+
|a  |0  |0  |
|b  |0  |1  |
|c  |2  |2  |
|d  |3  |2  |
|e  |3  |3  |
|f  |0  |2  |
+---+---+---+

+---+---+---+---+---+---+
|id |i  |j  |id2|i2 |j2 |
+---+---+---+---+---+---+
|a  |0  |0  |a  |0  |0  |
|a  |0  |0  |b  |0  |1  |
|a  |0  |0  |c  |2  |2  |
|b  |0  |1  |a  |0  |0  |
|b  |0  |1  |b  |0  |1  |
|b  |0  |1  |c  |2  |2  |
|c  |2  |2  |a  |0  |0  |
|c  |2  |2  |b  |0  |1  |
|c  |2  |2  |c  |2  |2  |
|a  |0  |0  |d  |3  |2  |
|a  |0  |0  |e  |3  |3  |
|a  |0  |0  |f  |0  |2  |
|b  |0  |1  |d  |3  |2  |
|b  |0  |1  |e  |3  |3  |
|b  |0  |1  |f  |0  |2  |
|c  |2  |2  |d  |3  |2  |
|c  |2  |2  |e  |3  |3  |
|c  |2  |2  |f  |0  |2  |
|d  |3  |2  |a  |0  |0  |
|d  |3  |2  |b  |0  |1  |
+---+---+---+---+---+---+
only showing top 20 rows

root
 |-- id: string (nullable = true)
 |-- i: long (nullable = true)
 |-- j: long (nullable = true)
 |-- id2: string (nullable = true)
 |-- i2: long (nullable = true)
 |-- j2: long (nullable = true)

+---+---

In [None]:
vertices = df.select(df.id).distinct()

vertices.show()

edges = df.select(df.id.alias("src"), df.id2.alias("dst"))
edges.show()

+---+
| id|
+---+
|  c|
|  b|
|  a|
|  f|
|  d|
|  e|
+---+

+---+---+
|src|dst|
+---+---+
|  a|  a|
|  b|  a|
|  b|  b|
|  c|  c|
|  d|  c|
|  f|  b|
|  d|  d|
|  e|  d|
|  e|  e|
|  f|  f|
+---+---+



In [None]:
from graphframes import *
spark.sparkContext.setCheckpointDir(".")
g = GraphFrame(vertices, edges)

#result = g.connectedComponents(algorithm="graphx")
#result.select("id", "component").orderBy("component").show()

result = g.connectedComponents()
result.select("id", "component").orderBy("component").show()

+---+------------+
| id|   component|
+---+------------+
|  a|412316860416|
|  b|412316860416|
|  f|412316860416|
|  c|670014898176|
|  d|670014898176|
|  e|670014898176|
+---+------------+

