<p><span style="font-size: 36pt; font-family: georgia, palatino, serif; color: #800000;">Learning Topical Social Sensors</span></p>

<h1><span style="color: #000080;"><strong>How useful is twitter to you in terms of finding the right information?</strong></span></h1>

![caption](https://github.com/demoonism/TwitterSensor/blob/master/Screenshot/search.JPG?raw=true)

<p style="text-align: center;"><span style="text-decoration: underline;"><span style="font-size: 20pt;"><em><strong>We can do better than this!</strong> </em></span></span></p>

<p style="text-align: left;"><strong>In this project, we are aiming to train a classifier to identify targeted information on Twitter with high precision. </strong></p>
<p style="text-align: left;"><strong>For example, if you are interested in:</strong></p>
<p style="text-align: left;"><em><strong>&bull; Global social issues</strong></em><br /><em><strong>&bull; Politics in the Pacific Northwest</strong></em><br /><em><strong>&bull; Public transit in New York City</strong></em></p>
<p style="text-align: left;"><strong>The classifier would serve as a "sensor" to identify topical tweets based on your tailored interests!</strong></p>

<h1><span style="color: #000080;"><strong>Challenges</strong></span></h1>

<p style="text-align: left;"><strong>(1) &nbsp;Billions of potential features, thousands of useful ones (Hashtags, users, mentions, terms, locations)</strong></p>
<p style="text-align: left;"><strong>(2) &nbsp;Need a lot of labeled data to learn feature weights well</strong></p>

<h1><span style="color: #000080;"><strong>Solution</strong></span></h1>

<p><span style="font-size: 12pt;"><strong>(1) Careful feature engineering and feature selection using Apache Spark.</strong></span></p>
<p><span style="font-size: 10pt;"><strong>We performed feature selection and transformation with Apache Spark on a standalone server with eight 1TB Hard disks, two 20 core CPU (40 threads) and 256GB RAM. </strong></span></p>
<p><span style="font-size: 12pt;"><strong>(2)</strong> <strong>Hashtags!</strong>&nbsp;</span></p>
<p><span style="font-size: 10pt;"><strong>Hashtags&nbsp;originated on IRC chat, were&nbsp;adopted later (and perhaps most famously) on Twitter, and&nbsp;now appear on other social media platforms such as Instagram,&nbsp;Tumblr, and Facebook. They usually serve as surogates for topics. Therefore, for each topic,&nbsp;we leverage a (small)&nbsp;set of user-curated topical hashtags to efficiently provide&nbsp;a large number of supervised topic labels for social media&nbsp;content.&nbsp;</strong></span></p>
<p><span style="font-size: 10pt;"><strong>We used 4 independent annotators to query the Twitter search API to identify candidate hashtags for each topic. A&nbsp;hashtag is assigned to a topic set if 3 out of 4 annotators agrees on the assignment.</strong></span></p>
<p><span style="font-size: 10pt;"><strong>For example, for the topic, "Natural Disaster", the set of hashtags are ["sandy", "drought", "storm", "hurricane", "tornado" .... etc]. If a tweet contains one or more of the pre-determined hashtags, we say it is "topical" for a particular toic, and it is labeled 1 (0 otherwise). We will revisit this in the feature selection section</strong></span></p>
<p><span style="font-size: 10pt;">&nbsp;</span></p>
<p><span style="font-size: 18pt; color: #ff0000;"><strong>Catch!</strong></span></p>
<p><strong><span style="font-size: 10pt;">Hashtag is part of our feature, wouldn't the classifier simply learn to remember the hashtag?</span></strong></p>
<p><strong><span style="font-size: 10pt;">To ensure maximum generality, we remove training hashtags from the validation and test set to ensure the classifier making prediction on the learnt feature and not just remembering hashtags. This would be further illlustrated in the Train-Validation split section later.</span></strong></p>

<h1><span style="color: #000080;"><strong>Now we have labeled data, what features could be useful for predciting topicality?</strong></span></h1>

![caption](https://github.com/demoonism/TwitterSensor/blob/master/Screenshot/twt.JPG?raw=true)

<p style="text-align: left;"><span style="font-size: 18pt; color: #000080;"><strong>Why might these tweet features be useful?</strong> </span></p>
<p style="text-align: left;"><br /><span style="font-size: 10pt;"><strong>&bull; Users: who tweets on the topic?</strong></span><br /><span style="font-size: 10pt;"> <em><strong>-</strong></em> <span style="text-decoration: underline;"><em><strong>Tweets from the weather channel might be a good indicator for Natural Disasters</strong></em></span></span></p>
<p style="text-align: left;"><br /><span style="font-size: 10pt;"><strong>&bull; Hashtags: What hashtags co-occur with the topic?</strong></span><br /><span style="font-size: 10pt;"> <em><strong>-</strong></em> <span style="text-decoration: underline;"><em><strong>#teaparty could imply LBGT rights</strong></em></span></span></p>
<p style="text-align: left;"><br /><span style="font-size: 10pt;"><strong>&bull; Mentions:</strong></span><br /><span style="font-size: 10pt;"> <em><strong>-</strong></em> <span style="text-decoration: underline;"><em><strong>@Redcross might be releavant to Natural Disaster</strong></em></span></span></p>
<p style="text-align: left;"><br /><span style="font-size: 10pt;"><strong>&bull; Locations:</strong></span><br /><span style="font-size: 10pt;"> <em><strong>-</strong></em> <span style="text-decoration: underline;"><em><strong>Philippines where a lot of natural disaster happend in the last few years is a descent guess for releavant topics</strong></em></span></span></p>
<p style="text-align: left;"><br /><span style="font-size: 10pt;"><strong>&bull; Terms:</strong></span><br /><span style="font-size: 10pt;"> <em><strong>-</strong></em> <span style="text-decoration: underline;"><em><strong> Word features are strong indicators of a particular topic</strong></em></span></span></p>

<h1><span style="color: #000080;"><strong>Implementation</strong></span></h1>

<p><strong><span style="font-size: 10pt;">The original Twitter data were collected over 2 years, which contains over 2TB compressed data. It consists of hundreds of millions lines of tweets.</span></strong></p>
<p><strong><span style="font-size: 10pt;">How do we go from the raw data to an efficient classifier?</span></strong></p>
<p><strong><span style="font-size: 10pt;">The following three-step processes serves an end-to-end pipeline to perform ETL and ML training.</span></strong></p>
<p>&nbsp;</p>

<p><span style="color: #000080;"><strong><span style="font-family: georgia, palatino, serif; font-size: 12pt;">Starting the Spark app.</span></strong></span></p>
<p><strong><span style="font-size: 10pt;">Note that spark context must be lanuched prior to running this note book.</span></strong></p>

<ul style="list-style-type: disc;">
<li><strong><span style="font-size: 10pt;">Spark dir: "/usr/local/share/spark-2.0.1-bin-hadoop2.7"</span></strong></li>
<li><strong><span style="font-size: 10pt;">Spark config dir: "/usr/local/share/spark-2.0.1-bin-hadoop2.7/conf"</span></strong></li>
</ul>
<p><strong><span style="font-size: 10pt;">The spark configuration is set through the spark-env.sh file. You will need to edit the # of executor, memory and cpu depends on different task. Once these are configured, you can use the command " start-master.sh" to start the master node, and then "start-slave.sh spark://d3m1:7077" to start the slave node. Note that you should specify the master node url so the slave node knows which maste to communicate to. When you are done, run "stop-master.sh" and "stop-slave.sh" to stop the corresponding service.<br /> </span></strong></p>
<p><strong><span style="font-size: 10pt;">When the service has been started, run the interactive shell with ipython notebook: <br /></span></strong></p>
<ul>
<li><strong><span style="font-size: 10pt;">"PYSPARK_DRIVER_PYTHON="jupyter" PYSPARK_DRIVER_PYTHON_OPTS="notebook pyspark --master spark://d3m1:7077"&nbsp;</span></strong></li>
</ul>
<p><strong><span style="font-size: 10pt;">or if you want to ssh to the server, add the following option "--no-browser --port=8889"</span></strong></p>

<p><span style="color: #000080;"><strong><span style="font-family: georgia, palatino, serif; font-size: 24pt;">Step One: Pre-Processing</span></strong></span></p>

<p><span style="font-size: 13.3333px;"><strong>Each valid tweet crawled from the server is a json object with over 100 attributes. An example could be find as following:</strong></span></p>

<p><span style="font-size: 10pt;"><strong>Sample Tweet</strong></span></p>
<p><span style="font-size: 8pt;"><strong>{</strong>"created_at":"Thu Jan 31 12:58:06 +0000 2013",</span><br /><span style="font-size: 8pt;"> "id":296965581582786560,</span><br /><span style="font-size: 8pt;"> "id_str":"296965581582786560",</span><br /><span style="font-size: 8pt;"> "text":"Im ready for whatever",</span><br /><span style="font-size: 8pt;"> "source":"\u003ca href=\"http:\/\/twitter.com\/download\/iphone\" rel=\"nofollow\"\u003eTwitter for iPhone\u003c\/a\u003e",</span><br /><span style="font-size: 8pt;"> "truncated":false,</span><br /><span style="font-size: 8pt;"> "in_reply_to_status_id":null,</span><br /><span style="font-size: 8pt;"> "in_reply_to_status_id_str":null,</span><br /><span style="font-size: 8pt;"> "in_reply_to_user_id":null,</span><br /><span style="font-size: 8pt;"> "in_reply_to_user_id_str":null,</span><br /><span style="font-size: 8pt;"> "in_reply_to_screen_name":null,</span><br /><span style="font-size: 8pt;"> "user":{</span><br /><span style="font-size: 8pt;"> "id":1059349532,</span><br /><span style="font-size: 8pt;"> "id_str":"1059349532",</span><br /><span style="font-size: 8pt;"> "name":"Don Dada",</span><br /><span style="font-size: 8pt;"> "screen_name":"ImDatNiggaBD",</span><br /><span style="font-size: 8pt;"> "location":"South Side Of Little Rock",</span><br /><span style="font-size: 8pt;"> "url":null,</span><br /><span style="font-size: 8pt;"> "description":"Weed Smoker (Kush)",</span><br /><span style="font-size: 8pt;"> "protected":false,</span><br /><span style="font-size: 8pt;"> "followers_count":109,</span><br /><span style="font-size: 8pt;"> "friends_count":110,</span><br /><span style="font-size: 8pt;"> "listed_count":0,</span><br /><span style="font-size: 8pt;"> "created_at":"Fri Jan 04 02:37:28 +0000 2013",</span><br /><span style="font-size: 8pt;"> "favourites_count":14,</span><br /><span style="font-size: 8pt;"> "utc_offset":null,</span><br /><span style="font-size: 8pt;"> "time_zone":null,</span><br /><span style="font-size: 8pt;"> "geo_enabled":false,</span><br /><span style="font-size: 8pt;"> "verified":false,</span><br /><span style="font-size: 8pt;"> "statuses_count":1312,</span><br /><span style="font-size: 8pt;"> "lang":"en",</span><br /><span style="font-size: 8pt;"> "contributors_enabled":false,</span><br /><span style="font-size: 8pt;"> "is_translator":false,</span><br /><span style="font-size: 8pt;"> "profile_background_color":"C0DEED",</span><br /><span style="font-size: 8pt;"> "profile_background_image_url":"http:\/\/a0.twimg.com\/images\/themes\/theme1\/bg.png",</span><br /><span style="font-size: 8pt;"> "profile_background_image_url_https":"https:\/\/si0.twimg.com\/images\/themes\/theme1\/bg.png",</span><br /><span style="font-size: 8pt;"> "profile_background_tile":false,</span><br /><span style="font-size: 8pt;"> "profile_image_url":"http:\/\/a0.twimg.com\/profile_images\/3184813228\/d6d3a95d902f088f412cf1bd90c126c7_normal.jpeg",</span><br /><span style="font-size: 8pt;"> "profile_image_url_https":"https:\/\/si0.twimg.com\/profile_images\/3184813228\/d6d3a95d902f088f412cf1bd90c126c7_normal.jpeg",</span><br /><span style="font-size: 8pt;"> "profile_banner_url":"https:\/\/si0.twimg.com\/profile_banners\/1059349532\/1359068332",</span><br /><span style="font-size: 8pt;"> "profile_link_color":"0084B4",</span><br /><span style="font-size: 8pt;"> "profile_sidebar_border_color":"C0DEED",</span><br /><span style="font-size: 8pt;"> "profile_sidebar_fill_color":"DDEEF6",</span><br /><span style="font-size: 8pt;"> "profile_text_color":"333333",</span><br /><span style="font-size: 8pt;"> "profile_use_background_image":true,</span><br /><span style="font-size: 8pt;"> "default_profile":true,</span><br /><span style="font-size: 8pt;"> "default_profile_image":false,</span><br /><span style="font-size: 8pt;"> "following":null,</span><br /><span style="font-size: 8pt;"> "follow_request_sent":null,</span><br /><span style="font-size: 8pt;"> "notifications":null},</span><br /><span style="font-size: 8pt;"> "geo":null,</span><br /><span style="font-size: 8pt;"> "coordinates":null,</span><br /><span style="font-size: 8pt;"> "place":null,</span><br /><span style="font-size: 8pt;"> "contributors":null,</span><br /><span style="font-size: 8pt;"> "retweet_count":0,</span><br /><span style="font-size: 8pt;"> "entities":{"hashtags":[],</span><br /><span style="font-size: 8pt;"> "urls":[],</span><br /><span style="font-size: 8pt;"> "user_mentions":[]},</span><br /><span style="font-size: 8pt;"> "favorited":false,</span><br /><span style="font-size: 8pt;"> "retweeted":false,</span><br /><span style="font-size: 8pt;"> "lang":"en"<strong>}</strong></span></p>

<p><span style="font-size: 13.3333px;"><strong>Obviously, not all attributes are relevant to our analysis. In the context of this paper, the only releavant fields in our features are:</strong></span></p>
<p><span style="color: #0000ff;"><em><strong>Hashtags, From_User, Create_Time, Location, Mentions</strong></em></span></p>
<p><span style="font-size: 13.3333px;"><strong>Moreover, the raw text is quite dirty. We need to perform some data cleaning to get proper features.</strong></span></p>
<p><span style="font-size: 13.3333px;"><strong>Since is step is fairly involveda and independet of the analysis here, I keep them in a separate Notebook. </strong></span></p>
<blockquote>
<p><span style="font-size: 13.3333px;"><strong>Spark-Twt-PreProcessing.ipynb</strong></span></p>
</blockquote>
<p><span style="font-size: 13.3333px;"><strong> You should be able to follow along as an indepent module.</strong></span></p>

<p><span style="font-size: 13.3333px;"><strong>The resulting data looks like this:</strong></span></p>

<p><strong>Processed-tweet:</strong></p>
<p><strong>{</strong>u'Create_time': 1359737884.0,<br /> u'from_id': 87151732,<br /> u'from_user': u'ishiPTI',<br /> u'hashtag': u'thuglife',<br /> u'location': u'loc_lakeshore',<br /> u'mention': u'BushraShekhani',<br /> u'term': u'I am ready for whatever',<br /> u'tweet_id': 297312861586325504<strong>}</strong></p>

<p><span style="font-size: 13.3333px;"><strong>Now we have a small (sort of) and clean dataset to work with, it is time to move on to spark to perform some reall analysis.</strong></span></p>

<p><span style="font-size: 13.3333px;"><strong>We are on to the real coding part. Please note that the Spak code presented here probably violates every single good coding practice, not to mention OOP...The point is to make this notebook as illustrative as possible, you will probably see a lot of redundant code. Feel free to refactor as you wish..</strong></span></p>

<p><span style="color: #000080;"><strong><span style="font-family: georgia, palatino, serif; font-size: 24pt;">Step Two: Feature Extraction</span></strong></span></p>

<p><span style="font-size: 13.3333px;"><strong>We need to turn the raw json data into a feature matrix. There are two keys here: </strong></span></p>
<p><span style="font-size: 13.3333px;"><strong>1. Data processing must be extremly efficient since we only have 40 cores and 256G ram.</strong></span></p>
<p><span style="font-size: 13.3333px;"><strong>2. The resulting matrix must be sparse to facilitate the training step&nbsp;later.</strong></span></p>
<p><span style="font-size: 13.3333px;"><strong>These are achieved through the following pipeline. </strong></span></p>

In [6]:
### Step 1

## Notebook property setup.
## Spark SQL
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import udf, col, lit, monotonically_increasing_id, explode
from pyspark.sql import functions as F
from pyspark import SparkContext
from pyspark.sql import SQLContext


## Spark ML
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier
from pyspark.ml.param import Param, Params
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, Tokenizer, IDF, StopWordsRemover, CountVectorizer, VectorAssembler

## Helper
import re
import string
import sys
import time
import os.path
import json
from datetime import datetime
from operator import add
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
## Enable inline graphs
%matplotlib inline

## Display precision for pandas dataframe
pd.set_option('precision',10)

workdir_1e = "data/Eng_Json/" #Parsed json for year 2013
#workdir_2b = "/mnt/2b53fde0-61da-4eeb-a038-9910540ff9ad/Eng_Json/" #Parsed json for year 2014
workdir_4e = "data/final_parquet" #Dir to hold input data in parquet format 
workdir_66 = "data/Training_data" #Dir to hold training data
workdir_b9 = "data/Feature_Vector" #Dir to hold processed Feature vectors
# Sample bash code to change folder access.
# !chgrp danielshi /mnt/66e695cd-1a0c-4e3b-9a50-55e01b788529/Training_data/
# !chmod g+s /mnt/b93e71ec-8ddf-4033-bd42-770c05bc68aa/Feature_Vector/
# !setfacl -d -m g::rwx /mnt/b93e71ec-8ddf-4033-bd42-770c05bc68aa/Feature_Vector/

In [2]:

## Helper function to keep track of the run time of a spark task.
def getTime(start):
    sec = time.time() - start
    m, s = divmod(sec, 60)
    h, m = divmod(m, 60)
    print('Spark operation takes - %d:%02d:%02d which is %d seconds in total' % (h,m,s,sec))
    
# load json object, if a line is invalid, substitute as an empty dict (which has len() == 0 )
def loadJson(d):
    try:
        js = json.loads(d)
    except ValueError as e:
        js = {}
    except Exception:
        js = {}
    return js

def translating(x):
    return x.encode('utf-8').lower().translate(None, string.punctuation)

def loc_clean(d):

    if d == None or d.strip(' ') == '':
        loc_term = "empty_location"
    else:
        loc_term = 'loc_' + "_".join(map(translating, d.strip(' ').split(" ")))
        
    return loc_term

loc_udf = udf(loc_clean, StringType())


def hash_clean(d):

    if d == None or d.strip(' ') == '':
        hashtags = "empty_hashtag"
    else:
        hashtags = d
        
    return hashtags

hash_udf = udf(hash_clean, StringType())


def mention_clean(d):

    if d == None or d.strip(' ') == '':
        mentions = "empty_mention"
    else:
        mentions = d
        
    return mentions

mention_udf = udf(mention_clean, StringType())


def clean_term(d):
    tags = d['hashtag'].split()
    user = d['from_user'].split()
    mention = d['mention'].split()  
    text = d['term'].encode('ascii', 'ignore')
    for ppl in mention:
        text = text.replace('@'+ppl, '')
    for tag in tags:
        text = text.replace('#'+tag, '')
        
    text = re.sub(r'(https?://\S+)', '',text).replace(":", "").lower()


    if text == None or text.strip(' ') == '':
        terms = "empty_tweet"
    else:
        terms = " ".join(text.encode('utf-8').translate(None, string.punctuation).strip().split())

    updated = {'create_time': d['create_time'],
      'from_id': d['from_id'],
      'from_user': d['from_user'],
      'hashtag': d['hashtag'],
      'location': d['location'],
      'mention': d['mention'],
      'term': terms,
      'tweet_id': d['tweet_id']}    
    return updated


def finalCLeaning(file_obj, output):
    data_1 = file_obj.map(loadJson)
    cleaned_dat = data_1.map(clean_term)
    df_p1 = sqlContext.createDataFrame(cleaned_dat, schema)
    df_p1.write.save(workdir_66+output, format="parquet")


<p><span style="font-size: 18px;"><strong>Reading Data </strong></span></p>
<p><span style="font-size: 13.3333px;"><strong>After preprocessing, tweets are saved as parquet files. We need to load and parse these data into dataframes. Note that, the sc.textFile function's input arg could be either a file or a directory. Spark context will create partitions automatically. Note that the pre-processed data are stored in two directories for I/O balancing.</strong></span></p>

In [3]:
# full
sc =SparkContext()

data_Eng = sc.textFile(workdir_1e)
data = data_Eng.map(loadJson)
# Take a look at the (parsed) first line of our input files
data.take(1)

[{u'create_time': 1360036679.0,
  u'from_id': u'132727520',
  u'from_user': u'AndroidJunkies',
  u'hashtag': u'',
  u'location': u'',
  u'mention': u'',
  u'term': u'Breaking \u2013 Jelly Bean update for the Verizon Galaxy S3 leaked!: Android 4.1.2, the latest flavor of Jelly Bean o... http://t.co/nPbyaHqS',
  u'tweet_id': u'298566098620665858'}]

<h1><span style="color: #000080;"><strong>Turning to dataframe</strong></span></h1>


<p><span style="font-size: 13.3333px;"><strong>A RDD (Resilient Distributed Dataset) is more of a blackbox dataset (available in Spark since 1.0) </strong></span></p>
<p><span style="font-size: 13.3333px;"><strong>A dataframe is a table, or two-dimensional array-like structure, in which each column contains measurements on one variable, and each row contains one case. Therefore, a DataFrame has additional metadata due to its tabular format, which allows Spark to run certain optimizations on the finalized query. (Added since 1.3) </strong></span></p>
<p><span style="font-size: 13.3333px;"><strong>In summary, you are able to write traditional map-reduce type of code on both RDD and Dataframe, but Dataframe also support SQL command and built-in analytical functions. For performance consideration, let's turn our RDD into Dataframes first.</strong></span></p>

In [4]:
## Define Dataframe schema.
schema = StructType([StructField('create_time', DoubleType(), False),
                     StructField('from_id', StringType(), False),
                     StructField('from_user', StringType(), False),
                     StructField('hashtag', StringType(), True),
                     StructField('location', StringType(), True),
                     StructField('mention', StringType(), True),
                     StructField('term', StringType(), True),
                     StructField('tweet_id', StringType(), False)
                    ])


In [8]:
sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame(data, schema)

<p><span style="font-size: 13.3333px;"><strong>Saving dataframe to parquet format for easy loading. NOTE: you will see a lot of I/O code being commented out. These are intermeidte results used to produce other DFs later. We don't need to run them everytime; only run if you want to reproduce the result.</strong></span></p>


In [None]:
#df.write.save(workdir_4e+"/Eng_DF", format="parquet")

<p><span style="font-size: 13.3333px;"><strong>Input is shown in tabular form (Dataframe) below. Note that hashtag field just happend to be null for the first few records. </strong></span></p>

<p><span style="font-size: 13.3333px;"><strong>Clean the hashtag, mention, username and location by removing null and un-wanted chars/punctuations. We utilized user-defined functions here. Essentially this is the same concept of apply a custom map operation on a column</strong></span></p>

In [9]:
clean_stage1 = df.withColumn("clean_loc", loc_udf(df.location)).\
                withColumn("clean_hash", hash_udf(df.hashtag)).\
                withColumn("clean_mention", mention_udf(df.mention))
clean_stage1.show(5)

Py4JJavaError: An error occurred while calling o78.showString.
: java.lang.IllegalArgumentException
	at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
	at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
	at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
	at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:449)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:432)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
	at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:432)
	at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
	at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
	at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
	at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
	at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:262)
	at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:261)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:261)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:797)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:796)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:796)
	at org.apache.spark.sql.execution.python.EvalPythonExec.doExecute(EvalPythonExec.scala:93)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
	at org.apache.spark.sql.execution.BaseLimitExec$class.inputRDDs(limit.scala:62)
	at org.apache.spark.sql.execution.LocalLimitExec.inputRDDs(limit.scala:97)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:337)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3273)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:844)


