# Analysis Tweets from Text files

Group Project for Big Data Programming, Fall 2017

### Project Contributors:
Caleb Hulburt   
Mohammad Azim   
Yao Jin   
Xian Lai   

========================================================

This application focuses on analysing the tweets in a distributed manner under the framework of Spark. The tweets are coming from 2 files "MAGA.txt" and "resist.txt". Each file contains the tweets pulled from tweepy API corresponding to label #resist or #maga. 

Our target is mixing them together and extract the most informative features using term frequency and perform naive The main steps are:
1. preprocess each tweet into a clean list of words with stop word removed.
2. Mix the 2 tweets datasets together, count the frequencies of each word and fetch the top 5000 words.
3. Use these 5000 words as the features for mixed dataset so that each data point has 5000 features and each feature value is either true or false indicating whether this word appears in current tweet.
4. Calculate the "informativeness" of features.

The benefits using Spark for this application is that each data point namely each tweet can be seen independent of other tweets and most of the steps in preprocessing and analysing are independent of other steps. So parallelize them will significant shorten the running time.

In [1]:
import re
from math import log
from pyspark import SparkContext, SparkConf

Firstly, we set up spark configuration using **local mode with 2 CPU's** and create a spark context with this configuration.

In [2]:
conf = SparkConf().setAppName("tweeter_spark").setMaster("local[2]")
sc   = SparkContext(conf=conf)

Then we should prepare the vocabulary of stop words for later filtering. Here the vocabulary is stored as a list in the STOPWORD variable.

In [3]:
with open("../data/stop_words.txt", 'r') as f:
    STOPWORDS = f.read()
    
STOPWORDS = STOPWORDS.split("\n")
STOPWORDS = [word for word in STOPWORDS if word != '']

### 1. preprocessing
We wrap the preprocessing of tweets into function **prepTweets**.

In [4]:
def prepTweets(file, label):
    """ preprocess the tweets by spliting each line into words, removing
    useless symbols, tranforming all words into lower cases, removing 
    label word and stop words.
    
    inputs:
    -------
    - file: the text file containing data.
    - label: the label of tweets inside this file.
    
    output:
    -------
    - tweets: the cleaned tweets dataset as an RDD. Each row contains the 
        cleaned words appeared in one tweet.
    - n_tweet: the number of tweets in this dataset.
    """
    # replace char ' with space
    # split the line into list of words by space
    # remove any characters are not alpha or number
    # change characters to lower case
    # remove label word
    # remove empty string
    # remove stop words
    tweets = sc.textFile(file)\
        .map(lambda x: x.replace("'", " "))\
        .map(lambda x: x.split(" "))\
        .map(lambda x: [re.sub(r'([^\s\w]|_)+', '', word) for word in x])\
        .map(lambda x: [word.lower() for word in x])\
        .map(lambda x: [word for word in x if word != label])\
        .map(lambda x: [word for word in x if word != ''])\
        .map(lambda x: [word for word in x if word not in STOPWORDS])
    return tweets, tweets.count()

resistTweets, n_resist = prepTweets("../data/resist.txt", 'resist')
magaTweets, n_maga     = prepTweets("../data/MAGA.txt", 'maga')
print("Number of resist tweets: %d" %n_resist)
print("Number of maga tweets: %d" %n_maga)

Number of resist tweets: 11737
Number of maga tweets: 10849


### 2. feature extraction
In this step, we mix 2 tweet datasets together, flatten the rdd so that each row is one word and then we count the frequency of each word, take the top 5000 ones as our feature words.

In [5]:
tweets   = resistTweets.union(magaTweets)
# flat the list so that each row has one word
# add 1 count to each word
# reduce them to get the word frequencies
# take the first 5000 words
featureCnt = tweets.flatMap(lambda x: x)\
    .map(lambda x: (x, 1))\
    .reduceByKey(lambda a, b: a+b)\
    .sortBy(lambda x: x[1], ascending=False)\
    .take(5000)
    
features, counts = zip(*featureCnt)
features[:10]

('rt',
 'trump',
 'funder',
 'amp',
 'realdonaldtrump',
 'trumpresign',
 'theresistance',
 'president',
 'cnn',
 'potus')

### 3. transform dataset
The original dataset is simply a list of words appeared in each tweet. Now we have the features, we will transform the dataset into a structured one with a fixed size of features.

In [6]:
dataset_r = resistTweets.map(lambda x: [word in x for word in features])
dataset_m = magaTweets.map(lambda x: [word in x for word in features])

### 4. calculate "informativeness" for features
Since all the feature values are binary(either 1 or 0), all of the features will have Binoulli distributions. We can easily get the probability of each feature conditioned on either label by summing up the values of each column and divide it by the total count.

