Sentiment Classification of Tweets 


In [1]:
import re
from os.path import join
import numpy as np
from scipy.sparse import dok_matrix, csr_matrix
from itertools import count
from math import log
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn import svm
import pickle
from nltk.stem import WordNetLemmatizer
from nltk.stem.porter import *
from nltk.corpus import stopwords


In [2]:
# Define test sets
testsets = ['twitter-test1.txt', 'twitter-test2.txt', 'twitter-test3.txt']

In [3]:
# Evaluation code for the test sets
def read_test(testset):
    '''
    readin the testset and return a dictionary
    :param testset: str, the file name of the testset to compare
    '''
    id_gts = {}
    with open(testset, 'r', encoding='utf8') as fh:
        for line in fh:
            fields = line.split('\t')
            tweetid = fields[0]
            gt = fields[1]

            id_gts[tweetid] = gt

    return id_gts


def confusion(id_preds, testset, classifier):
    '''
    print the confusion matrix of {'positive', 'netative'} between preds and testset
    :param id_preds: a dictionary of predictions formated as {<tweetid>:<sentiment>, ... }
    :param testset: str, the file name of the testset to compare
    :classifier: str, the name of the classifier
    '''
    id_gts = read_test(testset)

    gts = []
    for m, c1 in id_gts.items():
        if c1 not in gts:
            gts.append(c1)

    gts = ['positive', 'negative', 'neutral']

    conf = {}
    for c1 in gts:
        conf[c1] = {}
        for c2 in gts:
            conf[c1][c2] = 0

    for tweetid, gt in id_gts.items():
        if tweetid in id_preds:
            pred = id_preds[tweetid]
        else:
            pred = 'neutral'
        conf[pred][gt] += 1

    print(''.ljust(12) + '  '.join(gts))

    for c1 in gts:
        print(c1.ljust(12), end='')
        for c2 in gts:
            if sum(conf[c1].values()) > 0:
                print('%.3f     ' % (conf[c1][c2] / float(sum(conf[c1].values()))), end='')
            else:
                print('0.000     ', end='')
        print('')

    print('')


def evaluate(id_preds, testset, classifier):
    '''
    print the macro-F1 score of {'positive', 'netative'} between preds and testset
    :param id_preds: a dictionary of predictions formated as {<tweetid>:<sentiment>, ... }
    :param testset: str, the file name of the testset to compare
    :classifier: str, the name of the classifier
    '''
    id_gts = read_test(testset)

    acc_by_class = {}
    for gt in ['positive', 'negative', 'neutral']:
        acc_by_class[gt] = {'tp': 0, 'fp': 0, 'tn': 0, 'fn': 0}

    catf1s = {}

    ok = 0
    for tweetid, gt in id_gts.items():
        if tweetid in id_preds:
            pred = id_preds[tweetid]
        else:
            pred = 'neutral'

        if gt == pred:
            ok += 1
            acc_by_class[gt]['tp'] += 1
        else:
            acc_by_class[gt]['fn'] += 1
            acc_by_class[pred]['fp'] += 1

    catcount = 0
    itemcount = 0
    macro = {'p': 0, 'r': 0, 'f1': 0}
    micro = {'p': 0, 'r': 0, 'f1': 0}
    semevalmacro = {'p': 0, 'r': 0, 'f1': 0}

    microtp = 0
    microfp = 0
    microtn = 0
    microfn = 0
    for cat, acc in acc_by_class.items():
        catcount += 1

        microtp += acc['tp']
        microfp += acc['fp']
        microtn += acc['tn']
        microfn += acc['fn']

        p = 0
        if (acc['tp'] + acc['fp']) > 0:
            p = float(acc['tp']) / (acc['tp'] + acc['fp'])

        r = 0
        if (acc['tp'] + acc['fn']) > 0:
            r = float(acc['tp']) / (acc['tp'] + acc['fn'])

        f1 = 0
        if (p + r) > 0:
            f1 = 2 * p * r / (p + r)

        catf1s[cat] = f1

        n = acc['tp'] + acc['fn']

        macro['p'] += p
        macro['r'] += r
        macro['f1'] += f1

        if cat in ['positive', 'negative']:
            semevalmacro['p'] += p
            semevalmacro['r'] += r
            semevalmacro['f1'] += f1

        itemcount += n

    micro['p'] = float(microtp) / float(microtp + microfp)
    micro['r'] = float(microtp) / float(microtp + microfn)
    micro['f1'] = 2 * float(micro['p']) * micro['r'] / float(micro['p'] + micro['r'])

    semevalmacrof1 = semevalmacro['f1'] / 2

    print(testset + ' (' + classifier + '): %.3f' % semevalmacrof1)

