In [None]:
import org.apache.spark.graphx.util.GraphGenerators
import scala.util.Random
import org.apache.spark.sql.{Row, DataFrame}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{functions => F}
import org.apache.spark.sql.types.{IntegerType, LongType, DoubleType, StringType, StructField, StructType}

  

>     import org.apache.spark.graphx.util.GraphGenerators
>     import scala.util.Random
>     import org.apache.spark.sql.{Row, DataFrame}
>     import org.apache.spark.sql.expressions.Window
>     import org.apache.spark.sql.{functions=>F}
>     import org.apache.spark.sql.types.{IntegerType, LongType, DoubleType, StringType, StructField, StructType}

In [None]:
val numNodes = 1520925
val numEdges = 2152835

  

>     numNodes: Int = 1520925
>     numEdges: Int = 2152835

In [None]:
def makeEdgesCanonical (edgeDF : org.apache.spark.sql.DataFrame): org.apache.spark.sql.DataFrame = {
  // Provide each node with an index id
  val nodes = edgeDF.select(F.col("src").alias("node")).union(edgeDF.select(F.col("dst").alias("node"))).distinct()
  val nodes_window = Window.orderBy("node")
  val nodesWithids = nodes.withColumn("id", F.row_number().over(nodes_window))
  
  // Add the canonical node ids to the edgeDF and drop the old ids
  val dstNodes = nodesWithids.withColumnRenamed("node", "dst").withColumnRenamed("id", "dst__")
  val srcNodes = nodesWithids.withColumnRenamed("node", "src").withColumnRenamed("id", "src__")
  val edgesWithBothIds = edgeDF.join(dstNodes, dstNodes("dst") === edgeDF("dst"))
                           .join(srcNodes, srcNodes("src") === edgeDF("src"))
                           .drop("src").drop("dst")
  
  val edgesWithCanonicalIds = edgesWithBothIds.withColumn("src",
                    F.when(F.col("dst__") > F.col("src__"), F.col("src__")).otherwise(F.col("dst__"))
                  ).withColumn("dst",
                    F.when(F.col("dst__") > F.col("src__"), F.col("dst__")).otherwise(F.col("src__"))
                  ).drop("src__").drop("dst__").distinct().where(F.col("src") =!= F.col("dst"))
  
  val edges_window = Window.orderBy(F.col("src"), F.col("dst"))
  val GroupedCanonicalEdges = edgesWithCanonicalIds.withColumn("id", F.row_number().over(edges_window))
  return GroupedCanonicalEdges
}

  

>     makeEdgesCanonical: (edgeDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame

  

### Generate Erdös-Renyi graph (uniform edge sampling)

In [None]:
val nodes = sc.parallelize(0 until numNodes)
val randomEdges = sc.parallelize(0 until numEdges).map {
  idx =>
    val random = new Random(42 + idx)
    val src = random.nextInt(numNodes)
    val dst = random.nextInt(numNodes)
    if (src > dst) Row(dst, src) else Row(src, dst)
}

val schema = new StructType()
  .add(StructField("src", IntegerType, true))
  .add(StructField("dst", IntegerType, true))

val groupedCanonicalEdges = makeEdgesCanonical(spark.createDataFrame(randomEdges, schema))
val test = groupedCanonicalEdges.describe()
test.show()

  

>     +-------+-----------------+------------------+-----------------+
>     |summary|              src|               dst|               id|
>     +-------+-----------------+------------------+-----------------+
>     |  count|          2152831|           2152831|          2152831|
>     |   mean|503954.5198787085|1007302.6882690745|        1076416.0|
>     | stddev|356344.9662673391|356346.95043392474|621468.9230224366|
>     |    min|                1|              1230|                1|
>     |    max|          1510148|           1511245|          2152831|
>     +-------+-----------------+------------------+-----------------+
>
>     nodes: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[587] at parallelize at command-685894176419819:1
>     randomEdges: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[589] at map at command-685894176419819:2
>     schema: org.apache.spark.sql.types.StructType = StructType(StructField(src,IntegerType,true), StructField(dst,IntegerType,true))
>     groupedCanonicalEdges: org.apache.spark.sql.DataFrame = [src: int, dst: int ... 1 more field]
>     test: org.apache.spark.sql.DataFrame = [summary: string, src: string ... 2 more fields]

In [None]:
groupedCanonicalEdges.write.format("parquet").mode("overwrite").save("/projects/group21/uniform_random_graph")

  

  

### Generate RMAT graph

In [None]:
val rmatGraphraw = GraphGenerators.rmatGraph(sc=spark.sparkContext, requestedNumVertices=numNodes, numEdges=numEdges).removeSelfEdges().convertToCanonicalEdges()
val rmatedges = rmatGraphraw.edges.map{ 
  edge => Row(edge.srcId, edge.dstId)
}

val schema = new StructType()
  .add(StructField("src", LongType, true))
  .add(StructField("dst", LongType, true))

val rmatGroupedCanonicalEdges = makeEdgesCanonical(spark.createDataFrame(rmatedges, schema))
val test = rmatGroupedCanonicalEdges.describe()
test.show()

  

>     +-------+-----------------+------------------+-----------------+
>     |summary|              src|               dst|               id|
>     +-------+-----------------+------------------+-----------------+
>     |  count|          2150915|           2150915|          2150915|
>     |   mean|270135.6247001857|481234.69323520456|        1075458.0|
>     | stddev|225243.9236773024|244586.28516170362|620915.8214645528|
>     |    min|                1|                 3|                1|
>     |    max|           895135|            895435|          2150915|
>     +-------+-----------------+------------------+-----------------+
>
>     rmatGraphraw: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@a9b071f
>     rmatedges: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[516] at map at command-685894176419823:2
>     schema: org.apache.spark.sql.types.StructType = StructType(StructField(src,LongType,true), StructField(dst,LongType,true))
>     rmatGroupedCanonicalEdges: org.apache.spark.sql.DataFrame = [src: int, dst: int ... 1 more field]
>     test: org.apache.spark.sql.DataFrame = [summary: string, src: string ... 2 more fields]

In [None]:
rmatGroupedCanonicalEdges.write.format("parquet").mode("overwrite").save("/projects/group21/rmat_random_graph")