In [ ]:
import scala.collection.JavaConversions._
import scala.util.control.Breaks._
import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.language.postfixOps

import java.io._
import java.nio.file.{Files, Path, Paths}

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
import org.apache.spark.ml.feature.IDF

//import opennlp.tools.langdetect._ // custom implementation
import opennlp.tools.lemmatizer.DictionaryLemmatizer
import opennlp.tools.postag.{POSModel, POSTaggerME}
import opennlp.tools.tokenize.{TokenizerME, TokenizerModel}

//object Helpers {
    def using[A <: { def close(): Unit }, B](resource: A)(f: A => B): B =
        try {
            f(resource)
        } catch {
            case _ : Throwable => throw new Exception("file exception")
        } finally {
            resource.close()
        }
//}

//import Helpers._
type MapLangToStrings = scala.collection.mutable.Map[String, List[String]]
type MapLangToStringsI = scala.collection.immutable.Map[String, List[String]]

// since sparkContext is unavailable as var here, I can't just use it like in "Spark Dataset 101", "Spark 101"
// could be related to customDepts bug, because I added opennlp dependency:
// https://github.com/spark-notebook/spark-notebook/issues/563
val spark = SparkSession
  .builder()
  .appName("words")
  .config("spark.driver.allowMultipleContexts", "true")
  .master("local")
  .getOrCreate()
import spark.implicits._
val sparkContext = spark.sparkContext

object NLP {  
  def getLangs : Seq[String] = {
    val de : String = "de"
    val en : String = "en"
    val fr : String = "fr"
    Seq(de, en, fr)
  }
  
  def getStopwordsPerLang(langs : Seq[String]) : MapLangToStringsI = {
    langs map { lang => 
      (lang, using(scala.io.Source.fromFile(s"notebooks/words/vocabs/$lang-stopwords.txt")) 
        { source => source.getLines.toList })
    } toMap  
  }
  
  def getFilesPaths : Seq[String] = {
      Files.newDirectoryStream(Paths.get("notebooks/words/text-data"))
           .filter(_.getFileName.toString.endsWith(".txt"))
           .map(_.toString)
           .toSeq
  }
}

case class TfIdfFile(val id:(String,String), val tokens:Array[String], val idfCoefs:Vector[Double]) extends Serializable

class NLP(val stopwordsPerLang: MapLangToStringsI, val textfilesPaths: Seq[String]) extends Serializable {
  def this() = this(NLP.getStopwordsPerLang(NLP.getLangs), NLP.getFilesPaths)

  def process = {
    getFilePathsPerLang(textfilesPaths) map { case (lang, textPaths) => //Future {
        val onlp = new OpenNLP(lang)
  
        val ls : Seq[((String,String), Array[String])] = textPaths map { path =>
          using(scala.io.Source.fromFile(path)) { source =>
            val text = source.getLines.mkString
            val unnoisedText = removeTextNoise(text)
                                               
            val tokens = onlp.tokenize(unnoisedText)
            val tokensExcludeStopWords = removeStopWords(lang, tokens, stopwordsPerLang)

            val lemmas = onlp.lemmatize(tokensExcludeStopWords)
            val lemmd = (tokensExcludeStopWords zip lemmas) map (tuple => if(tuple._2 != "O") tuple._2 else tuple._1 ) // if no lemma => original
            ((lang,path),lemmd.toArray)
          }}
        val df = spark.createDataFrame(ls).toDF("id", "words") 
        
        val tf_ = new CountVectorizer()
              .setInputCol("words")
              .setOutputCol("rawFeatures") 
              .fit(df)
        val tf = tf_.transform(df)        

        val tfidf = new IDF()
              .setInputCol("rawFeatures")
              .setOutputCol("features")
              .fit(tf)
              .transform(tf)
        //tfidf.printSchema
        //val vocabulary = tf_.vocabulary
                                                         
        //for(v <- tfidf) println(v(0) + "\n" + v(1) + "\n" + v(3) + "\n\n")
        //for(v <- tfidf) { println(v(3)(0)); println(v(3)(1)); println(v(3)(2)); }
        (tf_.vocabulary, tfidf.map(t => 
                                        TfIdfFile(t(0).asInstanceOf[(String, String)], 
                                                  t(1).asInstanceOf[Array[String]], 
                                                  t(3).asInstanceOf[Vector[Double]])).collect)
        // 0 - (lang,filePath)
        // 1 - words - output to text
        // 3 - vocab ids + tfidf coefs
    } foreach { case (voc, tifs) => 
      for(v <- voc) print(v + " ")
      for(tif <- tifs) {
        println(tif.id)         
        for(t <- tif.tokens) print(t + " ")
        println(" ")
        for(c <- tif.idfCoefs) print(c + " ")
      }
    }
    
    //}          //break; // test single file
  }
  