In [None]:
#clean_stage1.write.save(workdir_4e+"/Eng_DF_clean_stage1", format="parquet")

In [None]:
#clean_stage1 = spark.read.parquet(workdir_4e+"/Eng_DF_clean_stage1")

In [None]:
# Map back to the old column names
Pre_cleansing_df = clean_stage1.select(clean_stage1.create_time, \
                   clean_stage1.from_id, \
                   clean_stage1.from_user, \
                   clean_stage1.tweet_id, \
                   clean_stage1.term, \
                   clean_stage1.clean_loc, \
                   clean_stage1.clean_hash, \
                   clean_stage1.clean_mention)

clean_stage2 = Pre_cleansing_df.withColumnRenamed("clean_loc", "location").withColumnRenamed("clean_hash", "hashtag").withColumnRenamed("clean_mention", "mention")

In [None]:
#clean_stage2.write.json(workdir_66+"/Eng_DF_clean_stage2")

In [None]:
#clean_stage2 = sc.textFile(workdir_66+"/Eng_DF_clean_stage2/*.json")

In [None]:
# Reframing
# Perform cleaning for the terms column. This used to be the most time consuming step during processing; therefore it was separated
# from the previous stage. However, after some optimization, right now it takes roughly the same time as the other cleaning steps.
# Feel free to combine this with the previous stage if you want.

