## Combine files saved from the Twitter Consumer
After creating multiple files, saved through the Twitter consumer, we combine all of them into one and clean out the emptry rows. 

In [1]:
import glob
import os
import shutil

In [2]:
# As the names of the folders are unpredictable, we make sure we loop through all folders and all files within
filepaths = glob.glob("data/elon-*/part-0*")
filepaths2 = glob.glob("data/elon-*")

In [3]:
with open("data/collect.txt","a") as writefile:
    for filepath in filepaths:
        with open(filepath,"r") as readfile:
            data = readfile.read()
            writefile.write(data)
        os.remove(filepath)
for filepath2 in filepaths2:
    shutil.rmtree(filepath2)

In [4]:
# clear empty rows
with open('./data/collect.txt') as infile, open('data/output.txt', 'w') as outfile:
    for line in infile:
        if not line.strip(): continue  # skip the empty line
        outfile.write(line)

## Create Spark context

In [5]:
# Import findspark 
import findspark

# Initialize and provide path
findspark.init("/usr/share/spark/spark-2.3.2-bin-hadoop2.7/")

# Or use this alternative
#findspark.init()

In [6]:
import pyspark
from pyspark.sql import SQLContext

# create spark contexts
sc = pyspark.SparkContext()
sqlContext = SQLContext(sc)

## Read in the data for Natural Language Processing
Here we read in the one merged file with all Tweets, creating a data frame

In [7]:
# Load a text file and convert each line to a Row.
data_rdd = sc.textFile('data/output.txt') 
parts_rdd = data_rdd.map(lambda l: l.split("\t"))

# Filter bad rows out
#garantee_col_rdd = parts_rdd.filter(lambda l: len(l) == 3)

#typed_rdd = garantee_col_rdd.map(lambda p: (p[0], p[1], float(p[2])))

# Inspect the first 2 lines 
#typed_rdd.take(5)
parts_rdd.take(5)

[['RT @MariyaAlexander: Howard Schultz is basically a less likable Jill Stein, a dumber Gary Johnson, and a more embarrassing Elon Musk. '],
 ['RT @TheWilsonCenter: Congratulations to @wapodavenport for his book "The Space Barons: Elon Musk, Jeff Bezos, and the Quest to Colonize the… '],
 ['@thomas_violence Hahahahahahahahahhahahaha you fucking idiot , I mean lots come to mind but let’s put a well known… https://t.co/3NMFBY2pzr '],
 ['RT @Tesla_Bear: (1)'],
 ['MOST important part of the conference call: Elon Musk desperately avoiding to give ANY detail about demand in Europe a… ']]

In [8]:
data_df = sqlContext.createDataFrame(parts_rdd, ["text"])

# data_df.printSchema()
data_df.show(5)

+--------------------+
|                text|
+--------------------+
|RT @MariyaAlexand...|
|RT @TheWilsonCent...|
|@thomas_violence ...|
| RT @Tesla_Bear: (1)|
|MOST important pa...|
+--------------------+
only showing top 5 rows



## NLP
After creating the data frame, we move on to the natural language processing. 

In [9]:
import nltk
# Note: Uncomment the two lines below before the first execution of the code:
#nltk.download("stopwords")
#nltk.download("wordnet")

In [10]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import preproc as pp

# Register all the functions in Preproc with Spark Context

'''Use langid module to classify the language to make sure we are applying the correct cleanup actions for English
https://github.com/saffsd/langid.py '''
check_lang_udf = udf(pp.check_lang, StringType())

'''Stop words usually refer to the most common words in a language, there is no single universal list of stop words  
used by all natural language processing tools. 
Reduces Dimensionality removes stop words of a single Tweets (cleaned_str/row/document)'''
remove_stops_udf = udf(pp.remove_stops, StringType())

'''catch-all to remove other 'words' that I felt didn't add a lot of value
Reduces Dimensionality, gets rid of a lot of unique urls'''
remove_features_udf = udf(pp.remove_features, StringType())

'''Process of classifying words into their parts of speech and labeling them accordingly is known as part-of-speech
tagging, POS-tagging, or simply tagging. Parts of speech are also known as word classes or lexical categories. The
collection of tags used for a particular task is known as a tagset. Our emphasis in this chapter is on exploiting
tags, and tagging text automatically. http://www.nltk.org/book/ch05.html'''
tag_and_remove_udf = udf(pp.tag_and_remove, StringType())

