## Anomaly detection and pattern extraction with Spark, Cassandra and Scala

Today, geo-located data is available in a number of domains, ranging from healthcare to financial markets, to social services. In all these domains, extracting patterns and detecting anomalies and novelties from data has very concrete business outcomes. 

Anomaly detection can be defined as the process of finding which samples in the given dataset do not follow the given patterns and behave as though they were produced by a different mechanism. From detection follows action. Depending on the domain and the use case, we define them as anomalies or novelties and these signals are the triggers for applications such as personalized marketing and fraud alerting and notification.

As more data gets ingested/produced via digital services, it’s key to perform this sort of analytics at scale. In the open source space, technologies such as Spark and Cassandra are definitely instrumental to implement and execute modern data pipelines at scale.

This Oriole is divided in two parts: Firstly, I will show how to collect data from Cassandra and bring it up to Spark for further analysis. In the second part of this Oriole, I will explore a number of techniques for detecting anomalies based on three different techniques:

  - Statistics and Histograms
  - Process Mining and Graph Analytics
  - Clustering for Geo-Located Data with DBSCAN

For this analysis, we are going to use the Gowalla Dataset [1]. The Gowalla dataset consists of a table of events, registered by anonymized users. Each event registers a user checking into a geolocated venue at a specific timestamp. The dataset is available at https://snap.stanford.edu/data/loc-gowalla.html

[1] E. Cho, S. A. Myers, J. Leskovec. Friendship and Mobility: Friendship and Mobility: User Movement in Location-Based Social Networks ACM SIGKDD International Conference on Knowledge Discovery and Data Mining (KDD), 2011.



#### Setup