  def removeTextNoise(text:String) : String = {
    val removedNumbers = text.filter(!_.isDigit)
    // https://stackoverflow.com/questions/30074109/removing-punctuation-marks-form-text-in-scala-spark
    val removedWordsOfSizeLessEqual2AndPunctuation = removedNumbers.replaceAll("""([\p{Punct}]|\b\p{IsLetter}{1,2}\b)\s*""", " ")
    // https://stackoverflow.com/questions/6198986/how-can-i-replace-non-printable-unicode-characters-in-java
    val removedUnicodes = removedWordsOfSizeLessEqual2AndPunctuation.replaceAll("""[\p{C}]""", " ")
    val replacedEscapeSeqWithSpace =  removedUnicodes.replaceAll("""[\t\n\r\f\v]""", " ")
    replacedEscapeSeqWithSpace
  }

  def removeStopWords(lang: String, tokens:Seq[String], stopwordsPerLang : MapLangToStringsI) : Seq[String] = {
     tokens.filter(!stopwordsPerLang(lang).contains(_))
  }
  
  def getFilePathsPerLang(textfilePaths : Seq[String]) : MapLangToStrings = { // TODO: to MapLangToStringsI
    val textfilesPathsPerLang: MapLangToStrings = scala.collection.mutable.Map.empty
  
    for(file <- textfilePaths) {
      using(scala.io.Source.fromFile(file)) { source => 
        val firstLine = source.getLines.next() // detect language with first line, TODO: use a few random lines in the middle of the text
        detectLang(firstLine, stopwordsPerLang) match  {
          case Some(lang) => {
            var list = textfilesPathsPerLang.getOrElse(lang, List.empty)
            textfilesPathsPerLang.update(lang, list:+file)        
          }
          case None => println("Language was not detected for file: $file") // side effect
        }                                              
      }
    }
    textfilesPathsPerLang
  } 
  
  /*
    Before I googled Apache OpenNLP, I implemented custom language recognizer based on -stopwords.txt.
    Since some external libs are using dictionary approach anyway (https://github.com/optimaize/language-detector):
    stopwords are commonly found in the speech,
    stopwords dictionary is relatively small and stopwords of 3 langs provided differ a lot.
  */
  def detectLang(line : String, stopwordsPerLang : MapLangToStringsI) : Option[String] = {
    val langs = line.split(" ").flatMap(item => stopwordsPerLang.filter(_._2.exists(_.equalsIgnoreCase(item))).map(_._1))
                    .groupBy(f => f)
                    .map(g => (g._1, g._2.size))
    if(langs.isEmpty) None
    else Some(langs.maxBy(_._2)._1)
  } 
}

class OpenNLP(val tokenizerModel: TokenizerModel, val posModel : POSModel, val lemmatizer : DictionaryLemmatizer) {
  def this(lang:String) = this(OpenNLP.loadTokenizerModel(lang), OpenNLP.loadPOSModel(lang), OpenNLP.loadLemmatizer(lang))

  val tokenizer = new TokenizerME(tokenizerModel)
  val posTagger = new POSTaggerME(posModel)

  def tokenize(text: String): Seq[String] = {
    val positions = tokenizer.tokenizePos(text)
    val strings = positions.map {
      pos => text.substring(pos.getStart, pos.getEnd)
    }
    strings.filter(_.length > 1).map(s => s.toLowerCase) // additional cleanup after regexps & to lower case
  }
  
  def lemmatize(tokens:Seq[String]): Seq[String] = {
    val tags = posTagger.tag(tokens.toArray)
    lemmatizer.lemmatize(tokens.toArray, tags)
  }
}

object OpenNLP {
  def loadTokenizerModel(lang:String): TokenizerModel = {
    using(new FileInputStream(s"notebooks/words/vocabs/$lang-token.bin")) { stream =>
      new TokenizerModel(stream)
    }
  }
  
  def loadPOSModel(lang:String): POSModel = {
    using(new FileInputStream(s"notebooks/words/vocabs/$lang-pos-maxent.bin")) { stream =>
      new POSModel(stream)
    }
  }
  
  def loadLemmatizer(lang:String): DictionaryLemmatizer = {
    using(new FileInputStream(s"notebooks/words/vocabs/$lang-lemmatizer-columns-reordered.txt")) { stream =>
      new DictionaryLemmatizer(stream)
    }
  }
}

val nlp = new NLP()
nlp.process

org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:839)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:839)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:371)
  at org.apache.spark.sql.execution.Spark