# Event Detection on Twitter dataset

### Storing Tweet info in a data structure (Record class)

In [1]:
import io.arabesque.embedding._
import io.arabesque.pattern._
import java.io._
import java.util.Date
import java.util.Calendar
import java.text.SimpleDateFormat;
import scala.collection.mutable.Map
import scala.io.Source

class Record(line: String, isPreEvent: Boolean) extends Serializable {
    
    var tweep = ""
    var retweep = ""
    var tweetDatetime = java.util.Calendar.getInstance().getTime()
    var tweet = ""
    var hashtag = ""
    var isRetweet = false
    var link = ""
    
    val lineComps = line.split("\t")
    
    this.tweep = lineComps(0).replace("@", "")
    val format = new java.text.SimpleDateFormat("EEE MMM dd HH:mm:ss ZZZZ yyyy")
    this.tweetDatetime = format.parse(lineComps(2))
    this.tweet = lineComps(4)
    this.isRetweet = lineComps(4).startsWith("RT ")
    if ( this.isRetweet ) {
        if ( isPreEvent )
            this.retweep = lineComps(6).replace("@", "")
        else
            this.retweep = lineComps(4).split(":")(0).split(" ")(1).replace("@", "")
    }
        
    override def toString = "$this.tweep, $this.tweetDatetime, $this.tweet, $this.retweep, $this.isRetweet, $this.hashtag, $this.link"
}

### Define logic to read, parse and store pre & post event tweets

In [2]:
import io.arabesque.embedding._
import io.arabesque.pattern._
import java.io._
import scala.collection.mutable.Map
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import scala.io.Source._

val minRecordsThreshold = 500
var splitMonth = 4

val processingDir = "/home/local/QCRI/abghanem/arabesque/twitter/datasets/Islands/"
var tweepDict = Map.empty[String, ArrayBuffer[Record]]
var retweepDict = Map.empty[String, ArrayBuffer[Record]]
var retweetCountDict = Map.empty[String, Int]
var records = new ArrayBuffer[Record]()

def addToDictionary(dictionary: Map[String, ArrayBuffer[Record]], key: String, record: Record) = {
    if ( !dictionary.contains(key) )
        dictionary(key) = new ArrayBuffer[Record]()
    
    dictionary(key) += record
}

def getWeekOfDay(day: Int) = (math.floor(math.min(27, day) / 7).toInt * 7) + 1

val preTimeFormat = "MM-yyyy"
val postTimeFormat = "MM-yyyy"
var formatter = new java.text.SimpleDateFormat(preTimeFormat)
var dayFormatter = new java.text.SimpleDateFormat("dd")

def getTimeKeyForDate(date: Date):String = {
    
    val weekNumber = getWeekOfDay(dayFormatter.format(date).toInt)
    val timeKey = "%02d".format(weekNumber) +
                        "-" + formatter.format(date)
    
    return timeKey
}

def parseEventFile(path: String) = {
    
    tweepDict.clear()
    retweepDict.clear()
    records.clear()
    
    var i = 0
    val file = new File(path)
    for(line <- fromFile(file).getLines) {
        val record = new Record(line, file.getName().startsWith("Pre"))
        records += record
        addToDictionary(tweepDict, record.tweep, record)
        addToDictionary(retweepDict, record.retweep, record)
    }
}

var timeKeyCounter = Map.empty[String, Int]
var graphDict = Map.empty[String, Map[String, ArrayBuffer[(String, Record)]]]
var tweepVidDict = Map.empty[String, Map[String, Int]]


