In [None]:
#helps to find the pyspark path
import findspark
findspark.init()

import sparknlp
spark = sparknlp.start()


In [None]:
spark

In [None]:
#import sql functions
from pyspark.sql import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id

In [None]:
# Read JSON file into dataframe
df = spark.read.json("arxiv-metadata-oai-snapshot.json")

In [None]:
df.dtypes

In [None]:
df.count()

In [None]:
#read categories description file
cat_desc_df=spark.read.option("header",True).csv('Category_Descriptions.csv')

In [None]:
cat_desc_df.show(truncate=False)

In [None]:
#create temporary view for meta data
df.createOrReplaceTempView("arxiv_metadata")

In [None]:
#select required cols from the above temp view
df_sub = spark.sql("SELECT id,title,abstract,categories,authors FROM arxiv_metadata")

In [None]:
df_sub.dtypes

In [None]:
#fetch the year from versions column; if a doc has multiple versions then fetch max versions year
latest_version = df.select('versions').rdd.map(lambda x: int(x[-1][-1][0][-17:-13])).collect()

In [None]:
latest_version[0:10]

In [None]:
#create pyspark df for year to merge with main df i.e. df_sub
lv_df = spark.createDataFrame([(i,) for i in latest_version], ['Year'])

#joining df_sub and lv_df dataframes
df_sub = df_sub.withColumn("row_id",
                           row_number().over(Window.orderBy(monotonically_increasing_id())))
lv_df = lv_df.withColumn("row_id",
                         row_number().over(Window.orderBy(monotonically_increasing_id())))

transformed_df = df_sub.join(lv_df, df_sub.row_id == lv_df.row_id).drop("row_id")

#left join with transformed_df, all the cats are not in category_desc file
transformed_df=transformed_df.join(cat_desc_df,transformed_df.categories == cat_desc_df.categories,
                    "left").select(transformed_df['*'], cat_desc_df.cat_desc)
transformed_df.show()

In [None]:
auth_id_df=transformed_df.select(['id','authors','Year','categories'])

from pyspark.sql.functions import split, explode
auth_id_df=auth_id_df.withColumn('authors',explode(split('authors',',|\n| and ')))

In [None]:
auth_id_df.show(truncate=False)

In [None]:
#create temporary view on top the transormed data
transformed_df.createOrReplaceTempView("transformed_df")

auth_id_df.createOrReplaceTempView("auth_id_df")

In [None]:
import plotly.express as px

In [None]:
#EDA

print('Total Number of Research Papers:',transformed_df.count())


In [None]:
#Yearly trend
rp_year_df=spark.sql('''
                        SELECT Year
                            ,count(DISTINCT id) as CNT
                        FROM transformed_df 
                        WHERE Year<2022
                        GROUP BY 1
                        ORDER BY CNT ASC
                        
                        ''').toPandas()

In [None]:
#yearly trend
fig_violationtrend = px.line(rp_year_df, x="Year", 
                             y='CNT', 
                             title='Yearly Trend of Research Publications')

fig_violationtrend.update_layout(yaxis_title="#of Research Papers Published")
fig_violationtrend.show()

In [None]:
##
rp_cat_year_df=spark.sql('''
                        SELECT TEMP_B.* FROM(
                        SELECT TEMP_A.*,ROW_NUMBER() OVER(PARTITION BY TEMP_A.Year 
                                            ORDER BY TEMP_A.CNT DESC) as row_n
                        FROM
                        (
                        SELECT categories,Year,count(DISTINCT id) as CNT
                        FROM transformed_df 
                        WHERE Year>2016 AND Year<2022
                        GROUP BY 1,2) TEMP_A
                        )TEMP_B WHERE TEMP_B.row_n<=3
                        
                        ''').toPandas()


In [None]:
rp_cat_year_df

In [None]:
fig_rp_cat_year_df = px.bar(rp_cat_year_df,
             y='CNT',
             x="Year",
            color='categories',
            text_auto=True
                 )

fig_rp_cat_year_df.update_layout(yaxis_title="#of Research Papers Published",
                                 title='Research Papers of Top 3 Categories Year Wise (last 5 years)')
fig_rp_cat_year_df.show()

In [None]:
print('Total Number of Distinct Authors:',
     spark.sql("SELECT count(DISTINCT trim(authors)) FROM auth_id_df").collect())

