In [87]:
import pandas as pd # provide sql-like data manipulation tools. very handy.
pd.options.mode.chained_assignment = None
import numpy as np # high dimensional vector computing library.
from copy import deepcopy
from string import punctuation
from random import shuffle

import gensim
from gensim.models.word2vec import Word2Vec # the word2vec model gensim class
LabeledSentence = gensim.models.doc2vec.LabeledSentence # we'll talk about this down below

from tqdm import tqdm
tqdm.pandas(desc="progress-bar")

from nltk.tokenize import sent_tokenize, word_tokenize 

from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer

import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions as F

spark=SparkSession.builder \
                .master("local[4]") \
                .appName("string operations") \
                .config("spark.driver.memory","2g") \
                .config("spark.executor.memory","4g") \
                .getOrCreate()

In [90]:
sc=spark.sparkContext

# Load Data

In [88]:
def parseLine(line):
    processed_article = re.sub(r'@\w+', '', line)
    processed_article = re.sub(r'http\S+', '', processed_article)
    processed_article = re.sub(r'\s+', ' ', processed_article)
    
    return (processed_article)

In [89]:
# Okurken okunan sütunlar secilebilir.
df=spark.read \
         .option("sep",",") \
         .option("header","True") \
         .option("inferSchema","True") \
         .csv("realDonaldTrumps_tweets.csv") \
         .cache()

In [91]:
df.toPandas().head(10)

Unnamed: 0,t_id,t_date,t_text
0,1121407497741447168,4/25/2019 13:35,b'RT @RepAndyBiggsAZ: I\xe2\x80\x99m grateful ...
1,1121407411875647488,4/25/2019 13:35,b'RT @RepMattGaetz: Volume 1 of the Mueller Re...
2,1121406614093860864,4/25/2019 13:32,b'RT @RepMarkMeadows: We knew they wouldn\xe2\...
3,1121406415707484160,4/25/2019 13:31,"""b'RT @dcexaminer: """"The collusion delusion fe..."
4,1121406299013513217,4/25/2019 13:31,b'RT @RepMarkMeadows: Reminder: Democrats dema...
5,1121406061519544322,4/25/2019 13:30,b'RT @Jim_Jordan: It\xe2\x80\x99s time to figu...
6,1121405753364025344,4/25/2019 13:29,"""b""""RT @GOPoversight: Democrats are obsessed w..."
7,1121405495405948929,4/25/2019 13:27,"""b'RT @Jim_Jordan: Peter Strzok told us that h..."
8,1121388967444799488,4/25/2019 12:22,b'Welcome to the race Sleepy Joe. I only hope ...
9,1121385795481423873,4/25/2019 12:09,b'.....Despite the fact that the Mueller Repor...


# Clean Text

In [99]:
from pyspark.sql.functions import *

In [100]:
df2=df.withColumn("t_date",F.to_timestamp(F.col("t_date"),"MM/dd/yyyy HH:mm")) \
        .withColumn("t_text", F.lower(F.col("t_text"))) \
        .withColumn("t_text",F.regexp_replace(F.col("t_text"),"b'","")) \
        .withColumn("t_text",F.regexp_replace(F.col("t_text"),"''","")) \
        .withColumn("t_text",F.regexp_replace(F.col("t_text"),"''",""))

In [101]:
df2.printSchema()

root
 |-- t_id: long (nullable = true)
 |-- t_date: timestamp (nullable = true)
 |-- t_text: string (nullable = true)



In [102]:
df2.toPandas().head(10)

Unnamed: 0,t_id,t_date,t_text
0,1121407497741447168,2019-04-25 13:35:00,rt @repandybiggsaz: i\xe2\x80\x99m grateful fo...
1,1121407411875647488,2019-04-25 13:35:00,rt @repmattgaetz: volume 1 of the mueller repo...
2,1121406614093860864,2019-04-25 13:32:00,rt @repmarkmeadows: we knew they wouldn\xe2\x8...
3,1121406415707484160,2019-04-25 13:31:00,"""rt @dcexaminer: """"the collusion delusion fell..."
4,1121406299013513217,2019-04-25 13:31:00,rt @repmarkmeadows: reminder: democrats demand...
5,1121406061519544322,2019-04-25 13:30:00,rt @jim_jordan: it\xe2\x80\x99s time to figure...
6,1121405753364025344,2019-04-25 13:29:00,"""b""""rt @gopoversight: democrats are obsessed w..."
7,1121405495405948929,2019-04-25 13:27:00,"""rt @jim_jordan: peter strzok told us that he ..."
8,1121388967444799488,2019-04-25 12:22:00,welcome to the race sleepy joe. i only hope yo...
9,1121385795481423873,2019-04-25 12:09:00,.....despite the fact that the mueller report ...


