In [1]:
# Import Spark NLP
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
import sparknlp
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline

In [2]:
spark = SparkSession.builder \
    .appName("Spark NLP")\
    .config("spark.driver.memory","32G")\
    .config("spark.driver.maxResultSize", "2G") \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:4.1.0")\
    .config("spark.kryoserializer.buffer.max", "1000M")\
    .getOrCreate()


:: loading settings :: url = jar:file:/home/3147567/.conda/envs/reddit_env/lib/python3.9/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/3147567/.ivy2/cache
The jars for the packages stored in: /home/3147567/.ivy2/jars
com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e45ca135-8ed1-40ae-884d-60e42ac84df7;1.0
	confs: [default]
	found com.johnsnowlabs.nlp#spark-nlp_2.12;4.1.0 in central
	found com.typesafe#config;1.4.2 in central
	found org.rocksdb#rocksdbjni;6.29.5 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.828 in central
	found com.github.universal-automata#liblevenshtein;3.0.0 in central
	found com.google.code.findbugs#annotations;3.0.1 in central
	found net.jcip#jcip-annotations;1.0 in central
	found com.google.code.findbugs#jsr305;3.0.1 in central
	found com.google.protobuf#protobuf-java-util;3.0.0-beta-3 in central
	found com.google.protobuf#protobuf-java;3.0.0-beta-3 in central
	found com.google.code.gson#gson;2.3 in central
	found it.unimi.dsi#fastutil;7.0.12 in central
	found org.projectlombok#l

In [3]:
spark.version

'3.1.2'

In [4]:
NeutralFile = spark.read.parquet("../../Files/Submissions/score/done/Neutr_vacc_d.parquet")
ProFile = spark.read.parquet("../../Files/Submissions/score/done/Pro_vacc_d.parquet")
AntiFile = spark.read.parquet("../../Files/Submissions/score/done/Anti_vacc_d.parquet")

                                                                                

In [5]:
from sparknlp.base.document_assembler import DocumentAssembler
from sparknlp.base.finisher import Finisher
from sparknlp.annotator.stop_words_cleaner import StopWordsCleaner
from sparknlp.annotator.normalizer import Normalizer
from sparknlp.annotator.token import Tokenizer
from pyspark.ml.clustering import LDA
import pandas as pd
from pyspark.sql.functions import size, explode

In [7]:
import functools
def unionAll(dfs):
    return functools.reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs)

In [8]:
Total = unionAll([NeutralFile, ProFile, AntiFile])

In [6]:
sample_n = NeutralFile.sample(0.2)

In [10]:
sample = Total.sampleBy("class_II", fractions={
    0.0: 0.10,
    1.0: 0.10,
    2.0: 0.10
}, seed=42)

In [7]:

# remove stopwords
document_assembler = DocumentAssembler() \
    .setInputCol("cleanText") \
    .setOutputCol("document") 
    # .setCleanupMode("")
# Split sentence to tokens(array)
tokenizer = Tokenizer() \
  .setInputCols(["document"]) \
  .setOutputCol("token")
# clean unwanted characters and garbage
# normalizer = Normalizer() \
#     .setInputCols(["token"]) \
#     .setOutputCol("normalized")

stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("token") \
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)

finisher = Finisher() \
    .setInputCols(["cleanTokens"]) \
    .setOutputCols(["tokens"]) \
    .setOutputAsArray(True) \
    .setCleanAnnotations(False)

nlp_pipeline = Pipeline(
    stages=[
        document_assembler,
            tokenizer,
            # normalizer,
            stopwords_cleaner,  
            finisher])

In [8]:
# train the pipeline
nlp_model = nlp_pipeline.fit(sample_n)

In [9]:
# apply the pipeline to transform dataframe.
processed_df  = nlp_model.transform(sample_n)

In [10]:
tokens_df = processed_df.select('subreddit', 'score', 'created_utc','tokens')
tokens_df.count()

                                                                                

473254

In [11]:
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="tokens", outputCol="features", minDF=3.0, maxDF=0.3, vocabSize=5000)
# train the model
cv_model = cv.fit(tokens_df)
# transform the data. Output column name will be features.
vectorized_tokens = cv_model.transform(tokens_df)

                                                                                

In [12]:
k=7
lda = LDA(k=k, maxIter=10,  topicConcentration=0.5)
model = lda.fit(vectorized_tokens)
ll = model.logLikelihood(vectorized_tokens)
lp = model.logPerplexity(vectorized_tokens)

topics = model.describeTopics(maxTermsPerTopic = 10)
vocab = cv_model.vocabulary
print(f"Log Likelihood of the model {ll}")
print(f"Log Perplexity of the model {lp}")
topics_rdd = topics.rdd
topics_words = topics_rdd\
       .map(lambda row: row['termIndices'])\
       .map(lambda idx_list: [vocab[idx] for idx in idx_list])\
       .collect()