'''Tweets are going to use different forms of a word, such as organize, organizes, and organizing. Additionally, 
there are families of derivationally related words with similar meanings, such as democracy, democratic, and 
democratization. In many situations, it seems as if it would be useful for a search for one of these words to return 
documents that contain another word in the set.
Reduces Dimensionality and boosts numerical measures like TFIDF
http://nlp.stanford.edu/IR-book/html/htmledition/stemming-and-lemmatization-1.html
lemmatization of a single Tweets (cleaned_str/row/document)'''
lemmatize_udf = udf(pp.lemmatize, StringType())

'''check to see if a row only contains whitespace '''
check_blanks_udf = udf(pp.check_blanks, StringType())

[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /home/student/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


In [11]:
'''remove stop words to reduce dimensionality
list of stop words: https://gist.github.com/sebleier/554280'''
rm_stops_df = data_df.withColumn("stop_text", remove_stops_udf(data_df["text"]))
rm_stops_df.show(5)

+--------------------+--------------------+
|                text|           stop_text|
+--------------------+--------------------+
|RT @MariyaAlexand...|RT @MariyaAlexand...|
|RT @TheWilsonCent...|RT @TheWilsonCent...|
|@thomas_violence ...|@thomas_violence ...|
| RT @Tesla_Bear: (1)| RT @Tesla_Bear: (1)|
|MOST important pa...|MOST important pa...|
+--------------------+--------------------+
only showing top 5 rows



In [12]:
# remove other non essential words, think of it as my personal stop word list
rm_features_df = rm_stops_df.withColumn("feat_text", remove_features_udf(rm_stops_df["stop_text"]))
rm_features_df.show(5)

+--------------------+--------------------+--------------------+
|                text|           stop_text|           feat_text|
+--------------------+--------------------+--------------------+
|RT @MariyaAlexand...|RT @MariyaAlexand...|  howard schultz ...|
|RT @TheWilsonCent...|RT @TheWilsonCent...|  congratulations...|
|@thomas_violence ...|@thomas_violence ...|hahahahahahahahah...|
| RT @Tesla_Bear: (1)| RT @Tesla_Bear: (1)|                    |
|MOST important pa...|MOST important pa...|most important pa...|
+--------------------+--------------------+--------------------+
only showing top 5 rows



In [13]:
# tag the words remaining and keep only Nouns, Verbs and Adjectives
tagged_df = rm_features_df.withColumn("tagged_text", tag_and_remove_udf(rm_features_df["feat_text"]))
tagged_df.show(5)

+--------------------+--------------------+--------------------+--------------------+
|                text|           stop_text|           feat_text|         tagged_text|
+--------------------+--------------------+--------------------+--------------------+
|RT @MariyaAlexand...|RT @MariyaAlexand...|  howard schultz ...| howard schultz l...|
|RT @TheWilsonCent...|RT @TheWilsonCent...|  congratulations...| congratulations ...|
|@thomas_violence ...|@thomas_violence ...|hahahahahahahahah...| hahahahahahahaha...|
| RT @Tesla_Bear: (1)| RT @Tesla_Bear: (1)|                    |                    |
|MOST important pa...|MOST important pa...|most important pa...| important part c...|
+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [14]:
# lemmatization of remaining words to reduce dimensionality & boost measures
lemm_df = tagged_df.withColumn("lemm_text", lemmatize_udf(tagged_df["tagged_text"]))
lemm_df.show(5)

+--------------------+--------------------+--------------------+--------------------+--------------------+
|                text|           stop_text|           feat_text|         tagged_text|           lemm_text|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|RT @MariyaAlexand...|RT @MariyaAlexand...|  howard schultz ...| howard schultz l...|howard schultz li...|
|RT @TheWilsonCent...|RT @TheWilsonCent...|  congratulations...| congratulations ...|congratulation bo...|
|@thomas_violence ...|@thomas_violence ...|hahahahahahahahah...| hahahahahahahaha...|hahahahahahahahah...|
| RT @Tesla_Bear: (1)| RT @Tesla_Bear: (1)|                    |                    |                    |
|MOST important pa...|MOST important pa...|most important pa...| important part c...|important part co...|
+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [15]:
# remove all rows containing only blank spaces - can be skipped as it was cleaned before?
check_blanks_df = lemm_df.withColumn("is_blank", check_blanks_udf(lemm_df["lemm_text"]))
no_blanks_df = check_blanks_df.filter(check_blanks_df["is_blank"] == "False")

no_blanks_df.show(5)

+--------------------+--------------------+--------------------+--------------------+--------------------+--------+
|                text|           stop_text|           feat_text|         tagged_text|           lemm_text|is_blank|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------+
|RT @MariyaAlexand...|RT @MariyaAlexand...|  howard schultz ...| howard schultz l...|howard schultz li...|   False|
|RT @TheWilsonCent...|RT @TheWilsonCent...|  congratulations...| congratulations ...|congratulation bo...|   False|
|@thomas_violence ...|@thomas_violence ...|hahahahahahahahah...| hahahahahahahaha...|hahahahahahahahah...|   False|
| RT @Tesla_Bear: (1)| RT @Tesla_Bear: (1)|                    |                    |                    |   False|
|MOST important pa...|MOST important pa...|most important pa...| important part c...|important part co...|   False|
+--------------------+--------------------+--------------------+--------

In [16]:
data_set = no_blanks_df.select(no_blanks_df['lemm_text'])

data_set.show(5)

+--------------------+
|           lemm_text|
+--------------------+
|howard schultz li...|
|congratulation bo...|
|hahahahahahahahah...|
|                    |
|important part co...|
+--------------------+
only showing top 5 rows



## Tokenize for Word2Vec

In [17]:
''' https://spark.apache.org/docs/1.6.2/ml-features.html
TF: HashingTF is a Transformer which takes sets of terms and converts those sets into fixed-length feature vectors.
In text processing, a “set of terms” might be a bag of words. The algorithm combines Term Frequency (TF) counts with 
the hashing trick for dimensionality reduction.'''

from pyspark.ml.feature import HashingTF, IDF, Tokenizer

tokenizer = Tokenizer(inputCol="lemm_text", outputCol="words")
wordsData = tokenizer.transform(data_set)
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)


