# IMDB

This notebook processes IMDB data to find the following information:
* The most connected actor.
* Degrees of separation between two actors.

To achieve the results, the process will have to perform the following tasks:
* Read IMDB title princials file.
* Map all lines to Title type (defined below) and reduce to one line per title.
* Extract Actors/Actresses relationships from Title to create the Connection type.
* Reduce Network to find the most connected Actor/Actress.
* Perform BREADTH-FIRST SEARCH to find the (smallest) degrees of separation between two given Actors/Actresses. 

References:
* [IMDB data description](https://www.imdb.com/interfaces/)
* [IMDB datasets](https://datasets.imdbws.com/)
* [BREADTH-FIRST SEARCH](https://en.wikipedia.org/wiki/Breadth-first_search)

In [1]:
// Imports and types definition

import org.apache.spark.util.LongAccumulator
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ArrayBuffer

type Title = (String, Array[String]) // (Title id, list of actors/actresses)
type Connection = (String, Array[String]) // (Actor/Actress id, connected actors/actresses)

type BFSData = (Array[String], Int, Int) // (List of id, process status, separation)
type BFSNode = (String, BFSData) // (Actor/Actress id, BFSData)

Intitializing Scala interpreter ...

Spark Web UI available at http://78598ab614c0:4040
SparkContext available as 'sc' (version = 3.1.1, master = local[*], app id = local-1625706066641)
SparkSession available as 'spark'


import org.apache.spark.util.LongAccumulator
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ArrayBuffer
defined type alias Title
defined type alias Connection
defined type alias BFSData
defined type alias BFSNode


## First part: find the most connected acter/actress

In [2]:
// Read IMDB dataset, keep only actor and actress

def createRaw(sc:SparkContext):RDD[(String,String)] = {
    // Load base file
    val base = sc.textFile("data/title.principals.tsv")
    .map(x => (x.split("\t")(0), x.split("\t")(2), x.split("\t")(3)))
    .filter(x => x._3 == "actor" || x._3 == "actress")
    .map(x => (x._1, x._2)) // Make pair RDD
    
    // Load titles to be removed (not a movie or isAdult = 1)
    val adult = sc.textFile("data/title.basics.tsv")
    .map(x => (x.split("\t")(0),x.split("\t")(1),x.split("\t")(4)))
    .filter(x => x._2 != "movie" || x._3 == 1)
    .map(x => (x._1, 0)) // Make pair RDD
    
    return base.subtractByKey(adult)
}

val raw = createRaw(sc)

raw.take(10).foreach(println)


(tt6925244,nm0103977)
(tt9472442,nm0352195)
(tt9472442,nm0470050)
(tt9472442,nm0021373)
(tt9472442,nm0093210)
(tt8374136,nm8045290)
(tt8374136,nm8745061)
(tt8374136,nm7329793)
(tt6229652,nm7899734)
(tt6229652,nm1699249)


createRaw: (sc: org.apache.spark.SparkContext)org.apache.spark.rdd.RDD[(String, String)]
raw: org.apache.spark.rdd.RDD[(String, String)] = SubtractedRDD[10] at subtractByKey at <console>:44


In [3]:
// Print to debug RDD
def printTitle(title:Title){
    println(title._1)
    for(t <- title._2){
        println(s"\t${t}")
    }
}

// Map the raw input to the Title type
def mapTitle(line:(String, String)):Title = {
    var arr:ArrayBuffer[String] = new ArrayBuffer()
    arr += line._2
    (line._1, arr.toArray)
}

// Recude the raw input to have one line poer title
def reduceTitle(title1:Array[String], title2:Array[String]):Array[String] = {
    var arr:ArrayBuffer[String] = new ArrayBuffer()
    for(t <- title1){
        arr += t
    }
    
    for(t <- title2){
        arr += t
    }
    
    arr.toArray.distinct
}

val titleRDD = raw.map(mapTitle).reduceByKey(reduceTitle)

titleRDD.take(5).foreach(printTitle)

tt5674064
	nm1794460
	nm0434263
	nm8105573
	nm0474609
tt1620421
	nm1938461
	nm3310504
	nm2933317
	nm2714718
tt1401235
	nm1543296
	nm0465631
	nm0619952
	nm2975382
tt0125125
	nm0741411
	nm0214150
	nm0220116
	nm0222540
	nm0498140
	nm0277618
	nm0445173
	nm0446178
	nm0593722
	nm0000561
tt0106810
	nm0293739
	nm0053903
	nm0433564
	nm0419951


printTitle: (title: Title)Unit
mapTitle: (line: (String, String))Title
reduceTitle: (title1: Array[String], title2: Array[String])Array[String]
titleRDD: org.apache.spark.rdd.RDD[(String, Array[String])] = ShuffledRDD[12] at reduceByKey at <console>:61


In [4]:
// Print to debug RDD
def printConnection(con:Connection){
    println(con._1)
    for(c <- con._2){
        println(s"\t${c}")
    }
}

def titleToConnection(title:Title):Array[Connection] = {
    var outArray:ArrayBuffer[Connection] = new ArrayBuffer()
    
    val inArray = title._2
    
    for(k <- inArray){
        var conArray:ArrayBuffer[String] = new ArrayBuffer()
        for(v <- inArray){
            if(k != v){ // Skip self
                conArray += v
            }
        }
        outArray += ((k, conArray.toArray))
    }
    outArray.toArray
}

def reduceConnection(c1:Array[String], c2:Array[String]): Array[String] = {
    var outArray:ArrayBuffer[String] = new ArrayBuffer()
    
    for(c <- c1){
        outArray += c
    }
    for(c <- c2){
        outArray += c
    }
    outArray.toArray.distinct
}

val connectionRDD = titleRDD.flatMap(titleToConnection).reduceByKey(reduceConnection)

//connectionRDD.take(5).foreach(printConnection)

printConnection: (con: Connection)Unit
titleToConnection: (title: Title)Array[Connection]
reduceConnection: (c1: Array[String], c2: Array[String])Array[String]
connectionRDD: org.apache.spark.rdd.RDD[(String, Array[String])] = ShuffledRDD[14] at reduceByKey at <console>:70


In [5]:
def printMostConnected(sc:SparkContext, ac:(String, Int)){
    val nameLookup = sc.textFile("data/name.basics.tsv")
    .map(x => (x.split("\t")(0),x.split("\t")(1)))
    .filter(x => x._1 == ac._1)
    .take(1)(0)
    
    println(s"${nameLookup._2} is the most connected actor/actress with ${ac._2} connections.")
}

val mostConnected = connectionRDD.map(x => (x._1, x._2.length)).toDS().sort(desc("_2")).take(1)(0)

printMostConnected(sc, mostConnected)

Brahmanandam is the most connected actor/actress with 929 connections.


printMostConnected: (sc: org.apache.spark.SparkContext, ac: (String, Int))Unit
mostConnected: (String, Int) = (nm0103977,929)


## Second part: perform BFS to find degrees of separation between to given actors/actresses

In [6]:
// Process status => 0 = not processed / 1 = currently processing / 2 = already processed

def toBFSNode(line: Connection):BFSNode = {
    (line._1, (line._2, 0, 9999))
}

val bfsBase = connectionRDD.map(toBFSNode)

// Select an actor/actress to start
val fromAct = "nm3053338" // Robert De Niro
// Select an actor/actress as target
val toAct = "nm3053338" // Margot Robbie

var bfs = bfsBase.map( x => if (x._1 == fromAct) (x._1, (x._2._1, 1, 0)) else x )

// Accumulator
var hitCounter:Option[LongAccumulator] = None

// flatMap: explode nodes to be processed in next iteration
def bfsMap(node:BFSNode):Array[BFSNode] = {
    var outArray:ArrayBuffer[BFSNode] = new ArrayBuffer()
    
    var thisNode = node
    
    val id = node._1
    val data = node._2
    
    val connections:Array[String] = data._1
    val color = data._2
    val currentDistance = data._3
    
    // If we find the target id, increase the count to stop the loop
    if (id == toAct) {
        if (hitCounter.isDefined) {
            hitCounter.get.add(1)
        }
        outArray += ((id, (connections, 2, currentDistance + 1)))
        return outArray.toArray
    }
    
    if(color == 1){ // Gray node: This is the node we need to process
        thisNode = (id, (connections, 2, currentDistance))
        
        // Add each connection to the output Array
        for(c <- connections){
            val emptyArray:Array[String] = new Array(0)
            outArray += ((c, (emptyArray,1,currentDistance + 1)))
        }
    }
    outArray += thisNode
    outArray.toArray
}


// Reduce function: combine nodes in different states
def bfsReduce(node1:BFSData, node2:BFSData):BFSData = {
    val node1Connections = node1._1
    val node2Connections = node2._1
    val node1Color = node1._2
    val node2Color = node2._2
    val node1Distance = node1._3
    val node2Distance = node2._3
    
    var outArray:ArrayBuffer[String] = new ArrayBuffer()
    node1Connections.foreach( x => outArray += x)
    node2Connections.foreach( x => outArray += x)

    val outColor = List(node1Color, node2Color).max
    val outDistance = List(node1Distance, node2Distance).min
    
    (outArray.toArray, outColor, outDistance)
}

hitCounter = Some(sc.longAccumulator("Hit Counter"))
var iteration:Int = 0
while(hitCounter.get.value == 0){
    val greyNodes = bfs.filter(x => x._2._2 == 1 )
    println(s"Iteration ${iteration}. Processing ${greyNodes.count()} nodes.")
    
    val mapped = bfs.flatMap(bfsMap)
    
    bfs = mapped.reduceByKey(bfsReduce)
    
    iteration += 1
}

println(s"Separation between ${fromAct} and ${toAct} is ${iteration - 1}.")

Iteration 0. Processing 1 nodes.
Iteration 1. Processing 0 nodes.
Separation between nm3053338 and nm3053338 is 1.


toBFSNode: (line: Connection)BFSNode
bfsBase: org.apache.spark.rdd.RDD[BFSNode] = MapPartitionsRDD[24] at map at <console>:41
fromAct: String = nm3053338
toAct: String = nm3053338
bfs: org.apache.spark.rdd.RDD[(String, (Array[String], Int, Int))] = ShuffledRDD[31] at reduceByKey at <console>:116
hitCounter: Option[org.apache.spark.util.LongAccumulator] = Some(LongAccumulator(id: 213, name: Some(Hit Counter), value: 1))
bfsMap: (node: BFSNode)Array[BFSNode]
bfsReduce: (node1: BFSData, node2: BFSData)BFSData
hitCounter: Option[org.apache.spark.util.LongAccumulator] = Some(LongAccumulator(id: 213, name: Some(Hit Counter), value: 1))
iteration: Int = 2
