# MDMS with Rheem and plotly

This notebooks is used to test the MDMS together with Rheem to empower analytics and plotly to visualize results.

## Setup

At first we need to prepare our notebooks at first, namely configuring where to find the data and load the 
MDMS dependencies into the kernel

In [1]:
// Path to the data files
val pathToData = new java.io.File("data").getAbsolutePath

// MDMS version to use.
val mdmsVersion = "0.0.3-SNAPSHOT"
val rheemVersion = "0.2.0"

// Local Maven repository to load MDMS dependencies from.
val localRepository = "file:///Users/basti/.m2/repository"
classpath.addRepository(localRepository)

[36mpathToData[0m: [32mString[0m = [32m"/Users/basti/Work/Notebooks/mdms-on-jupyter/data"[0m
[36mmdmsVersion[0m: [32mString[0m = [32m"0.0.3-SNAPSHOT"[0m
[36mrheemVersion[0m: [32mString[0m = [32m"0.2.0"[0m
[36mlocalRepository[0m: [32mString[0m = [32m"file:///Users/basti/.m2/repository"[0m

In [2]:
// We do a little hacking to try to slf4j-nop as logging implementation.
// Too much logging output severly slows down the notebook.
classpath.add("org.slf4j" % "slf4j-nop" % "1.7.2")
locally {
    val loggerSource = Class.forName("org.slf4j.impl.StaticLoggerBinder").getProtectionDomain.getCodeSource
    println(s"Using logger implementation from $loggerSource.")
}

Adding 2 artifact(s)
Using logger implementation from (file:/Users/basti/.coursier/cache/v1/https/repo1.maven.org/maven2/org/slf4j/slf4j-nop/1.7.2/slf4j-nop-1.7.2.jar <no signer certificates>).




In [3]:
println("Resolving dependencies from:")
classpath.repositories.foreach(repo => println(s"* $repo"))

classpath.add("de.hpi.isg" % "mdms-analytics" % mdmsVersion)
classpath.add("de.hpi.isg" % "mdms-model" % mdmsVersion)
classpath.add("de.hpi.isg" % "mdms-dependencies" % mdmsVersion)
classpath.add("de.hpi.isg" % "mdms-simple" % mdmsVersion)

classpath.add("org.qcri.rheem" % "rheem-api" % rheemVersion)
classpath.add("org.qcri.rheem" % "rheem-basic" % rheemVersion)
classpath.add("org.qcri.rheem" % "rheem-java" % rheemVersion)

classpath.add("org.plotly-scala" %% "plotly-jupyter-scala" % "0.1.0")

Resolving dependencies from:
* ivy2Local
* https://repo1.maven.org/maven2
* https://oss.sonatype.org/content/repositories/releases
* file:///Users/basti/.m2/repository
Adding 177 artifact(s)
Adding 0 artifact(s)
Adding 0 artifact(s)
Adding 0 artifact(s)
Adding 0 artifact(s)
Adding 0 artifact(s)
Adding 0 artifact(s)
Adding 18 artifact(s)




## Create & Populate a Metadata Store

A metadata store is responsible of storing metadata and making it accessible via APIs. There are various storage backends for metadata store. In this notebook, we are using the simplest one, the `DefaultMetadataStore`, which keeps all of its information in memory and leverages Java's standard serialization.

The first kind of metadata we need to populate our metadate store with is a (relational) schema with its tables and their columns. All dependencies/integrity constraints/metadata, that we add later on, reference schema elements. This is crucial to integrate and jointly query different metadata types.

In [4]:
import de.hpi.isg.mdms.model.DefaultMetadataStore
import de.hpi.isg.mdms.analytics.visualization.{HTMLTable, Chart, RenderingConfig}
import de.hpi.isg.mdms.tools.apps.{CreateSchemaForCsvFilesApp, MetanomeDependencyImportApp, MetanomeStatisticsImportApp}
import de.hpi.isg.mdms.domain.constraints._

import de.hpi.isg.mdms.analytics.ConstraintImplicits._


// Create an in-memory Metadatastore
val store = new DefaultMetadataStore
val schemaName = "musicbrainz"

// Import Schema
CreateSchemaForCsvFilesApp.fromParameters(store,
    s"$pathToData/data-sample",
    schemaName,
    "tab",
    "none",
    "false")


SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/basti/.coursier/cache/v1/https/repo1.maven.org/maven2/org/slf4j/slf4j-nop/1.7.2/slf4j-nop-1.7.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/basti/.coursier/cache/v1/https/repo1.maven.org/maven2/org/slf4j/slf4j-simple/1.7.13/slf4j-simple-1.7.13.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.helpers.NOPLoggerFactory]


[32mimport [36mde.hpi.isg.mdms.model.DefaultMetadataStore[0m
[32mimport [36mde.hpi.isg.mdms.analytics.visualization.{HTMLTable, Chart, RenderingConfig}[0m
[32mimport [36mde.hpi.isg.mdms.tools.apps.{CreateSchemaForCsvFilesApp, MetanomeDependencyImportApp, MetanomeStatisticsImportApp}[0m
[32mimport [36mde.hpi.isg.mdms.domain.constraints._[0m
[32mimport [36mde.hpi.isg.mdms.analytics.ConstraintImplicits._[0m
[36mstore[0m: [32mde[0m.[32mhpi[0m.[32misg[0m.[32mmdms[0m.[32mmodel[0m.[32mDefaultMetadataStore[0m = MetadataStore[1 schemas, 0 constraint collections]
[36mschemaName[0m: [32mString[0m = [32m"musicbrainz"[0m

Next, we import FDs, INDs etc. In general, these can be obtained using [Metanome](https://www.metanome.de).

In [5]:
// Import Multi-Column Dependencies
MetanomeDependencyImportApp.fromParameters(store,
    s"$pathToData/mc-results/FDs.txt",
    schemaName,
    "FD")

MetanomeDependencyImportApp.fromParameters(store,
    s"$pathToData/mc-results/INDs.txt",
    schemaName,
    "IND")

MetanomeDependencyImportApp.fromParameters(store,
    s"$pathToData/mc-results/UCCs.txt",
    schemaName,
    "UCC")

// Import Single-Column Statistics
MetanomeStatisticsImportApp.fromParameters(store,
    s"$pathToData/sc-results",
    schemaName)



## Analytics

Having initialized the metadata store with a schema and corresponding metadata, we can finally do analytics to serve various use cases, e.g., data exploration.

In a very first effort, we just count 

In [6]:
import plotly._
import plotly.element._
import plotly.layout._
import plotly.JupyterScala._

plotly.JupyterScala.init()

[32mimport [36mplotly._[0m
[32mimport [36mplotly.element._[0m
[32mimport [36mplotly.layout._[0m
[32mimport [36mplotly.JupyterScala._[0m

In [7]:
import de.hpi.isg.mdms.analytics._
import org.qcri.rheem.core.api._
import org.qcri.rheem.api._
import org.qcri.rheem.java.Java

val rheemConfig = new Configuration
val rheemCtx = new RheemContext(rheemConfig).withPlugin(Java.basicPlugin).withPlugin(Java.graphPlugin)

[32mimport [36mde.hpi.isg.mdms.analytics._[0m
[32mimport [36morg.qcri.rheem.core.api._[0m
[32mimport [36morg.qcri.rheem.api._[0m
[32mimport [36morg.qcri.rheem.java.Java[0m
[36mrheemConfig[0m: [32morg[0m.[32mqcri[0m.[32mrheem[0m.[32mcore[0m.[32mapi[0m.[32mConfiguration[0m = Configuration[(no name)]
[36mrheemCtx[0m: [32morg[0m.[32mqcri[0m.[32mrheem[0m.[32mcore[0m.[32mapi[0m.[32mRheemContext[0m = org.qcri.rheem.core.api.RheemContext@5addf079

In [8]:
import scala.collection.JavaConversions._
store.getConstraintCollections.foreach(cc => println(f"ID ${cc.getId}%12d: ${cc.getDescription}"))

ID   2096544347: FD import (Thu Sep 22 11:20:46 CEST 2016)
ID   1913119238: IND import (Thu Sep 22 11:20:47 CEST 2016)
ID    926104209: UCC import (Thu Sep 22 11:20:49 CEST 2016)
ID   1653643669: Single column statistics import


[32mimport [36mscala.collection.JavaConversions._[0m

In [13]:
val fdCcId = 2096544347
val indCcId = 1913119238
val uccCcId = 926104209
val statisticsCcId = 1653643669

[36mfdCcId[0m: [32mInt[0m = [32m2096544347[0m
[36mindCcId[0m: [32mInt[0m = [32m1913119238[0m
[36muccCcId[0m: [32mInt[0m = [32m926104209[0m
[36mstatisticsCcId[0m: [32mInt[0m = [32m1653643669[0m

Next, we visualize above results in a histogram.

In [14]:
locally {
    val planBuilder = new PlanBuilder(rheemCtx)

    val (constraintNames, constraintCounts) = Seq(
        "FDs" -> store.loadFDs(fdCcId)(planBuilder).count.collect().head.toInt,
        "INDs" -> store.loadINDs(indCcId)(planBuilder).count.collect().head.toInt,
        "UCCs" -> store.loadUCCs(uccCcId)(planBuilder).count.collect().head.toInt,
        "statistics" -> store.loadStatistics(statisticsCcId)(planBuilder).count.collect().head.toInt
        ).unzip

    Bar(constraintNames, constraintCounts).plot()
}

[36mres13[0m: [32mString[0m = [32m"plot-2114779130"[0m

We proceed with a simple aggregation of constraints. The next query creates a histogram of columns according to their number of distinct values.

In [15]:
locally {
    implicit val planBuilder = new PlanBuilder(rheemCtx)
    
    def calcHistogramKey(statistics: ColumnStatistics): Int = {
        val base = 2
        val numDistinctValues = statistics.getNumDistinctValues
        if (numDistinctValues == 0L) 0 
        else {
                var lower = 1
                while (lower * 2 < numDistinctValues) lower *= 2
                lower
        }
    }
    
    val distinctValueHistogram = store.loadStatistics(statisticsCcId)
        .map(stats => (calcHistogramKey(stats), 1))
        .reduceByKey(_._1, (t1, t2) => (t1._1, t1._2 + t2._2))
        .collect()
        .toSeq.sortBy(_._1)
        .map { case (key, count) => (s">= $key", count) }
    
    val (x, y) = distinctValueHistogram.unzip
    Bar(x, y).plot()
}

[36mres14[0m: [32mString[0m = [32m"plot-714592476"[0m

Last but not least, we perform a little more intricate analysis. For each (unary) IND, we collect the numbers of distinct values of both the dependent and referenced column, such that we can map each IND into a coordinate system. The resulting scatter chart reveals different types of INDs found. The key point here is that the MDMS design allows to easily integrate different metadata types.

In [16]:
locally {
    implicit val planBuilder = new PlanBuilder(rheemCtx)
    import org.qcri.rheem.basic.data.{Tuple2 => T2}
    
    def toExpBase(value: Long, base: Int): Long = {
        if (value == 0L) 0 
        else {
                var lower = 1
                while (lower * 2 < value) lower *= base
                lower
        }
    }
    
    def toIntervalCenter(value: Long, interval: Long) = value - (value % interval) + (interval / 2)
    
    def roundDistinctValues(numDistinctValues: Long): Long = numDistinctValues //toIntervalCenter(numDistinctValues, interval = 200)
    
    val distinctValues = store.loadStatistics(statisticsCcId)
        .map(stats => (stats.getTargetReference.getTargetId, stats.getNumDistinctValues))
    
    val indDvCounts = store.loadINDs(indCcId)
        .filter(_.getArity == 1)
        .map {
            unaryInd => 
                val targetRef = unaryInd.getTargetReference
                (targetRef.getDependentColumns()(0), targetRef.getReferencedColumns()(0))
        }
        .join[(Int, Long), Int](_._1, distinctValues, _._1)
        .map(joinLine => (joinLine.field1._2, joinLine.field0._2))
        .join[(Int, Long), Int](_._2, distinctValues, _._1)
        .map(joinLine => (joinLine.field0._1, joinLine.field1._2))
        .filter { case (depDvc, refDvc) => depDvc <= refDvc}
        .map(indDvs => ((roundDistinctValues(indDvs._1), roundDistinctValues(indDvs._2)), 1))
        .reduceByKey(_._1, (indDvs1, indDvs2) => (indDvs1._1, indDvs1._2 + indDvs2._2))
        .collect()
        .toSeq
    
    val (indDvs, counts) = indDvCounts.unzip
    val (depDvs, refDvs) = indDvs.unzip
    
    val layout = Layout(
        title = "Distinct values of LHS and RHS of all unary INDs",
        xaxis = Axis(`type` = AxisType.Log, title = "distinct values in dependent column"),
        yaxis = Axis(`type` = AxisType.Log, title = "distinct values in referenced column")
    )
    
    val maxDv = (1.1 * refDvs.max).toInt
    val diagonal = Scatter(Seq(0, 1, maxDv), Seq(0, 1, maxDv), mode = ScatterMode(ScatterMode.Lines), name = "diagonal")
    
    val scatter = Scatter(
        depDvs.map(_.toInt).toSeq,
        refDvs.map(_.toInt).toSeq,
        mode = ScatterMode(ScatterMode.Markers),
        marker = Marker(size = counts.map(count => (0.1 * math.sqrt(count)).toInt + 3).toSeq),
        name = "INDs [#]",
        text = counts.map(count => f"$count%,d INDs")
    )
    
    JupyterScala.plot(Seq(diagonal, scatter), layout)
}

[36mres15[0m: [32mString[0m = [32m"plot-1115052257"[0m