# Spark LDA

*ammonite-spark* handles loading Spark in a clever way, and does not rely on a specific Spark distribution.
Because of that, you can use it with any Spark 2.x version.
The only limitation is that the Scala version of Spark and the running Almond kernel must match, so make sure your
kernel uses the same Scala version as your Spark cluster.
Spark 2.0.x - 2.3.x requires Scala 2.11. Spark 2.4.x supports both Scala 2.11 and 2.12.


In [1]:
import $ivy.`org.apache.spark::spark-sql:2.4.5` // Or use any other 2.x version here
import org.apache.spark.sql._
import $ivy.`org.apache.spark::spark-mllib:2.4.5`


Downloading https://repo1.maven.org/maven2/org/apache/spark/spark-sql_2.12/2.4.5/spark-sql_2.12-2.4.5.pom
Downloaded https://repo1.maven.org/maven2/org/apache/spark/spark-sql_2.12/2.4.5/spark-sql_2.12-2.4.5.pom
Downloading https://repo1.maven.org/maven2/org/apache/spark/spark-parent_2.12/2.4.5/spark-parent_2.12-2.4.5.pom
Downloaded https://repo1.maven.org/maven2/org/apache/spark/spark-parent_2.12/2.4.5/spark-parent_2.12-2.4.5.pom
Downloading https://repo1.maven.org/maven2/org/apache/spark/spark-catalyst_2.12/2.4.5/spark-catalyst_2.12-2.4.5.pom
Downloading https://repo1.maven.org/maven2/org/apache/spark/spark-sketch_2.12/2.4.5/spark-sketch_2.12-2.4.5.pom
Downloading https://repo1.maven.org/maven2/org/apache/spark/spark-core_2.12/2.4.5/spark-core_2.12-2.4.5.pom
Downloading https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.6.7.3/jackson-databind-2.6.7.3.pom
Downloading https://repo1.maven.org/maven2/org/apache/spark/spark-tags_2.12/2.4.5/spark-tags_2.12-2.4.5.po

Downloaded https://repo1.maven.org/maven2/org/apache/spark/spark-sketch_2.12/2.4.5/spark-sketch_2.12-2.4.5-sources.jar
Downloaded https://repo1.maven.org/maven2/org/apache/spark/spark-sql_2.12/2.4.5/spark-sql_2.12-2.4.5-sources.jar
Downloaded https://repo1.maven.org/maven2/org/apache/spark/spark-catalyst_2.12/2.4.5/spark-catalyst_2.12-2.4.5.jar
Downloaded https://repo1.maven.org/maven2/org/apache/spark/spark-core_2.12/2.4.5/spark-core_2.12-2.4.5.jar
Downloaded https://repo1.maven.org/maven2/org/apache/spark/spark-sql_2.12/2.4.5/spark-sql_2.12-2.4.5.jar
Downloading https://repo1.maven.org/maven2/org/apache/spark/spark-mllib_2.12/2.4.5/spark-mllib_2.12-2.4.5.pom
Downloaded https://repo1.maven.org/maven2/org/apache/spark/spark-mllib_2.12/2.4.5/spark-mllib_2.12-2.4.5.pom
Downloading https://repo1.maven.org/maven2/org/apache/spark/spark-graphx_2.12/2.4.5/spark-graphx_2.12-2.4.5.pom
Downloading https://repo1.maven.org/maven2/org/apache/spark/spark-mllib-local_2.12/2.4.5/spark-mllib-local_2.1