This notebook is running scala code and interfaces to a Spark cluster using the [Apache Toree](https://toree.incubator.apache.org/) project. Furthermore, Spark reads the data from Cassandra tables. Spark interfaces to Cassandra via the [Cassandra-Spark connector](https://github.com/datastax/spark-cassandra-connector). 

At the time of compiling this notebook, Spark 1.6.1 and Cassandra 3.5 were used. Here below the command to install the Spark - Scala Kernel on Jupiter. More instructions on this topic are available on Apache Toree [website](https://toree.incubator.apache.org/) and [github pages](https://github.com/apache/incubator-toree).

```
sudo jupyter-toree install --spark_home=${SPARK_HOME} 
--spark_opts='--packages com.datastax.spark:spark-cassandra
-connector_2.10:1.6.0,graphframes:graphframes:0.1.0-spark1.6 
--conf spark.cassandra.connection.host=localhost --conf 
spark.executor.memory=1g --conf spark.driver.memory=1g'
```

In [1]:
// Scala version
sc.version

1.6.1

### Connecting to Cassandra

In [2]:
//sql context
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._

val sqlContext  = new SQLContext(sc)
import sqlContext.implicits._

In [3]:
// spark-cassandra connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._

import org.apache.spark.sql.cassandra.CassandraSQLContext
val cc = new CassandraSQLContext(sc)

#### SQL queries in Cassandra

Cassandra is exposed via a SQL context, so there is not need to learn a separate syntax as Spark will map the query to the available features of the underlying storage system. See below a simple query accessing the name and the id of venues from a cassandra table. Also remember that sql statements are _staged_ but not _executed_ until some actual [actions](http://spark.apache.org/docs/latest/programming-guide.html#actions) needs to be computed. Examples of actions are for instance, **count**(), **first**(), **collect**().

In [49]:
val venues   = cc.sql("select vid, name from lbsn.venues").as("venues")

In [50]:
venues.count()

17291

Sometimes you cannot push to cassandra the full query, as the full query cannot be mapped on the available database functions supported on that specific data store. For instance, Cassandra cannot easily execute joins. In this case, Spark will partition and plan the query _pushing down_ what can be done in Cassandra and perform in Spark the rest of the query. 

More information can be found on Cassandra Documentation about [using Spark SQL to query data](http://docs.datastax.com/en/datastax_enterprise/5.0/datastax_enterprise/spark/sparkSqlOverview.html) or on the [Cassandra Spark Connector](https://github.com/datastax/spark-cassandra-connector) pages.

For example, the following query filters out only those events which were registered in the New York area. As filtering in cassandra cannot by done on columns which are not indexes, this specific query will first move the data form Cassandra to Spark, and then will perform the filtering in Spark. In general, it's a good practice to push down and filter as much data as early as possible. This practice keeps the throughput low and minimize the data transfered from one system to the other.

In [95]:
val events   = cc.sql("""select ts, uid, lat, lon, vid from lbsn.events where
                            lon>-74.2589 and lon<-73.7004 and 
                            lat> 40.4774 and lat< 40.9176
                      """).as("events").orderBy("uid", "ts").repartition(8, $"uid").cache()

In [94]:
events.count()

112383

In [53]:
events.first()

[2010-09-24 06:41:01.0,110775,40.7745781069,-73.8720059395,12505]

Before diving into anomaly detection of geo-located data, let's perform some more basic queries. Herebelow, it is shown how to count events registered by user uid=0, and how to retrieve the name of venue id 12525. Finally, the third query prints out the first five rows of the venue table, in no particular order.

In [54]:
// User 0: how many checkins?
events.where("uid=0").count()

25

In [55]:
venues.where("vid = 12525").first()

[12525,The Museum of Modern Art (MoMA)]

#### Joining Cassandra tables in Spark.

One of the advantages of connecting Cassandra and Spark, is the fact that you can now merge and join Cassandra tables.

In [56]:
val df_ny = events.
  join(venues, events("events.vid") === venues("venues.vid"), "inner").
  select("ts", "uid", "lat", "lon", "events.vid","venues.name")

df_ny.filter($"uid"===0).show(5,false)

+---------------------+---+------------------+------------------+------+---------------------------------+
|ts                   |uid|lat               |lon               |vid   |name                             |
+---------------------+---+------------------+------------------+------+---------------------------------+
|2010-10-11 16:21:28.0|0  |40.6438845363     |-73.78280639649999|23261 |JFK John F. Kennedy International|
|2010-10-11 12:21:20.0|0  |40.74137425       |-73.9881052167    |16907 |Shake Shack                      |
|2010-10-11 12:20:42.0|0  |40.741388197      |-73.98945450779999|12973 |Flatiron Building                |
|2010-10-10 16:06:30.0|0  |40.724910334499995|-73.9946207517    |341255|Emilio's Ballato Restaurant      |
|2010-10-10 14:00:37.0|0  |40.729768314      |-73.9985353275    |260957|The Half Pint                    |
+---------------------+---+------------------+------------------+------+---------------------------------+
only showing top 5 rows



The spark table joined above is going to be the starting point for our anomaly analysis.    
Each row records the event's timestamp, the user id, the geo-location (latitude and longitude) of venue and finally the venue id and name.

#### Executing SQL statements as code.

Spark dataframes can also be filtered and transformed programmatically via a number of [pre-defined functions](https://spark.apache.org/docs/1.6.1/api/scala/#org.apache.spark.sql.functions$), such as min, sum, stddev, and many more. Some of those are shown in the next code sections. 

Next to the default set of pre-defined dataframe and column functions, it is possible to define the user-defined-functions (udf's). In the code below, we will create two UDF's to transform the timestamp to the day of the week and the hour of the day values, computed according to a given local timezone.

In [57]:
// UDF functions for SQL-like operations on columns
import org.joda.time.DateTime
import org.joda.time.DateTimeZone

import java.sql.Timestamp
import org.apache.spark.sql.functions.udf

val  dayofweek = udf( (ts: Timestamp, tz: String) => {
  val dt = new DateTime(ts,DateTimeZone.forID(tz))
  // sunday starts at 0
  dt.getDayOfWeek() - 1
})

val  localhour = udf( (ts: Timestamp, tz: String) => {
  val dt = new DateTime(ts,DateTimeZone.forID(tz))
  dt.getHourOfDay()
})

In [58]:
val newyork_tz = "America/New_York"

val df = df_ny.
  withColumn("dow",  dayofweek($"ts", lit(newyork_tz))).
  withColumn("hour", localhour($"ts", lit(newyork_tz))).
  as("events")
  
df.show(5, false)


+---------------------+------+------------------+--------------+-------+---------------------+---+----+
|ts                   |uid   |lat               |lon           |vid    |name                 |dow|hour|
+---------------------+------+------------------+--------------+-------+---------------------+---+----+
|2010-09-24 06:41:01.0|110775|40.7745781069     |-73.8720059395|12505  |LGA LaGuardia Airport|4  |2   |
|2010-09-22 16:27:24.0|110775|40.7288199592     |-74.0019902199|521470 |Mermaid Oyster Bar   |2  |12  |
|2010-09-21 05:01:29.0|110775|40.705070443000004|-74.011467788 |3361070|Wall Street          |1  |1   |
|2010-09-21 05:00:35.0|110775|40.7050351565     |-74.0109678168|301297 |Starbucks Coffee     |1  |1   |
|2010-09-20 04:41:49.0|110775|40.705070443000004|-74.011467788 |3361070|Wall Street          |0  |0   |
+---------------------+------+------------------+--------------+-------+---------------------+---+----+
only showing top 5 rows



## Anomaly Detection

### Histogram based
#### Basic statistics in Spark

The following code section shows how to collect global statistics and histograms per hour of the day and per day of the week. Histograms can be made more specific by aggregating the events according to a number of factors, such as:

 - venue
 - geographical area
 - popular users
 - 1st, 2nd friend's circle
 
If you are interested, in multiple slicing and dicing option, Spark as a [cube function](https://spark.apache.org/docs/1.5.1/api/scala/index.html#org.apache.spark.sql.DataFrame) as well.  
To start, let's compute the histogram, accumulating all events and aggregating by hour of the day and by day of the week.  


In [59]:
// histogram day of the week events
df.groupBy($"dow").count().show()

+---+-----+
|dow|count|
+---+-----+
|  0|14751|
|  1|15341|
|  2|15705|
|  3|16324|
|  4|16868|
|  5|17916|
|  6|15478|
+---+-----+



In [60]:
// histogram hour of the day events
df.groupBy($"hour").count().show(24,false)

+----+-----+
|hour|count|
+----+-----+
|0   |3927 |
|1   |4953 |
|2   |5411 |
|3   |6047 |
|4   |7362 |
|5   |8102 |
|6   |7668 |
|7   |7341 |
|8   |7831 |
|9   |7628 |
|10  |8308 |
|11  |8614 |
|12  |6669 |
|13  |5525 |
|14  |4452 |
|15  |2835 |
|16  |2200 |
|17  |1128 |
|18  |694  |
|19  |536  |
|20  |390  |
|21  |843  |
|22  |1398 |
|23  |2521 |
+----+-----+



#### Statistics in Spark: venue-specific histograms

Moving on, let's have a look on how to create an specific histogram for each venue.   
In this case, we will store the histogram as a vector. First, let's convert the day-of-the-week to a vector.

In [61]:
import breeze.linalg._
import breeze.linalg.DenseVector

import org.apache.spark.mllib.linalg.{Vector,Vectors}

def r(x: Double, d:Int) = { 
    import scala.math.{pow, round}
    val p = pow(10,d); round(x*p)/p 
}

In [62]:
// vector histogram: the RDD way

def toVector(i: Int, length:Int) = {
  DenseVector((0 to length-1).map(x => if (x == i) 1.0 else 0.0).toArray)
}

In [63]:
df.select($"vid", $"dow").map(r => (r.getLong(0),toVector(r.getInt(1), 7))).first()

(12505,DenseVector(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0))

#### Pair RDDs: reduce by Key

We will now **reduceByKey** those weekly and daily vectors by applying vectors arithmetics. In this way, we can collect the probability of an event happening at a specific day of the week for each venue in the dataset. This is a much more detailed analysis since different venues, such as restaurants, musea and train stations have different daily and weekly histogram patterns.

In [64]:
val dow_hist = df.
  select($"vid", $"dow").
  map(r => (r.getLong(0),toVector(r.getInt(1), 7))).
  reduceByKey(_ + _).
  mapValues(x => Vectors.dense((x / sum(x)).toArray.map(r(_,2)))).
  toDF("vid", "dow_hist")

  

In [65]:
dow_hist.show(3, false)

+------+-------------------------------+
|vid   |dow_hist                       |
+------+-------------------------------+
|513544|[0.67,0.0,0.0,0.0,0.0,0.33,0.0]|
|932648|[0.25,0.0,0.0,0.5,0.0,0.0,0.25]|
|178464|[0.0,0.0,0.0,0.0,1.0,0.0,0.0]  |
+------+-------------------------------+
only showing top 3 rows



#### Scoring events according to venues histograms

In [66]:
val df_probs = df.
  join(dow_hist, df("vid") === dow_hist("vid"), "inner").
  select($"ts", $"uid", round($"lat",4), round($"lon",4), $"events.vid", $"dow", $"dow_hist")

df_probs.show(5,false)

                                                                                +---------------------+------+------------+------------+-----+---+-----------------------------------+
|ts                   |uid   |round(lat,4)|round(lon,4)|vid  |dow|dow_hist                           |
+---------------------+------+------------+------------+-----+---+-----------------------------------+
|2009-10-19 08:48:08.0|24951 |40.7491     |-73.968     |11831|0  |[0.1,0.14,0.21,0.16,0.11,0.16,0.13]|
|2010-08-13 22:28:29.0|125327|40.7491     |-73.968     |11831|4  |[0.1,0.14,0.21,0.16,0.11,0.16,0.13]|
|2010-10-09 06:00:59.0|111   |40.7491     |-73.968     |11831|5  |[0.1,0.14,0.21,0.16,0.11,0.16,0.13]|
|2010-03-10 10:50:05.0|49440 |40.7491     |-73.968     |11831|2  |[0.1,0.14,0.21,0.16,0.11,0.16,0.13]|
|2009-12-17 10:00:45.0|157344|40.7491     |-73.968     |11831|3  |[0.1,0.14,0.21,0.16,0.11,0.16,0.13]|
+---------------------+------+------------+------------+-----+---+----------------------------

#### Histogram based anomaly detection
We will now score the probability of a given event based on the histograms we have just computed here above. For that, we are going to craft a new UDF which will select a given element of a vector based on the value provided by a different column. 

In [67]:
val  nth = udf( (i:Int, arr: Vector) => {
  arr.toArray.lift(i).getOrElse(0.0)
})

df_probs.select($"ts", $"uid", $"vid", $"dow", nth($"dow", $"dow_hist").as("dow_prob")).show(3,false)

                                                                                +---------------------+------+-----+---+--------+
|ts                   |uid   |vid  |dow|dow_prob|
+---------------------+------+-----+---+--------+
|2009-10-19 08:48:08.0|24951 |11831|0  |0.1     |
|2010-08-13 22:28:29.0|125327|11831|4  |0.11    |
|2010-10-09 06:00:59.0|111   |11831|5  |0.16    |
+---------------------+------+-----+---+--------+
only showing top 3 rows



#### Histograms for anomaly detection: putting it all together

Let's repeat the same exercise for the histograms binned by hour of the day. And finally, let's merge and compute the probability of each given event, given the venue, the hour of the day, and the day of the week. Events with lower probabilities are less likely to happen.

In [68]:
// same for hour of the day

val hour_hist = df.
  select($"vid", $"hour").
  map(r => (r.getLong(0),toVector(r.getInt(1), 24))).
  reduceByKey(_ + _).
  mapValues(x => Vectors.dense((x / sum(x)).toArray.map(r(_,2)))).
  toDF("vid", "hour_hist")

hour_hist.show(1, false)

+------+----------------------------------------------------------------------------------------------------+
|vid   |hour_hist                                                                                           |
+------+----------------------------------------------------------------------------------------------------+
|513544|[0.0,0.0,0.0,0.0,0.0,0.33,0.0,0.0,0.0,0.33,0.0,0.0,0.33,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]|
+------+----------------------------------------------------------------------------------------------------+
only showing top 1 row



In [69]:
val df_probs = df.
  join(dow_hist, df("vid") === dow_hist("vid"), "inner").
  join(hour_hist, df("vid") === hour_hist("vid"), "inner").
  select( 
    $"ts", 
    $"uid", 
    round($"lat",4), 
    round($"lon",4),
    $"events.vid", 
    nth($"hour", $"hour_hist").as("hour_prob"), 
    nth($"dow",  $"dow_hist").as("dow_prob"))

df_probs.show(5,false)

                                                                                +---------------------+------+------------+------------+-----+---------+--------+
|ts                   |uid   |round(lat,4)|round(lon,4)|vid  |hour_prob|dow_prob|
+---------------------+------+------------+------------+-----+---------+--------+
|2009-10-19 08:48:08.0|24951 |40.7491     |-73.968     |11831|0.11     |0.1     |
|2010-08-13 22:28:29.0|125327|40.7491     |-73.968     |11831|0.02     |0.11    |
|2010-10-09 06:00:59.0|111   |40.7491     |-73.968     |11831|0.08     |0.16    |
|2010-03-10 10:50:05.0|49440 |40.7491     |-73.968     |11831|0.08     |0.21    |
|2009-12-17 10:00:45.0|157344|40.7491     |-73.968     |11831|0.08     |0.16    |
+---------------------+------+------------+------------+-----+---------+--------+
only showing top 5 rows



### Process mining based

The first step, in order to do process mining, is to collect sequences of events. In particular, the following code will take chronologically consecutive events and bundle them in pairs for a specific user. These pairs consists of two venue ids, namely source and destination, defining where each user is coming from, going to respectively. 

The steps in the following code are:

  - Convert the DataFrame to an RDD
  - Select uid as the key for the PairRDD
  - Reshape the PairRDD from "tall" to "wide"
  - Sort chronologically all the checked-in venues for each user
  - Extract pairs from each sequence of checked-in venues per user
  - Reshape the PairRDD from "wide" to "tall" again
  - Convert back the PairRDD to a DataFrame

In [96]:
// process mining
val g_df = events.
  select($"ts", $"uid", $"vid").
  rdd.
  map(row => (row.getLong(1), List( (row.getTimestamp(0), row.getLong(2)) ))).
  reduceByKey(_ ++ _).
  mapValues( x =>
    x.sortWith(_._1.getTime < _._1.getTime).
      map(_._2)
  ).
  mapValues(_.sliding(2).toList).
  flatMap(_._2).
  map(
    _ match {
      case List(a, b) => Some((a, b))
      case _ => None
  }).
  flatMap(x => x).
  toDF("src", "dst").
  repartition(8,$"src").
  cache()

This newly created DataFrame is used to create a graph, where the nodes are the venues and the edges are connections of users checking-in from the one venue to the next.

In [97]:
g_df.show(5)

                                                                                +-------+-------+
|    src|    dst|
+-------+-------+
| 226623| 620588|
| 487279| 226768|
|  11807| 176668|
| 195095|1005664|
|1080823|1005664|
+-------+-------+
only showing top 5 rows



In [98]:
val edges_df = g_df.
  groupBy($"src",$"dst").
  count().
  select($"src",$"dst").
  filter($"src" !== $"dst")

edges_df.show(5)

+-------+-------+
|    src|    dst|
+-------+-------+
| 226623| 620588|
| 487279| 226768|
|  11807| 176668|
| 195095|1005664|
|1080823|1005664|
+-------+-------+
only showing top 5 rows



In [99]:
val nodes_df = edges_df.
  select($"src").
  unionAll(edges_df.select($"dst")).
  distinct().
  toDF("id")

nodes_df.show(5)

                                                                                +------+
|    id|
+------+
|404831|
| 80431|
|818431|
| 11831|
|718231|
+------+
only showing top 5 rows



#### Graph and Page Rank

The above table describes how users are moving from venue to venue. We can now calculate which venues attract more users. This can be done using the page rank algorithm. The PageRank algorithm outputs a probability distribution used to represent the likelihood that a person randomly walking in the city will arrive at any particular venue. This analysis can be executed in Spark using [GraphFrames](http://graphframes.github.io/). GraphFrames is a package for Apache Spark which provides DataFrame-based graph analytics, including the PageRank algorithm.

In [100]:
import org.graphframes.GraphFrame

val v = nodes_df.orderBy("id")
val e = edges_df.orderBy("src", "dst")

val g = GraphFrame(nodes_df, edges_df)

#### Node degree analysis

In [101]:
val vertices_indeg = g.inDegrees.filter($"inDegree">150).sort(desc("inDegree"))

In [102]:
val popular_venues = vertices_indeg.
    join(venues, vertices_indeg("id") === venues("vid"), "inner").
    sort($"inDegree".desc).
    select("vid", "inDegree", "name")
    
println(s"Top in-degree nodes: ${ popular_venues.count()}")
popular_venues.show(10, false)

Top in-degree nodes: 19
+------+--------+---------------------------------+
|vid   |inDegree|name                             |
+------+--------+---------------------------------+
|12505 |657     |LGA LaGuardia Airport            |
|23261 |600     |JFK John F. Kennedy International|
|11844 |580     |Times Square                     |
|13022 |442     |Grand Central Terminal           |
|24963 |352     |EWR Newark Liberty International |
|11875 |278     |Madison Square Garden            |
|12525 |273     |The Museum of Modern Art (MoMA)  |
|106840|247     |Union Square                     |
|11834 |234     |Bryant Park                      |
|11720 |232     |Yankee Stadium                   |
+------+--------+---------------------------------+
only showing top 10 rows



#### Page Rank analysis

In [103]:
val vertices  = vertices_indeg.select("id")

val edges_src = edges_df.join(vertices, vertices("id")  === edges_df("src"), "inner").select("src", "dst")
val edges     = edges_src.join(vertices, vertices("id") === edges_src("dst"), "inner").select("src", "dst")

println(s"graph: vertices=${vertices.count()} edges=${edges.count()}")

graph: vertices=19 edges=243


In [104]:
edges.show(5)

+-----+-----+
|  src|  dst|
+-----+-----+
|14151|13022|
|14151|14148|
|14151|11875|
|14151|12505|
|14151|11844|
+-----+-----+
only showing top 5 rows



In [108]:
val v = vertices.orderBy("id").repartition(8, $"id").cache()
val e = edges.orderBy("src", "dst").repartition(8, $"src", $"dst").cache()

In [109]:
val g = GraphFrame(v, e)
val results = g.pageRank.resetProbability(0.05).maxIter(10).run()

In [110]:
val result_pr = results.vertices.select("id", "pagerank")
val popular_venues = result_pr.
    join(venues, result_pr("id") === venues("vid"), "inner").
    select("vid", "pagerank", "name")

popular_venues.sort($"pagerank".desc).show(13, false)

+-----+-------------------+---------------------------------+
|vid  |pagerank           |name                             |
+-----+-------------------+---------------------------------+
|23261|0.5858428512529236 |JFK John F. Kennedy International|
|11844|0.5616098666039386 |Times Square                     |
|11875|0.5596497250101465 |Madison Square Garden            |
|12505|0.5534573371424486 |LGA LaGuardia Airport            |
|12313|0.4955013646050896 |Empire State Building            |
|16907|0.47343577069389187|Shake Shack                      |
|13022|0.47223222676253773|Grand Central Terminal           |
|24963|0.46775299982424834|EWR Newark Liberty International |
|14148|0.43629932411573197|Radio City Music Hall            |
|12525|0.43565144007204715|The Museum of Modern Art (MoMA)  |
|14151|0.42922538125116216|Rockefeller Center               |
|11720|0.4169975718000968 |Yankee Stadium                   |
|11834|0.4093383935567544 |Bryant Park                      |
+-----+-

As shown above, this algorithm provides a "popularity" factor for each checked-in venue. This feature can be used to further discriminate anomalies based on the rank of the venue, for instance combining it with the probability of checking in a specific time of the day.

### Geo-Location: density based

We will now cluster events based on the [DBSCAN algorithm](https://en.wikipedia.org/wiki/DBSCAN). DBSCAN is clustering events depending on the density of the events provided. Since the clusters emerge locally by looking for neighboring points, clusters of various shapes can be detected. Points that are isolated and too far from any other point are assigned to a special cluster of outliers. These discerning properties make the DBSCAN algorithm a good candidate for clustering geolocated events.

Let's prepare the data by transforming the events DataFrame, into a PairRDD. In particular, for geolocated data, we choose the key to be the user identifier, and the value to be the aggregated list of all check-ins posted by that given user. The geolocated data is arranged in a n-by-2 matrix, where the first column represents the latitude and the second column the longitude. 

In [111]:
val top_users = events.
    groupBy($"uid").
    count().
    filter($"count" > 1000)
    
top_users.sort(desc("count")).show()

+----+-----+
| uid|count|
+----+-----+
| 578| 1641|
|  22| 1290|
| 842| 1145|
|4985| 1063|
+----+-----+



In [112]:
val top_events = events.
    join(top_users, top_users("uid")  === events("uid"), "inner").
    select("ts", "events.uid", "lat", "lon")

top_events.show(5, false)

+---------------------+----+-------------+------------------+
|ts                   |uid |lat          |lon               |
+---------------------+----+-------------+------------------+
|2009-12-30 10:23:39.0|4985|40.7663583   |-73.9833973667    |
|2009-12-30 10:23:53.0|4985|40.7665252686|-73.9829705584    |
|2009-12-30 10:24:02.0|4985|40.76665848  |-73.98390340000002|
|2009-12-30 10:24:26.0|4985|40.7671166667|-73.9823264       |
|2009-12-30 10:24:35.0|4985|40.7673913454|-73.9814347029    |
+---------------------+----+-------------+------------------+
only showing top 5 rows



In [113]:
val e_df = top_events.
  select("uid","lat","lon").
  rdd.map(row => (row.getLong(0), Array(row.getDouble(1), row.getDouble(2))) ).
  reduceByKey( _ ++ _).
  mapValues(v => new DenseMatrix(v.length/2,2,v, 0, 2, true))


In [114]:
def formatUserEvents(x: Tuple2[Long, DenseMatrix[Double]]) : Unit = {
    val arr = x._2
    val n = math.min( 5 , arr.rows) - 1
    val slice = arr(0 to n, ::)
    println(s"uid = ${x._1}")
    println(s"events count = ${arr.rows}")
    println("lat,lon = ")
    println(slice)
    if (arr.rows > 5) println(s"... ${arr.rows- 5} more rows")
    println("-"*30)
}

See below a formatted output describing the events related to three users:

In [115]:
e_df.take(3).foreach(e => formatUserEvents(e))

uid = 4985
events count = 1063
lat,lon = 
40.7663583     -73.9833973667      
40.7665252686  -73.9829705584      
40.76665848    -73.98390340000002  
40.7671166667  -73.9823264         
40.7673913454  -73.9814347029      
... 1058 more rows
------------------------------
uid = 578
events count = 1641
lat,lon = 
40.7444201864  -73.98721218109999  
40.7457482667  -73.9850020333      
40.7443825333  -73.9783781667      
40.7428595014  -73.9768588543      
40.7428595014  -73.9768588543      
... 1636 more rows
------------------------------
uid = 842
events count = 1145
lat,lon = 
40.7164383   -74.0125343         
40.71685842  -74.01399805        
40.71575957  -74.01116800000001  
40.717054    -74.013316          
40.71685483  -74.01230673        
... 1140 more rows
------------------------------


We will now cluster the events for each user according to the DBSCAN algorithm. This algorithm with cluster those user's events in groups. The rest of the code below reduces those groups to bounding boxes. Next we will use the extracted bounding boxes to score events.

In [116]:
import breeze.numerics._
import breeze.linalg._

def euclideanDistance (a: DenseVector[Double], b: DenseVector[Double]) = norm(a-b, 2)

// 1deg at 40deg latitude is 111034.61 meters
// set radius at about 200 mt (0.002 * 111034.61)
// which is 0.002 in decimal degrees https://en.wikipedia.org/wiki/Decimal_degrees

val eps = 0.002
val min_points = 3

In [117]:
import nak.cluster._
import nak.cluster.GDBSCAN._

def dbscan(v : breeze.linalg.DenseMatrix[Double]) = {

  val gdbscan = new GDBSCAN(
    DBSCAN.getNeighbours(eps, distance=euclideanDistance),
    DBSCAN.isCorePoint(min_points)
  )

  // core DBSCAN algorithm
  val clusters = gdbscan cluster v
  
  // reducing the clusters to bounding boxes
  // for simplicity: each user could 
  clusters.map(
    cluster => (
      cluster.id.toInt, 
      cluster.points.size, 
      cluster.points.map(_.value(0)).min,
      cluster.points.map(_.value(1)).min,
      cluster.points.map(_.value(0)).max,
      cluster.points.map(_.value(1)).max
    )
  )
}

In [118]:
val bboxRdd = e_df.mapValues(dbscan(_)).cache()

Let's convert back the RDDs to a DataFrame. Now we have a table describing clusters. Each row defines a cluster in terms of user id, cluster id, the number of cluster's events and the bounding box of the cluster. Each user can have multiple clusters, and some users might have no cluster at all.

In [119]:
val bbox_df = bboxRdd.
  flatMapValues(x => x).
  map(x => (x._1, x._2._1, x._2._2,x._2._3,x._2._4,x._2._5,x._2._6)).
  toDF("uid", "cid", "csize", "lat_min", "lon_min", "lat_max", "lon_max").
  filter($"cid" > 0)

bbox_df.show(10)

                                                                                +----+---+-----+------------------+------------------+------------------+--------------+
| uid|cid|csize|           lat_min|           lon_min|           lat_max|       lon_max|
+----+---+-----+------------------+------------------+------------------+--------------+
|4985|  4|   14|     40.7665739833|    -73.9829850197|40.769262733299996|  -73.98122905|
|4985| 10|   25|       40.76349341|      -73.98234528|     40.7652683667|-73.9789733887|
|4985| 15|    6|        40.7631676|       -73.9792362|     40.7649154663|-73.9781703833|
|4985| 24|   12|     40.7626658695|    -73.9765036134|     40.7647866916|-73.9739406109|
|4985| 29|    7|     40.7604105667|      -73.97418069|40.762592967399996|  -73.97242375|
|4985| 38|   19|     40.7620566327|    -73.9716011167|40.764846633299996|-73.9698984337|
|4985| 40|   35|40.759771316700004|    -73.9695874833|       40.76248225|  -73.96590763|
|4985| 44|   21|       40.75

#### Scoring Events: looking for outliers

We will now score events, and look if some of them are located outside the computed clusters' bounding boxes. Firstly, we join the table of events with the table of clusters. Let's filter out users which do not have enough points as those users have no clusters associated with them and there is no sufficient data to determine outliers. In the code above, we need a user to have at least 3 events in a region of 0.1 degrees in order to have a DBSCAN cluster.

In [120]:
val bbox_events = events.
  join(bbox_df, events("events.uid") === bbox_df("uid"), "full").
  select($"events.ts",$"events.uid",$"lat",$"lon",$"lat_min",$"lon_min",$"lat_max",$"lon_max").
  filter($"lat_min".isNotNull)

bbox_events.show(5,false)

                                                                                +---------------------+---+----------+-----------+-------------+------------------+------------------+--------------+
|ts                   |uid|lat       |lon        |lat_min      |lon_min           |lat_max           |lon_max       |
+---------------------+---+----------+-----------+-------------+------------------+------------------+--------------+
|2010-09-22 11:54:40.0|842|40.7164383|-74.0125343|40.71524655  |-74.01068991      |40.716528869      |-74.009803494 |
|2010-09-22 11:54:40.0|842|40.7164383|-74.0125343|40.7173408987|-74.01089783329999|40.719733983299996|-74.0092711075|
|2010-09-22 11:54:40.0|842|40.7164383|-74.0125343|40.7180543262|-74.01522362      |40.719745         |-74.0147852898|
|2010-09-22 11:54:40.0|842|40.7164383|-74.0125343|40.7199164581|-74.01152372359999|40.722198721999995|-74.00957512  |
|2010-09-22 11:54:40.0|842|40.7164383|-74.0125343|40.7191226005|-74.0093686167    |40.720835

In [121]:
case class EventBbox(
  ts: Timestamp,
  uid: Long, 
  lat:Double, 
  lon: Double, 
  lat_min:Double, 
  lon_min:Double, 
  lat_max:Double, 
  lon_max:Double)
  
case class EventDetected(
  ts: Timestamp,
  uid: Long, 
  lat: Double, 
  lon: Double, 
  bbox: Boolean
)

In [122]:
def bbox_check( x:EventBbox): Boolean = {
  x.lon >= x.lon_min &
  x.lon <= x.lon_max &
  x.lat >= x.lat_min &
  x.lat <= x.lat_max   
}

The following code uses the newer Dataset API, which is a DataFrame where row are handled as typed objects. In particular, we are converting the events row into a `EventDetected` object and then we check if the event is within the boundary of the given cluster. Since each user might have more than one cluster, we check each event against all the user's clusters and we consider it an outlier if none of the check is returns a positive outcome. 

In [123]:
val scored_events = bbox_events.
  as[EventBbox].
  map(x => EventDetected(x.ts, x.uid, x.lat, x.lon, bbox_check(x))).
  groupBy($"ts", $"uid").
  reduce( (x,y) => if (x.bbox) x else y ).
  map(x => x._2).
  cache()

Here below the outlier scoring for `uid=4831`. 

In [128]:
scored_events.
  filter(_.uid==22).
  show(10)

                                                                                +--------------------+---+------------------+------------------+-----+
|                  ts|uid|               lat|               lon| bbox|
+--------------------+---+------------------+------------------+-----+
|2009-12-25 09:11:...| 22|       40.80989134|      -73.95480966|false|
|2010-02-18 14:54:...| 22|40.744951366399995|    -73.9878090536| true|
|2010-03-24 19:50:...| 22|     40.7299255408|    -73.9808499813| true|
|2009-10-20 14:21:...| 22|        40.7244409|    -73.9924251833| true|
|2009-10-29 17:35:...| 22|     40.7253249667|    -74.0042984333| true|
|2009-11-07 11:47:...| 22|     40.6912468675|    -74.1795158386|false|
|2009-12-09 16:18:...| 22|        40.7508811|-74.00584190000001| true|
|2010-01-08 12:25:...| 22|     40.7298523686|-73.99140715600001| true|
|2010-03-02 19:27:...| 22|     40.7420385013|    -74.0047001839| true|
|2010-03-02 19:29:...| 22|        40.7421403|    -74.0034926333| t

As you can see three items are found outside those bounding boxes. Although this is not yet a strong indicator for an anomaly per se, it can constitute a very relevant signal if combined with other signals as seen above. Many improvements can be done to the above core idea, for instance, by including relations and interaction between users and more refined analysis of clusters, using for instance convex hulls instead of bounding boxes and so forth.

I hope you enjoyed this notebook, thanks for keeping up with me till here. Best wishes for your data science projects!