## Analyzing Click Stream Data with the GraphX Library

In [1]:
val maxEdgesFirstShell = 25
val maxEdgesNthShell = 20

maxEdgesFirstShell = 25
maxEdgesNthShell = 20


20

We will take a look at the Wikipedia ClickStream Dataset (TODO:  Link to original dataset).  This dataset reports thee number of times a user goes from one Wikipedia site to another.  The first two columns are the page numbers of the "from" site and "to" site, respectively (as indexed numbers).  The third column is the the number of clicks (the edge values).  The remaining columns are the "from" and "to" sites (as names). If a "from" site is outside of the Wikipedia corpus, it is listed as "other".  We will remove these sites, because it is not interesting to count those entries, and the numbers are much larger than the clickstream traffic within Wikipedia.

In [2]:
val lines = sc.textFile("headPlusWatsonPlusTeslaPlusApple.tsv")
lines.take(20).foreach(println)

Name: org.apache.hadoop.mapred.InvalidInputException
Message: Input path does not exist: file:/Users/nilmeier@us.ibm.com/Box Sync/git/DSatEnterpriseScale/headPlusWatsonPlusTeslaPlusApple.tsv
StackTrace:   at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287)
  at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
  at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
  at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(R

## Defining Some ETL Functions

There is some ETL that we will need to carry out in order to get a graph model that is worth using.  One such function is a simple indexing of the "other" sites so that we can filter them out later on.

In [3]:
// simple mapping of 'other names to a unique index'
def getIndex(x: String): Int = x match {
    case "other-wikipedia" => 1
    case "other-empty"     => 2
    case "other-internal"  => 3    
    case "other-google"    => 4
    case "other-yahoo"     => 5
    case "other-bing"      => 6
    case "other-facebook"  => 7
    case "other-twitter"   => 8 
    case "other-other"     => 9 
    case _ => 0
}

lastException: Throwable = null
getIndex: (x: String)Int


Some more ETL Functions here.  We need to populate empty fields with null values that have the same type.  The API here is a bit less robust than the DataFrames API, but nothing we can't handle!

In [4]:
def assignOtherSourceIndex(partsLine:Array[String]): String = {
    println( "assigning other source to " + partsLine(0))
    if (partsLine(0) == "") {
//      println("empty")
      return getIndex(partsLine(3)).toString
    }
    else {
//       println("in nonblank")
       return partsLine(0)
    }
}
// filling empty number fields with a large number
def fillWithNum(element: String): String = {
    if (element == ""){ 999999999.toString }
    else {element}
}

// filling empty column fields with a filler string
def fillWithString(element: String, colNum: Int): String = {
    if (element == ""){"column"+colNum+"Fill"}
    else{element}

}

assignOtherSourceIndex: (partsLine: Array[String])String
fillWithNum: (element: String)String
fillWithString: (element: String, colNum: Int)String


Now we're going to take our raw RDD and add all of our ETL functions.  We're filtering out redlinks, as well as all clicktstreams from outside Wikipedia, and also handling empty fields.

In [5]:
val parts = lines.map(l=>l.split("\t")).filter(l => !(l.contains("redlink"))). //splitting on tabs and filtering out redlinks
                                                    map(l => Array(assignOtherSourceIndex(l), 
                                                    fillWithNum(l(1)),fillWithNum(l(2)),
                                                    fillWithString(l(3),4),fillWithString(l(4),5))).
                                                    filter(y=>y(0).toInt>9)

parts.take(20).foreach(x => println(x(0)+"\t" + x(1) + "\t" + x(2) + "\t" + x(3) + "\t" + x(4)))

Name: Unknown Error
Message: <console>:31: error: not found: value lines
       val parts = lines.map(l=>l.split("\t")).filter(l => !(l.contains("redlink"))). //splitting on tabs and filtering out redlinks
                   ^

StackTrace: 

We can convert this into a DataFrame.  For this notebook, the DataFrame is only used briefly to show the column labels.  The `parts` RDD is used to construct the graph.

In [6]:
case class Fields(prev_id: Int, curr_id: Int, n: Int, 
           prev_title: String, curr_title: String)

defined class Fields


In [7]:
val clicksDataFrame = parts.map(
     p => Fields(p(0).toInt, p(1).toInt, p(2).toInt, p(3), p(4))).toDF

// registering dataframe
clicksDataFrame.registerTempTable("clicks")
clicksDataFrame.show(20)

Name: Unknown Error
Message: <console>:27: error: not found: value parts
       val clicksDataFrame = parts.map(
                             ^

StackTrace: 

We're also going to explicitly deduplicate our vertices before building the graph.  The graphX API will handle this for us, but It saves us some time later on.

In [8]:
//nodes should be deduplicated
val nodes1 = (parts.map{p => Array(p(0).toString, p(3)) })
nodes1.cache()
val uniqueNodes1 = nodes1.map(x => x(0)+"-0-"+x(1)).distinct.  //trick for accessing distinct
    map(x => Array(x.split("-0-")(0), x.split("-0-")(1)))  //resplitting to original structure

val nodes2 = (parts.map{p => Array(p(1).toString, p(4)) })
nodes2.cache()             
val uniqueNodes2 = nodes2.map(x => x(0)+"-0-"+x(1)).distinct.  //trick for accessing distinct
    map(x => Array(x.split("-0-")(0), x.split("-0-")(1)))  //resplitting to original structure
                         //converting to vertex RDD


uniqueNodes1.cache
uniqueNodes2.cache
val uniqueNodesBoth = (uniqueNodes1 ++ uniqueNodes2).map(x => x(0)+"-0-"+x(1)).distinct.  //trick for accessing distinct
    map(x=>Array(x.split("-0-")(0), x.split("-0-")(1))).  //resplitting to original structure
    map{ x=> (x(0).toInt.toLong, (x(1), x(1))) }  
uniqueNodesBoth.count

Name: Unknown Error
Message: <console>:26: error: not found: value parts
       val nodes1 = (parts.map{p => Array(p(0).toString, p(3)) })
                     ^
<console>:31: error: not found: value parts
       val nodes2 = (parts.map{p => Array(p(1).toString, p(4)) })
                     ^

StackTrace: 

Now, we construct the graph.  A graph consists of an RDD of vertices, and an RDD of edges, along with some error handling for dangling edges.  A printout of the first 10 edges and vertices is listed below.  Note that the vertices have some extra label annotation, while the edges have only the integer indexing and the edge value.  This is because the edges RDD is much larger than the vertices RDD.

In [9]:
// importing the graphx library
import org.apache.spark.graphx._
case class nodeFields(nodeID: Int, nodeName: String)
 
val edges = parts.map(x => Edge(x(0).toInt.toLong,x(1).toInt.toLong, x(2)) )
val defaultNode = ("default Node", "Missing")

//Graph is a wrapper to a vertex list and a edge list, with the defaultNode as well.
//It does some internal bookkeepping to maintain consistency before packaging.
val graph = Graph(uniqueNodesBoth,edges,defaultNode)
//main graph will be searched very frequently
graph.cache()

println("\nfirst 10 vertices")
graph.vertices.take(10).foreach(println)

println("\nfirst 10 edges")
graph.edges.take(10).foreach(println)

Name: Unknown Error
Message: <console>:17: error: not found: value parts
       val edges = parts.map(x => Edge(x(0).toInt.toLong,x(1).toInt.toLong, x(2)) )
                   ^
<console>:22: error: not found: value uniqueNodesBoth
Error occurred in an application involving default arguments.
       val graph = Graph(uniqueNodesBoth,edges,defaultNode)
                         ^

StackTrace: 

## Graph Processing Functions

We want to find the top N edges connected to any particular node and discard the remaining edges.  This is known as graph pruning.

In [10]:
def pruneGraphByMaxEdges(maxEdges: Int,  bigGraph:  Graph[(String, String),String]): 
                                                    Graph[(String, String),String] = {
    val minCount = bigGraph.triplets.sortBy(_.attr.toInt, ascending=false).
                                    map(x=>x.attr.toInt).take(maxEdges).reverse(0)

    return bigGraph.subgraph(epred = x => x.attr.toInt >= minCount)
}

Name: Unknown Error
Message: <console>:26: error: not found: type Graph
                                                           Graph[(String, String),String] = {
                                                           ^
<console>:25: error: not found: type Graph
       def pruneGraphByMaxEdges(maxEdges: Int,  bigGraph:  Graph[(String, String),String]):
                                                           ^

StackTrace: 

With our pruning function in hand, we want to start with a particular node and build a *shell* of N nodes around that central node.   ONce that graph is built, we want to add a shell to each node of that graph.  We can do this as many times as we like, but we're only creating 3 shells in this notebook.  Ultimately, this will give us a list of all important wikipedia sites that are 4 clicks or less away from the source site. 

In [11]:
def addShellToGraph(thisGraph: Graph[(String, String),String], shellNum:Int ): Graph[(String, String),String]  = {

    val searchStringList = thisGraph.triplets.map(x => x.srcAttr._1).collect
    def recursiveGraphBuild(i:Int,prevGraph: Graph[(String, String),String] ):Graph[(String, String),String] = {
        
        val currentGraph =  pruneGraphByMaxEdges(maxEdgesNthShell,
                            graph.subgraph(epred = x => x.srcAttr._1 == searchStringList(i))
                            )

        val nextGraph = Graph( graph.vertices, 
                                (prevGraph.edges++currentGraph.edges).distinct     )

        if (i == searchStringList.length - 1 ) {   
            return nextGraph
        }
        else {return recursiveGraphBuild(i+1, nextGraph)}
    }
    var i = 0
    return recursiveGraphBuild(0,thisGraph)
}


Name: Unknown Error
Message: <console>:27: error: not found: type Graph
       def addShellToGraph(thisGraph: Graph[(String, String),String], shellNum:Int ): Graph[(String, String),String]  = {
                                                                                      ^
<console>:27: error: not found: type Graph
       def addShellToGraph(thisGraph: Graph[(String, String),String], shellNum:Int ): Graph[(String, String),String]  = {
                                      ^
<console>:30: error: not found: type Graph
           def recursiveGraphBuild(i:Int,prevGraph: Graph[(String, String),String] ):Graph[(String, String),String] = {
                                                                                     ^
<console>:30: error: not found: type Graph
           def recursiveGraphBuild(i:Int,prevGraph: Graph[(String, String),String] ):Graph[(String, String),String] = {
                                                    ^
<console>:32: error: not found: value pruneGra

Now let's build our clickstream graph!  We'll start with the Watson site.  Some other sites are listed there as well.  Feel free to return to this and generate graphs with those after runnig through the first example. 

In [12]:
// MAIN QUERY  ============================================
//Here are a list of sites that work well for the prototype dataset

val centerVertex = "Watson_(computer)"
//val centerVertex = "Heroes"
//val centerVertex = "Tesla_Motors"
//val centerVertex = "Apple_Inc."

centerVertex = Watson_(computer)


Watson_(computer)

First, we generate the first cell of sites around the center vertex (Watson)

In [13]:
val smallGraph = pruneGraphByMaxEdges(maxEdgesFirstShell, 
                 graph.subgraph(epred = x => x.dstAttr._1.contains(centerVertex) && 
                               !(x.srcAttr._1.contains("other-")) && 
                               !(x.srcAttr._1 == "Main_Page"))
                )
smallGraph.cache
//smallGraph.triplets.count

Name: Unknown Error
Message: <console>:29: error: not found: value pruneGraphByMaxEdges
       val smallGraph = pruneGraphByMaxEdges(maxEdgesFirstShell,
                        ^
<console>:30: error: not found: value graph
                        graph.subgraph(epred = x => x.dstAttr._1.contains(centerVertex) &&
                        ^
<console>:30: error: not found: value epred
                        graph.subgraph(epred = x => x.dstAttr._1.contains(centerVertex) &&
                                       ^

StackTrace: 

Here is a list of *all* of the vertices connected to Watson.  We'll build our larger graph from this one.  We're calling the `triplets` method, which returns the edge between two vertices (one is always Watson).

In [14]:
smallGraph.triplets.collect.foreach(println)

Name: Unknown Error
Message: <console>:26: error: not found: value smallGraph
       smallGraph.triplets.collect.foreach(println)
       ^

StackTrace: 

Now we'll build out the additional shells from this original graph.  We're using a very small dataset here, so it only takes about a minute.

In [15]:
//smallGraph.triplets.count
val maxEdgesPerFirstShell = 50
val maxEdgesPerNthShell = 20

val t0 = System.nanoTime
// called 3 times (manually)
val graph1p1 = addShellToGraph(smallGraph,2)
val graph2p1 = addShellToGraph(graph1p1,3)
// fourth click takes longer and is not very informative (so far)
val graph3p1 = addShellToGraph(graph2p1,4)
//val graph2 = graph3p1
graph3p1.edges.count 
val dt = ((System.nanoTime-t0)/1.0e6.round/1.0e3).toString

Name: Unknown Error
Message: <console>:31: error: not found: value addShellToGraph
       val graph1p1 = addShellToGraph(smallGraph,2)
                      ^
<console>:31: error: not found: value smallGraph
       val graph1p1 = addShellToGraph(smallGraph,2)
                                      ^
<console>:32: error: not found: value addShellToGraph
       val graph2p1 = addShellToGraph(graph1p1,3)
                      ^
<console>:34: error: not found: value addShellToGraph
       val graph3p1 = addShellToGraph(graph2p1,4)
                      ^

StackTrace: 

## Visualizing the Graph

Since we're using scala here, we're just going to output the graph as a webpage that we will visualize in a separate page.  The rest of the code here is just reformatting and writing an html file that uses the d3 library.  After this has been run, go to the `site` directory and type:

`(py35) python -m http.server`

and go to `localhost:8000` (or `remotehost:8000`) to see your graph!


In [16]:
val outputVertexList = (graph3p1.triplets.map(x=>x.srcAttr._1) ++ graph3p1.triplets.map(x=>x.dstAttr._1)).distinct

// getting unique indices for vertices that will be referenced in the 'links' section of the json output.
val outputVertexListZipped=outputVertexList.collect.zipWithIndex

def getLinkIndex(name: String): Int = { outputVertexListZipped.filter(x=>x._1==name)(0)._2}
centerVertex


Name: Unknown Error
Message: <console>:27: error: not found: value graph3p1
       val outputVertexList = (graph3p1.triplets.map(x=>x.srcAttr._1) ++ graph3p1.triplets.map(x=>x.dstAttr._1)).distinct
                               ^
<console>:27: error: not found: value graph3p1
       val outputVertexList = (graph3p1.triplets.map(x=>x.srcAttr._1) ++ graph3p1.triplets.map(x=>x.dstAttr._1)).distinct
                                                                         ^

StackTrace: 

In [17]:
// JSON WRITING ===========================================
import sys.process._
import java.io._

val dirname = "site"
val pw = new PrintWriter(new File(dirname + "/" + centerVertex + ".json"))

// a quick way to normalize edges (not rigorous):
val maxEdgeVal = graph3p1.triplets.sortBy(_.attr.toInt, ascending=false).
                                        map(x=>x.attr.toInt).take(1)(0)

// recall that only *edges* are filtered, the full node list is kept at all times. 
// (it saves time when rebuilding Graphs)
       
//formatting vertices (lots of delimiter issues)                        
val jsonVertices = outputVertexList.map(x=>x.replace("""\""" ,"""\\""")). // backslash
                                    map(x=>x.replace("\"","\\\"")). // quote delimiters
                                    map(x=>"    {\"name\":\"" + x + "\",\"group\":1}").
                                    collect

Name: Unknown Error
Message: <console>:35: error: not found: value graph3p1
val maxEdgeVal = graph3p1.triplets.sortBy(_.attr.toInt, ascending=false).
                 ^
<console>:35: error: not found: value ascending
val maxEdgeVal = graph3p1.triplets.sortBy(_.attr.toInt, ascending=false).
                                                        ^
<console>:42: error: not found: value outputVertexList
val jsonVertices = outputVertexList.map(x=>x.replace("""\""" ,"""\\""")). // backslash
                   ^

StackTrace: 

In [18]:
val jsonEdges = graph3p1.triplets.map(x => (x.srcAttr._1,x.dstAttr._1,x.attr)).collect.map(y =>  
                                    "    {\"source\":" + getLinkIndex(y._1).toString +
                                    ",\"target\":" + getLinkIndex(y._2).toString +
                                    ",\"value\":" + (y._3.toFloat/maxEdgeVal*100).ceil.toInt.
                                    toString  + "}"  
)

Name: Unknown Error
Message: <console>:25: error: not found: value graph3p1
       val jsonEdges = graph3p1.triplets.map(x => (x.srcAttr._1,x.dstAttr._1,x.attr)).collect.map(y =>
                       ^
<console>:26: error: not found: value getLinkIndex
                                           "    {\"source\":" + getLinkIndex(y._1).toString +
                                                                ^
<console>:27: error: not found: value getLinkIndex
                                           ",\"target\":" + getLinkIndex(y._2).toString +
                                                            ^

StackTrace: 

In [19]:
// writing json

pw.write("{\n")                             // main header
pw.write("  \"nodes\":[\n")                 // nodes header

for (i<-0 until jsonVertices.length){       // nodes
    if(i == jsonVertices.length-1){pw.write(jsonVertices(i))}
    else {pw.write(jsonVertices(i)+",")}
    pw.write("\n")}
pw.write("  ],\n")                          // end nodes header
pw.write("  \"links\":[\n")      

Name: Unknown Error
Message: <console>:37: error: not found: value pw
       pw.write("  \"links\":[\n")
       ^
<console>:27: error: not found: value pw
       pw.write("{\n")                             // main header
       ^
<console>:28: error: not found: value pw
       pw.write("  \"nodes\":[\n")                 // nodes header
       ^
<console>:30: error: not found: value jsonVertices
       for (i<-0 until jsonVertices.length){       // nodes
                       ^
<console>:31: error: not found: value pw
           if(i == jsonVertices.length-1){pw.write(jsonVertices(i))}
                                          ^
<console>:31: error: not found: value jsonVertices
           if(i == jsonVertices.length-1){pw.write(jsonVertices(i))}
                                                   ^
<console>:32: error: not found: value pw
           else {pw.write(jsonVertices(i)+",")}
                 ^
<console>:32: error: not found: value jsonVertices
           else {pw.write(jsonV

In [20]:
for (i<-0 until jsonEdges.length){          // links
    if(i == jsonEdges.length-1){pw.write(jsonEdges(i))}
    else {pw.write(jsonEdges(i)+",")}
    pw.write("\n")
}
pw.write("  ]\n") 
pw.write("}")                               // main footer
pw.close

Name: Unknown Error
Message: <console>:34: error: not found: value pw
       pw.close
       ^
<console>:25: error: not found: value jsonEdges
       for (i<-0 until jsonEdges.length){          // links
                       ^
<console>:26: error: not found: value pw
           if(i == jsonEdges.length-1){pw.write(jsonEdges(i))}
                                       ^
<console>:26: error: not found: value jsonEdges
           if(i == jsonEdges.length-1){pw.write(jsonEdges(i))}
                                                ^
<console>:27: error: not found: value pw
           else {pw.write(jsonEdges(i)+",")}
                 ^
<console>:27: error: not found: value jsonEdges
           else {pw.write(jsonEdges(i)+",")}
                          ^
<console>:28: error: not found: value pw
           pw.write("\n")
           ^
<console>:30: error: not found: value pw
       pw.write("  ]\n")
       ^
<console>:31: error: not found: value pw
       pw.write("}")                        

In [21]:

// writing new html file from template
import scala.io.Source._
val lines = fromFile(dirname +"/template.html").getLines.toArray
val pw = new java.io.PrintWriter(new File(dirname+"/"+centerVertex+".html"))

//lines.map(x=>x.replace("NAME_OF_SITE", centerVertex)).foreach(y=>pw.write(y+"\n"))
lines.map(x => x.replace("NAME_OF_SITE", centerVertex)). 
      map(x => x.replace("TIME_FOR_QUERY", " (took " + dt + "s)")).
      foreach(y=>pw.write(y+"\n"))

pw.close

Name: Unknown Error
Message: <console>:29: error: not found: value dirname
       val lines = fromFile(dirname +"/template.html").getLines.toArray
                            ^
<console>:30: error: not found: type File
       val pw = new java.io.PrintWriter(new File(dirname+"/"+centerVertex+".html"))
                                            ^
<console>:30: error: not found: value dirname
       val pw = new java.io.PrintWriter(new File(dirname+"/"+centerVertex+".html"))
                                                 ^

StackTrace: 