# Plongeur

A *topological data analysis* library.

> Core algorithm written in [Scala](http://www.scala-lang.org/), using Apache [Spark](http://spark.apache.org/).
> 
> Executed in a [Jupyter](http://jupyter.org/) notebook, using the Apache [Toree](https://github.com/apache/incubator-toree) kernel and [declarative widgets](http://jupyter-incubator.github.io/declarativewidgets/docs.html).
>
> Graphs rendered with [Sigma](http://sigmajs.org/)/[Linkurious](https://github.com/Linkurious/linkurious.js), wrapped in a [Polymer](https://www.polymer-project.org/1.0/) component.
> 
> Reactive machinery powered by [Rx](http://reactivex.io/) [RxScala](https://github.com/ReactiveX/RxScala).


#### Maven dependencies

In [1]:
%AddDeps org.apache.spark spark-mllib_2.10 1.6.2 --repository file:/Users/tmo/.m2/repository
%AddDeps org.scalanlp breeze_2.10 0.11.2 --transitive
%AddDeps org.scalanlp breeze-natives_2.10 0.11.2
%AddDeps org.scalanlp breeze-macros_2.10 0.11.2
%AddDeps com.github.haifengl smile-core 1.2.0 --transitive
%AddDeps com.github.karlhigley spark-neighbors_2.10 0.3.6-FORK --repository file:/Users/tmo/.m2/repository
%AddDeps io.reactivex rxscala_2.10 0.26.1 --transitive --repository file:/Users/tmo/.m2/repository 
%AddDeps com.softwaremill.quicklens quicklens_2.10 1.4.4 --repository file:/Users/tmo/.m2/repository
%AddDeps org.tmoerman plongeur-spark_2.10 0.3.51 --repository file:/Users/tmo/.m2/repository

Marking org.apache.spark:spark-mllib_2.10:1.6.2 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps9111157238150301946/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps9111157238150301946/https/repo1.maven.org/maven2/org/apache/spark/spark-mllib_2.10/1.6.2/spark-mllib_2.10-1.6.2.jar
Marking org.scalanlp:breeze_2.10:0.11.2 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps9111157238150301946/
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps9111157238150301946/https/repo1.maven.org/maven2/org/spire-math/spire-macros_2.10/0.7.4/spire-macros_2.10-0.7.4.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps9111157238150301946/https/repo1.maven.org/maven2/junit/junit/4.8.2/junit-4.8.2.jar
-

In [2]:
%addjar http://localhost:8888/nbextensions/declarativewidgets/declarativewidgets.jar

Starting download from http://localhost:8888/nbextensions/declarativewidgets/declarativewidgets.jar
Finished download of declarativewidgets.jar


#### Import classes

In [3]:
import rx.lang.scala.{Observer, Subscription, Observable}
import rx.lang.scala.subjects.PublishSubject
import rx.lang.scala.subjects._

import org.apache.commons.lang.StringUtils.trim
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.Vectors.dense
import org.apache.spark.rdd.RDD
import org.joda.time.DateTime
import org.tmoerman.plongeur.tda.TDAMachine
import org.tmoerman.plongeur.tda.Distances._
import org.tmoerman.plongeur.tda.Model._
import org.tmoerman.plongeur.tda.Filters._
import org.tmoerman.plongeur.tda.cluster.Clustering._
import org.tmoerman.plongeur.tda.cluster.Scale._
import org.tmoerman.plongeur.tda.Colour._
import org.tmoerman.plongeur.tda.Brewer
import org.tmoerman.plongeur.tda.LSH.LSHParams
import org.tmoerman.plongeur.tda.Model.{DataPoint, TDAContext, dp}
import org.tmoerman.plongeur.tda.knn.FastKNN.FastKNNParams
import org.tmoerman.plongeur.tda.knn.SampledKNN.SampledKNNParams
import org.tmoerman.plongeur.tda.knn.{FastKNN, SampledKNN, _}
import org.tmoerman.plongeur.util.RDDFunctions._
import org.tmoerman.plongeur.util.TimeUtils.time
import org.tmoerman.plongeur.tda.geometry.Laplacian._
import breeze.stats.distributions._
import org.apache.spark.mllib.linalg.SparseMatrix

In [4]:
import declarativewidgets._
initWidgets

import declarativewidgets.WidgetChannels.channel

In [5]:
import java.util.concurrent.atomic.AtomicReference

case class SubRef(val ref: AtomicReference[Option[Subscription]] = new AtomicReference[Option[Subscription]](None)) extends Serializable {

    def update(sub: Subscription): Unit = ref.getAndSet(Option(sub)).foreach(old => old.unsubscribe())

    def reset(): Unit = update(null)

}

#### Import polymer elements

These cells triggers Bower installations of the specified web components. 

If it doesn't work, check whether Bower has sufficient permissions to install in the jupyter `/nbextensions` folder.

In [6]:
%%html
<link rel='import' href='urth_components/paper-slider/paper-slider.html' 
        is='urth-core-import' package='PolymerElements/paper-slider'>
<link rel='import' href='urth_components/paper-button/paper-button.html' 
        is='urth-core-import' package='PolymerElements/paper-button'>
<link rel='import' href='urth_components/plongeur-graph/plongeur-graph.html' 
        is='urth-core-import' package='tmoerman/plongeur-graph'>
<link rel='import' href='urth_components/urth-viz-scatter/urth-viz-scatter.html' is='urth-core-import'>

#### Reactive TDA Machine

Keep references to Rx subscriptions apart.

In [7]:
val in$_subRef = SubRef()

Instantiate a `PublishSubject`. This stream of `TDAParams` instances represents the input of a `TDAMachine`. The `PublishSubject` listens to changes and sets these to the channel `"ch_TDA_1"` under the `"params"` key.

*TODO: unsubscribe previous on re-evaluation*

In [8]:
val in$ = PublishSubject[TDAParams]

in$_subRef.update(in$.subscribe(p => channel("ch_TDA_1").set("params", p.toString)))

Create an initial `TDAParams` instance. In the same cell, we submit the instance to the `PublishSubject`.

For the sake of illustration, we create an html snippet that listens to changes on the `"ch_TDA_1"` channel and displays the value of the `"params"` key.

In [9]:
%%html
<template is='urth-core-bind' channel='ch_TDA_1'>  
    <div style='background: #FFB; padding: 10px;'>
        <span style='font-family: "Courier"'>[[params]]</span>
    </div>
</template>

Notice that when we evaluate the `TDAParams` instantiation cells, the output of the yellow box changes.

#### Inititalize rdd

In this example, we are using a synthetic torus-shaped 2D data set.

In [10]:
import org.apache.spark.rdd.RDD
import org.apache.commons.lang.StringUtils.trim
import org.apache.spark.mllib.linalg.Vectors.dense

def readMixture(file: String): RDD[DataPoint] = {
  sc
    .textFile(file)
    .zipWithIndex
    .map{ case (line, idx) =>
      val columns = line.split(",").map(trim)
      val category = columns.head
      val features = columns.tail.map(_.toDouble)

      dp(idx, dense(features), Map("cat" -> category)) }
}

In [11]:
val data_path = "/Users/tmo/Work/batiskav/projects/plongeur/scala/plongeur-spark/src/test/resources/data/"

val mixture_path = data_path + "mixture.1000.2.csv"

val rdd = readMixture(mixture_path).cache

val ctx = TDAContext(sc, rdd)

In [12]:
rdd.count

1250

Turn a TDAResult into a data structure.

In [13]:
val r = scala.util.Random

def format(result: TDAResult) = Map(
    "nodes" -> result.clusters.map(c =>
      Map(
        "id"     -> c.id.toString,
        "size"   -> c.dataPoints.size,
        "color"  -> c.colours.headOption.getOrElse("#000000"),
        "x"      -> r.nextInt(100),
        "y"      -> r.nextInt(100))),
    "edges" -> result.edges.map(e => {
      val (from, to) = e.toArray match {case Array(f, t) => (f, t)}

      Map(
        "id"     -> s"$from--$to",
        "source" -> from.toString,
        "target" -> to.toString)}))

Run the machine, obtaining an `Observable` of `TDAResult` instances

In [14]:
val out$: Observable[TDAResult] = TDAMachine.run(ctx, in$)

In [15]:
val out$_subRef = SubRef()

In [16]:
out$_subRef.update(
    out$.subscribe(
        onNext = (r) => channel("ch_TDA_1").set("result", format(r)),
        onError = (e) => println("Error in TDA machine: ", e)))

#### Reactive inputs

First, we set up a stream of updates to BASE TDAParams instance.

In [17]:
val pipe$_subRef = SubRef()

In [18]:
import org.tmoerman.plongeur.ui.Controls._
kernel.magics.html(controlsCSS)

In [40]:
import TDAParams._

val den = Filter(Density(sigma=1.0), 30, 0.30)

val pc0 = Filter(Feature(0), 20, 0.3)
val pc1 = Filter(Feature(1), 20, 0.3)

val selector = (d: DataPoint) => d.meta.get("cat")
val maxFq = ClusterMaxFrequency(Array("#F00", "#00F", "#999"), selector)
val avgFilterValue = AverageFilterValue(Brewer.palettes("PuOr")(9), den)

val BASE = 
    TDAParams(
        lens = TDALens(pc0, pc1),
        clusteringParams = ClusteringParams(),
        scaleSelection = firstGap(5),
        collapseDuplicateClusters = false,
        colouring = Nop())

in$.onNext(BASE)

In [34]:
%%html
<template is='urth-core-bind' channel='ch_TDA_1'>    
    <plongeur-graph height="600" data="{{result}}"></plongeur-graph>
</template>

In [36]:
val (sub, html) = BASE.makeControls(channel("ch_TDA_1"), in$)
pipe$_subRef.update(sub)
kernel.magics.html(html)

Filter,Feature(0),Feature(0).1
Nr of cover bins,,[[filter-0-nrBins]]
Cover overlap,,[[filter-0-overlap]]%
Filter,Feature(1),Feature(1)
Nr of cover bins,,[[filter-1-nrBins]]
Cover overlap,,[[filter-1-overlap]]%
,General,General
Nr of scale bins,,[[scaleBins]]
Collapse duplicates,,


In [27]:
val rawData = rdd.
    map(dp => dp.features.toArray.toList).collect.toList

rawData.take(1)

List(List(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.45157042641657, -0.254897771679638, -0.117268763277652, 0.484539839250765, -1.18693811390646, -0.593712755643318, -0.904161427869958, -0.0680593488088087, 0.301802668750097, -0.594621872021638, 1.03938840029754, -1.3667295355169))

In [28]:
channel("data").set("raw", rawData)

In [29]:
%%html
<template is='urth-core-bind' channel='data'>    
    <urth-viz-scatter
        datarows='[[raw]]'
        primary='0'
        secondary='1'        
        />
</template>