**import statement**

In [1]:
# from pyspark.sql import SparkSession
# sparkSession = SparkSession.builder.enableHiveSupport().master("local").getOrCreate()
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext()
sparkSession = SparkSession(sc)

from pyspark.sql.types import StructType, StructField, IntegerType, StringType 
from pyspark.sql import Row

**Graph in Edge list**

In [2]:
edgeList = [(1, 2), (1, 3), (1, 4), (2, 3), (2, 5), (3, 4), (3, 5), (3, 6), (3, 7)] 
graphData = sparkSession.sparkContext.parallelize(edgeList).map(lambda (src, dst): Row(src, dst))

**Define data frame schema**

In [3]:
graphSchemaAB = StructType([StructField('A', IntegerType(), nullable=False),
                            StructField('B', StringType(), nullable=False)])
graphSchemaBCl = StructType([StructField('B', IntegerType(), nullable=False),
                            StructField('C1', StringType(), nullable=False)])

**create dfs**

In [4]:
abDF = sparkSession.createDataFrame(graphData, graphSchemaAB)
bcDF = sparkSession.createDataFrame(graphData, graphSchemaBCl) 

In [5]:
abDF.show()

+---+---+
|  A|  B|
+---+---+
|  1|  2|
|  1|  3|
|  1|  4|
|  2|  3|
|  2|  5|
|  3|  4|
|  3|  5|
|  3|  6|
|  3|  7|
+---+---+



In [6]:
bcDF.show()

+---+---+
|  B| C1|
+---+---+
|  1|  2|
|  1|  3|
|  1|  4|
|  2|  3|
|  2|  5|
|  3|  4|
|  3|  5|
|  3|  6|
|  3|  7|
+---+---+



**test without reserved pairs**

In [52]:
abDF.join(bcDF, on="B").groupBy("A", "C1").count().orderBy("A", "C1").show()

+---+---+-----+
|  A| C1|count|
+---+---+-----+
|  1|  3|    1|
|  1|  4|    1|
|  1|  5|    2|
|  1|  6|    1|
|  1|  7|    1|
|  2|  4|    1|
|  2|  5|    1|
|  2|  6|    1|
|  2|  7|    1|
+---+---+-----+



The results are not right because the following:<br>
if we have A -> B -> C then B is a mutual friend, no matter what B > C or B < C <br>
in edge list, we include only consider pairs (B,C) where B < C<br>
so if we have A -> B, C -> B then B -> C is not included => wrong results

In [7]:
ab1DF = abDF.union(abDF.select("B", "A"))
# ab1DF.show()

In [8]:
bc1DF = bcDF.union(bcDF.select("C1", "B"))

In [12]:
jonDF = ab1DF.join(bc1DF, on="B")

In [41]:
# jonDF.drop('B').show()

**number of common friends**

In [14]:
jonDF.drop('B').groupBy("A", "C1").count().filter("A < C1").orderBy('A').show()

+---+---+-----+
|  A| C1|count|
+---+---+-----+
|  1|  3|    2|
|  1|  6|    1|
|  1|  4|    1|
|  1|  2|    1|
|  1|  7|    1|
|  1|  5|    2|
|  2|  6|    1|
|  2|  5|    1|
|  2|  7|    1|
|  2|  4|    2|
|  2|  3|    2|
|  3|  5|    1|
|  3|  4|    1|
|  4|  6|    1|
|  4|  7|    1|
|  4|  5|    1|
|  5|  6|    1|
|  5|  7|    1|
|  6|  7|    1|
+---+---+-----+



**Triangles Count**

In [20]:
# ab1DF.show()

In [21]:
# bc1DF.show()

In [17]:
ac2DF = ab1DF.select("A", "B").withColumnRenamed("B", "C2")

In [22]:
# ac2DF.show()

In [23]:
triDF = ab1DF.join(bc1DF, on="B")\
             .join(ac2DF, on="A")\
             .filter("C1 = C2")

In [34]:
triDF.show()

+---+---+---+---+
|  A|  B| C1| C2|
+---+---+---+---+
|  1|  3|  4|  4|
|  4|  3|  1|  1|
|  3|  5|  2|  2|
|  3|  1|  2|  2|
|  2|  3|  1|  1|
|  2|  3|  5|  5|
|  3|  2|  5|  5|
|  1|  4|  3|  3|
|  1|  2|  3|  3|
|  3|  4|  1|  1|
|  3|  2|  1|  1|
|  5|  2|  3|  3|
|  2|  5|  3|  3|
|  2|  1|  3|  3|
|  4|  1|  3|  3|
|  1|  3|  2|  2|
|  3|  1|  4|  4|
|  5|  3|  2|  2|
+---+---+---+---+



In [30]:
from pyspark.sql.functions import array, col, explode

In [33]:
triDF.select(array(col("A"), col("B"), col('C1')).alias("triangleVerticies"))\
     .select(explode("triangleVerticies").alias("triangleVertex"))\
     .groupBy("triangleVertex")\
     .count()\
     .show(100)

+--------------+-----+
|triangleVertex|count|
+--------------+-----+
|             3|   18|
|             5|    6|
|             1|   12|
|             4|    6|
|             2|   12|
+--------------+-----+



why the results are multiplied by 6?

In [35]:
abDF.show()

+---+---+
|  A|  B|
+---+---+
|  1|  2|
|  1|  3|
|  1|  4|
|  2|  3|
|  2|  5|
|  3|  4|
|  3|  5|
|  3|  6|
|  3|  7|
+---+---+



In [36]:
bcDF.show()

+---+---+
|  B| C1|
+---+---+
|  1|  2|
|  1|  3|
|  1|  4|
|  2|  3|
|  2|  5|
|  3|  4|
|  3|  5|
|  3|  6|
|  3|  7|
+---+---+



In [39]:
acDF = abDF.select("A", "B").withColumnRenamed("B", "C2")
acDF.show()

+---+---+
|  A| C2|
+---+---+
|  1|  2|
|  1|  3|
|  1|  4|
|  2|  3|
|  2|  5|
|  3|  4|
|  3|  5|
|  3|  6|
|  3|  7|
+---+---+



In [40]:
triDF1 = abDF.join(bcDF, on="B").join(acDF, on="A").filter("C1 = C2")
triDF1.show()

+---+---+---+---+
|  A|  B| C1| C2|
+---+---+---+---+
|  1|  3|  4|  4|
|  1|  2|  3|  3|
|  2|  3|  5|  5|
+---+---+---+---+



**number of triangle**
if we have A -> B -> C -> A then (A,B,C) is a triangle, without depending on direction

In [47]:
triDF1.select(array(col("A"), col("B"), col("C1")).alias("triangleVertices"))\
      .select(explode("triangleVertices").alias("triangleVertex"))\
      .groupBy("triangleVertex")\
      .count()\
      .orderBy("count")\
      .show()

+--------------+-----+
|triangleVertex|count|
+--------------+-----+
|             4|    1|
|             5|    1|
|             2|    2|
|             1|    2|
|             3|    3|
+--------------+-----+