In [103]:
# rt olanları cıkartalım.
df3=df2 \
        .filter(~(
                F.col("t_text").contains("rt")  
))

print(df2.count())
print(df3.count())

800
475


In [104]:
# bastan ve sondan boslukları silelim
df4=df3 \
        .withColumn("t_id",F.trim(F.col("t_id"))) \
        .withColumn("t_date",F.trim(F.col("t_date"))) \
        .withColumn("t_text",F.trim(F.col("t_text"))) 
    
df5.toPandas().head(10)

Unnamed: 0,t_id,t_date,t_text
0,1121388967444799488,2019-04-25 12:22:00,welcome to the race sleepy joe. i only hope yo...
1,1121382698742841344,2019-04-25 11:57:00,....mueller was not fired and was respectfully...
2,1121187494312255489,2019-04-24 23:01:00,the great state of tennessee is so close to pa...
3,1121138875638792192,2019-04-24 19:48:00,.@senmikelee of the great state of utah has wr...
4,1121121705970286592,2019-04-24 18:40:00,"as one united nation, we will work, we will pr..."
5,1121120816983363584,2019-04-24 18:36:00,"today, @flotus melania and i were honored to j..."
6,1121053578603397120,2019-04-24 14:09:00,i didn\xe2\x80\x99t call bob costa of the wash...
7,1121049166615142400,2019-04-24 13:52:00,"....congress has no time to legislate, they on..."
8,1121048120312389634,2019-04-24 13:47:00,"no collusion, no obstruction - there has never..."
9,1121044792945926144,2019-04-24 13:34:00,can anyone comprehend what a great job border ...


In [105]:
# http removes
# removing https
expr = "http.*"
df5 = df4.filter(~(F.col("t_text").rlike(expr)))

+-------------------+-------------------+--------------------+
|               t_id|             t_date|              t_text|
+-------------------+-------------------+--------------------+
|1121388967444799488|2019-04-25 12:22:00|welcome to the ra...|
|1121382698742841344|2019-04-25 11:57:00|....mueller was n...|
|1121187494312255489|2019-04-24 23:01:00|the great state o...|
|1121121705970286592|2019-04-24 18:40:00|as one united nat...|
|1121120816983363584|2019-04-24 18:36:00|today, @flotus me...|
|1121049166615142400|2019-04-24 13:52:00|....congress has ...|
|1121048120312389634|2019-04-24 13:47:00|no collusion, no ...|
|1121044792945926144|2019-04-24 13:34:00|can anyone compre...|
|1121025624632647682|2019-04-24 12:18:00|.....are there no...|
|1121021160827887616|2019-04-24 12:00:00|mexico\xe2\x80\x9...|
|1121006942502182913|2019-04-24 11:04:00|\xe2\x80\x9cforme...|
|1120851364035399680|2019-04-24 00:46:00|thanks rush! @fox...|
|1120820536605585408|2019-04-23 22:43:00|you mean the s

In [114]:
# lets check if there is any https
df5.where(F.col("t_text").like("%http%")).show()

+----+------+------+
|t_id|t_date|t_text|
+----+------+------+
+----+------+------+



# Tokenizing

In [115]:
from pyspark.ml.feature import RegexTokenizer

regexTokenizer = RegexTokenizer(gaps = False, pattern = '\w+', inputCol = 't_text', outputCol = 'text_token')
df_token = regexTokenizer.transform(df5)
df_token.show(3)

