# Spark LDA

An example of topic modelling a corpus of texts using Spark ML's LDA.

In the first two code cells, you can define your main decisions about how to topic model your corpus by setting key values, and by downloading and cleaning up your texts.


## Settings

- `k` is the traditional name for the number of topics to find
- `iterations` is the number of cycles the LDA algorithm should run through
- `stopWords` is an Array of words to omit from the model
- `vocabSize` is the number of terms to consider
- `termsToDisplay` is the number of terms to use in describing a topic

In [None]:
val k = 10
val iterations = 20
val stopWords = Array("de", "kai", "to", "thn", "gar", "twn", "h", "tou", "ws", "o", "ths", "ton", "dia", "mh", "oti", "ou", "pros", "eis", "men", "oi", "ouk", "en", "tous", "epi", "ta", "tw|", "tois", "auton", "ei", "nun", "peri", "hn", "oun", "autw|", "autou", "alla", "tas", "all'", "esti", "estin", "te", "th|", "touto", "tauta", "apo", "ek", "meta", "ti", "ec", "anti", "oude")

val vocabSize = 10000
val minimumTokenLength = 4
val termsToDisplay = 15

// Cosmetic setting for table display:
val maxWidth = 1000

## Download data and clean up text


This example uses delimited-text data from the OCRE data set. 
We extract column 7, then tidy up the data by:

- converting all text to lower case
- removing all characters *except* alphabetic `a-z` and the space character

In [None]:
val personalRepo = coursierapi.MavenRepository.of("https://dl.bintray.com/neelsmith/maven")
interp.repositories() ++= Seq(personalRepo)

In [None]:
import $ivy.`edu.holycross.shot.cite::xcite:4.3.0`
import $ivy.`edu.holycross.shot::ohco2:10.20.3`
import $ivy.`edu.holycross.shot::greek:5.5.1`
import $ivy.`edu.holycross.shot.mid::orthography:2.0.0`

In [None]:
import edu.holycross.shot.cite._
import edu.holycross.shot.ohco2._
import edu.holycross.shot.greek._
import edu.holycross.shot.mid.orthography._


val venetusAUrl = "https://raw.githubusercontent.com/neelsmith/summer2020nbs/master/data/hmt-2020i-noIliad.cex"
val twins9Url = "https://raw.githubusercontent.com/neelsmith/summer2020nbs/master/data/twins9corpus.cex"


In [None]:
// create  source corpora
val twins9 = CorpusSource.fromUrl(twins9Url)
val venetusA = CorpusSource.fromUrl(venetusAUrl)

In [None]:

// - tokenize, keep only lexical tokens
// - make LiteraryGreekStrings from lexical tokens, drop accents and breathings
// - recompose into a single stripped-down string for each line
def curateNode(cn: CitableNode, siglum: String) : CitableNode = {
  if (cn.text.isEmpty){
    println("EMPTY TEXT: " + cn.urn)
    cn
  } else {

    val lexTokens = LiteraryGreekString.tokenizeNode(cn).filter(_.tokenCategory == Some(LexicalToken))
    val lgs = lexTokens.map(tkn => LiteraryGreekString(tkn.text).toLower.stripBreathingAccent.ascii)
    val simpleAscii = lgs.mkString(" ")
    CitableNode(cn.urn.addVersion(s"${siglum}_simpleascii"),simpleAscii)
  }
}


def asciiCorpus(c: Corpus, siglum: String) : Corpus = {
  Corpus(c.nodes.map(n => curateNode(n, siglum)))
}

In [None]:
val upsilonScholia = twins9 ~~ CtsUrn("urn:cts:greekLit:tlg5026.e3.hmt:")
val vaScholia = venetusA ~~  CtsUrn("urn:cts:greekLit:tlg5026:") 
val va9 = Corpus(vaScholia.nodes.filter(n => n.urn.passageComponent.startsWith("9")))
/*
val vaMain = venetusA ~~  CtsUrn("urn:cts:greekLit:tlg5026.msA.hmt:")
val vaAim =  venetusA ~~  CtsUrn("urn:cts:greekLit:tlg5026.msAim.hmt:")
val vaAint =  venetusA ~~  CtsUrn("urn:cts:greekLit:tlg5026.msAint.hmt:")
val vaAext =  venetusA ~~  CtsUrn("urn:cts:greekLit:tlg5026.msAext.hmt:")
val vaAilt =  venetusA ~~  CtsUrn("urn:cts:greekLit:tlg5026.msAil.hmt:")
*/
val allScholia = upsilonScholia ++ va9


In [None]:
allScholia.size


In [None]:
val scholiaAscii = asciiCorpus(allScholia, "msa_and_upsilon_")

In [None]:
scholiaAscii.size


## Setup a Spark notebook session

Import libraries, configure debugging, start up a local Spark notebook session.  These four cells fall in the category of "stuff you copy and paste in to set up a Jupyter notebook with Spark and don't think about too much."

In [None]:
import $ivy.`org.apache.spark::spark-sql:2.4.5` // Or use any other 2.x version here
import org.apache.spark.sql._
import $ivy.`org.apache.spark::spark-mllib:2.4.5`


In [None]:
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)

In [None]:
val spark = {
  NotebookSparkSession.builder()
    .master("local[*]")
    .getOrCreate()
}

## Topic modelling with Spark LDA

After importing a small mountain of Spark libraries, the following cells go through the basic steps of topic modelling:

1. Create a text corpus
2. Tokenize
3. Filter stop words
4. Count word occurrences for each text
5. Create the LDA model by "fitting" it to our data
6. Apply the model to compute the topics and their distribution in each document of our corpus