In [7]:
def elementWiseAdd(list_1, list_2):
    """ combine 2 lists together with element wise addition"""
    return [a + b for a, b in zip(list_1, list_2)]

wordCount_r = dataset_r.reduce(lambda a, b: elementWiseAdd(a, b))
wordCount_m = dataset_m.reduce(lambda a, b: elementWiseAdd(a, b))
wordCounts  = list(zip(features, wordCount_r, wordCount_m))

print("(word, word count in resist tweets, word count in maga tweets)")
wordCounts[:30]

(word, word count in resist tweets, word count in maga tweets)


[('rt', 7317, 7669),
 ('trump', 1939, 2549),
 ('funder', 2422, 4),
 ('amp', 1103, 1063),
 ('realdonaldtrump', 780, 1376),
 ('trumpresign', 1271, 68),
 ('theresistance', 1358, 187),
 ('president', 204, 1188),
 ('cnn', 1009, 261),
 ('potus', 156, 1064),
 ('retweet', 1003, 190),
 ('speakerryan', 1041, 45),
 ('white', 658, 376),
 ('tonight', 847, 118),
 ('agree', 925, 70),
 ('america', 258, 609),
 ('able', 864, 14),
 ('question', 822, 8),
 ('trumpre', 754, 27),
 ('civil', 767, 13),
 ('town', 763, 9),
 ('gop', 416, 341),
 ('re', 444, 280),
 ('hall', 759, 7),
 ('unscripted', 757, 0),
 ('httpstco', 350, 394),
 ('nazis', 580, 157),
 ('people', 233, 397),
 ('charlottesville', 383, 242),
 ('left', 197, 422)]

Then we calculate the informativeness of features defined by the biggest ratio of conditional probability:
$$maxarg\left( \frac{p(true|label=resist)}{p(true|label=maga)}, \frac{p(true|label=maga)}{p(true|label=resist)}\right)$$

So we need to divide the count by corresponding dataset size and get the ratio. But we observed that there is some words don't appear in one dataset at all. To avoid divide by zero error, we smooth the count by adding 1 to each of the count of each features.

In [8]:
def calcInf(cps):
    """ takes in the conditional probablities of one feature and calculate
    the informativeness of this feature.
    """
    return round(max(cps[0]/cps[1], cps[1]/cps[0]), 5)

# parallelize the word counts into rdd
# orgaize the rows as (word, (count_resist, count_maga))
# add one to each count to avoid zero divide error
# calculate the conditioning probabilities 
# calculate the informativeness
infs = sc.parallelize(wordCounts)\
    .map(lambda x: (x[0], (x[1], x[2])))\
    .mapValues(lambda x: (x[0]+1, x[1]+1))\
    .mapValues(lambda x: (x[0]/n_resist, x[1]/n_maga))\
    .mapValues(lambda x: (log(x[0]), log(x[1]), calcInf(x)))
    
# sort the features words by informativeness and collect them into master machine.
informativeness = infs\
    .sortBy(lambda x: x[1][2], ascending=False)\
    .collect()
    
infWords, _ = zip(*informativeness)
print("(word, logProb(word|label='resist'), logProb(word|label='maga'), informativeness)")
informativeness[:30]

(word, logProb(word|label='resist'), logProb(word|label='maga'), informativeness)


[('unscripted', (-2.7398181384577525, -9.29182818882245, 700.6511)),
 ('funder', (-1.5777398032835983, -7.68239027638835, 447.93605)),
 ('745pm', (-3.2860021110249527, -9.29182818882245, 405.78606)),
 ('assemble', (-3.299763796097634, -9.29182818882245, 400.24001)),
 ('tweetfest', (-3.3043934339963763, -9.29182818882245, 398.39133)),
 ('blackouttrump', (-3.376540096793555, -9.29182818882245, 370.66107)),
 ('dobbs', (-9.370501524100124, -3.571516412215039, 329.96451)),
 ('fbn7p', (-9.370501524100124, -3.5913846154317643, 323.47341)),
 ('kthopkins', (-9.370501524100124, -3.611655579805383, 316.9823)),
 ('six', (-9.370501524100124, -3.9165497811382854, 233.67979)),
 ('usaassociation', (-9.370501524100124, -4.008624460084462, 213.12462)),
 ('bfraser747', (-9.370501524100124, -4.018828630258704, 210.96092)),
 ('maga', (-3.981429794283623, -9.29182818882245, 202.43086)),
 ('draintheswamp', (-9.370501524100124, -4.093331157556625, 195.81501)),
 ('loudobbs', (-8.677354343540179, -3.46288257121