for idx, topic in enumerate(topics_words):
    print("topic: {}".format(idx))
    print("*"*25)
    for word in topic:
       print(word)
    print("*"*25)

22/10/28 09:40:10 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/10/28 09:40:10 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

Log Likelihood of the model -37739477.21228218
Log Perplexity of the model 7.649306893528074
topic: 0
*************************
[NUM]
remove
coronavirus
police
biden
say
man
covid
covid[NUM]
woman
*************************
topic: 1
*************************
m
like
good
think
remove
look
pandemic
covid
new
people
*************************
topic: 2
*************************
[NUM]
virus
china
people
_
corona
coronavirus
world
market
covid[NUM]
*************************
topic: 3
*************************
[NUM]
people
[num]
year
like
know
time
state
think
work
*************************
topic: 4
*************************
[NUM]
new
case
coronavirus
india
covid[NUM]
report
 
china
york
*************************
topic: 5
*************************
trump
mask
[NUM]
coronavirus
people
say
covid
delete
wear
house
*************************
topic: 6
*************************
[NUM]
coronavirus
test
covid[NUM]
[url]
covid[num]
covid
death
case
positive
*************************


In [20]:
transformed = model.transform(vectorized_tokens)

In [13]:
topics = model.describeTopics(maxTermsPerTopic = 5000)

In [54]:
topics = model.describeTopics(maxTermsPerTopic = 50)

In [None]:
topics_rdd = topics.rdd
topics_words = topics_rdd\
       .map(lambda row: row['termIndices'])\
       .map(lambda idx_list: [vocab[idx] for idx in idx_list])\
       .collect()
for idx, topic in enumerate(topics_words):
    print("topic: {}".format(idx))
    print("*"*25)
    for word in topic:
       print(word)
    print("*"*25)

topic: 0
*************************
[NUM]
covid
dose
get
covid[NUM]
day
feel
pfizer
vaccinate
shot
vaccination
m
say
like
covid[num]
good
people
second
go
new
effect
remove
coronavirus
week
arm
time
booster
moderna
[num]
work
experience
hour
start
know
year
ve
take
want
health
need
receive
pain
bad
help
month
mandate
think
today
unvaccinated
symptom
*************************
topic: 1
*************************
[NUM]
covid
dose
get
covid[NUM]
day
feel
vaccinate
pfizer
shot
vaccination
m
say
like
covid[num]
second
good
people
go
new
remove
effect
coronavirus
[num]
week
arm
time
booster
moderna
experience
work
hour
start
know
year
ve
take
want
receive
need
health
pain
bad
month
help
mandate
think
symptom
today
unvaccinated
*************************
topic: 2
*************************
[NUM]
covid
dose
get
covid[NUM]
day
feel
pfizer
vaccinate
shot
vaccination
m
say
like
covid[num]
good
people
second
go
remove
new
effect
coronavirus
week
time
arm
booster
moderna
[num]
work
experience
hour
start

In [14]:
#Get Topic Term Distribution
term_topics = topics.coalesce(1).toPandas()
wordict = dict(zip(term_topics['termIndices'][0], term_topics['termWeights'][0]))
wordict1 = dict(zip(term_topics['termIndices'][1], term_topics['termWeights'][1]))
wordict2 = dict(zip(term_topics['termIndices'][2], term_topics['termWeights'][2]))
wordict3 = dict(zip(term_topics['termIndices'][3], term_topics['termWeights'][3]))
wordict4 = dict(zip(term_topics['termIndices'][4], term_topics['termWeights'][4]))
wordict5 = dict(zip(term_topics['termIndices'][5], term_topics['termWeights'][5]))
wordict6 = dict(zip(term_topics['termIndices'][6], term_topics['termWeights'][6]))
df = pd.DataFrame([wordict, wordict1, wordict2, wordict3, wordict4, wordict5, wordict6  ]) # 
df.to_pickle('../../Files/models/topics/term_topics_n_7_2.pkl')

In [15]:
# get doc_topic distribution
transformed = model.transform(vectorized_tokens)
out = transformed[['subreddit', 'score','created_utc', 'topicDistribution']]

pdf = out.toPandas() # , 'topic_4', 'topic_5', 'topic_6'
pdf[['topic_0', 'topic_1', 'topic_2', 'topic_3', 'topic_4', 'topic_5', 'topic_6']] = pd.DataFrame(pdf['topicDistribution'].to_list(), columns=['topic_0', 'topic_1', 'topic_2', 'topic_3', 'topic_4', 'topic_5', 'topic_6'])

pdf.drop(columns='topicDistribution', inplace=True)
pdf.to_pickle("../../Files/models/topics/doctop_n_7_distr_2.pickle")

                                                                                

