In [10]:
def prepare(graph):
    vertices = graph.vertices.indexedVertices
        .select(col("LONG_ID").as("ID"), col("ATTR"))
    
    edges = graph.indexedEdges
        .select(col("LONG_SRC").as("SRC"), col("LONG_DST").as("DST"))
    
    orderedEdges = {
        edges.filter(col("src") != col("dst"))
          .select(minValue(col("src"), col("dst")).as("src"), 
                  maxValue(col("src"), col("dst")).as("dst"))
          .distinct()
    }
    
    GraphFrame(vertices, orderedEdges)
  

SyntaxError: invalid syntax (<ipython-input-10-9cddc8007ac8>, line 8)

In [None]:
def minNbrs(edges):
    symmetrized_edges = ee.unionAll(ee.select(col("dst").as("src"), col("src").as("dst")))
    
    symmetrized_edges.groupBy("src").agg(min(col("dst")).as("min_nbr"), count("*").as("cnt"))

In [None]:
def skewedJoin(edges, minNbrs, broadcastThreshold):
    hubs = {
        minNbrs.filter(col("cnt") > broadcastThreshold)
            .select("src")
            .as[Long]
            .collect()
            .toSet
    }
    GraphFrame.skewedJoin(edges, minNbrs, "src", hubs)
  

In [None]:
def run(graph, broadcastThreshold):
    
    g = prepare(graph)
    vv = g.vertices
    ee = g.edges.persist(StorageLevel.MEMORY_AND_DISK)
    
    converged = False
    iteration = 1
    prevSum = None
    
    while (!converged):
        # large-star step
        # compute min neighbors including self
        minNbrs1 = minNbrs(ee)
            .withColumn("min_nbr", minValue(col("src"), col("min_nbr")).as("min_nbr"))
            .persist(StorageLevel.MEMORY_AND_DISK)
            
        # connect all strictly larger neighbors to the min neighbor (including self)
        ee = skewedJoin(ee, minNbrs1, broadcastThreshold)
            .select(col("dst").as("src"), col("min_nbr").as("dst")) # src > dst
            .distinct()
            .persist(StorageLevel.MEMORY_AND_DISK)

        # small-star step
        # compute min neighbors
        minNbrs2 = minNbrs(ee)
            .persist(StorageLevel.MEMORY_AND_DISK)

        # connect all smaller neighbors to the min neighbor
          ee = skewedJoin(ee, minNbrs2, broadcastThreshold)
            .select(col("min_nbr").as("src"), col("dst")) # src <= dst
        
        # connect self to the min neighbor
          ee = ee
            .unionAll(
              minNbrs2.select( # src <= dst
                minValue(col("src"), col("min_nbr")).as("src"),
                maxValue(col("src"), col("min_nbr")).as("dst")))
            .filter(col("src") !== col("dst")) # src < dst
            .distinct()
            
        ee.persist(StorageLevel.MEMORY_AND_DISK)

        # test convergence
        (currSum, cnt) = ee.select(sum(col("src")), count("*")).rdd.first()
        if (currSum == prevSum) {
            # This also covers the case when cnt = 0 and currSum is None, which means no edges.
            converged = true
        } else {
            prevSum = currSum
        }
        iteration += 1
    
    vv.join(ee, vv("id") === ee("dst"), "left_outer")
        .select(vv("attr"), when(ee("src").isNull, vv("id")).otherwise(ee("src")).as("component"))
        .select(col(s"$ATTR.*"), col("component"))
  