def buildGraph(timeFormat: String, splitMonth: Int) = {
    timeKeyCounter.clear()
    graphDict.clear()
    tweepVidDict.clear()
    formatter = new java.text.SimpleDateFormat(timeFormat)
    dayFormatter = new java.text.SimpleDateFormat("dd")
    
    for ( record <- records ) {
        if ( record.isRetweet ) {
            
            val timeKey = getTimeKeyForDate(record.tweetDatetime)
            val tweep = record.tweep
            val retweep = record.retweep
            
            if ( !graphDict.contains(timeKey) ) {
                graphDict(timeKey) = Map.empty[String, ArrayBuffer[(String, Record)]]
                tweepVidDict(timeKey) = Map.empty[String, Int]
                timeKeyCounter(timeKey) = 0
            }
            
            if ( !graphDict(timeKey).contains(tweep) ) {
                graphDict(timeKey)(tweep) = new ArrayBuffer[(String, Record)]()
                tweepVidDict(timeKey)(tweep) = timeKeyCounter(timeKey)
                timeKeyCounter(timeKey) += 1
            }
            
            if ( !graphDict(timeKey).contains(retweep) ) {
                graphDict(timeKey)(retweep) = new ArrayBuffer[(String, Record)]()
                tweepVidDict(timeKey)(retweep) = timeKeyCounter(timeKey)
                timeKeyCounter(timeKey) += 1
            }
            
            val t1 = (retweep, record)
            val t2 = (tweep, record)
            
            graphDict(timeKey)(tweep) += t1
            graphDict(timeKey)(retweep) += t2
        }
    }
    
    println("number of records: " + records.size)
}

def exportToGraph(filePrefix: String, splitMonth: Int) = {
    val exportDir = processingDir + "graph/" + filePrefix + "/" + splitMonth
    
    val edFile = new File(exportDir)
    
    if ( !edFile.exists() )
        edFile.mkdir()
    
    graphDict.foreach(pair => {
        
        val vidTweepDict = Map.empty[Int, String]
        var graphList = new ArrayBuffer[(Int, ArrayBuffer[Int])]()
        
        val timeKey = pair._1
        val tweepsDict = pair._2
        val vidDict = tweepVidDict(timeKey)
        
        val tweepsKeys = tweepsDict.keys
        
        if ( tweepsKeys.size > minRecordsThreshold ) {
            
            var tweetCount = 0
            
            tweepsKeys.foreach(tweep => {
                val tweepList = tweepsDict(tweep)
                val tweepVid = vidDict(tweep)
                
                vidTweepDict(tweepVid) = tweep
                val t = (tweepVid, tweepList.map(pair => vidDict(pair._1)).distinct.sorted)
                graphList += t
                tweetCount += tweepList.size
            })
            
//             println("filling data for " + timeKey + " with records count = " + tweepsKeys.size)
            println("number of retweets for key " + timeKey + " = " + tweetCount)
            retweetCountDict(timeKey) = tweetCount / 2
            //scala.util.Sorting.stableSort(graphList, (e1: (Int, ArrayBuffer[Int]), e2: (Int, ArrayBuffer[Int])) => e1._1 < e2._1)
            graphList = graphList.sortBy(_._1)
            
            val file = new File(exportDir + "/" + timeKey + ".txt")
            val bw = new BufferedWriter(new FileWriter(file))
            val reverseFile = new File(exportDir + "/" + timeKey + ".reverse.txt")
            val rbw = new BufferedWriter(new FileWriter(reverseFile))

            println("exporting " + timeKey + " with records count = " + tweepsKeys.size)

            graphList.map(pair => {
                val tweepVid = pair._1
                val tweepList = pair._2
                rbw.write(tweepVid + "\t" + vidTweepDict(tweepVid) + "\n")
                bw.write(tweepVid + " 0 " + tweepList.distinct.sorted.mkString(" ") + "\n")
            })

            bw.close()
            rbw.close()
        } else {
//             println("skipping data for " + timeKey + " with records count = " + tweepsKeys.size)
        }
    })
}

def clusterTweetsByTime(timeFormat: String) = {
    
    val timeDict = Map.empty[String, ArrayBuffer[Record]]
    formatter = new java.text.SimpleDateFormat(timeFormat)
    dayFormatter = new java.text.SimpleDateFormat("dd")
    var count = 0
    
    for(record <- records ) {
        if ( record.isRetweet ){
            val timeKey = getTimeKeyForDate(record.tweetDatetime)
            addToDictionary(timeDict, timeKey, record)
            count += 1
        }
    }
    println("number of retweets = " + count)
    
    for (key <- timeDict.keys) {
        println("date " + key + " =\t" + timeDict(key).size)
    }
}

### Parse Tweets & Conversion to Arabesque input graph format

