# Create an index of lemmas with Spark from any corpora

The goal of this notebook is twofold. For another project I'm currently working on (the [Quickref](https://gitlab.common-lisp.net/quickref/quickref) project), I need a big list of various [lemmas](https://en.wikipedia.org/wiki/Lemma) but a classical list didn't fit my need. So I decide to develop this tool (currently at notebook state) to create the list of the most common lemmas of a corpus. I choose Spark to do it, first of all because I have big corpora and secondly because I want to practice Scala and Spark.

I imagine this tool as a pipeline of transformation to get lemmas from a pool of file. Therefore, I'm considering the following architecture :
1. list('path/to/files')
2. map(path => content)
3. flatMap(content => token)
4. filter(token => clean_token)
5. map_reduce(frequency of tokens) 
5. sort

All these steps will be completly distributed with DataSet.

# Import 
For this project, we will use some extra packages that we need to download/import. We will do it with the torree magic %AddJar.

In [1]:
%AddJar http://dl.bintray.com/spark-packages/maven/databricks/spark-corenlp/0.3.1-s_2.11/spark-corenlp-0.3.1-s_2.11.jar

Starting download from http://dl.bintray.com/spark-packages/maven/databricks/spark-corenlp/0.3.1-s_2.11/spark-corenlp-0.3.1-s_2.11.jar
Finished download of spark-corenlp-0.3.1-s_2.11.jar


In [2]:
%AddJar https://repo1.maven.org/maven2/edu/stanford/nlp/stanford-corenlp/3.9.1/stanford-corenlp-3.9.1-models.jar

Starting download from https://repo1.maven.org/maven2/edu/stanford/nlp/stanford-corenlp/3.9.1/stanford-corenlp-3.9.1-models.jar
Finished download of stanford-corenlp-3.9.1-models.jar


In [3]:
import org.apache.spark.sql.Row
import scala.io.Source
import com.databricks.spark.corenlp.functions._

In [4]:
// Create a spark session

val sparkSession = SparkSession.builder
                                    .appName("lemmas_index")
                                    .master("local")
                                    .getOrCreate()
val sc = sparkSession.sparkContext

sparkSession = org.apache.spark.sql.SparkSession@424d5745
sc = org.apache.spark.SparkContext@6846c0b8


org.apache.spark.SparkContext@6846c0b8

# Loading

