In [None]:
%scala
val storage_container = "data"
val storage_account_name = "<NAME>"
val storage_account_access_key = "<KEY>"
val storage_base_url = s"wasbs://${storage_container}@${storage_account_name}.blob.core.windows.net"

val file_type = "parquet"
val train_data_url = s"${storage_base_url}/data/train/train_data/*.${file_type}"
val  test_data_url = s"${storage_base_url}/data/test/test_data/*.${file_type}"

In [None]:
%scala
import org.apache.spark.sql.functions.{col, when, split, size, lower, concat_ws, regexp_replace, input_file_name}
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, StringType}
import org.apache.spark.sql.SparkSession

val spark = ( SparkSession.builder()
                          .appName("Pre-Processing-2")
                          .config(s"fs.azure.account.key.${storage_account_name}.blob.core.windows.net", storage_account_access_key)
                          .getOrCreate() )
val sc = spark.sparkContext

In [None]:
%scala
val stopWordsEN = Array(
 "i","me","my","myself",
 "we","our","ours","ourselves",
 "you","you're","you've","you'll","you'd","your","yours","yourself","yourselves",
 "he","him","his","himself",
 "she","she's","her","hers","herself",
 "it","it's","its","itself",
 "they","them","their","theirs","themselves",
 "what","which","who","whom",
 "this","that","that'll","these","those",
 "am","is","are","was","were","be","been","being",
 "have","has","had","having",
 "do","does","did","doing",
 "a","an","the",
 "of","at","by","for","with",
 "and", "as",
 "about","against","between","into","through","during",
 "before","after",
 "above","below",
 "to","from","up","down","in","out","on","off","over","under",
 "again","further","then","once","here","there",
 "when","where","why","how","all","any","both","each",
 "s","t",
 "can","will","just",
 "should","should've",
 "now","d","ll","m","o","re","ve","y","ma"
)

In [None]:
%scala
import com.johnsnowlabs.nlp.annotator.{DocumentNormalizer, Tokenizer, LemmatizerModel, PerceptronModel, StopWordsCleaner}
import com.johnsnowlabs.nlp.base.DocumentAssembler
import org.apache.spark.ml.Pipeline
import spark.implicits._

val documentAssembler = new DocumentAssembler()
  .setInputCol("comment")
  .setOutputCol("document")

val documentNormalizer = new DocumentNormalizer()
  .setInputCols("document")
  .setOutputCol("normalizedDocument")
  .setAction("clean")
  .setPatterns(Array("<[^>]>", """[^\w\d\s]"""))
  .setReplacement(" ")
  .setPolicy("pretty_all")
  .setLowercase(true)

val tokenizer = new Tokenizer()
  .setInputCols(Array("normalizedDocument"))
  .setOutputCol("token")

val stopWords = new StopWordsCleaner()
  .setInputCols("token")
  .setOutputCol("cleanTokens")
  .setStopWords(stopWordsEN)

val lemmatizer = LemmatizerModel.pretrained()
  .setInputCols(Array("cleanTokens"))
  .setOutputCol("lemma")

val posTagger = PerceptronModel.pretrained("pos_ud_ewt", "en")
  .setInputCols(Array("normalizedDocument", "cleanTokens"))
  .setOutputCol("posTag")

val pipeline = new Pipeline()
  .setStages(Array(
    documentAssembler,
    documentNormalizer,
    tokenizer,
    stopWords,
    lemmatizer,
    posTagger
  ))

In [None]:
%scala
val schema = StructType(
  StructField("id"     , IntegerType, true) ::
  StructField("rating" , IntegerType, true) ::
  StructField("type"   , IntegerType, true) ::
  StructField("comment", StringType , true) :: Nil
)

val trainDF = spark.read.format(file_type)
                        .schema(schema)
                        .load(train_data_url)

val testDF  = spark.read.format(file_type)
                        .schema(schema)
                        .load(test_data_url)

val transformedTrainDF = pipeline.fit(trainDF)
                                 .transform(trainDF)
                                  .select(
                                    col("id"),
                                    col("rating"),
                                    col("type").as("sentiment"),
                                    col("lemma")("result").as("comment"),
                                    col("posTag")("result").as("posTag")
                                  )
                                 .cache()

val transformedTestDF  = pipeline.fit(testDF)
                                 .transform(testDF)
                                  .select(
                                    col("id"),
                                    col("rating"),
                                    col("type").as("sentiment"),
                                    col("lemma")("result").as("comment"),
                                    col("posTag")("result").as("posTag")
                                  )
                                 .cache()

In [None]:
%scala
transformedTrainDF.show(5, 50)

In [None]:
%scala
transformedTestDF.show(5, 50)

In [None]:
%scala
transformedTrainDF
       .withColumn("comment", concat_ws("|", col("comment") ).as("comment") )
       .withColumn("posTag" , concat_ws("|", col("posTag") ).as("posTag") )
       .repartition(1)
       .write
       .mode("overwrite")
       .option("header", "true")
       .format("com.databricks.spark.csv")
       .save(s"wasbs://${storage_container}@${storage_account_name}.blob.core.windows.net/data/train/transformed_train")

transformedTestDF
       .withColumn("comment", concat_ws("|", col("comment") ).as("comment") )
       .withColumn("posTag" , concat_ws("|", col("posTag") ).as("posTag") )
       .repartition(1)
       .write
       .mode("overwrite")
       .option("header", "true")
       .format("com.databricks.spark.csv")
       .save(s"wasbs://${storage_container}@${storage_account_name}.blob.core.windows.net/data/test/transformed_test")

In [None]:
%scala
transformedTrainDF
       .repartition(1)
       .write
       .mode("overwrite")
       .format("parquet")
       .save(s"wasbs://${storage_container}@${storage_account_name}.blob.core.windows.net/data/train/transformed_train")

transformedTestDF
       .repartition(1)
       .write
       .mode("overwrite")
       .format("parquet")
       .save(s"wasbs://${storage_container}@${storage_account_name}.blob.core.windows.net/data/test/transformed_test")