+-------------------+-------------------+--------------------+--------------------+
|               t_id|             t_date|              t_text|          text_token|
+-------------------+-------------------+--------------------+--------------------+
|1121138875638792192|2019-04-24 19:48:00|.@senmikelee of t...|[senmikelee, of, ...|
|1121053578603397120|2019-04-24 14:09:00|i didn\xe2\x80\x9...|[i, didn, xe2, x8...|
|1120742807847800839|2019-04-23 17:34:00|great golf champi...|[great, golf, cha...|
+-------------------+-------------------+--------------------+--------------------+
only showing top 3 rows



In [116]:
# remove stopwords
from pyspark.ml.feature import StopWordsRemover

swr = StopWordsRemover(inputCol = 'text_token', outputCol = 'text_sw_removed')
tweet_swr = swr.transform(df_token)
tweet_swr.show(5)

+-------------------+-------------------+--------------------+--------------------+--------------------+
|               t_id|             t_date|              t_text|          text_token|     text_sw_removed|
+-------------------+-------------------+--------------------+--------------------+--------------------+
|1121138875638792192|2019-04-24 19:48:00|.@senmikelee of t...|[senmikelee, of, ...|[senmikelee, grea...|
|1121053578603397120|2019-04-24 14:09:00|i didn\xe2\x80\x9...|[i, didn, xe2, x8...|[didn, xe2, x80, ...|
|1120742807847800839|2019-04-23 17:34:00|great golf champi...|[great, golf, cha...|[great, golf, cha...|
|1120655455594799105|2019-04-23 11:47:00|keep america great!'|[keep, america, g...|[keep, america, g...|
|1120655199130017792|2019-04-23 11:46:00|the wall is being...|[the, wall, is, b...|[wall, rapidly, b...|
+-------------------+-------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



# Fit and Train Word2Vec

In [117]:
from pyspark.ml.feature import Word2Vec

#create an average word vector for each document
word2vec = Word2Vec(minCount = 1, inputCol = 'text_sw_removed', outputCol = 'result')
model = word2vec.fit(tweet_swr)
result = model.transform(tweet_swr)

In [118]:
result.show(3)
result.select('result').show(1, truncate = True)

+-------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|               t_id|             t_date|              t_text|          text_token|     text_sw_removed|              result|
+-------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|1121138875638792192|2019-04-24 19:48:00|.@senmikelee of t...|[senmikelee, of, ...|[senmikelee, grea...|[4.87581986051641...|
|1121053578603397120|2019-04-24 14:09:00|i didn\xe2\x80\x9...|[i, didn, xe2, x8...|[didn, xe2, x80, ...|[-2.5814099769507...|
|1120742807847800839|2019-04-23 17:34:00|great golf champi...|[great, golf, cha...|[great, golf, cha...|[-0.0012052971214...|
+-------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 3 rows

+--------------------+
|              result|
+--------------------+
|[4.87581986051641...|
+

In [119]:
result.printSchema()
#result.write.saveAsTable("result", format="parquet", mode = "overwrite")

root
 |-- t_id: string (nullable = true)
 |-- t_date: string (nullable = true)
 |-- t_text: string (nullable = true)
 |-- text_token: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- text_sw_removed: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- result: vector (nullable = true)



# Define Similarity

In [124]:
import numpy as np
import pandas as pd
import pickle
import copy

In [125]:
# Calculate cosine similarity between two vectors 
def cossim(v1, v2): 
    return np.dot(v1, v2) / np.sqrt(np.dot(v1, v1)) / np.sqrt(np.dot(v2, v2)) 

In [126]:
def content_recommend(tweet_id, docvecs):
    input_vec = docvecs.select('result').filter(docvecs['t_id'] == tweet_id).collect()[0][0]   
    docvecs = docvecs.select('t_id','result').rdd.map(lambda x: (x[0], x[1])).collect()
    
    #compute similarity array
    t = sc.parallelize((i[0], float(cossim(input_vec, i[1]))) for i in docvecs)
    
    # recommendation's cossim values
    similarity = spark.createDataFrame(t).\
    withColumnRenamed('_1', 'tweet_id').\
    withColumnRenamed('_2', 'similarity').\
    orderBy("similarity", ascending = False)
  
    return similarity 


In [128]:
content_recommend("1121138875638792192", result).show(truncate = False)

+-------------------+------------------+
|tweet_id           |similarity        |
+-------------------+------------------+
|1121138875638792192|1.0               |
|1102791903324631040|0.8312465787510966|
|1107997913584726020|0.8298340329952292|
|1102007681588563969|0.8219903746021223|
|1115218802290122752|0.82022217914165  |
|1103787204965478404|0.8051104511645759|
|1114523496237883392|0.8010976712993788|
|1105960676097441793|0.7678433874948783|
|1111209625825640448|0.7441570601964962|
|1105834434962571265|0.7205428236125595|
|1114207309184602112|0.716563608100392 |
|1106161655292010502|0.7149127738590835|
|1108559080204001280|0.6795696823521122|
|1102661631568461824|0.6271673827337592|
|1106529365549084672|0.6235181291720807|
|1100110413209858048|0.6116754976954513|
|1104018553987100673|0.6042849130319551|
|1114888062884954114|0.5761606924612956|
|1112900816371900418|0.556188789385397 |
|1121053578603397120|0.5434824179661336|
+-------------------+------------------+
only showing top

In [129]:
result.where(F.col("t_id") == 1121138875638792192).collect()

[Row(t_id='1121138875638792192', t_date='2019-04-24 19:48:00', t_text=".@senmikelee of the great state of utah has written a wonderful new book entitled, \\xe2\\x80\\x9cour lost declaration.\\xe2\\x80\\x9d highly recommended!'", text_token=['senmikelee', 'of', 'the', 'great', 'state', 'of', 'utah', 'has', 'written', 'a', 'wonderful', 'new', 'book', 'entitled', 'xe2', 'x80', 'x9cour', 'lost', 'declaration', 'xe2', 'x80', 'x9d', 'highly', 'recommended'], text_sw_removed=['senmikelee', 'great', 'state', 'utah', 'written', 'wonderful', 'new', 'book', 'entitled', 'xe2', 'x80', 'x9cour', 'lost', 'declaration', 'xe2', 'x80', 'x9d', 'highly', 'recommended'], result=DenseVector([0.0, -0.0004, 0.0012, -0.0031, 0.0009, 0.0007, -0.0015, -0.0005, -0.0006, 0.0007, 0.0004, -0.0008, 0.0012, 0.0003, -0.0003, -0.0007, 0.0015, -0.0005, 0.0004, 0.0024, -0.0013, -0.0025, -0.0013, -0.0033, -0.0011, -0.0005, 0.0008, -0.0008, 0.0014, 0.0, 0.0004, -0.0008, 0.0017, -0.0019, 0.0009, -0.0001, 0.0018, -0.0004, 0.0

In [130]:
#test similarity between words
synonyms = model.findSynonyms("president", 5)   # its okay for certain words (cuisines, foods), real bad for others
synonyms.show(15)

+--------+-------------------+
|    word|         similarity|
+--------+-------------------+
|  number|0.25912711024284363|
| victory|0.25897228717803955|
|standard| 0.2551022469997406|
| illegal|  0.248518168926239|
|  voting| 0.2445465326309204|
+--------+-------------------+



In [133]:
synonyms = model.findSynonyms("crazy", 5)   # its okay for certain words (cuisines, foods), real bad for others
synonyms.show(15)

+------------+-------------------+
|        word|         similarity|
+------------+-------------------+
|        done| 0.2754474878311157|
|        lied|0.24875687062740326|
|presidential|0.23190045356750488|
|    employed|0.22457554936408997|
|       world|0.22156548500061035|
+------------+-------------------+



# Test Results

In [131]:
#test similarity between words
def keyword_recommend(input_str, docvecs):
    # run input_str through preprocessing pipeline
    x = sc.parallelize([(1, input_str)]).toDF(['t_id', 't_text'])
    from pyspark.ml.feature import RegexTokenizer
    regexTokenizer = RegexTokenizer(gaps = False, pattern = '\w+', inputCol = 't_text', outputCol = 'text_token')
    x_token = regexTokenizer.transform(x)
    from pyspark.ml.feature import StopWordsRemover
    swr = StopWordsRemover(inputCol = 'text_token', outputCol = 'text_sw_removed')
    x_swr = swr.transform(x_token)
    
    # run word2vec model on input string
    input_vec = model.transform(x_swr)
    input_vec = input_vec.select('result').collect()[0][0]
    
    docvecs = docvecs.select('t_id','result').rdd.map(lambda x: (x[0], x[1])).collect()
    
    #compute similarity array
    t = sc.parallelize((i[0], float(cossim(input_vec, i[1]))) for i in docvecs)
    
    # recommendation's cossim values
    similarity = spark.createDataFrame(t).\
    withColumnRenamed('_1', 'tweet_id').\
    withColumnRenamed('_2', 'similarity').\
    orderBy("similarity", ascending = False)
  
    return similarity 

In [134]:
keyword_recommend("president", result).show(truncate = False)

+-------------------+--------------------+
|tweet_id           |similarity          |
+-------------------+--------------------+
|1106554754715533313|0.5943848628534582  |
|1102360081247682561|0.4244376060880388  |
|1120093671356022785|0.3333256513802402  |
|1104017664131907584|0.22012572493032603 |
|1114156288555016193|0.15761668589896682 |
|1117496743032250368|0.12131485974817567 |
|1119962857347735552|0.11917203055649125 |
|1114290701334802433|0.11301846708165722 |
|1101289057060077569|0.10899428842914181 |
|1101289772528603137|0.10329962419042553 |
|1118317574301863936|0.09604921024046569 |
|1104033876291305473|0.09472673678995967 |
|1116157899724611584|0.08581757036733936 |
|1117751619469361152|0.0802298430747358  |
|1117843954437767168|0.07274550926957822 |
|1106665436865851394|0.06353980612359773 |
|1105871954001739782|0.060546781694689425|
|1117486994018451458|0.05883977018714115 |
|1108190837257764864|0.058780359227617984|
|1117829011860787200|0.04988724600038479 |
+----------