<b>Pipeline for Spark NLP operations</b> (https://nlp.johnsnowlabs.com/docs/en/annotators)

•	<b>Document Assembler:</b> Prepares data into a format that is processable by Spark NLP (more in above link)

•	<b>Sentence Detector:</b> Detects sentence boundaries using any provided approach.

•	<b>Tokenizer:</b> Tokenizes raw text into word pieces, tokens

•	<b>Normalizer:</b> Removes all dirty characters from text following a regex pattern and transforms words based on a provided   dictionary.

•	<b>Lemmatizer:</b> Finds lemmas out of words with the objective of returning a base dictionary word.

•	<b>Stopwords Cleaner:</b> This annotator takes a sequence of strings (e.g. the output of a Tokenizer, Normalizer, Lemmatizer, and Stemmer) and drops all the stop words from the input sequences.

•	<b>Finisher:</b> Converts annotation results into a format that easier to use. It is useful to extract the results from Spark NLP Pipelines.




In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer
import pyspark.sql.functions as F
from sparknlp.annotator import *
from sparknlp.base import *
from sparknlp.pretrained import PretrainedPipeline

In [None]:
from nltk.corpus import stopwords
eng_stopwords = stopwords.words('english')

In [None]:
#annotate data to document type, which will be used further
doc_assembler = DocumentAssembler().setInputCol('abstract').setOutputCol('doc')

sentence_detector = SentenceDetector().setInputCols(['doc']).setOutputCol('sentences')

#each word in the doc/sentence is tokenized
tokenization = Tokenizer().setInputCols(['sentences']).setOutputCol('tokenizer_out')

#normalization, converts to lower case, removes special characters
normalization = Normalizer().setInputCols(['tokenizer_out']).setOutputCol('normalized_out') \
     .setLowercase(True).setCleanupPatterns(["[^\w\d\s]"])

#lemmatization, converts to root word
lemmatization = Lemmatizer().setInputCols(["normalized_out"]).setOutputCol("lemma_out") \
    .setDictionary("AntBNC_lemmas_ver_001.txt.txt",value_delimiter ="\t", key_delimiter = "->")

#removes stopwords like is, the etc.
rm_stopwords= StopWordsCleaner().setInputCols(['lemma_out']).setOutputCol('rm_stopwords_out') \
     .setCaseSensitive(False).setStopWords(eng_stopwords)


#readable output
readable_out_finisher = Finisher().setInputCols(['rm_stopwords_out']) \
     .setOutputCols('finshed_lemma').setCleanAnnotations(False)



In [None]:
#building the pipeline
nlpPipeline = Pipeline(stages=[doc_assembler
                               ,sentence_detector
                               ,tokenization
                               ,normalization
                               ,lemmatization
                               ,rm_stopwords
                               ,readable_out_finisher
                               ])

In [None]:
#filter the data for TF-IDF operation as this operation is taking time, 
#filter on categories only starts with 'cs' and year>2016

trans_sub_df = spark.sql('''
                            SELECT * 
                            FROM transformed_df 
                            where categories LIKE 'cs.CV' 
                                and Year>2016 AND Year<2022
                            ''')

In [None]:
trans_sub_df.count()

In [None]:
trans_sub_df.dtypes

In [None]:
abstract_df=trans_sub_df.select('abstract')

In [None]:
abstract_nlp_trans = nlpPipeline.fit(abstract_df).transform(abstract_df)

In [None]:
abstract_lemma_df = abstract_nlp_trans.select('finshed_lemma')

In [None]:
type(abstract_lemma_df)

In [None]:
abstract_lemma_df.count()

<b>Term Frequency and IDF</b> (https://spark.apache.org/docs/latest/ml-features.html#tf-idf)

<b>Term Frequency:</b> 

•	We find frequency of each word that is appearing in the text. Output would be term frequency vector.

•	We can achieve the above by CountVectorizer(CV) or HashingTF. Obviously, there are advantages and disadvantages for both methods. 

•	<b>HasingTF:</b>

    • It is faster but suffers collision effect i.e., same hasing index can be appeared to multiple terms.
    • Because of collision effect results might not be that accurate when compared to CV method 
    • It is hard to understand in the sense that TF vector hard to trace back to the original words in a doc

•	<b>CountVectorizer:</b>

    • Gives more accurate results
    • Easy understand back track meaning the TF vectors can be traced back to the exact word
    • Suffers performance issues on larger datasets


<b>IDF (Inverse Document Frequency):</b> 

•	It takes feature vectors created from above TF methods and downs the weight of features which appear frequently in the collection documents/texts/corpus. 

•	Output from this can be used in ML algorithms like classification, topic modeling etc.



In [None]:
#term frequency computation
from pyspark.ml.feature import CountVectorizer
#minDF = 2 means "ignore terms that appear in less than 2 documents"
#maxDF = 0.50 means "ignore terms that appear in more than 50% of the documents".

countvectorizer = CountVectorizer().setInputCol("finshed_lemma").setOutputCol("features_cv") \
                 .setMinDF(10).setMaxDF(0.75)
model_cv = countvectorizer.fit(abstract_lemma_df)
model_cv_out = model_cv.transform(abstract_lemma_df)

In [None]:
#IDF computation
from pyspark.ml.feature import IDF
model_idf = IDF(inputCol="features_cv", outputCol="features_idf").fit(model_cv_out)
#.setInputCol("features_cv").setOutputCol("features_idf")

#model_IDF_fit = model_idf.fit(model_cv_out)
model_IDF_out = model_idf.transform(model_cv_out)

In [None]:
type(model_IDF_out)

<b>NLU (Natural Language Understanding) </b>
(Ref: https://medium.com/nanonets/topic-modeling-with-lsa-psla-lda-and-lda2vec-555ff65b0b05)

We try to understand the document by analysing its topics.

<b>Topic modeling: </b>

    •From the corpus of documents/texts, we try to recognize and extract topics across the collection of documents
    •Each document can have mixture of topics and each topic consists of collection of words

<b>There many ways to extract the latent/hidden topics from collection of documents. Below are some of them</b>

    •LDA (Latent Dirichlet Allocation)
    •LSA (Latent Semantic Analysis)
    •pLSA (Probabilistic Semantic Analysis)
    •BERT Topic modelling

<b>LDA (Latent Dirichlet Allocation): </b> (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.clustering.LDA.html)

    •Finds document-topic and word-topic distributions using Dirichlet priors.
    •Simply put distributions over distributions, meaning given a particular type of distribution what are probability distributions, we expect to see.
    •LDA model is given collection of docs as input where each doc is specified as vector of length VocabSize, each entry is count for corresponding word in the document.





In [None]:
#topic modeling using LDA(Latent Dirichlet Allocation)
from pyspark.ml.clustering import LDA
lda = LDA(k=20,maxIter=100,featuresCol='features_idf')
model_LDA = lda.fit(model_IDF_out)

In [None]:
transformed_LDA = model_LDA.transform(model_IDF_out)
#transformed_LDA.show(truncate=False)

In [None]:
transformed_LDA.dtypes

In [None]:
#function to fetch words from vocabulary
import pyspark.sql.types as T
docs_word_list = model_cv.vocabulary
def fetch_words(token_list):
    return [docs_word_list[i] for i in token_list]
fun_to_words = F.udf(fetch_words, T.ArrayType(T.StringType()))

In [None]:
#
num_top_words = 50
topics_LDA = model_LDA.describeTopics(num_top_words).withColumn('words_in_topic', fun_to_words(F.col('termIndices')))
#topics_LDA.select('topic', 'words_in_topic').show(truncate=False)

In [None]:
docid_df=trans_sub_df.select('id')

#joining df_sub and lv_df dataframes
docid_df = docid_df.withColumn("row_id",
                           row_number().over(Window.orderBy(monotonically_increasing_id())))
model_IDF_out = model_IDF_out.withColumn("row_id",
                         row_number().over(Window.orderBy(monotonically_increasing_id())))

Final_model_IDF_out = model_IDF_out.join(docid_df, model_IDF_out.row_id == docid_df.row_id).drop("row_id")

In [None]:
topics_LDA = topics_LDA.withColumn("row_id",
                         row_number().over(Window.orderBy(monotonically_increasing_id())))

Final_topics_LDA = topics_LDA.join(docid_df, topics_LDA.row_id == docid_df.row_id).drop("row_id")

In [None]:
transformed_LDA = transformed_LDA.withColumn("row_id",
                         row_number().over(Window.orderBy(monotonically_increasing_id())))

Final_transformed_LDA = transformed_LDA.join(docid_df, transformed_LDA.row_id == docid_df.row_id).drop("row_id")

In [None]:
print('trans_sub_df DataTypes:\n',trans_sub_df.dtypes,'\n')

print('Final_model_IDF_out DataTypes:\n',Final_model_IDF_out.dtypes,'\n')

print('Final_topics_LDA DataTypes:\n',Final_topics_LDA.dtypes,'\n')

print('Final_transformed_LDA DataTypes:\n',Final_transformed_LDA.dtypes,'\n')

In [None]:

trans_sub_df.write.mode('overwrite').parquet("trans_sub_df.parquet")

In [None]:
Final_model_IDF_out.write.mode('overwrite').parquet("trans_sub_df_tfidf.parquet")

In [None]:
Final_topics_LDA.write.mode('overwrite').parquet("topics_LDA.parquet")

In [None]:
Final_transformed_LDA.write.mode('overwrite').parquet("transformed_LDA_TopicDist.parquet")

In [None]:
import pandas as pd
test_2=pd.read_parquet("trans_sub_df_tfidf.parquet",engine='fastparquet')

In [None]:
spark.stop()