# Note that the convention followed here is more like "hadoop" since intermediate steps are saved separately. Although not 
# necessary, it ensures we have somewhere to fall back on if incuring any problem at some stage. 

#finalCLeaning(clean_stage2, "/Staging_final")
Stg_final = spark.read.parquet(workdir_66+"/Staging_final")

<p><span style="font-size: 13.3333px;"><strong>Now our dataframe is nice and clean, the next task is to label the dataset. Remember our criteria is that a tweet is topical if it contains one of our pre-defined hashtag list for a given topic. Note that a tweet could contain multiple hashtags, if one of them is releavant, we would consider the tweet as releavant. Therefore, what we need to do here is to flatten the hashtag list, find an unique list of tweet ids which contains releavant hashtags, and then join it back to the original DF. Obviously, there are more than one way to achieve this. I try to stick with dataframe-only operations here for efficiency.</strong></span></p>

In [None]:
# Import the hashtag dict from another file.
from hashtag_dict import topic_dict

In [None]:
#Access a particular topic
#topic_dict['Soccer']

<p><span style="font-size: 13.3333px;"><strong>Each tweet could contain multiple hashtags, we need to normalize this attribute. This will facilitate the labeling step later.</strong></span></p>

In [None]:
tokenizer = Tokenizer(inputCol="hashtag", outputCol="each_hashtag")
hashtags_df = tokenizer.transform(Stg_final)