First, I need to load my corpus. The corpus I will use in this notebook is constituted of all the READMEs of libraries referenced by [Quicklisp](https://www.quicklisp.org/beta/). I put it in the same directory of the notebook to facilitate this step. Because I don't think I have the right to upload this corpus on my repository, I will use it locally. Just replace the content of `corpus_directory` by the path of your own corpus.

In [5]:
// Create a DataSet from the list of files in corpus directory (converted into string)

import java.io.File

val corpus_directory = new File("./corpus/")
val pathsDS = corpus_directory.listFiles.filter(_.isFile).map(path => path.getPath()).toList.toDS()
pathsDS.show()

+--------------------+
|               value|
+--------------------+
|./corpus/cl-grnm....|
|./corpus/cl-cooki...|
|./corpus/osicat.r...|
|./corpus/oe-encod...|
|./corpus/lastfm.r...|
|./corpus/inferior...|
|./corpus/asd-gene...|
|./corpus/linewise...|
|./corpus/listofli...|
|./corpus/cl-scram...|
|./corpus/definiti...|
|./corpus/intel-he...|
|./corpus/pooler.r...|
|./corpus/lichat-t...|
|./corpus/cl-steam...|
|./corpus/lichat-s...|
|./corpus/cl-olefs...|
|./corpus/docutils...|
|./corpus/ernestin...|
|./corpus/png-read...|
+--------------------+
only showing top 20 rows



corpus_directory = ./corpus
pathsDS = [value: string]


<console>:6: error: Symbol 'type scala.AnyRef' is missing from the classpath.
This symbol is required by 'class org.apache.spark.sql.catalyst.QualifiedTableName'.
Make sure that type AnyRef is in your classpath and check for conflicting dependencies with `-Ylog-classpath`.
A full rebuild may help if 'QualifiedTableName.class' was compiled against an incompatible version of scala.
  lazy val $print: String =  {
           ^


[value: string]

In [6]:
// Read content of all files and replace path with it. We also need to remove empty document for further computations

val contentsDS = pathsDS.map{path => {val source = Source.fromFile(path)
                                      try source.mkString
                                      finally source.close()}}.filter(c => c.length > 1)
contentsDS.show(1)

+--------------------+
|               value|
+--------------------+
|# cl-grnm

Common...|
+--------------------+
only showing top 1 row



contentsDS = [value: string]


[value: string]

# Tokenization and Lemmatization
Thanks to the spark-corenlp package, we can easily lemmatize our text. The lemma function given by spark-corenlp already tokenize sentences, so we can skip this step. When we have our token list lemmatized, we need to filter all the crap out of it.

In [7]:
// Lemmatize contents but before, remove all empty rows

val rawLemmaDS = contentsDS.select(lemma('value).as('lemma))
rawLemmaDS.show()

+--------------------+
|               lemma|
+--------------------+
|[#, cl-grnm, comm...|
|[#, cl-cookie, -l...|
|[-lsb-, !, -lsb-,...|
|[*, introduction,...|
|[#, lastfm, Inter...|
|[INFERIOR-SHELL, ...|
|[#, asd-generator...|
|[linewise-templat...|
|[*, overview, the...|
|[#, cl-scram, ##,...|
|[Project, 's, hom...|
|[#, intel-hex, -,...|
|[##, about, Licha...|
|[##, about, cl-st...|
|[##, about, Licha...|
|[#, +, title, :, ...|
|[., ., -, *, -, r...|
|[Ernestine, =====...|
|[portable, Networ...|
+--------------------+
only showing top 20 rows



rawLemmaDS = [lemma: array<string>]


[lemma: array<string>]

In [8]:
// Filter all the crap
val lemmasByDocDS = rawLemmaDS.map{
    case Row(tokens: Seq[String]) => tokens.filter(_.matches("^[A-Za-z][A-Za-z-]+[A-Za-z]$")).map(_.toLowerCase)
}
lemmasByDocDS.show()

+--------------------+
|               value|
+--------------------+
|[cl-grnm, common-...|
|[cl-cookie, build...|
|[build, status, o...|
|[introduction, th...|
|[lastfm, interfac...|
|[inferior-shell, ...|
|[asd-generator, a...|
|[linewise-templat...|
|[overview, the, l...|
|[cl-scram, introd...|
|[project, home, d...|
|[intel-hex, libra...|
|[pooler, trivial,...|
|[about, lichat-tc...|
|[about, cl-steamw...|
|[about, lichat-se...|
|[title, cl-olef, ...|
|[rst, cl-docutils...|
|[ernestine, build...|
|[portable, networ...|
+--------------------+
only showing top 20 rows



lemmasByDocDS = [value: array<string>]


           case Row(tokens: Seq[String]) => tokens.filter(_.matches("^[A-Za-z][A-Za-z-]+[A-Za-z]$")).map(_.toLowerCase)
                            ^


[value: array<string>]

In [9]:
// transform DataSet[tokens: array<string>] into DataSet[token: string] by flatmaping it

val lemmasDS = lemmasByDocDS.flatMap(_.iterator)
lemmasDS.show()

+---------------+
|          value|
+---------------+
|        cl-grnm|
|    common-lisp|
| implementation|
|            the|
|grid-restrained|
|            and|
|    traditional|
|    nelder-mead|
|      algorithm|
|     authorship|
|           this|
|        package|
|     originally|
|          write|
|          mario|
|         mommer|
|            and|
|           this|
|           fork|
|       maintain|
+---------------+
only showing top 20 rows



lemmasDS = [value: string]


[value: string]

# Regroup and sort it
Now, the last step. We will regroup individual lemmas together, and then sort our dataset by number lemmas.

In [10]:
// Group by lemma and count individual lemma

val lemmasCount = lemmasDS.groupBy('value).count()
lemmasCount.show()

+----------------+-----+
|           value|count|
+----------------+-----+
|      convergent|    3|
|             few|  127|
|            some|  896|
|           input|  470|
|           still|  207|
|       recognize|   39|
|           those|  185|
|             art|    5|
|           trail|   17|
|       unlicense|   18|
|        incoming|   14|
|         degrade|    1|
|          harder|    6|
|          online|   23|
|        priority|   75|
|    evaluateable|    2|
|     requirement|   49|
|browser-specific|    1|
|     interaction|   22|
|         persist|   14|
+----------------+-----+
only showing top 20 rows



lemmasCount = [value: string, count: bigint]


[value: string, count: bigint]

In [11]:
// The last step is to sort all the lemmas !
val sortedLemmas = lemmasCount.orderBy('count.desc)
sortedLemmas.show()

+--------+-----+
|   value|count|
+--------+-----+
|     the|29883|
|     and|10443|
|     for| 6983|
|     you| 5976|
|     use| 5186|
|    this| 4930|
|function| 4757|
|    that| 4503|
|    lisp| 4180|
|    with| 4140|
|     can| 3624|
|     not| 3392|
|   value| 2972|
|    name| 2938|
|  return| 2690|
|    will| 2610|
|    list| 2446|
|    have| 2380|
|    from| 2360|
|    test| 2200|
+--------+-----+
only showing top 20 rows



sortedLemmas = [value: string, count: bigint]


[value: string, count: bigint]

# Export
Tada ! We have our new list of lemmas sorted by their frequency, extracted from a corpus ! That was not so hard ! To finalize this project, we just need to export it as a csv file.

In [13]:
// First we need to coalesce all partitions into 1, and then we can output it in a single csv file

sortedLemmas.coalesce(1).write.csv("./lemmas_frequency.csv")

lastException: Throwable = null
