In [152]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Window, Row
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql import functions as F
from pyspark.sql import types as T
from nltk.corpus import stopwords
eng_stopwords = stopwords.words('english')

In [2]:
import sparknlp
spark = sparknlp.start()

In [3]:
# conf = SparkConf().set("spark.jars", "./spark-nlp_2.11-2.4.5.jar")

In [4]:
from pyspark.ml import Pipeline

from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *

In [7]:
spark

In [8]:
sparknlp.version()

'2.4.5'

In [9]:
df = spark.read\
    .option("encoding", "UTF-8")\
    .option("delimiter", ",")\
    .option("parserLib", "univocity")\
    .option("multiLine", "true")\
    .option("escape", "\"")\
    .csv("tweets", header=True)

In [285]:
df.limit(10).toPandas().drop(columns='user_id')

Unnamed: 0,status_id,created_at,text,lang
0,1238253442063310848,2020-03-13T00:00:00Z,The UFC is about to be the most popular sport ...,en
1,1238253441778098177,2020-03-13T00:00:00Z,The great toilet paper depression of 2020 #Toi...,en
2,1238253440486313988,2020-03-13T00:00:00Z,The 'Spotlight Show' with @janeyleegrace on @u...,en
3,1238253439051870208,2020-03-13T00:00:00Z,Because we all the time in the world right? @s...,en
4,1238253440821649408,2020-03-13T00:00:00Z,French pastry chef shows off Easter eggs model...,en
5,1238253442034020354,2020-03-13T00:00:00Z,ICYMI - Hour 2 of #TheGamePlan with @DaveWNSP ...,en
6,1238253441564266496,2020-03-13T00:00:00Z,"With rising #Coronavirus cases in India, which...",en
7,1238253441517928448,2020-03-13T00:00:00Z,#ICYMI: #Ontario #MPPs may temporarily suspend...,en
8,1238253440603541504,2020-03-13T00:00:00Z,Despite having only 3 confirmed #coronavirus c...,en
9,1238253440461135873,2020-03-13T00:00:00Z,Autonomous #Robots Are Helping Kill #Coronavir...,en


In [37]:
document_assembler = DocumentAssembler() \
    .setInputCol("text")

# sentence_detector = SentenceDetector() \
#     .setInputCols(["document"]) \
#     .setOutputCol("sentence")

tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")

normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normal")\
    .setCleanupPatterns(['[^A-Za-z]', 'http.*'])\
    .setLowercase(True)

stopwords_cleaner = StopWordsCleaner() \
    .setInputCols(["normal"]) \
    .setOutputCol("clean") \
    .setCaseSensitive(False) \
    .setStopWords(eng_stopwords)

# lemmatizer = LemmatizerModel.pretrained() \
#     .setInputCols(["clean"]) \
#     .setOutputCol("lemma")

lemmatizer=LemmatizerModel.load('lemma_antbnc_en_2.0.2_2.4_1556480454569/')\
    .setInputCols(["clean"]) \
    .setOutputCol("lemma")

stemmer = Stemmer() \
    .setInputCols(["lemma"]) \
    .setOutputCol("stem")

# word_embeddings=ElmoEmbeddings.pretrained()\
#     .setInputCols(["document", "lemma"])\
#     .setOutputCol("embedding")

# word_embeddings=ElmoEmbeddings.load('elmo_en_2.4.0_2.4_1580488815299/')\
#     .setInputCols(["document", "lemma"])\
#     .setOutputCol("embedding")


# sentence_embeddings=SentenceEmbeddings()\
#     .setInputCols(["document", "embedding"])\
#     .setOutputCol("sentence_embedding")\
#     .setPoolingStrategy("AVERAGE")


ngrams_cum = NGramGenerator() \
    .setInputCols(["stem"]) \
    .setOutputCol("ngram") \
    .setN(2) \
    .setEnableCumulative(True)\
    .setDelimiter("_") # Default is space


finisher = Finisher() \
    .setInputCols(["ngram"]) \
    .setOutputCols(['tokens'])\

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]


In [38]:
pipeline = Pipeline(stages=[
    document_assembler, 
#     sentence_detector, 
    tokenizer, 
    normalizer, 
    stopwords_cleaner,
    lemmatizer,
    stemmer,
#     word_embeddings,
#     sentence_embeddings,
    ngrams_cum,
    finisher
])

In [39]:
result = pipeline.fit(df).transform(df)