In [16]:
#Get Doc Length
countdf = tokens_df.select(size('tokens').alias('doc_len'))
counts = countdf.toPandas()
counts.to_pickle('../../Files/models/topics/doclen_n_7_2.pkl')

                                                                                

In [17]:
# Get Vocabulary
vocab = cv_model.vocabulary
with open('../../Files/models/topics/n_7_vocab_2.txt', 'w') as file:
    for item in vocab:
        file.write(f'{item} \n')

In [18]:
# get term frequency
import pyspark.sql.functions as f
combined_df = (
tokens_df.select(f.explode('tokens').alias('col'))
      .select(f.collect_list('col').alias('tokens'))
)
counts = cv_model.transform(combined_df).select('features').collect()
Tf = dict(zip(vocab, counts[0]['features'].values))
Tf_v = list(Tf.values())
with open('../../Files/models/topics/n_7_tf_2.txt', 'w') as file:
    for item in Tf_v:
        file.write(f'{item} \n')

                                                                                

# DONT LOOK HERE

In [126]:
doc_len = tokens_df.select('tokens').collect()

                                                                                

In [143]:


countdf = tokens_df.select(size('tokens').alias('doc_len'))

In [145]:
docLen = countdf.toPandas()

                                                                                

In [147]:
countdf.write.csv("../../Files/models/doclen.csv")

                                                                                

In [14]:
transformed = model.transform(vectorized_tokens)

In [74]:
out = transformed[['subreddit', 'score','created_utc', 'topicDistribution']]

In [103]:
import pyspark.sql.functions as f

In [109]:
combined_df = (
tokens_df.select(f.explode('tokens').alias('col'))
      .select(f.collect_list('col').alias('tokens'))
)

In [112]:
counts = cv_model.transform(combined_df).select('features').collect()

                                                                                

In [114]:
Tf = dict(zip(cv_model.vocabulary, counts[0]['features'].values))

In [122]:
Tf_v = list(Tf.values())

In [77]:
import pandas as pd
pdf = out.toPandas()

Exception in thread "serve-DataFrame" java.net.SocketTimeoutException: Accept timed out
	at java.net.PlainSocketImpl.socketAccept(Native Method)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
	at java.net.ServerSocket.implAccept(ServerSocket.java:560)
	at java.net.ServerSocket.accept(ServerSocket.java:528)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:64)
                                                                                

In [78]:
pdf.head()

Unnamed: 0,subreddit,score,created_utc,topicDistribution
0,altnewz,8,1586963819,"[0.3334551586768651, 0.3339027337180743, 0.332..."
1,altnewz,1,1600297040,"[0.3336362057577024, 0.3326746584696878, 0.333..."
2,altnewz,1,1599356238,"[0.33321316659717315, 0.333259202922333, 0.333..."
3,altnewz,1,1599130056,"[0.327864080763441, 0.35364011028479353, 0.318..."
4,altnewz,1,1602832667,"[0.33341178135066096, 0.3327875545975474, 0.33..."


In [83]:
pdf2 = pdf.copy()

In [81]:
pdf2 = pd.DataFrame(pdf['topicDistribution'].to_list(), columns=['topic_0', 'topic_1', 'topic_2'])

In [85]:
pdf2[['topic_0', 'topic_1', 'topic_2']] = pd.DataFrame(pdf['topicDistribution'].to_list(), columns=['topic_0', 'topic_1', 'topic_2'])

In [89]:
pdf2.head()

Unnamed: 0,subreddit,score,created_utc,topic_0,topic_1,topic_2
0,altnewz,8,1586963819,0.333455,0.333903,0.332642
1,altnewz,1,1600297040,0.333636,0.332675,0.333689
2,altnewz,1,1599356238,0.333213,0.333259,0.333528
3,altnewz,1,1599130056,0.327864,0.35364,0.318496
4,altnewz,1,1602832667,0.333412,0.332788,0.333801


In [91]:
pdf2.to_pickle('../../Files/models/topics_a_3_td.pickle')

In [88]:
pdf2.drop(columns='topicDistribution', inplace=True)

In [96]:
pdf.to_pickle("../../Files/models/topics_n_7_distr.pickle")

In [16]:
# extract vocabulary from CountVectorizer

topics_rdd = topics.rdd
topics_words = topics_rdd\
       .map(lambda row: row['termIndices'])\
       .map(lambda idx_list: [vocab[idx] for idx in idx_list])\
       .collect()
for idx, topic in enumerate(topics_words):
    print("topic: {}".format(idx))
    print("*"*25)
    for word in topic:
       print(word)
    print("*"*25)