#### Load training set, dev set and testing set


In [4]:
lemmatizer = WordNetLemmatizer()
stemmer = PorterStemmer()
sw = list(map(lambda a:a.lower(),stopwords.words('english')))

def preprocess_tweet(tweet):
    tweet = re.sub('\n','', tweet)
    for url_form in ['http://.*','https://.*']:
        tweet = re.sub(' '+url_form, '', tweet)
    tweet = re.sub('\@[a-zA-Z0-9]+','', tweet)
    tweet = re.sub('\#[a-zA-Z0-9]+','', tweet)
    tweet = re.sub('[^a-zA-Z0-9 ]', '', tweet)
    tweet = re.sub('[0-9]+', '', tweet)
    tweet = re.sub('\b\[a-zA-Z]b', '', tweet)
    tweet = tweet.split()
    for i,token in enumerate(tweet):
        if i>0 and tweet[i-1].lower() in ['not', 'no', 'never']:
            tweet[i] = 'not_'+tweet[i]
    tweet = [word for word in map(lemmatizer.lemmatize,tweet) if word not in sw and len(word)>1]
    tweet = ' '.join(tweet)
    return tweet.lower()

# Load training set, dev set and testing set
data = {}
tweetids = {}
tweetgts = {}
tweets = {}

tweets_preprocessed = {}
tweets_preprocessed_not_split = {}
data_as_csr = {}

for dataset in ['twitter-training-data.txt', 'twitter-dev-data.txt'] + testsets:
    data[dataset] = []
    tweets[dataset] = []
    tweetids[dataset] = []
    tweetgts[dataset] = []
    
    tweets_preprocessed[dataset] = []
    tweets_preprocessed_not_split[dataset] = []

    testset_path = join('semeval-tweets', dataset)
    id_gts = {}
    vocabulary = {}
    indices = count()
    with open(testset_path, 'r', encoding='utf8') as fh:
        for line in fh:
            fields = line.split('\t')
            tweetids[dataset].append(fields[0])
            tweetgts[dataset].append(fields[1])
            tweets[dataset].append(fields[2])
            
            tweet_prep = preprocess_tweet(fields[2]).split()
            for token in tweet_prep:
                if token not in vocabulary:
                    vocabulary[token] = next(indices)
            
            tweets_preprocessed[dataset].append(tweet_prep)
            tweets_preprocessed_not_split[dataset].append(preprocess_tweet(fields[2]))
    fh.close()
        
    data[dataset] = dok_matrix
                
    #data_as_csr[dataset] = data[dataset].tocsr()

#### Build sentiment classifiers


In [5]:
f=open('subjclueslen1-HLTEMNLP05.tff','r',encoding='utf8')
lexicons={}
lexicons['positive']=set()
lexicons['negative']=set()
lexicons['neutral']=set()

type_mapping = {'positive':'positive',
               'negative':'negative',
               'neutral':'neutral',
               'both':'neutral',
               'weakneg':'negative',
               'trongneg':'negative'}

for line in f:
    l=line.split()
    lexicons[type_mapping[l[-1][14:]]].add(l[2][6:])
f.close()

In [6]:
def bayes_train(tweetgts, tweets, lexicon=False):
    vocabulary_counts_positive['{lexicon}']=0
    vocabulary_counts_negative['{lexicon}']=0
    vocabulary_counts_neutral['{lexicon}']=0
    for gts, tweet in zip(tweetgts, tweets):
        #words_so_far = []
        for token in tweet:
            if lexicon:
                if token in lexicons['positive']:
                    vocabulary_counts_positive['{lexicon}']+=1
                if token in lexicons['negative']:
                    vocabulary_counts_negative['{lexicon}']+=1
                if token in lexicons['neutral']:
                    vocabulary_counts_neutral['{lexicon}']+=1
            #if token in words_so_far:
                #continue
            #words_so_far.append(token)
            if token not in vocabulary:
                vocabulary.add(token)
            if gts=='positive':
                d = vocabulary_counts_positive
            elif gts=='negative':
                d = vocabulary_counts_negative
            elif gts=='neutral':
                d = vocabulary_counts_neutral
            else:
                print("problem")
            try:
                d[token]+=1
            except KeyError:
                d[token]=1

def bayes_n_doc(c):
    return len([tc for tc in tweetgts['twitter-training-data.txt'] if tc == c])

def bayes_log_prior(c):
    return log(bayes_n_doc(c)/len(tweetgts['twitter-training-data.txt']),10)

def bayes_count(w,c):
    try:
        if c=='positive':
            return vocabulary_counts_positive[w]
        if c=='negative':
            return vocabulary_counts_negative[w]
        if c=='neutral':
            return vocabulary_counts_neutral[w]
    except KeyError:
        return 0
    