In [40]:
result = result.select(
        'status_id',
        'created_at',
        F.from_unixtime(F.unix_timestamp('created_at', 'yyyy-MM-dd')).cast('timestamp').alias('date'),
        F.explode("tokens").alias("word")
        )

In [41]:
result = result.groupBy(['word', 'date'])\
    .agg(F.countDistinct("status_id").alias("count"))

In [42]:
result_df = result.toPandas()
result_df

Unnamed: 0,word,date,count
0,ufc,2020-03-13,1
1,popular,2020-03-13,1
2,sport,2020-03-13,15
3,entir,2020-03-13,1
4,world,2020-03-13,17
...,...,...,...
6636,univers_amp,2020-03-13,1
6637,self,2020-03-13,2
6638,perhap,2020-03-13,1
6639,face_disciplin,2020-03-13,1


In [110]:
result.write\
    .option("encoding", "UTF-8")\
    .option("delimiter", ",")\
    .option("parserLib", "univocity")\
    .option("multiLine", "true")\
    .option("escape", "\"")\
    .mode("overwrite")\
    .parquet("word_count.parquet")

In [None]:
result = spark.read\
    .option("encoding", "UTF-8")\
    .option("delimiter", ",")\
    .option("parserLib", "univocity")\
    .option("multiLine", "true")\
    .option("escape", "\"")\
    .parquet("word_count.parquet")

In [119]:
result2 = result.groupby('word')\
    .agg(F.sum('count').alias('count'))\
    .orderBy(F.desc('count'))

In [120]:
words = [row[0] for row in result2.limit(1000).select('word').collect()]
if 'date' in words:
    words.remove('date')

In [121]:
result3 = result.groupBy('date').pivot('word', words).max('count').orderBy('date').fillna(0)

In [122]:
result3.toPandas()

Unnamed: 0,date,coronaviru,covid,coronaviruspandem,peopl,get,go,ne,time,spread,...,french,egg,pastri_chef,easteregg_coronaviru,ncaatourna,hour_thegameplan,nfl_ncaa,railwai,rise_coronaviru,effort_bid
0,2020-03-13,239,143,77,32,29,27,24,24,23,...,1,1,1,1,1,1,1,1,1,1


In [248]:
df_cases = spark.read.csv('covid_numconf.csv', header=True)

In [249]:
days = lambda i: i * 86400

In [272]:
from pyspark.sql.window import Window

In [283]:
df_cases = df_cases.select(
        F.from_unixtime(F.unix_timestamp('date', 'yyyy-MM-dd')).cast('timestamp').alias('date'),
        F.col('numconf').cast('Long'),
        F.lit(1).alias('temp')
    )

window = Window.partitionBy('temp').orderBy('date')

df_cases = df_cases.withColumn("numconf_lead3", (F.lead('numconf', 3).over(window)))
df_cases = df_cases.withColumn("numconf_lead7", (F.lead('numconf', 7).over(window)))

In [284]:
df_cases.toPandas()

Unnamed: 0,date,numconf,temp,num_conf_lead3,num_conf_lead7
0,2020-03-01,24,1,39.0,62.0
1,2020-03-02,28,1,45.0,77.0
2,2020-03-03,33,1,51.0,90.0
3,2020-03-04,39,1,57.0,103.0
4,2020-03-05,45,1,62.0,138.0
5,2020-03-06,51,1,77.0,176.0
6,2020-03-07,57,1,90.0,193.0
7,2020-03-08,62,1,103.0,249.0
8,2020-03-09,77,1,138.0,324.0
9,2020-03-10,90,1,176.0,424.0


In [136]:
result4 = result3.join(df_cases, on='date', how='left')

In [137]:
result4.stat.corr("coronaviru", "numconf")

nan

In [None]:
result4.write\
    .option("encoding", "UTF-8")\
    .option("delimiter", ",")\
    .option("parserLib", "univocity")\
    .mode("overwrite")\
    .parquet("word_count_pivot.parquet")

In [None]:
result4.persist()

In [None]:
corr_coefs_lead3 = [result4.stat.corr(w, 'numconf_lead3') for w in words]

In [151]:
corr_coefs = [result4.stat.corr(w, 'numconf') for w in words]

KeyboardInterrupt: 

In [140]:
sc = SparkContext.getOrCreate()

In [None]:
rdd = sc.parallelize(zip(words, corr_coefs))
corr_rdd = rdd.map(lambda x: Row(word=x[0], corr=x[1]))
corr_df = spark.createDataFrame(corr_rdd)

In [None]:
corr_df.write\
    .mode("overwrite")\
    .parquet("word_correlation.parquet")