hashtag =  hashtags_df.select("tweet_id","create_time","each_hashtag")
hash_exploded = hashtag.withColumn('each_hashtag', explode('each_hashtag'))

In [None]:
#hash_exploded.write.save(workdir_4e+"/hash_exploded", format="parquet")
hash_exploded = spark.read.parquet(workdir_4e+"/hash_exploded")

<p><span style="font-size: 13.3333px;"><strong>If a hashtag is in the predefined list, we mark the corresponding tweet as topical. Using distinct ops to get a unique list of topical id for a particular topic.  </strong></span></p>

In [None]:
print("Tennis")
print("num hastag: " + len(topic_dict["Tennis"]))
tennis_topical_ids = hash_exploded.select(hash_exploded.tweet_id).where(hash_exploded.each_hashtag.isin(topic_dict["Tennis"])).distinct().cache()
#print(topic_ids.count())

<p><span style="font-size: 13.3333px;"><strong> Now we have a list of tweets ids that are topical for tennis</strong></span></p>

In [None]:
tennis_topical_ids.write.save(workdir_66+"/tennis_topical_ids", format="parquet")
tennis_topical_ids = spark.read.parquet(workdir_66+"/tennis_topical_ids")

In [None]:
tennis_topical_ids.count()

<p><span style="font-size: 13.3333px;"><strong> Join the ids back to obtain the full label</strong></span></p>