In [3]:
//processing pre event tweets and put it in input graph format
//generating .graph files & reverse mapping files as well
parseEventFile(processingDir + "Pre-event-tweets.txt")
buildGraph(preTimeFormat, splitMonth)
exportToGraph("pre", splitMonth)

//processing post event tweets and put it in input graph format
//generating .graph files & reverse mapping files as well
parseEventFile(processingDir + "Post-event-tweets.txt")
buildGraph(postTimeFormat, splitMonth)
exportToGraph("post", splitMonth)

// retweetCountDict = Map(retweetCountDict.toSeq.sortBy(_._1): _*)
// val keys = retweetCountDict.keys.toSeq.sorted
// for ( key <- keys) {
//     val count = retweetCountDict(key)
//     println(key + ": " + count)
// }
retweetCountDict.foreach(pair => println(pair._1 + ": " + pair._2))

number of records: 445000
number of retweets for key 22-09-2015 = 1460
exporting 22-09-2015 with records count = 639
number of retweets for key 01-02-2016 = 6916
exporting 01-02-2016 with records count = 2304
number of retweets for key 01-11-2015 = 1640
exporting 01-11-2015 with records count = 713
number of retweets for key 15-09-2015 = 1170
exporting 15-09-2015 with records count = 507
number of retweets for key 22-10-2015 = 2180
exporting 22-10-2015 with records count = 843
number of retweets for key 01-12-2015 = 1924
exporting 01-12-2015 with records count = 796
number of retweets for key 01-03-2016 = 17420
exporting 01-03-2016 with records count = 4584
number of retweets for key 15-10-2015 = 1186
exporting 15-10-2015 with records count = 538
number of retweets for key 08-10-2015 = 1206
exporting 08-10-2015 with records count = 502
number of retweets for key 22-01-2016 = 12924
exporting 22-01-2016 with records count = 3264
number of retweets for key 22-11-2015 = 3422
exporting 22-1

### Read Arabesque input files and other helper functions

In [4]:
import io.arabesque.ArabesqueContext
import io.arabesque.ArabesqueGraph
import io.arabesque.ArabesqueResult
import scala.collection.mutable.ListBuffer

val motifSize = 4
val prePath = "/home/local/QCRI/abghanem/arabesque/twitter/datasets/Islands/graph/pre/"
val postPath = "/home/local/QCRI/abghanem/arabesque/twitter/datasets/Islands/graph/post/"

def getListOfFilesRecursively(dir: String):ListBuffer[File] = {
    val allFiles = ListBuffer.empty[File]
    val d = new File(dir)
    if (d.exists && d.isDirectory) {
        allFiles ++= d.listFiles.filter(_.isFile).toList
        val allDirs = d.listFiles.filter(_.isDirectory).toList
        for(file <- allDirs) {
            //println(file.getPath)
            allFiles ++= getListOfFilesRecursively(file.getPath)
        }
    } else {
        allFiles ++= List[File]()
    }
    
    return allFiles
}

var files = getListOfFilesRecursively(prePath + splitMonth + "/")
files ++= getListOfFilesRecursively(postPath + splitMonth + "/")

var filesList = new ArrayBuffer[String]()
var revFilesList = new ArrayBuffer[String]()

files.foreach(file => {
    if ( !file.getName().contains("reverse") )
        filesList += file.getAbsolutePath()
    else
        revFilesList += file.getAbsolutePath()
})

filesList = filesList.sorted
revFilesList = revFilesList.sorted

var allPatternsDict = Map.empty[String, Map[String, Long]]

def skip(list:Array[Int], n:Int): ArrayBuffer[Int] = {
    val result = new ArrayBuffer[Int]()
    for (i <- (0 to (list.size - 1))) {
        if ( i % 2 == 0)
            result += list(i)
    }
    
    return result
}

def addPatternToDict(dict: Map[String, Map[String, Long]], key: String, pattern: String, frequency: Long) {
        
    println("adding pattern key:" + key + " pattern: " + pattern + " freq: " + frequency)
    var edges = skip(pattern.replace("[","").replace("]","").replace("-", ",").split(",").map(_.toInt), 2)
    val size = edges.reduceLeft(_ max _) + 1

    if( !dict.contains(key) )
        dict(key) = Map.empty[String, Long]

    dict(key)(pattern) = frequency
}