def bayes_log_likelihood(w,c):
    if c=='positive':
        d=vocabulary_counts_positive
    if c=='negative':
        d=vocabulary_counts_negative
    if c=='neutral':
        d=vocabulary_counts_neutral
    return log((bayes_count(w,c)+1)/(sum(d.values())+len(vocabulary)),10)

def bayes_sum(tweet,c,lexicon):
    s = bayes_log_prior(c)
    for token in tweet:
        if lexicon:
            if token in lexicons['positive'] and c=='positive':
                s = s + bayes_log_likelihood('{lexicon}',c)
                continue
            if token in lexicons['negative'] and c=='negative':
                s = s + bayes_log_likelihood('{lexicon}',c)
                continue
            if token in lexicons['neutral'] and c=='neutral':
                s = s + bayes_log_likelihood('{lexicon}',c)
                continue
        if token in vocabulary:
            s = s + bayes_log_likelihood(token,c)
    return s

def bayes_predict(tweet,lexicon=False):
    cat_key = {0:'positive',1:'negative',2:'neutral'}
    likelihoods = [bayes_sum(tweet,c,lexicon) for c in ['positive','negative','neutral']]
    return cat_key[likelihoods.index(max(likelihoods))]

In [7]:
def classes_for_svm(tweetgts):
    return list(map(lambda a: 0 if a=='negative' else 1 if a=='neutral' else 2 if a=='positive' else None,tweetgts))

In [9]:
svmoutputtoclass = lambda a: 'negative' if a==0 else 'neutral' if a==1 else 'positive' if a==2 else None

for classifier in ['naive_bayes', 'svm']:

    print('Training',classifier)
        
    if classifier=='naive_bayes':
        #continue 
        for features in ['bow','bow+lexicons']:
            for testset in ['twitter-dev-data.txt']+testsets:
                id_preds = {}
                vocabulary = set()
                vocabulary_counts_positive = {}
                vocabulary_counts_negative = {}
                vocabulary_counts_neutral = {}
                bayes_train(tweetgts['twitter-training-data.txt'],tweets_preprocessed['twitter-training-data.txt'],
                            lexicon=True if features=='bow+lexicons' else False)
                for tweetid,tweet in zip(tweetids[testset],tweets_preprocessed[testset]):
                    id_preds[tweetid] = bayes_predict(tweet,lexicon=True if features=='bow+lexicons' else False)   
                testset_name = testset
                testset_path = join('semeval-tweets', testset_name)
                evaluate(id_preds, testset_path, features+'-'+classifier)
            
    if classifier=='svm':
        #continue
        vectorizer = TfidfVectorizer(min_df = 5,max_df = 0.8,sublinear_tf = True,use_idf = True)
        train_vectors = vectorizer.fit_transform(tweets_preprocessed_not_split['twitter-training-data.txt'])
        #uncomment this to build SVM model instead of loading from pickle
        #clf = svm.SVC()
        #clf.fit(train_vectors, classes_for_svm(tweetgts['twitter-training-data.txt']))
        clf = pickle.load( open( "svmmodel.p", "rb" ) )
        for testset in ['twitter-dev-data.txt']+testsets:
            id_preds = {}
            test_vectors = vectorizer.transform(tweets_preprocessed_not_split[testset])
            dev_set_predictions = clf.predict(test_vectors)
            for i in range(len(tweetids[testset])):              
                id_preds[tweetids[testset][i]] = svmoutputtoclass(dev_set_predictions[i])
    
            testset_name = testset
            testset_path = join('semeval-tweets', testset_name)
            evaluate(id_preds, testset_path, classifier)

Training naive_bayes
semeval-tweets\twitter-dev-data.txt (bow-naive_bayes): 0.598
semeval-tweets\twitter-test1.txt (bow-naive_bayes): 0.486
semeval-tweets\twitter-test2.txt (bow-naive_bayes): 0.468
semeval-tweets\twitter-test3.txt (bow-naive_bayes): 0.485
semeval-tweets\twitter-dev-data.txt (bow+lexicons-naive_bayes): 0.562
semeval-tweets\twitter-test1.txt (bow+lexicons-naive_bayes): 0.512
semeval-tweets\twitter-test2.txt (bow+lexicons-naive_bayes): 0.506
semeval-tweets\twitter-test3.txt (bow+lexicons-naive_bayes): 0.508
Training svm
semeval-tweets\twitter-dev-data.txt (svm): 0.577
semeval-tweets\twitter-test1.txt (svm): 0.498
semeval-tweets\twitter-test2.txt (svm): 0.522
semeval-tweets\twitter-test3.txt (svm): 0.495
