In [None]:
import org.apache.spark.SparkContext
import org.apache.spark.graphx.{Edge, Graph, VertexId}

def parseNames(line:String):Option[(VertexId, String)] = {
    var fields = line.split("\"")
    if (fields.length > 1) {
      val heroId:Long = fields(0).trim.toLong
      if (heroId < 6487) {
        return Some(fields(0).trim.toLong, fields(1))
      }
    }
    None
}

def makeEdges(line:String):List[Edge[Int]] = {
    import scala.collection.mutable.ListBuffer
    var edges = new ListBuffer[Edge[Int]]()
    val fields = line.split(" ")
    val origin = fields(0)

    for (x <- 1 to (fields.length-1)) {
      edges += Edge(origin.toLong, fields(x).toLong, 0)
    }

    edges.toList
}

// vertices
val names = sc.textFile("Marvel-names.txt")
val vertices = names.flatMap(parseNames)

// edges
val lines = sc.textFile("Marvel-graph.txt")
val edges = lines.flatMap(makeEdges)

// graph
val default = "Nobody"
val graph = Graph(vertices, edges, default).cache()

// degrees of sepeartion calculation

// SpiderMan
val root:VertexId = 5306
val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else Double.PositiveInfinity)

// pregel algorithm
// pregel sends initial message of PositiveInfinity to every vertex and we set up 10 iterations
// TODO --- correct pregel arguments
val bfs = initialGraph.pregel(Double.PositiveInfinity, 10)(
  // program for vertex - it has to preserve the shortest distance between incoming message and current attribute
  (id, attr, msg) => attr,

  // send message function - propagates out to all neighbours every iteration
  triplet => Iterator.empty,

  // reduce operation - preserving minimum of messages received by vertex if it received more than one in each iteration
  (a, b) => a
)
// TODO ^^^ correct pregel arguments

// get top 10 results
bfs.vertices.join(vertices).take(10).foreach(println)
// like in previous exercise SpiderMan to Adam
println("\n\nDegrees from SpiderMan to ADAM")
bfs.vertices.filter(x => x._1 == 14).collect.foreach(println)