# Tweets Dataset
In this example, we use the sentiment140 corpus, comprised of about 1.6M tweets, each with the following information:
- Polarity: 0 = negative, 2 = neutral, 4 = positive
- ID
- Date
- Query
- User
- Text

We also used the AFINN wordlist which provides a **sentiment score** for 2476 English words to help our classifier achieve better accuracy. Each row of this dataset is:
- Word
- Score

# Data Preparation

In [3]:
import pandas as pd

In [4]:
# read raw data directly into spark
#df_tweets = sqlContext.read.format("csv").load("/FileStore/tables/pw4anhpx1491487613286/")

# if data is small enough, use pandas
df_tweets = pd.read_csv("/dbfs/FileStore/tables/pw4anhpx1491487613286/training_1600000_processed_noemoticon-efba6.csv", 
                  header=None, 
                  names=["polarity", "id", "date", "query", "user", "text"])

In [5]:
# read raw data directly into spark
#df_wordlist = sqlContext.read.format("csv").load("/FileStore/tables/pw4anhpx1491487613286/")

# if data is small enough, use pandas
df_wordlist = pd.read_csv("/dbfs/FileStore/tables/pw4anhpx1491487613286/AFINN_111-47bc9.txt", 
                 sep="\t",
                 header=None, 
                 names=["word", "score"])

# Feature Generation
We represent each teet as follows:
- A TF-IDF vector of words from the text of the tweet
- A positive and negative sentiment score, based on the sentiment wordlist
- Temporal features including month, day-of-week, and hour-of-day

### Define Spark UDFs

In [8]:
import re, string
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, ArrayType, FloatType

# Define PySpark UDF to tokenize text into words with various other specialized processing
punct = re.compile('[%s]' % re.escape(string.punctuation))

def tok_str(text, ngrams=1, minChars=2):
  # change any whitespace to regular space
  text = re.sub(r'\s+', ' ', text)
  
  # split into tokens and change to lower case
  tokens = map(unicode, text.lower().split(' '))
  
  # remove short words and usernames
  tokens = filter(lambda x: len(x) >= minChars and x[0] != '@', tokens)
  
  # replace any url by the constant word "URL"
  tokens = ["URL" if t[:4] == "http" else t for t in tokens]
  
  # remove punctuation from tokens
  tokens = [punct.sub('', t) for t in tokens]
  
  if ngrams==1:
    return tokens
  else:
    return tokens + [' '.join(tokens[i:i+ngrams]) for i in xrange(len(tokens) - ngrams + 1)]
  
tokenize = F.udf(lambda s: tok_str(unicode(s), ngrams=2), ArrayType(StringType()))

In [9]:
wordlist = dict([(r[0], r[1]) for r in df_wordlist.iterrows()])

# Define PySpark UDF to get sentiment score using word-list
def pscore(words):
  scores = filter(lambda x: x > 0, [wordlist[t] for t in words if t in wordlist])
  return 0.0 if len(scores) == 0 else (float(sum(scores)) / len(scores))

pos_score = F.udf(lambda w: pscore(w), FloatType())

def nscore(words):
  scores = filter(lambda x: x < 0, [wordlist[t] for t in words if t in wordlist])
  return 0.0 if len(scores) == 0 else (float(sum(scores)) / len(scores))

neg_score = F.udf(lambda w: nscore(w), FloatType())

### Transform Data

In [11]:
df_tweets.head()

In [12]:
df_tweets_selected = df_tweets.loc[:, ['text', 'query', 'polarity', 'date']]

hour_criteria = '([0-9]{2}):([0-9]{2}):([0-9]{2})'
dayofweek_criteria = '(Sun|Mon|Tue|Wed|Thu|Fri|Sat)'
month_criteria = '(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)'

df_tweets_selected['hour'] = [re.search(hour_criteria, x).group(1) for x in df_tweets_selected['date']]
df_tweets_selected['dayofweek'] = [re.search(dayofweek_criteria, x).group(1) for x in df_tweets_selected['date']]
df_tweets_selected['month'] = [re.search(month_criteria, x).group(1) for x in df_tweets_selected['date']]

In [13]:
tw = sqlContext.createDataFrame(df_tweets_selected)
tw.show(5)

In [14]:
# filter out tweets with neutral sentiment
tw2 = tw.filter("polarity != 2").withColumn('words', tokenize(tw['text']))

# use tokenize UDF to transform text into bag of words
tw3 = (tw2.select("hour", "dayofweek", "month", "words",
                 F.when(tw2.polarity == 4, "Pos").otherwise("Neg").alias("sentiment"),
                 pos_score(tw2["words"]).alias("pscore"),
                 neg_score(tw2["words"]).alias("nscore")))

# store resulting feature matrix in Spark SQL temporary table
tw3.registerTempTable("fm")

# Train Classifier

### Pipeline

In [17]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorAssembler, IDF, RegexTokenizer, HashingTF
from pyspark.ml import Pipeline

# parameters for modeling
numFeatures = 5000
minDocFreq = 50
numTrees = 100

# transform string variables to categorical variables
inx1 = StringIndexer(inputCol="hour", outputCol="hour-inx")
inx2 = StringIndexer(inputCol="month", outputCol="month-inx")
inx3 = StringIndexer(inputCol="dayofweek", outputCol="dow-inx")
inx4 = StringIndexer(inputCol="sentiment", outputCol="label")

# compute TF-IDF on the words list for each tweer
hashingTF = HashingTF(numFeatures=numFeatures, inputCol="words", outputCol="hash-tf")
idf = IDF(minDocFreq=minDocFreq, inputCol="hash-tf", outputCol="hash-tfidf")

# combines all features into a single feature vector
va = VectorAssembler(inputCols=["hour-inx", "month-inx", "dow-inx", "hash-tfidf", "pscore", "nscore"], outputCol="features")

# train using random forest algorithm
rf = RandomForestClassifier(numTrees=numTrees, maxDepth=4, maxBins=32, labelCol="label", seed=42)

# build ML pipeline
p = Pipeline(stages=[inx1, inx2, inx3, inx4, hashingTF, idf, va, rf])

### Modeling

In [19]:
# train test split
(trainSet, testSet) = sqlContext.table("fm").randomSplit([0.7, 0.3])

trainData = trainSet.cache()
testData = testSet.cache()

model = p.fit(trainData)

### Evaluation

In [21]:
def eval_metrics(lap):
  tp = float(len(lap[(lap['label'] == 1) & (lap['prediction'] == 1)]))
  tn = float(len(lap[(lap['label'] == 0) & (lap['prediction'] == 0)]))
  fp = float(len(lap[(lap['label'] == 0) & (lap['prediction'] == 1)]))
  fn = float(len(lap[(lap['label'] == 1) & (lap['prediction'] == 0)]))
  precision = tp / (tp + fp)
  recall = tp / (tp + fn)
  accuracy = (tp + tn) / (tp + tn + fp + fn)
  return {'precision': precision, 'recall': recall, 'accuracy': accuracy}

In [22]:
# Predict using test data
results = model.transform(testData)
lap = results.select("label", "prediction").toPandas()
m = eval_metrics(lap)

print(m)