topic: 0
*************************
NUM
people
vaccine
like
m
know
think
covid
go
want
get
time
say
work
thing
life
good
feel
need
year
ve
way
come
tell
vaccinate
day
death
right
die
try
bad
mask
look
world
find
take
post
believe
url
virus
remove
new
make
start
person
test
live
see
num
cause
happen
kill
government
talk
stop
help
long
mean
man
job
point
let
ask
child
vaccination
lot
actually
health
shit
end
fuck
family
give
care
friend
love
human
use
ass
case
woman
wear
change
big
well
question
reason
week
guy
shot
kid
leave
hate
month
mandate
pay
medium
doctor
real
state
*************************
topic: 1
*************************
people
NUM
vaccine
like
m
know
think
covid
go
want
get
time
say
work
thing
feel
life
good
ve
need
year
way
come
tell
vaccinate
day
right
try
death
die
bad
mask
world
look
find
take
believe
post
url
new
remove
make
start
virus
person
live
see
talk
cause
happen
kill
num
help
stop
mean
government
fuck
long
job
point
man
let
ask
lot
actually
child
end
health
test


In [27]:
term_topics = topics.coalesce(1).toPandas()

In [32]:
import pandas as pd

In [28]:
term_topics.head()

Unnamed: 0,topic,termIndices,termWeights
0,0,"[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13,...","[0.02407657465086515, 0.02137233258698163, 0.0..."
1,1,"[1, 0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13,...","[0.021785314593184744, 0.02101035696233249, 0...."
2,2,"[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13,...","[0.02578380621754275, 0.021577177569482697, 0...."


In [33]:
split_df = pd.DataFrame(term_topics['termIndices'].tolist())

In [35]:
indeces = term_topics['termIndices'][0]

In [38]:
words = [vocab[word] for word in indeces ]

In [57]:
wordict = dict(zip([vocab[word] for word in term_topics['termIndices'][0]], term_topics['termWeights'][0]))
wordict1 = dict(zip([vocab[word] for word in term_topics['termIndices'][1]], term_topics['termWeights'][1]))
wordict2 = dict(zip([vocab[word] for word in term_topics['termIndices'][2]], term_topics['termWeights'][2]))

In [127]:
wordict = dict(zip(term_topics['termIndices'][0], term_topics['termWeights'][0]))
wordict1 = dict(zip(term_topics['termIndices'][1], term_topics['termWeights'][1]))
wordict2 = dict(zip(term_topics['termIndices'][2], term_topics['termWeights'][2]))

In [128]:
df = pd.DataFrame([wordict, wordict1, wordict2])

In [71]:
df.to_pickle('../../Files/models/test.pkl')

In [150]:
df.head()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,95,94,99,100,96,97,98,102,101,104
0,0.024077,0.021372,0.020059,0.012459,0.01053,0.009011,0.008714,0.008228,0.007627,0.00747,...,0.001962,0.001941,0.001934,0.001925,0.00192,,,,,
1,0.02101,0.021785,0.019893,0.01275,0.010856,0.009169,0.008917,0.008264,0.007767,0.007692,...,0.001941,0.001982,,,0.001946,0.001979,0.001957,0.001925,,
2,0.025784,0.021577,0.019976,0.012191,0.010509,0.00899,0.008712,0.008391,0.007616,0.007419,...,0.001917,0.001927,,,0.001934,,0.001918,,0.001904,0.001902


In [34]:
terms_topics['words'] = word for words in 

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,90,91,92,93,94,95,96,97,98,99
0,0,1,2,3,4,5,6,7,8,9,...,90,91,89,93,92,95,94,99,100,96
1,1,0,2,3,4,5,6,7,8,9,...,90,92,94,97,93,98,88,96,95,102
2,0,1,2,3,4,5,6,7,8,9,...,90,91,92,96,94,93,98,95,101,104


In [126]:
model4.save('../../Files/models/topic_a_7_d.pickle')

                                                                                

In [8]:
from pyspark.ml.clustering import DistributedLDAModel

In [9]:
model5 = DistributedLDAModel.load('../../Files/models/topic_a_7_d.pickle')

                                                                                

In [15]:
topics = model5.describeTopics()

                                                                                

In [23]:
topics_rdd = topics.rdd
topics_words = topics_rdd\
       .map(lambda row: row['termIndices'])\
       .map(lambda idx_list: [vocab[idx] for idx in idx_list])\
       .collect()
for idx, topic in enumerate(topics_words):
    print("topic: {}".format(idx))
    print("*"*25)
    for word in topic:
       print(word)
    print("*"*25)

topic: 0
*************************
NUM
people
vaccine
like
m
know
think
covid
go
want
*************************
topic: 1
*************************
NUM
people
vaccine
like
m
know
think
covid
go
want
*************************
topic: 2
*************************
NUM
people
vaccine
like
m
know
think
covid
go
want
*************************


In [130]:
model2.save('../../Files/models/topic_a_all.pickle')

In [151]:
model3.save('../../Files/models/topic_p_n.pickle')

In [12]:
lda = LDA(k=7, maxIter=10)