### Run Arabesque and store the frequency of the generated patterns

In [5]:
val cachePath = "/home/local/QCRI/abghanem/arabesque/twitter/datasets/Islands/Aggregations-cache.txt"
val cacheAggs = new File(cachePath)
val cabw = new BufferedWriter(new FileWriter(cacheAggs))

for ( i <- filesList.indices ){
    
    //get local path for the input graph
    val filePath = filesList(i)
    val revFilePath = revFilesList(i)
    
    println(filePath)
    println(revFilePath + "\n")
    
    // arabesque context is built on top of SparkContext
    val ac: ArabesqueContext = new ArabesqueContext (sc)

    // several arabesque graphs are built on top of ArabesqueContext
    val ag: ArabesqueGraph = ac.textFile (filePath, true)
    
    // generating motifs of size 3
    val motifs = ag.motifs (motifSize).set ("agg_ic", true).set ("comm_ss", "embedding")

    // embeddings RDD
    val embeddings = motifs.embeddings
    println (motifs.config.getOutputPath)

    // getting aggregations, one by one ()
    val aggKeys = motifs.registeredAggregations
    println (s"\naggKeys = ${aggKeys.mkString(" ")}")
    val motifsAgg = motifs.aggregation (aggKeys(0))
    println ("\naggregations: " + motifsAgg)

    // getting all aggregations
    val allAggs = motifs.aggregations
    println ("\nall aggregations: " + allAggs)

    allAggs.foreach(pair => {
        println("\ndict key is " + pair._1)
        val dict = pair._2
        dict.foreach(pair => {
            val pattern = pair._1.toString
            val freq = pair._2.toString
            val timeKey = filePath.split("/").last.replace(".txt", "")
            addPatternToDict(allPatternsDict, timeKey, pattern.toString, freq.toLong)
            cabw.write(timeKey + " " + pattern.toString + " " + freq.toLong + "\n")
        })
    })

    println(allPatternsDict)

    ac.stop
}

cabw.close

/home/local/QCRI/abghanem/arabesque/twitter/datasets/Islands/graph/post/4/08-04-2016.txt
/home/local/QCRI/abghanem/arabesque/twitter/datasets/Islands/graph/post/4/08-04-2016.reverse.txt

/tmp/arabesque-43259901-f659-47c0-8458-4c414ec08334/graph-6ee837e1-3468-42d5-b4f2-45b9b6bdd809/motifs-5553d1de-b809-4bc1-8f11-b11c6615da48

aggKeys = motifs

aggregations: Map([1,0-2,0],[2,0-3,0],[1,0-3,0],[0,0-3,0] -> 83120, [0,0-1,0],[0,0-2,0],[2,0-3,0],[1,0-3,0] -> 29354, [0,0-2,0],[2,0-3,0],[1,0-3,0] -> 2715417, [2,0-3,0],[1,0-3,0],[0,0-3,0] -> 2631889, [0,0-1,0],[1,0-2,0],[0,0-2,0],[2,0-3,0],[1,0-3,0],[0,0-3,0] -> 38, [1,0-2,0],[0,0-2,0],[2,0-3,0],[1,0-3,0],[0,0-3,0] -> 2755)