In [None]:
def getLabeledDf(df_topic):
    Labeled_topical = df_topic.withColumn("topical", lit(1))
    Labled_df = Stg_final.join(Labeled_topical, Stg_final.tweet_id == Labeled_topical.tweet_id, "left").\
                                       select(Stg_final.create_time,\
                                              Stg_final.from_id,\
                                              Stg_final.from_user,\
                                              Stg_final.hashtag,\
                                              Stg_final.location,\
                                              Stg_final.mention,\
                                              Stg_final.tweet_id,\
                                              Stg_final.term,\
                                              F.when(Labeled_topical.topical == 1, 1.0).otherwise(0.0).alias("label")).distinct()
    
    return Labled_df

In [None]:
Labled_df = getLabeledDf(tennis_topical_ids)

In [None]:
#Labled_df.write.save(workdir_4e+"/Labled_df", format="parquet")

In [None]:
Labled_df = spark.read.parquet(workdir_4e+"/Labled_df")

In [None]:
tennis_labels = Labled_df.select("label","tweet_id")

In [None]:
tennis_text = Labled_df.select("tweet_id","create_time","from_user","hashtag","location","mention","term")

<p><span style="font-size: 13.3333px;"><strong>Saving the full label column for later use.</strong></span></p>

In [None]:
#tennis_labels.write.save(workdir_66+"/tennis_topical_labels", format="parquet")
tennis_labels = spark.read.parquet(workdir_66+"/tennis_topical_labels")

In [None]:
#tennis_text.write.save(workdir_66+"/tennis_raw_feature_text", format="parquet")
tennis_text = spark.read.parquet(workdir_66+"/tennis_raw_feature_text")

<h1><span style="color: #000080;"><strong>Vectorizing user, hashtag, location, mention, term into feature vectors</strong></span></h1>

