# Data Preparation/Processing using Pyspark

[Apache Spark](https://spark.apache.org/) website says that it is a `Lightning-fast unified analytics engine`.

PySpark is a flavour of Spark used for processing and analysing massive volumes of data. We all know how long data processing and cleaning can take as the size of the datasets increase. Enter - PySpark! It decrease the running time and is extremely efficient.

PySpark is the Python API of Spark; which means it can do almost all the things python can. Machine learning(ML) pipelines, exploratory data analysis (at scale), ETLs for data platform, and much more! And all of them in a distributed manner. One of the best parts of pyspark is that if you are already familiar with python, it's really easy to learn.

There is another language called *Scala* used for big data. Scala is usually 10 times faster than Python. But, since that requires learning another language, let's stick to PySpark.

## SparkNLP module
Need to write

## PySpark ML module
Need to write

## Install pyspark library

In [1]:
!pip install -q pyspark==3.3.0 spark-nlp==5.0.2

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.3/281.3 MB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m502.1/502.1 kB[0m [31m38.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m21.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
import sparknlp
spark = sparknlp.start()

print("Spark NLP version:", sparknlp.version())
print("Apache Spark version:", spark.version)

Spark NLP version: 5.0.2
Apache Spark version: 3.3.0


In [3]:
# ! cd ~/.ivy2/cache/com.johnsnowlabs.nlp/spark-nlp_2.12/jars && ls -lt

In [4]:
# from py4j.java_gateway import java_import
# java_import(spark._sc._jvm, "org.apache.spark.sql.api.python.*")

## Import packages

In [5]:
# pyspark packages
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession

# pyspark ml packages
from pyspark.ml import Pipeline
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.clustering import LDA
from pyspark.ml.feature import CountVectorizer, IDF

# sparknlp packages
from sparknlp.base import *
from sparknlp.annotator import *

# other packages
import re
import numpy as np
import pandas as pd
from functools import reduce

## Load Data
Read data from local directory

In [7]:
pandasDF = pd.read_csv('dataset.csv')
pandasDF.head()

Unnamed: 0,ID,TITLE,ABSTRACT,Computer Science,Physics,Mathematics,Statistics,Quantitative Biology,Quantitative Finance
0,1,Reconstructing Subject-Specific Effect Maps,Predictive models allow subject-specific inf...,1,0,0,0,0,0
1,2,Rotation Invariance Neural Network,Rotation invariance and translation invarian...,1,0,0,0,0,0
2,3,Spherical polyharmonics and Poisson kernels fo...,We introduce and develop the notion of spher...,0,0,1,0,0,0
3,4,A finite element approximation for the stochas...,The stochastic Landau--Lifshitz--Gilbert (LL...,0,0,1,0,0,0
4,5,Comparative study of Discrete Wavelet Transfor...,Fourier-transform infra-red (FTIR) spectra o...,1,0,0,1,0,0


In [8]:
pandasDF.shape

(20972, 9)

### Convert pandas dataframe into spark dataframe

Note: Usually the dataset is read from a database or a data lake storage on cloud.

In [9]:
# building a spark session is required
# call this spark session - pandas to spark
sparkS = SparkSession.builder.appName("pandas to spark").getOrCreate()

In [10]:
sparkDF = sparkS.createDataFrame(pandasDF)

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


In [11]:
sparkDF.show(5)

+---+--------------------+--------------------+----------------+-------+-----------+----------+--------------------+--------------------+
| ID|               TITLE|            ABSTRACT|Computer Science|Physics|Mathematics|Statistics|Quantitative Biology|Quantitative Finance|
+---+--------------------+--------------------+----------------+-------+-----------+----------+--------------------+--------------------+
|  1|Reconstructing Su...|  Predictive mode...|               1|      0|          0|         0|                   0|                   0|
|  2|Rotation Invarian...|  Rotation invari...|               1|      0|          0|         0|                   0|                   0|
|  3|Spherical polyhar...|  We introduce an...|               0|      0|          1|         0|                   0|                   0|
|  4|A finite element ...|  The stochastic ...|               0|      0|          1|         0|                   0|                   0|
|  5|Comparative study...|  Fourie

## Processing text columns

### User-defined functions required

There are (enter number) functions defined in this notebook to process the text columns in the dataframe and obtain the topics out of the data using `sparknlp` and `pyspark.ml` packages.
- `clean_sentence` - removes punctuations, special characters, additional spaces and any words with length < 3
- `getTopics` - obtains the terms from each of the topics from `sparkLDA` model

In [12]:
@udf("string")
def clean_sentence(sentence):

  '''function to clean up the sentences
  this removes punctuations, special characters,
  numbers, additional spaces in between
  words and remove any words whose length < 3.'''

  #replace non-alpha characters with spaces
  sentence = re.sub(r"[^a-z A-Z]", " ", sentence)

  #replace any extra spaces
  sentence = re.sub(r"/s+", "", sentence)

  #only retain words of length >= 3
  sentence = " ".join([ele for ele in sentence.split() if len(ele) >= 3])

  return sentence

In [13]:
@udf(ArrayType(StringType()))
def getTopics(token_list):

  '''funtion to get the terms from each of the
  topics from sparkLDA model'''

  tlist = [vocab[token_id] for token_id in token_list]
  return tlist

### Combine text columns
The two columns containing text are combined into one and create a new column `Text` which will be the inpiut for all the text processing, text clean-up and eventually the topic modeling. Retain only the text columns as the rest are not required for topic modeling

In [14]:
sparkDF = sparkDF.withColumn("Content", concat(col("Title"), lit(" "), col("Abstract")))
sparkDF = sparkDF.select("Title", "Abstract", "Content")
sparkDF.show(5)

+--------------------+--------------------+--------------------+
|               Title|            Abstract|             Content|
+--------------------+--------------------+--------------------+
|Reconstructing Su...|  Predictive mode...|Reconstructing Su...|
|Rotation Invarian...|  Rotation invari...|Rotation Invarian...|
|Spherical polyhar...|  We introduce an...|Spherical polyhar...|
|A finite element ...|  The stochastic ...|A finite element ...|
|Comparative study...|  Fourier-transfo...|Comparative study...|
+--------------------+--------------------+--------------------+
only showing top 5 rows



In [15]:
# apply the above function to clean up text and create a new column
# the regexp_replace function applied below cleans up strings
# which have more than 2 same letters consecutively in a word

sparkDF = sparkDF.withColumn("Text", clean_sentence(regexp_replace(col("Content"), \
                                                            r"(\w)\1{2}", "")))

In [16]:
sparkDF

DataFrame[Title: string, Abstract: string, Content: string, Text: string]

### Define spark-nlp pipelines
Define the following spark pipelines before topic modeling to
- tokenize
- lemmatize
- remove stopwords
- part of speech tagging
- retain only nouns and adjectives

In [17]:
documentAssembler = DocumentAssembler() \
                      .setInputCol("Text") \
                      .setOutputCol("document")

sentence = SentenceDetector() \
            .setInputCols("document") \
            .setOutputCol("sentence")

tokenizer = Tokenizer() \
              .setInputCols(["sentence"]) \
              .setOutputCol("token")

POSTag = PerceptronModel.pretrained() \
          .setInputCols("document", "token") \
          .setOutputCol("pos")

chunker = Chunker() \
            .setInputCols("sentence", "pos") \
            .setOutputCol("chunk") \
            .setRegexParsers(["<NN>", "<NNS>", "<NNP>", "<VB>", "<JJ>", "<ADJ>"])

lemmatizer = LemmatizerModel.pretrained() \
              .setInputCols(["token"]) \
              .setOutputCol("lemmatized")

stopwordsCleaner = StopWordsCleaner() \
                    .setStopWords(StopWordsRemover \
                            .loadDefaultStopWords("english")) \
                    .setInputCols(["lemmatized"]) \
                    .setOutputCol("unigram")

pos_anc download started this may take some time.
Approximate size to download 3.9 MB
[OK!]
lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]


In [46]:
pipeline1 = Pipeline().setStages(
              [documentAssembler,
               sentence,
               tokenizer,
               POSTag,
               chunker,
               lemmatizer,
               stopwordsCleaner
               ])

In [59]:
df = pipeline1.fit(sparkDF).transform(sparkDF)

In [55]:
# df.show(5)

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|               Title|            Abstract|             Content|                Text|            document|            sentence|               token|                 pos|               chunk|          lemmatized|             unigram|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Reconstructing Su...|  Predictive mode...|Reconstructing Su...|Reconstructing Su...|[{document, 0, 17...|[{document, 0, 17...|[{token, 0, 13, R...|[{pos, 0, 13, NNP...|[{chunk, 32, 37, ...|[{token, 0, 13, R...|[{token, 0, 13, R...|
|Rotation Invarian...|  Rotation invari...|Rotation Invarian...|Rota

In [60]:
finalDF = df.select("Text", col("unigram.result").alias("unigrams")) \
                      .withColumn("unigrams", clean_sentence(
                          regexp_replace(clean_sentence(
                              concat_ws(", ", array_distinct(
                                  split(regexp_replace(
                                          concat_ws(", ", col("unigrams")),
                                        "[^A-Za-z0-9]", " "),
                                      " "))
                              )), r"\s*[A-Z]\w*\s*", " "
                          ))
                      )

In [58]:
# finalDF.show(5)

+--------------------+--------------------+
|                Text|            unigrams|
+--------------------+--------------------+
|Reconstructing Su...|model allow subje...|
|Rotation Invarian...|invariance transl...|
|Spherical polyhar...|polyharmonics ker...|
|finite element ap...|finite element ap...|
|Comparative study...|study decompositi...|
+--------------------+--------------------+
only showing top 5 rows



In [61]:
# split the unigrams into separate terms
# filter to keep only the non-null entries

finalDF = finalDF.filter(col("unigrams").isNotNull()) \
        .withColumn("terms", split(col("unigrams"), " ")) \
        .select("Text", "terms")

In [62]:
finalDF.show(5)

+--------------------+--------------------+--------------------+
|                Text|            unigrams|               terms|
+--------------------+--------------------+--------------------+
|Reconstructing Su...|model allow subje...|[model, allow, su...|
|Rotation Invarian...|invariance transl...|[invariance, tran...|
|Spherical polyhar...|polyharmonics ker...|[polyharmonics, k...|
|finite element ap...|finite element ap...|[finite, element,...|
|Comparative study...|study decompositi...|[study, decomposi...|
+--------------------+--------------------+--------------------+
only showing top 5 rows



## Topic modeling
Use the pyspark ml libraries to obtain the topics and the terms along with their proabilities.

In [63]:
tfzer = CountVectorizer(inputCol = "terms", outputCol = "tfFeatures")
tfModel = tfzer.fit(finalDF)
tfResult = tfModel.transform(finalDF)

In [64]:
idfzer = IDF(inputCol = "tfFeatures", outputCol = "tfidfFeatures")
idfModel = idfzer.fit(tfResult)
tfidfResult = idfModel.transform(tfResult)

In [65]:
lda = LDA(k=5, maxIter = 5, featuresCol = "tfidfFeatures")
ldaModel = lda.fit(tfidfResult)

In [67]:
ldaModel.describeTopics(10).show(5)

+-----+--------------------+--------------------+
|topic|         termIndices|         termWeights|
+-----+--------------------+--------------------+
|    0|[53, 22, 1, 18, 1...|[0.00188610619231...|
|    1|[16, 133, 116, 4,...|[0.00128907686748...|
|    2|[251, 114, 335, 2...|[0.00198972364245...|
|    3|[26, 12, 27, 8, 5...|[0.00202644655618...|
|    4|[29, 7, 28, 10, 5...|[0.00256160719331...|
+-----+--------------------+--------------------+



In [68]:
vocab = tfModel.vocabulary

topics = ldaModel.describeTopics(20).withColumn("TopicTerms", \
                                                getTopics(col("termIndices"))) \
                                    .withColumnRenamed("termWeights", "TermProb") \
                                    .select("Topic", "TopicTerms", "TermProb")

In [69]:
topics.show()

+-----+--------------------+--------------------+
|Topic|          TopicTerms|            TermProb|
+-----+--------------------+--------------------+
|    0|[prove, algorithm...|[0.00188610619231...|
|    1|[system, control,...|[0.00128907686748...|
|    2|[temperature, ene...|[0.00198972364245...|
|    3|[function, two, g...|[0.00202644655618...|
|    4|[learn, propose, ...|[0.00256160719331...|
+-----+--------------------+--------------------+