all aggregations: Map(motifs -> Map([1,0-2,0],[2,0-3,0],[1,0-3,0],[0,0-3,0] -> 83120, [0,0-1,0],[0,0-2,0],[2,0-3,0],[1,0-3,0] -> 29354, [0,0-2,0],[2,0-3,0],[1,0-3,0] -> 2715417, [2,0-3,0],[1,0-3,0],[0,0-3,0] -> 2631889, [0,0-1,0],[1,0-2,0],[0,0-2,0],[2,0-3,0],[1,0-3,0],[0,0-3,0] -> 38, [1,0-2,0],[0,0-2,0],[2,0-3,0],[1,0-3,0]

### Define the logic that processes Arabesque output

In [5]:
import scala.math._

val cachePath = "/home/local/QCRI/abghanem/arabesque/twitter/datasets/Islands/Aggregations-cache.txt"

def loadData() = {
    if ( allPatternsDict.size == 0 ) {
        println("\n\n****************************************************")
        println("************ Loading data from the cache *************")
        println("****************************************************\n\n\n")
        val file = new File(cachePath)
        for(line <- fromFile(file).getLines) {
            val lineComps = line.split(" ")
            addPatternToDict(allPatternsDict, lineComps(0), lineComps(1), lineComps(2).toLong)
        }
    }
}

def addEdge(graphDict: Map[Int, ArrayBuffer[Int]], v1: Int, v2: Int) = {
    if ( !graphDict.contains(v1) )
        graphDict(v1) = new ArrayBuffer[Int]()
    
    if ( !graphDict.contains(v2) )
        graphDict(v2) = new ArrayBuffer[Int]()
    
    graphDict(v1) += v2
    graphDict(v2) += v1
}

def isStarPattern(pattern: String): Boolean = {
    var edges = skip(pattern.replace("[","").replace("]","").replace("-", ",").split(",").map(_.toInt), 2)
    val verticesCount = edges.reduceLeft(_ max _) + 1
    
    val graph = Map.empty[Int, ArrayBuffer[Int]]
    
    if ( (edges.size / 2) != (verticesCount - 1))
        return false
    
    for ( i <- skip((0 to edges.size - 1).toArray, 2)) {
        val v1 = edges(i)
        val v2 = edges(i+1)
        addEdge(graph, v1, v2)
    }
    
    var edgesCount = 0
    
    for (v <- graph.keys) {
        val edgeList = graph(v)
        
        if ( edgeList.size == verticesCount - 1)
            edgesCount += 1
    }
    
    if (edgesCount == 1)
        return true
    
    return false
}

def isConnectedPattern(pattern: String): Boolean = {
    return false
}

def getPatternFrequency(timeKey: String, pattern: String): Long = {
    
    if ( allPatternsDict.contains(timeKey) && allPatternsDict(timeKey).contains(pattern) )
        return allPatternsDict(timeKey)(pattern)
    
    return 0
}

def calculateEuclideanDistance(v1: ArrayBuffer[Long], v2: ArrayBuffer[Long]): Long = {
        
    if ( v1.size != v2.size )
        return -1
        
    var result: Long = 0
    for ( i <- v1.indices) {
        val dist = abs(v1(i) - v2(i))
        result += (dist * dist)
    }
    
    return round(sqrt(result).toDouble)
}

### Calculation of the Euclidean Distance between consecutive periods

In [6]:
loadData

val sortedTimeKeys = allPatternsDict.keys.toList.sortBy(dateString => {
    val format = new java.text.SimpleDateFormat("dd-MM-yyyy")
    format.parse(dateString)
})

println(sortedTimeKeys)

val distancesDict = Map.empty[String, Long]

for ( i <- (1 to sortedTimeKeys.size - 1) ) {
    val tk1 = sortedTimeKeys(i-1)
    val tk2 = sortedTimeKeys(i)
    
    println("key1: " + tk1 + " key2: " + tk2)
    
    val tk1Patterns = allPatternsDict(tk1)
    val tk2Patterns = allPatternsDict(tk2)
    
    var tkPatterns = tk1Patterns
    
    if ( tk2Patterns.size > tk1Patterns.size )
        tkPatterns = tk2Patterns
    
    println("keys are: " + tkPatterns)
    
    val v1 = new ArrayBuffer[Long]()
    val v2 = new ArrayBuffer[Long]()
    
    for ( pattern <- tkPatterns.keys ) {
        v1 += getPatternFrequency(tk1, pattern)
        v2 += getPatternFrequency(tk2, pattern)
    }
    
    val euclideanDistance = calculateEuclideanDistance(v1, v2)
    distancesDict(tk2) = euclideanDistance
}

println("Euclidean distances: " + distancesDict)

val statsFile = new File("/home/local/QCRI/abghanem/arabesque/twitter/datasets/Islands/stats.txt")
val sfbw = new BufferedWriter(new FileWriter(statsFile))

sfbw.write("Time-Key Euc-Dist-motifs-4 retween-count\n")

var eucDisDataPoints = ""
var retweetCountDataPoints = ""

for( i <- sortedTimeKeys.indices) {
    val timekey: String = sortedTimeKeys(i)
    println("time key: " + timekey)
    var retweetCount: Long = 0
    var distance: Long = 0
    
    if ( retweetCountDict.contains(timekey) )
        retweetCount = retweetCountDict(timekey)
    if ( distancesDict.contains(timekey) )
        distance = distancesDict(timekey)
    if ( retweetCount > 0 && distance > 0 )
        sfbw.write(timekey + " " + distance + " " + retweetCount + "\n")
    
    val tkcs = timekey.split("-")
    eucDisDataPoints += "{ x: new Date(" + tkcs(2) + "," + tkcs(1) + "," + tkcs(0) + "), y: " + distance/1000.0f + "},"
    retweetCountDataPoints += "{ x: \"" + timekey + "\", y: " + retweetCount + "},"
}

sfbw.close()

println(eucDisDataPoints)



****************************************************
************ Loading data from the cache *************
****************************************************



adding pattern key:08-04-2016 pattern: [1,0-2,0],[2,0-3,0],[1,0-3,0],[0,0-3,0] freq: 83120
adding pattern key:08-04-2016 pattern: [0,0-1,0],[0,0-2,0],[2,0-3,0],[1,0-3,0] freq: 29354
adding pattern key:08-04-2016 pattern: [0,0-2,0],[2,0-3,0],[1,0-3,0] freq: 2715417
adding pattern key:08-04-2016 pattern: [2,0-3,0],[1,0-3,0],[0,0-3,0] freq: 2631889
adding pattern key:08-04-2016 pattern: [0,0-1,0],[1,0-2,0],[0,0-2,0],[2,0-3,0],[1,0-3,0],[0,0-3,0] freq: 38
adding pattern key:08-04-2016 pattern: [1,0-2,0],[0,0-2,0],[2,0-3,0],[1,0-3,0],[0,0-3,0] freq: 2755
adding pattern key:15-04-2016 pattern: [1,0-2,0],[2,0-3,0],[1,0-3,0],[0,0-3,0] freq: 20671
adding pattern key:15-04-2016 pattern: [0,0-1,0],[0,0-2,0],[2,0-3,0],[1,0-3,0] freq: 10585
adding pattern key:15-04-2016 pattern: [0,0-2,0],[2,0-3,0],[1,0-3,0] freq: 1027909
adding patter

### Plot the Euclidean Distance against weekly time period

In [7]:
kernel.magics.html("""

<div id="chartContainer" style="height: 500px; width: 100%%;">
</div>

<script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/canvasjs/1.7.0/canvasjs.min.js"></script>

<script type="text/javascript">
    
    setTimeout(function () {
    
    var chart = new CanvasJS.Chart("chartContainer",
    {

        title:{
            text: "Event Identification on Twitter",
            fontSize: 25
        },
        animationEnabled: true,
        axisX:{
            gridColor: "Silver",
            tickColor: "silver",
            labelAngle: 50,
            labelFontSize: 15,
            interval: 14,
            intervalType: "day",
            valueFormatString: "DD MMM YYYY"
        },                        
                        toolTip:{
                          shared:true
                        },
        theme: "theme2",
        axisY: {
            gridColor: "Silver",
            tickColor: "silver",
            titleFontSize: 20,
            title: "Euclidean Distance (K)",
        },
        legend:{
            verticalAlign: "center",
            horizontalAlign: "right"
        },
        data: [
        {        
            type: "line",
            showInLegend: true,
            lineThickness: 2,
            name: "motifs size 4",
            markerType: "square",
            color: "#F08080",
            dataPoints: [%s]
            }
            ],
          legend:{
            cursor:"pointer",
            itemclick:function(e){
              if (typeof(e.dataSeries.visible) === "undefined" || e.dataSeries.visible) {
              e.dataSeries.visible = false;
              }
              else{
                e.dataSeries.visible = true;
              }
              chart.render();
            }
          }
        });

    chart.render();
    
}, 10);
</script>

""".format(eucDisDataPoints))