<p><span style="font-size: 13.3333px;"><strong>We have an overwhelming number of features; it is essentail to threshold them to avoid overfitting. We use the same threshold as describbed in the paper. Note that the threshold is for DF, not TF.</strong></span></p>
![caption](https://github.com/demoonism/TwitterSensor/blob/master/Screenshot/featurecount.JPG?raw=true)

<p><span style="font-size: 13.3333px;"><strong>In this section, we vectorize each feature according to the count threshold above. Note that we cannot chain this in a pipeline, as the threshold must be appied to the original dataframe.</strong></span></p>

<p><span style="font-size: 13.3333px;"><strong>Term Feature Threshold. Removing stop wprds first, and only take feature with df count > 50 </strong></span></p>

In [None]:
term_tokenizer = Tokenizer(inputCol="term", outputCol="words")
term_remover = StopWordsRemover(inputCol=term_tokenizer.getOutputCol(), outputCol="filtered")
term_cv = CountVectorizer(inputCol=term_remover.getOutputCol(), outputCol="term_features", minDF=50)
pipeline_term = Pipeline(stages=[term_tokenizer,term_remover,term_cv])

In [None]:
loading = time.time()

model = pipeline_term.fit(Labled_df)
Feat_term = model.transform(Labled_df).select("term_features", "tweet_id")

getTime(loading)

In [None]:
#Feat_term.write.save(workdir_b9+"/Feature_term", format="parquet")
Feat_term = spark.read.parquet(workdir_b9+"/Feature_term")

In [None]:
#check what features passes the threshold.
Feat_term.show()
#http://stackoverflow.com/questions/32285699/how-to-get-word-details-from-tf-vector-rdd-in-spark-ml-lib

<p><span style="font-size: 13.3333px;"><strong>Hashtag Feature Threshold. </strong></span></p>

In [None]:
hashtag_tokenizer = Tokenizer(inputCol="hashtag", outputCol="tags")
hashtag_cv = CountVectorizer(inputCol=hashtag_tokenizer.getOutputCol(), outputCol="hashtag_features", minDF=159)
pipeline_hashtag = Pipeline(stages=[hashtag_tokenizer,hashtag_cv])

In [None]:
loading = time.time()

model = pipeline_hashtag.fit(Labled_df)
Feat_hashtag = model.transform(Labled_df).select("hashtag_features", col("tweet_id").alias("id2"))

getTime(loading)

In [None]:
Feat_hashtag.write.save(workdir_b9+"/Feature_hashtag", format="parquet")
Feat_hashtag = spark.read.parquet(workdir_b9+"/Feature_hashtag")

<p><span style="font-size: 13.3333px;"><strong>Mention Feature Threshold. </strong></span></p>

In [None]:
mention_tokenizer = Tokenizer(inputCol="mention", outputCol="mentions")
mention_cv = CountVectorizer(inputCol=mention_tokenizer.getOutputCol(), outputCol="mention_features", minDF=159)
pipeline_mention = Pipeline(stages=[mention_tokenizer,mention_cv])

In [None]:
loading = time.time()

model = pipeline_mention.fit(Labled_df)
Feat_mention = model.transform(Labled_df).select("mention_features", col("tweet_id").alias("id3"))

getTime(loading)

In [None]:
Feat_mention.write.save(workdir_b9+"/Feature_mention", format="parquet")
Feat_mention = spark.read.parquet(workdir_b9+"/Feature_mention")

<p><span style="font-size: 13.3333px;"><strong>User Feature Threshold. </strong></span></p>

In [None]:
user_tokenizer = Tokenizer(inputCol="from_user", outputCol="users")
user_cv = CountVectorizer(inputCol=user_tokenizer.getOutputCol(), outputCol="user_features", minDF=159)
pipeline_user = Pipeline(stages=[user_tokenizer,user_cv])

In [None]:
loading = time.time()

model = pipeline_user.fit(Labled_df)
Feat_user = model.transform(Labled_df).select("user_features", col("tweet_id").alias("id4"))

getTime(loading)

In [None]:
Feat_user.write.save(workdir_b9+"/Feature_user", format="parquet")
Feat_user = spark.read.parquet(workdir_b9+"/Feature_user")

<p><span style="font-size: 13.3333px;"><strong>Location Feature Threshold. </strong></span></p>

In [None]:
loc_tokenizer = Tokenizer(inputCol="location", outputCol="locs")
loc_cv = CountVectorizer(inputCol=loc_tokenizer.getOutputCol(), outputCol="loc_features", minDF=50)
pipeline_loc = Pipeline(stages=[loc_tokenizer,loc_cv])

In [None]:
loading = time.time()

model = pipeline_loc.fit(Labled_df)
Feat_loc = model.transform(Labled_df).select("loc_features", "hashtag", "create_time", col("tweet_id").alias("id5"))

getTime(loading)

In [None]:
Feat_loc.write.save(workdir_b9+"/Feature_loc", format="parquet")
Feat_loc = spark.read.parquet(workdir_b9+"/Feature_loc")

<p><span style="font-size: 13.3333px;"><strong>Joining all feature DFs above into one. </strong></span></p>

In [None]:
Feat_1 = Feat_term.join(Feat_hashtag,\
                         Feat_term.tweet_id == Feat_hashtag.id2,\
                         "inner").select(Feat_term.term_features,\
                                         Feat_hashtag.hashtag_features,\
                                         Feat_hashtag.id2)
Feat_2 = Feat_1.join(Feat_mention,\
                     Feat_1.id2 == Feat_mention.id3,\
                     "inner").select(Feat_1.term_features,\
                                     Feat_1.hashtag_features,\
                                     Feat_mention.mention_features,\
                                     Feat_mention.id3)
Feat_3 = Feat_2.join(Feat_user,\
                     Feat_2.id3 == Feat_user.id4,\
                     "inner").select(Feat_2.term_features,\
                                     Feat_2.hashtag_features,\
                                     Feat_2.mention_features,\
                                     Feat_user.user_features,\
                                     Feat_user.id4)
Feat_all = Feat_3.join(Feat_loc,\
                     Feat_3.id4 == Feat_loc.id5,\
                     "inner").select(Feat_3.term_features,\
                                     Feat_3.hashtag_features,\
                                     Feat_3.mention_features,\
                                     Feat_3.user_features,\
                                     Feat_loc.loc_features,\
                                     Feat_loc.create_time,\
                                     Feat_loc.hashtag,\
                                     Feat_loc.id5)

In [None]:
Feat_all.write.save(workdir_66+"/Feature_agg", format="parquet")
Features_vect = spark.read.parquet(workdir_66+"/Feature_agg")

In [None]:
# Another way to do this is to chain everything into a pipeline.
''''
term_tokenizer = Tokenizer(inputCol="term", outputCol="words")
term_remover = StopWordsRemover(inputCol=term_tokenizer.getOutputCol(), outputCol="filtered")
term_cv = CountVectorizer(inputCol=term_remover.getOutputCol(), outputCol="term_features", minDF=50)

hashtag_tokenizer = Tokenizer(inputCol="hashtag", outputCol="tags")
hashtag_cv = CountVectorizer(inputCol=hashtag_tokenizer.getOutputCol(), outputCol="hashtag_features", minDF=159)

mention_tokenizer = Tokenizer(inputCol="mention", outputCol="mentions")
mention_cv = CountVectorizer(inputCol=mention_tokenizer.getOutputCol(), outputCol="mention_features", minDF=159)

user_tokenizer = Tokenizer(inputCol="from_user", outputCol="users")
user_cv = CountVectorizer(inputCol=user_tokenizer.getOutputCol(), outputCol="user_features", minDF=159)

loc_tokenizer = Tokenizer(inputCol="location", outputCol="locs")
loc_cv = CountVectorizer(inputCol=loc_tokenizer.getOutputCol(), outputCol="loc_features", minDF=50)

pipeline = Pipeline(stages=[term_tokenizer,term_remover,term_cv,hashtag_tokenizer,hashtag_cv,mention_tokenizer, \
                            mention_cv,user_tokenizer, user_cv, loc_tokenizer, loc_cv])

loading = time.time()

model = pipeline.fit(clean_data)
Input = model.transform(clean_data)

getTime(loading)
'''

<p><span style="font-size: 13.3333px;"><strong>Lastly, we join the data frame with the topical labels we had earlier. Now we have both different list of feature vectors and the coresponding labels.</strong></span></p>

In [None]:
Labled_Feat = Features_vect.join(tennis_labels,\
                                 Features_vect.id5 == tennis_labels.tweet_id,\
                                 "inner").select(Features_vect.term_features,\
                                                 Features_vect.hashtag_features,\
                                                 Features_vect.mention_features,\
                                                 Features_vect.user_features,\
                                                 Features_vect.loc_features,\
                                                 Features_vect.create_time,\
                                                 Features_vect.hashtag,\
                                                 tennis_labels.label,\
                                                 tennis_labels.tweet_id)

In [None]:
Labled_Feat.write.save(workdir_b9+"/Features_with_label", format="parquet")

In [None]:
Labled_Feat = spark.read.parquet(workdir_b9+"/Features_with_label")

In [None]:
Labled_Feat.count()

<p><span style="font-size: 13.3333px;"><strong>At this point, each feature vector is still in its separate column. We need to combine them into one feature matrix. However, before we do that, let's split our dataset first. The reason for this is that Apache parquet is not very good at handling sparse data. Saving such data will likely run into memory error. We hold off the combining step for later.</strong></span></p>

<h1><span style="color: #000080;"><strong>Temporal Split</strong></span></h1>

<p><span style="font-size: 13.3333px;"><strong>Now we have our feature matrix, it is time to estabulish the training, validation and test set for training the classifier</strong></span></p>
<p><span style="font-size: 13.3333px;"><strong>To ensure our classifier generalize to a wide range of features and not simply remeber the past hashtag, we will perform a temporal split to exclude training hashtags in validation and test.</strong></span></p>

![caption](https://github.com/demoonism/TwitterSensor/blob/master/Screenshot/Capture.JPG?raw=true)

In [None]:
# Note that the the 50%-10%-40% split ratio is not mandetory. I suggest to examine the dataframe for different topic 
# and make your decision on the go.

<h1><span style="color: #000080;"><strong>Hashtag Birthday</strong></span></h1>

<p><span style="font-size: 13.3333px;"><strong>Hashtag birthday indicates the first timestamp that a particular hashtag appears in the tweet corpus between year 2013 and 2014. We determine this by find the minimum "create time" for each hashtag </strong></span></p>

In [None]:
df_birthday = hash_exploded.join(tennis_labels,\
                                 hash_exploded.tweet_id == tennis_labels.tweet_id,\
                                 "inner").select(hash_exploded.create_time,\
                                                 hash_exploded.each_hashtag,\
                                                 hash_exploded.tweet_id)

In [None]:
## Find out the "birthday", or the earliest appearing time of each hashtag. 
## (add an extra column of 1 to mark as topical, will be used in a join later)
Ordered_Hashtag_set = df_birthday.\
                      groupby("each_hashtag").\
                      agg({"create_time": "min"}).\
                      orderBy('min(create_time)', ascending=True).\
                      withColumnRenamed("min(create_time)", "birthday").\
                      where(df_birthday.each_hashtag.isin(topic_dict["Tennis"])).cache()

In [None]:
# quick check
Ordered_Hashtag_set.count()

In [None]:
time_span = Ordered_Hashtag_set.count()

# Get id of the corresponding time split (75% and 85%). Again, you need to look at how many data we have for train-valid-test. 
# The split ratio should be determined on a case by case basis.

train_val_split_Ht = np.floor(np.multiply(time_span, 0.75)).astype(int)
val_test_split_Ht =  np.floor(np.multiply(time_span, 0.85)).astype(int)

In [None]:
# Converting to Pandas for random row access.
pd_Ordered_Hashtag_set = Ordered_Hashtag_set.toPandas()


In [None]:
# locate the timestamp of the cutoff point. Will be used later to split Dataframe.
train_val_time = pd_Ordered_Hashtag_set.iloc[train_val_split_Ht]['birthday']
val_test_time = pd_Ordered_Hashtag_set.iloc[val_test_split_Ht]['birthday']


In [None]:
train_hashtags = pd_Ordered_Hashtag_set[:train_val_split_Ht]["each_hashtag"].tolist()
train_hashtags = [x.encode('utf-8') for x in train_hashtags]

val_hashtags = pd_Ordered_Hashtag_set[train_val_split_Ht:val_test_split_Ht]["each_hashtag"].tolist()
val_hashtags = [x.encode('utf-8') for x in val_hashtags]

test_hashtags = pd_Ordered_Hashtag_set[val_test_split_Ht:]["each_hashtag"].tolist()
test_hashtags = [x.encode('utf-8') for x in test_hashtags]

In [None]:
len(train_hashtags)

In [None]:
print(train_val_time)
print(val_test_time)

<p><span style="font-size: 13.3333px;"><strong>Now we have identified the hashtags to be used in training, validation and test set, we can proceed to split our dataframe.</strong></span></p>

![caption](https://github.com/demoonism/TwitterSensor/blob/master/Screenshot/remove_twit.JPG?raw=true)

<h1><span style="color: #000080;"><strong>Train-Valid-Test split</strong></span></h1>

## Training Labeling

<p><span style="font-size: 13.3333px;"><strong>All data points happended before the train/valid split time are labeled as training data</strong></span></p>

In [None]:
Training_set = Labled_Feat.select(Labled_Feat.create_time,\
                              Labled_Feat.tweet_id,\
                              Labled_Feat.term_features,\
                              Labled_Feat.hashtag_features,\
                              Labled_Feat.mention_features,\
                              Labled_Feat.user_features,\
                              Labled_Feat.loc_features,\
                              Labled_Feat.hashtag,\
                              Labled_Feat.label).where(col("create_time") <= train_val_time)


In [None]:
loading = time.time()

tr_pos_sample = Training_set.where(col("label") == 1.0).count()

getTime(loading)

In [None]:
# Sanity check to see how many positive data we have in training by spliting on the current ratio. We need to ensure 
# that we have enough data points to train on. 
tr_pos_sample

## Validation Labeling

<p><span style="font-size: 13.3333px;"><strong>All data points happended between the train/valid & valid/test split time are labeled as validation data. We also need to remove any hashtag that appeared in the training set from the validation set.</strong></span></p>

In [None]:
Validation_set = Labled_Feat.select(Labled_Feat.create_time,\
                                Labled_Feat.tweet_id,\
                                Labled_Feat.term_features,\
                                Labled_Feat.hashtag_features,\
                                Labled_Feat.mention_features,\
                                Labled_Feat.user_features,\
                                Labled_Feat.loc_features,\
                                Labled_Feat.hashtag,\
                                Labled_Feat.label).where((col("create_time") > train_val_time) & (col("create_time") <= val_test_time))

In [None]:
hashtag_tokenizer = Tokenizer(inputCol="hashtag", outputCol="each_hashtag")

In [None]:
val_hashtags_df = hashtag_tokenizer.transform(Validation_set)
hashtag =  val_hashtags_df.select("tweet_id","each_hashtag")
val_hash_exploded = hashtag.withColumn('each_hashtag', explode('each_hashtag'))

In [None]:
val_hash_exploded.write.save(workdir_66+"/valid_hash_exploded", format="parquet")

In [None]:
val_hash_exploded = spark.read.parquet(workdir_66+"/valid_hash_exploded")

In [None]:
Invalid_Val_ids = val_hash_exploded.select("tweet_id").\
                                           where(val_hash_exploded.each_hashtag.isin(train_hashtags)).\
                                           distinct()

In [None]:
Invalid_Val_ids_list = Invalid_Val_ids.distinct().rdd.flatMap(lambda x: x).collect()

In [None]:
## If a hashtag appeared in training set, discard this record

Validation_set_no_train = Validation_set.where(Validation_set.tweet_id.isin(Invalid_Val_ids_list) == False)

In [None]:
val_pos_sample = Validation_set_no_train.where(col("label") == 1.0).count()

In [None]:
val_pos_sample

## Test Labeling

<p><span style="font-size: 13.3333px;"><strong>All data points happended after the valid/test split time are labeled as test data. We also need to remove any records that have hashtag that appeared in training&validation set.</strong></span></p>

In [None]:
Test_set = Labled_Feat.select(Labled_Feat.create_time,\
                            Labled_Feat.tweet_id,\
                            Labled_Feat.term_features,\
                            Labled_Feat.hashtag_features,\
                            Labled_Feat.mention_features,\
                            Labled_Feat.user_features,\
                            Labled_Feat.loc_features,\
                            Labled_Feat.hashtag,\
                            Labled_Feat.label).where(col("create_time") > val_test_time)

In [None]:
test_hashtags_df = hashtag_tokenizer.transform(Test_set)
hashtag = test_hashtags_df.select("tweet_id","each_hashtag")
test_hash_exploded = hashtag.withColumn('each_hashtag', explode('each_hashtag'))

In [None]:
test_hash_exploded.write.save(workdir_66+"/te_hash_exploded", format="parquet")

In [None]:
test_hash_exploded = spark.read.parquet(workdir_66+"/te_hash_exploded")

In [None]:
Invalid_Test_ids = test_hash_exploded.select("tweet_id").where((test_hash_exploded.each_hashtag.isin(train_hashtags)) | (test_hash_exploded.each_hashtag.isin(val_hashtags))).distinct()

In [None]:
Invalid_Test_ids_list = Invalid_Test_ids.distinct().rdd.flatMap(lambda x: x).collect()

In [None]:
Test_set_no_train_no_vaild = Test_set.where(Test_set.tweet_id.isin(Invalid_Test_ids_list) == False).\
                                         dropDuplicates(['term_features', 'hashtag_features', 'mention_features', 'user_features', 'term_features'])

## Sampling data to balance label

<p><span style="font-size: 13.3333px;"><strong> Down-sampling negative data to balance out the training data</strong></span></p>

In [None]:
# Concatenate pos and neg training samples to form the final training set.
Training_set_balanced = Training_set.sampleBy("label", fractions={0.0: 0.001, 1.0: 1}, seed=0) 

In [None]:
Training_set_balanced.write.save(workdir_b9+"/Train_balanced", format="parquet")

In [None]:
Training_set_balanced = spark.read.parquet(workdir_b9+"/Train_balanced")

In [None]:
Validation_set_no_train_balanced = Validation_set_no_train.sampleBy("label", fractions={0.0: 0.0001, 1.0: 1}, seed=0) 

In [None]:
Validation_set_no_train_balanced.write.save(workdir_b9+"/Validation_balanced", format="parquet")

In [None]:
Validation_set_no_train_balanced = spark.read.parquet(workdir_b9+"/Test_balanced")

In [None]:
Test_set_no_train_no_vaild.write.save(workdir_b9+"/Test_balanced", format="parquet")

<p><span style="font-size: 13.3333px;"><strong>Now we have a much smaller dataset to work with, it is time to go ahead and concatenate the feature vectors to obtain a single feature matrix. Note that we still keep the tweet id in the output because we want to keep a mapping to the original tweet for manual examination</strong></span></p>

In [None]:
def Assembling(ds):
    assembler = VectorAssembler(inputCols = ["term_features","hashtag_features","mention_features","user_features","loc_features"], outputCol="features")
    assembled_dataset = assembler.transform(ds).\
                    select("tweet_id","create_time","features", "label")
    return assembled_dataset
    

In [None]:
Tr_Features = Assembling(Training_set_balanced)

In [None]:
Val_Features =  Assembling(Validation_set_no_train_balanced)

In [None]:
Te_Features =  Assembling(Test_set_no_train_no_vaild)

In [None]:
Train = Tr_Features.withColumn("type", lit("train"))

In [None]:
Valid = Val_Features.withColumn("type", lit("valid"))

In [None]:
combined = Train.union(Valid)

In [None]:
from pyspark.mllib.util import MLUtils

In [None]:
# Save as libSVM format.
#MLUtils.saveAsLibSVMFile(combined.select("features").rdd, workdir_b9+"/combined_dataset")

In [None]:
combined.select("features","label","type").write.save(workdir_b9+"/combined_dataset", format="parquet")

In [None]:
combined = spark.read.parquet(workdir_b9+"/combined_dataset").persist()

In [None]:
combined.show(5)

<p><span style="color: #000080;"><strong><span style="font-family: georgia, palatino, serif; font-size: 24pt;">Step Three: Training Classifier</span></strong></span></p>

<p><span style="font-size: 13.3333px;"><strong>To train an effective classifier, we need to follow two steps: 1. Feature selection 2. Parameter tunning. Here we will be using Chi-sqaure as our feature selection method and tunning L2 penalty and epoch accordingly.</strong></span></p>

## Feature Selection

<p><span style="font-size: 13.3333px;"><strong>Wrap feature selector into a pipeline, use grid search to determine the optimal number of features. Chi-Square is a similar feature selection technique as mutual information. Utilizing this as its spark-built-in.</strong></span></p>

In [None]:
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors

selector = ChiSqSelector(featuresCol="features",
                         outputCol="Features_matrix", labelCol="label")

<h1><span style="color: #000080;"><strong>Train logistic regression and Hyper Parameter Tunning</strong></span></h1>

<p><span style="font-size: 13.3333px;"><strong>We are tunning two hyperparameters for the logistic regression, namly number of features and L2 penalty</strong></span></p>

In [None]:
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#Spark does not have a built-in evaluator class for average precision or P@K. We can extend the base CrossEvaluator class
#to implement our own evaluator. Check LogLossEvaluator.py to find more details.
from LogLossEvaluator import BinaryRankingEvaluator

blor = LogisticRegression(featuresCol='Features_matrix', labelCol='label')

TrainingPipeline = Pipeline(stages=[selector,blor])

Ranker = BinaryRankingEvaluator(metric = "AP")

# Run cross-validation, and choose the best set of parameters.

<p><span style="font-size: 13.3333px;"><strong>Extending the crossValidator class to use our custom  evaluator (P@k and MAP). The Evaluator code can be found in LogLossEvaluator.py</strong></span></p>

In [None]:
import numpy as np

from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder
from pyspark.sql.functions import rand

from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from LogLossEvaluator import BinaryRankingEvaluator
from CrossValidator import CrossValidatorVerbose

result = []
Ranker = BinaryRankingEvaluator(metric = "AP")

In [None]:
# numTopFeatures: number of feature to use, last one = all features.
# regParam: L2 Penalty
# maxIter: epoch

paramGrid = ParamGridBuilder().\
    addGrid(selector.numTopFeatures, [100, 1000, 10000, 100000]).\
    addGrid(blor.regParam, [0.01, 0.1, 1, 10]).\
    addGrid(blor.maxIter, [2, 20, 100, 200, 500]).\
    build()

In [None]:
cvImplicit = CrossValidatorVerbose(estimator=TrainingPipeline, numFolds=5, estimatorParamMaps=paramGrid,evaluator=Ranker)

In [None]:
cvModel = cvImplicit.fit(combined)

In [None]:
cvModel.save(workdir_b9+"/myModel")

In [None]:
len(cvModel.avgMetrics)

In [None]:
for a,b in zip(paramGrid, cvModel.avgMetrics):
    print(b,a)
    print("\n")

# Evaluation

In [None]:
cvModel.bestModel.getParam

In [None]:
val_pred = cvModel.bestModel.transform(Val_Features)


In [None]:
te_pred = cvModel.bestModel.transform(Te_Features)
metric = Ranker.evaluate(pr)
print("Test AP: ", metric)

In [None]:
pr.select('probability', 'rawPrediction', 'prediction', 'label').where(col("label") == 1.0).show(40)

In [None]:
from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import Row

neg_slicer = VectorSlicer(inputCol="probability", outputCol="0_prob", indices=[0])

pos_slicer = VectorSlicer(inputCol="probability", outputCol="1_prob", indices=[1])


output_stg1 = neg_slicer.transform(pr)
output = pos_slicer.transform(output_stg1)


Ranked_prediction  = output.select("label","prediction","term","hashtag","from_user").sort(col("1_prob").desc())


In [None]:
full_pred = out.join(tennis_text, "tweet_id").select("label","prediction","term","hashtag","from_user")\
                                             .sort(col("1_prob").desc())


In [None]:
# full_pred.show(100, truncate = False)
# Validation_set_no_train.select("hashtag").where(col("label") == 1.0).show(100, truncate =  False)