[32mimport [39m[36m$ivy.$                                   // Or use any other 2.x version here
[39m
[32mimport [39m[36morg.apache.spark.sql._
[39m
[32mimport [39m[36m$ivy.$                                    
[39m

Usually you want to disable logging in order to avoid polluting your cell outputs.

In [2]:
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)

[32mimport [39m[36morg.apache.log4j.{Level, Logger}
[39m

Then create a `SparkSession` using the `NotebookSparkSessionBuilder` provided by *almond-spark*.

## Running in local mode
This will run Spark in the same JVM as your kernel.

In [3]:
val spark = {
  NotebookSparkSession.builder()
    .master("local[*]")
    .getOrCreate()
}

Loading spark-stubs
Getting spark JARs
Creating SparkSession


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties


[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@69bff21e

When running this, you should see that the cell output contains a link to the Spark UI.

Note the use of `NotebookSparkSession.builder()`, instead of `SparkSession.builder()` that you would use when e.g. writing a Spark job.

The builder returned by `NotebookSparkSession.builder()` extends the one of `SparkSession.builder()`,
so that you can call `.appName("foo")`, `.config("key", "value")`, etc. on it.


In [4]:
import org.apache.spark.ml.clustering.LDA
import org.apache.spark.ml.feature.RegexTokenizer
import org.apache.spark.ml.feature.StopWordsRemover
import org.apache.spark.ml.feature.CountVectorizer
import org.apache.spark.mllib.linalg.Vector

[32mimport [39m[36morg.apache.spark.ml.clustering.LDA
[39m
[32mimport [39m[36morg.apache.spark.ml.feature.RegexTokenizer
[39m
[32mimport [39m[36morg.apache.spark.ml.feature.StopWordsRemover
[39m
[32mimport [39m[36morg.apache.spark.ml.feature.CountVectorizer
[39m
[32mimport [39m[36morg.apache.spark.mllib.linalg.Vector[39m

In [5]:
import scala.io.Source
val url = "https://raw.githubusercontent.com/neelsmith/nomisma/master/cex/devastation-2010-04-10.cex"

[32mimport [39m[36mscala.io.Source
[39m
[36murl[39m: [32mString[39m = [32m"https://raw.githubusercontent.com/neelsmith/nomisma/master/cex/devastation-2010-04-10.cex"[39m

In [6]:
val lines = Source.fromURL(url).getLines.toVector
val obvDescs = lines.map(ln => { val cols = ln.split("#"); cols(7)}).map(_.toLowerCase).
map(_.replaceAll("[^a-z ]", ""))

[36mlines[39m: [32mscala[39m.[32mpackage[39m.[32mVector[39m[[32mString[39m] = [33mVector[39m(
  [32m"ID#Label#Denomination#Metal#Authority#Mint#Region#ObvType#ObvLegend#ObvPortraitId#RevType#RevLegend#RevPortraitId#StartDate#EndDate"[39m,
  [32m"http://numismatics.org/aod/id/7513.81516.2.a#AoD 7513.81516.2.a#none#ar#German Empire#http://www.geonames.org/2861650#none#Facing bust#GENERAL FELDMARSCHALL von HINDENBURG##Nude man wrestling nude giant to the ground#TITANEN KAMPF 1914-1915##1915#1915"[39m,
  [32m"http://numismatics.org/aod/id/7513.13525.20.a#AoD 7513.13525.20.a#none#none#German Empire#http://www.geonames.org/2825297#none#Group of spike-helmeted German soldiers running to attack to r.; carrying rifles with bayonets and swords.#1914/15/ FELDZUG/ GEGEN FRANKREICH, RUSSLAND,/ ENGLAND, JAPAN U.S.W.#####1915#1915"[39m,
  [32m"http://numismatics.org/aod/id/7513.7155.300.c#AoD 7513.7155.300.c#none#fe#German Weimar Republic#http://www.geonames.org/2867714#none#Head o

### Create `DataFrame` with text corpus

In [9]:
// Create RDD:
val txtRdd = spark.sparkContext.parallelize(obvDescs).zipWithIndex



[36mtxtRdd[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mrdd[39m.[32mRDD[39m[([32mString[39m, [32mLong[39m)] = ZippedWithIndexRDD[4] at zipWithIndex at cmd8.sc:1

In [11]:
// Import implicits *after* creation of context.
import spark.sqlContext.implicits._

val corpus_df = txtRdd.toDF("corpus", "id")


[32mimport [39m[36mspark.sqlContext.implicits._
[39m
[36mcorpus_df[39m: [32mDataFrame[39m = [corpus: string, id: bigint]

## Tokenize

In [12]:
val tokenizer = new RegexTokenizer().setPattern("[\\W_]+").setMinTokenLength(4).setInputCol("corpus").setOutputCol("tokens")
val tokenized_df = tokenizer.transform(corpus_df)


[36mtokenizer[39m: [32mRegexTokenizer[39m = regexTok_8a36b65647f2
[36mtokenized_df[39m: [32mDataFrame[39m = [corpus: string, id: bigint ... 1 more field]

## Filter out stop words

Well, think about a serious stop-word list at some point, but here's the technique.

In [15]:
val stopWords = Array("a", "and", "the", "of")

val remover = new StopWordsRemover().setStopWords(stopWords).setInputCol("tokens").setOutputCol("filtered")
val filtered_df = remover.transform(tokenized_df)





[36mstopWords[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m([32m"a"[39m, [32m"and"[39m, [32m"the"[39m, [32m"of"[39m)
[36mremover[39m: [32mStopWordsRemover[39m = stopWords_b13d3899e309
[36mfiltered_df[39m: [32mDataFrame[39m = [corpus: string, id: bigint ... 2 more fields]

### Compute counts of each token for each text


In [16]:
val vectorizer = new CountVectorizer().setInputCol("filtered").setOutputCol("features").setVocabSize(10000).setMinDF(5).fit(filtered_df)
val countVectors = vectorizer.transform(filtered_df).select("id", "features")



[36mvectorizer[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mml[39m.[32mfeature[39m.[32mCountVectorizerModel[39m = cntVec_4844bbbb7f20
[36mcountVectors[39m: [32mDataFrame[39m = [id: bigint, features: vector]

### Create LDA model

In [17]:
val lda = new LDA().setK(10).setMaxIter(10)
val model = lda.fit(countVectors)

20/04/15 18:23:17 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
20/04/15 18:23:17 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


[36mlda[39m: [32mLDA[39m = lda_3aed844a700e
[36mmodel[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mml[39m.[32mclustering[39m.[32mLDAModel[39m = lda_3aed844a700e

### Compute topics from model

Each topic is a set of terms with corresponding weights.


In [19]:
val topics = model.describeTopics.show(false)

+-----+------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|topic|termIndices                                     |termWeights                                                                                                                                                                                                                       |
+-----+------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0    |[1, 2, 12, 21, 11, 3, 23, 33, 41, 15]           |[0.06508165168997954, 0.026391935224544322, 0.023888805007839874, 0.018253076995016077, 0.01

### Compute distribution of topics per document

In [23]:
val transformed = model.transform(countVectors)
transformed.printSchema // show(false)
transformed.show(3, false)


root
 |-- id: long (nullable = false)
 |-- features: vector (nullable = true)
 |-- topicDistribution: vector (nullable = true)



+---+---------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id |features                                                                   |topicDistribution                                                                                                                                                                                              |
+---+---------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0  |(452,[],[])                                                                |[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]        

[36mtransformed[39m: [32mDataFrame[39m = [id: bigint, features: vector ... 1 more field]

## Syncing Dependencies

If extra dependencies are loaded, via ``import $ivy.`…` `` after the `SparkSession` has been created, you should call `NotebookSparkSession.sync()` for the newly added JARs to be passed to the Spark executors.

Make sure the version of Spark used to start the master and executors matches the one loaded in the notebook session
(via e.g. ``import $ivy.`org.apache.spark::spark-sql:X.Y.Z` ``), and that the machine running the kernel can access / is
accessible from all nodes of the standalone cluster.


[32mimport [39m[36morg.apache.spark.ml.clustering.LDA
[39m
[32mimport [39m[36morg.apache.spark.ml.feature.RegexTokenizer
[39m
[32mimport [39m[36morg.apache.spark.ml.feature.StopWordsRemover
[39m
[32mimport [39m[36morg.apache.spark.ml.feature.CountVectorizer
[39m
[32mimport [39m[36morg.apache.spark.mllib.linalg.Vector
[39m

In [None]:
import $ivy.`org.typelevel::cats-core:1.6.0`

NotebookSparkSession.sync() // cats should be available on workers

## Datasets and Dataframes

If you try to create a `Dataset` or a `Dataframe` from some data structure containing a case class and you're getting an `org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class ...` when calling `.toDS`/`.toDF`, try the following workaround:

Add `org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)` in the same cell where you define case classes involved.

In [None]:
import spark.implicits._

org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this);

case class Person(id: String, value: Int)

val ds = List(Person("Alice", 42), Person("Bob", 43), Person("Charlie", 44)).toDS

This workaround won't be neccessary anymore in future Spark versions.

### Rich Display of Datasets and Dataframes

As of now, *almond-spark* doesn't include native rich display capabilities for Datasets and Dataframes. So by default, we only have ascii rendering of tables.

In [None]:
ds.show()

It's not too hard to add your own displayer though. Here's an example:

In [None]:
// based on a snippet by Ivan Zaitsev
// https://github.com/almond-sh/almond/issues/180#issuecomment-364711999
implicit class RichDF(val df: DataFrame) {
  def showHTML(limit:Int = 20, truncate: Int = 20) = {
    import xml.Utility.escape
    val data = df.take(limit)
    val header = df.schema.fieldNames.toSeq
    val rows: Seq[Seq[String]] = data.map { row =>
      row.toSeq.map { cell =>
        val str = cell match {
          case null => "null"
          case binary: Array[Byte] => binary.map("%02X".format(_)).mkString("[", " ", "]")
          case array: Array[_] => array.mkString("[", ", ", "]")
          case seq: Seq[_] => seq.mkString("[", ", ", "]")
          case _ => cell.toString
        }
        if (truncate > 0 && str.length > truncate) {
          // do not show ellipses for strings shorter than 4 characters.
          if (truncate < 4) str.substring(0, truncate)
          else str.substring(0, truncate - 3) + "..."
        } else {
          str
        }
      }: Seq[String]
    }

    publish.html(s"""
      <table class="table">
        <tr>
        ${header.map(h => s"<th>${escape(h)}</th>").mkString}
        </tr>
        ${rows.map { row =>
          s"<tr>${row.map { c => s"<td>${escape(c)}</td>" }.mkString}</tr>"
        }.mkString
        }
      </table>""")
  }
}

In [None]:
ds.toDF.showHTML()