In [18]:
# imports needed and set up logging
import gzip
import gensim 
import logging

logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)

In [19]:
featurizedData.show()

+--------------------+--------------------+--------------------+
|           lemm_text|               words|         rawFeatures|
+--------------------+--------------------+--------------------+
|howard schultz li...|[howard, schultz,...|(20,[0,3,7,8,9,15...|
|congratulation bo...|[congratulation, ...|(20,[0,5,6,7,13,1...|
|hahahahahahahahah...|[hahahahahahahaha...|(20,[4,9,12,15,18...|
|                    |                  []|     (20,[12],[1.0])|
|important part co...|[important, part,...|(20,[0,1,3,6,7,9,...|
|elon musk share i...|[elon, musk, shar...|(20,[0,3,13,16,17...|
|                    |                  []|     (20,[12],[1.0])|
|           bait elon|        [bait, elon]|(20,[0,19],[1.0,1...|
|washington post p...|[washington, post...|(20,[0,4,6,11,12,...|
|  elon musk fly mile|[elon, musk, fly,...|(20,[0,15,16,18],...|
|         elon review|      [elon, review]|(20,[0,4],[1.0,1.0])|
|dank meme bot mem...|[dank, meme, bot,...|(20,[1,5,7,14],[1...|
|heard real time f...|[he

In [20]:
''' Note: the execution here takes a bit longer that the rest of the cells
https://stackoverflow.com/questions/38610559/convert-spark-dataframe-column-to-python-list '''
tweets = featurizedData.select("words").rdd.flatMap(lambda x: x).collect()

## Train model and check results

The following two cells were kindly provided to us by Kavita Ganesan. The GitHub repository we downloaded the needed code is: https://github.com/kavgan/nlp-text-mining-working-examples/tree/master/word2vec . In her blog post "Gensim Word2Vec Tutorial - Full Working Example"( http://kavita-ganesan.com/gensim-word2vec-tutorial-starter-code/#.XFHO8M9KhZJ ) she explains the steps and provides a link to her jupyter notebooks.

In [21]:
'''Training the model is fairly straightforward. You just instantiate Word2Vec and pass the reviews that we read 
in the previous step. So, we are essentially passing on a list of lists. Where each list within the main list 
contains a set of tokens from a user review. Word2Vec uses all these tokens to internally create a vocabulary. 
And by vocabulary, I mean a set of unique words.'''

model = gensim.models.Word2Vec (tweets, size=150, window=10, min_count=2, workers=10)
model.train(tweets,total_examples=len(tweets),epochs=10)

2019-01-31 12:02:18,495 : INFO : collecting all words and their counts
2019-01-31 12:02:18,496 : INFO : PROGRESS: at sentence #0, processed 0 words, keeping 0 word types
2019-01-31 12:02:18,498 : INFO : collected 1246 word types from a corpus of 5582 raw words and 833 sentences
2019-01-31 12:02:18,499 : INFO : Loading a fresh vocabulary
2019-01-31 12:02:18,501 : INFO : effective_min_count=2 retains 565 unique words (45% of original 1246, drops 681)
2019-01-31 12:02:18,502 : INFO : effective_min_count=2 leaves 4901 word corpus (87% of original 5582, drops 681)
2019-01-31 12:02:18,506 : INFO : deleting the raw counts dictionary of 1246 items
2019-01-31 12:02:18,507 : INFO : sample=0.001 downsamples 81 most-common words
2019-01-31 12:02:18,508 : INFO : downsampling leaves estimated 3335 word corpus (68.1% of prior 4901)
2019-01-31 12:02:18,510 : INFO : estimated required memory for 565 words and 150 dimensions: 960500 bytes
2019-01-31 12:02:18,511 : INFO : resetting layer weights
2019-01-

2019-01-31 12:02:18,875 : INFO : worker thread finished; awaiting finish of 5 more threads
2019-01-31 12:02:18,876 : INFO : worker thread finished; awaiting finish of 4 more threads
2019-01-31 12:02:18,877 : INFO : worker thread finished; awaiting finish of 3 more threads
2019-01-31 12:02:18,878 : INFO : worker thread finished; awaiting finish of 2 more threads
2019-01-31 12:02:18,879 : INFO : worker thread finished; awaiting finish of 1 more threads
2019-01-31 12:02:18,880 : INFO : worker thread finished; awaiting finish of 0 more threads
2019-01-31 12:02:18,881 : INFO : EPOCH - 2 : training on 5582 raw words (3279 effective words) took 0.0s, 264229 effective words/s
2019-01-31 12:02:18,893 : INFO : worker thread finished; awaiting finish of 9 more threads
2019-01-31 12:02:18,895 : INFO : worker thread finished; awaiting finish of 8 more threads
2019-01-31 12:02:18,896 : INFO : worker thread finished; awaiting finish of 7 more threads
2019-01-31 12:02:18,897 : INFO : worker thread fin

2019-01-31 12:02:19,278 : INFO : worker thread finished; awaiting finish of 6 more threads
2019-01-31 12:02:19,279 : INFO : worker thread finished; awaiting finish of 5 more threads
2019-01-31 12:02:19,280 : INFO : worker thread finished; awaiting finish of 4 more threads
2019-01-31 12:02:19,281 : INFO : worker thread finished; awaiting finish of 3 more threads
2019-01-31 12:02:19,281 : INFO : worker thread finished; awaiting finish of 2 more threads
2019-01-31 12:02:19,282 : INFO : worker thread finished; awaiting finish of 1 more threads
2019-01-31 12:02:19,285 : INFO : worker thread finished; awaiting finish of 0 more threads
2019-01-31 12:02:19,286 : INFO : EPOCH - 10 : training on 5582 raw words (3387 effective words) took 0.0s, 329413 effective words/s
2019-01-31 12:02:19,287 : INFO : training on a 55820 raw words (33437 effective words) took 0.5s, 65520 effective words/s


(33437, 55820)

In [29]:
#check whichever word you want
w1 = "elon"
model.wv.most_similar (positive=w1)

[('musk', 0.9999439716339111),
 ('good', 0.9999069571495056),
 ('tesla', 0.9999029636383057),
 ('make', 0.9998981952667236),
 ('tsla', 0.9998976588249207),
 ('world', 0.9998967051506042),
 ('read', 0.999894917011261),
 ('thing', 0.9998947381973267),
 ('profit', 0.9998939037322998),
 ('think', 0.9998922348022461)]