In [None]:
import org.apache.spark.ml.clustering.LDA
import org.apache.spark.ml.feature.RegexTokenizer
import org.apache.spark.ml.feature.StopWordsRemover
import org.apache.spark.ml.feature.CountVectorizer
import org.apache.spark.mllib.linalg.Vector
import scala.collection.mutable.WrappedArray
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.functions._

### 1. Create `DataFrame` with text corpus

Getting your clean text into a Spark `DataFrame` is an awkward, two-step process. (This should be simpler in futuer versions of Spark.)

The important output is `corpus_df`, a `DataFrame` with one row for every text.


In [None]:
// Create RDD:
val scholiaText = scholiaAscii.nodes.map(n => n.text)
val txtRdd = spark.sparkContext.parallelize(scholiaText).zipWithIndex



In [None]:
// Import implicits *after* creation of context.
import spark.sqlContext.implicits._

val corpus_df = txtRdd.toDF("corpus", "id")

While we're at it, we can paste it this handy snippet defining a function that will beautify our display of Spark `DataFrame`s in HTML.  (We'll use the `showHTML` function later.)

In [None]:
// based on a snippet by Ivan Zaitsev
// https://github.com/almond-sh/almond/issues/180#issuecomment-364711999
implicit class RichDF(val df: DataFrame) {
  def showHTML(limit:Int = 20, truncate: Int = 20) = {
    import xml.Utility.escape
    val data = df.take(limit)
    val header = df.schema.fieldNames.toSeq
    val rows: Seq[Seq[String]] = data.map { row =>
      row.toSeq.map { cell =>
        val str = cell match {
          case null => "null"
          case binary: Array[Byte] => binary.map("%02X".format(_)).mkString("[", " ", "]")
          case array: Array[_] => array.mkString("[", ", ", "]")
          case seq: Seq[_] => seq.mkString("[", ", ", "]")
          case _ => cell.toString
        }
        if (truncate > 0 && str.length > truncate) {
          // do not show ellipses for strings shorter than 4 characters.
          if (truncate < 4) str.substring(0, truncate)
          else str.substring(0, truncate - 3) + "..."
        } else {
          str
        }
      }: Seq[String]
    }

    publish.html(s"""
      <table class="table">
        <tr>
        ${header.map(h => s"<th>${escape(h)}</th>").mkString}
        </tr>
        ${rows.map { row =>
          s"<tr>${row.map { c => s"<td>${escape(c)}</td>" }.mkString}</tr>"
        }.mkString
        }
      </table>""")
  }
}

### 2. Tokenize

In [None]:
val tokenizer = new RegexTokenizer().setPattern("[\\W_]+").setMinTokenLength(minimumTokenLength).setInputCol("corpus").setOutputCol("tokens")
val tokenized_df = tokenizer.transform(corpus_df)


### 3. Filter out stop words

Well, think about a serious stop-word list at some point, but here's the technique.

In [None]:
val remover = new StopWordsRemover().setStopWords(stopWords).setInputCol("tokens").setOutputCol("filtered")
val filtered_df = remover.transform(tokenized_df)





### 4. Compute counts of each token for each text


In [None]:
val vectorizer = new CountVectorizer().setInputCol("filtered").setOutputCol("features").setVocabSize(vocabSize).setMinDF(5).fit(filtered_df)
val countVectors = vectorizer.transform(filtered_df).select("id", "features")



### 5. Create ("fit") LDA model

In [None]:
val lda = new LDA().setK(k).setMaxIter(iterations)
val model = lda.fit(countVectors)

### 6. Compute topics and their distribution in each document

Each topic is a set of terms with corresponding weights.


In [None]:
val topics = model.describeTopics(termsToDisplay)


In [None]:
topics.showHTML(truncate=1000)

#### Label topics

Digression:  for human readers, we'll replace index numbers for each term with the actual term.

1. Create a new DataFrame with ordered lists ot terms by looking up the term for each term index.
2. Number the rows of this DataFrame so we can join it with the existing topic data.

In [None]:
val topicLabels = topics.select("termIndices").map { case Row(r:  WrappedArray[Integer]) => r.map( i => vectorizer.vocabulary(i) ) }
val labelsNumberedLong = topicLabels.rdd.zipWithIndex.toDF("terms", "topicLong")
val labelsIndexed = labelsNumberedLong.withColumn("topic", $"topicLong".cast(IntegerType)).drop("topicLong")

val topicsWithTerms = labelsIndexed.join(topics, labelsIndexed.col("topic") === topics.col("topic")).drop(labelsIndexed.col("topic"))





In [None]:
val weightedLabels = topicsWithTerms.withColumn("termsWithWeight", expr("zip_with(terms, termWeights, (t,w) -> concat(t, ' ', w))"))


In [None]:
// Flat view
weightedLabels.select("topic", "termsWithWeight").showHTML(truncate=1000)



In [None]:
// Exploded view
val explodedTerms = weightedLabels.select(col("*"),explode(col("termsWithWeight"))).select("topic","col")

explodedTerms.showHTML(explodedTerms.count.toInt, 1000)

### Compute distribution of topics per document

In [None]:
val transformed = model.transform(countVectors)
transformed.printSchema // show(false)
transformed.showHTML(3, 1000)


### Exploring results

Almost 91% of document 2 (the third document) is assigned to topic 5.  Let's compare the contents of document 2 and the topic definition.

We can just index directly into our original Array of texts to see document 2:

In [None]:
upsilonScholiaText(2)
upsilonScholiaAscii.nodes(2)

We can set a condition on the `weightedLabels` data frame to filter it to topic 5.

In [None]:
val topic5 = weightedLabels.filter(weightedLabels("topic") === 5).select("termsWithWeight") //.showHTML(truncate=1000)



We can break the resulting array out to one element per line with Spark's `explode` method.


In [None]:
topic5.select( explode(col("termsWithWeight"))).showHTML(truncate=maxWidth)
