In [2]:
from pyspark import SparkConf, SparkContext, SQLContext
import atexit

conf = SparkConf().setAppName("TwitterSentiment")
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
atexit.register(lambda: sc.stop())

<function __main__.<lambda>>

In [1]:
import re

def removeURLS(text):
    text = text.lower().strip()
    text = re.sub('((www\.[^\s]+)|(https?://[^\s]+))', '', text)
    #text = str(text.encode('utf-8') if text else text)   
    return text

In [2]:
rdd = (sc.textFile('hdfs://spark.rcg.usm.maine.edu:9000/sgangarapu/tweetTrain.csv').map(removeURLS))

In [3]:
rdd.take(5)

[u'kat,"apr 15, 2016 7:12:04 pm",style ',
 u'dee,"apr 15, 2016 7:12:05 pm","morning bonding with my buddy.. \U0001f4aa\U0001f3fb\U0001f600\U0001f3c3\U0001f3fd @ st. jude church, malacanang palace ',
 u'mike stunson,"apr 15, 2016 7:12:05 pm",bases loaded and two outs for cam kelly in the first for murray.',
 u'"\u2022\u2022princ\xa3 ak\u20ac\u20acm\u2022\u2022","apr 15, 2016 7:12:05 pm",rt @showersideas: why is donkey kong called donkey kong when he\'s a monkey?',
 u'macc buddha,"apr 15, 2016 7:12:05 pm","rt @giants: fact/fiction - #giants vs. packers is the best game on big blue\'s schedule?']

In [1]:
df = sqlCtx.read.format('com.databricks.spark.csv').option('header', 'false').load('hdfs://spark.rcg.usm.maine.edu:9000/sgangarapu/tweetTrain.csv')

In [3]:
df.dtypes

[('C0', 'string'), ('C1', 'string'), ('C2', 'string')]

In [4]:
df.count()

302164

In [2]:
df = df.drop('C0')
df = df.drop('C1')
df = df.selectExpr("C2 as text")
df = df.na.drop()
df.show()

+--------------------+
|                text|
+--------------------+
|style https://t.c...|
|Morning bonding w...|
|Bases loaded and ...|
|RT @ShowersIdeas:...|
|@1LoganHenderson ...|
|RT @fubaglady: I ...|
|#INTL morning.. c...|
|.@DRM00RE @Ass2Da...|
|Received this won...|
|RT @jesse_duplant...|
|Evangelio y Vida ...|
|@Wronganswerpal a...|
|RT @AldubEUzone_G...|
|Manchester City a...|
|2016 Lamborghini ...|
|RT @bruhitszach: ...|
|RT @GREATBLACKOTA...|
|The Heart And Sou...|
|اني اراك بعين قلب...|
|only real niggas ...|
+--------------------+



In [8]:
df.count()

227994

In [3]:
rd = df.map(list)
rd.take(5)

[[u'style https://t.co/DEaspvM7bV'],
 [u'Morning bonding with my buddy.. \U0001f4aa\U0001f3fb\U0001f600\U0001f3c3\U0001f3fd @ St. Jude Church, Malacanang Palace https://t.co/M5Xg7ujF0S'],
 [u'Bases loaded and two outs for Cam Kelly in the first for Murray.'],
 [u"RT @ShowersIdeas: Why is Donkey Kong called Donkey Kong when he's a monkey?"],
 [u"@1LoganHenderson I'M SO EXCITED"]]

In [25]:
rd.count()

227994

In [4]:
import re

rd = rd.map(lambda x: re.sub('[^a-zA-Z\\s]', '', x[0]).lower())
rd.take(5)

[u'style httpstcodeaspvmbv',
 u'morning bonding with my buddy   st jude church malacanang palace httpstcomxgujfs',
 u'bases loaded and two outs for cam kelly in the first for murray',
 u'rt showersideas why is donkey kong called donkey kong when hes a monkey',
 u'loganhenderson im so excited']

In [5]:
stopWords = []
stopWords = sc.textFile("hdfs://spark.rcg.usm.maine.edu:9000/sgangarapu/stop-words.txt").flatMap(lambda x: x.split(' ')).collect()

In [6]:
from pyspark.sql import Row

row = Row("Text") # Or some other column name
data = rd.map(row).toDF()
data = data.na.drop()
data.show()

+--------------------+
|                Text|
+--------------------+
|style httpstcodea...|
|morning bonding w...|
|bases loaded and ...|
|rt showersideas w...|
|loganhenderson im...|
|rt fubaglady i la...|
|intl morning come...|
|drmre assday  htt...|
|received this won...|
|rt jesseduplantis...|
|evangelio y vida ...|
|wronganswerpal ag...|
|rt aldubeuzoneger...|
|manchester city a...|
| lamborghini hura...|
|rt bruhitszach fo...|
|rt greatblackotak...|
|the heart and sou...|
|                    |
|only real niggas ...|
+--------------------+



In [7]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

def f(s):
    tokens = s.split(' ')
    return [t for t in tokens if t.lower() not in stopWords]

t = ArrayType(StringType())
data1 = data.withColumn('Filtered', udf(f, t)(data['Text']))

In [8]:
data1.show()

+--------------------+--------------------+
|                Text|            Filtered|
+--------------------+--------------------+
|style httpstcodea...|ArrayBuffer(style...|
|morning bonding w...|ArrayBuffer(morni...|
|bases loaded and ...|ArrayBuffer(bases...|
|rt showersideas w...|ArrayBuffer(showe...|
|loganhenderson im...|ArrayBuffer(logan...|
|rt fubaglady i la...|ArrayBuffer(fubag...|
|intl morning come...|ArrayBuffer(intl,...|
|drmre assday  htt...|ArrayBuffer(drmre...|
|received this won...|ArrayBuffer(recei...|
|rt jesseduplantis...|ArrayBuffer(jesse...|
|evangelio y vida ...|ArrayBuffer(evang...|
|wronganswerpal ag...|ArrayBuffer(wrong...|
|rt aldubeuzoneger...|ArrayBuffer(aldub...|
|manchester city a...|ArrayBuffer(manch...|
| lamborghini hura...|ArrayBuffer(lambo...|
|rt bruhitszach fo...|ArrayBuffer(bruhi...|
|rt greatblackotak...|ArrayBuffer(great...|
|the heart and sou...|ArrayBuffer(heart...|
|                    |       ArrayBuffer()|
|only real niggas ...|ArrayBuffe

In [9]:
posWords = []
posWords = sc.textFile("hdfs://spark.rcg.usm.maine.edu:9000/sgangarapu/pos-words.txt").flatMap(lambda x: x.split(' ')).collect()

negWords = []
negWords = sc.textFile("hdfs://spark.rcg.usm.maine.edu:9000/sgangarapu/neg-words.txt").flatMap(lambda x: x.split(' ')).collect()

In [12]:
from pyspark.sql.types import FloatType

def k(s):
    numWords = len(s)
    numPosWords = 0
    numNegWords = 0
    for t in s:
        if t in posWords:
            numPosWords += 1
        elif t in negWords:
            numNegWords += 1
    return [numPosWords, numNegWords]

data2 = data1.withColumn('PosNeg', udf(k, ArrayType(FloatType()))(data1['Filtered']))

In [13]:
data2.show()

+--------------------+--------------------+-----------------+
|                Text|            Filtered|           PosNeg|
+--------------------+--------------------+-----------------+
|style httpstcodea...|ArrayBuffer(style...|ArrayBuffer(0, 0)|
|morning bonding w...|ArrayBuffer(morni...|ArrayBuffer(0, 0)|
|bases loaded and ...|ArrayBuffer(bases...|ArrayBuffer(0, 0)|
|rt showersideas w...|ArrayBuffer(showe...|ArrayBuffer(0, 0)|
|loganhenderson im...|ArrayBuffer(logan...|ArrayBuffer(1, 0)|
|rt fubaglady i la...|ArrayBuffer(fubag...|ArrayBuffer(0, 1)|
|intl morning come...|ArrayBuffer(intl,...|ArrayBuffer(0, 0)|
|drmre assday  htt...|ArrayBuffer(drmre...|ArrayBuffer(1, 0)|
|received this won...|ArrayBuffer(recei...|ArrayBuffer(1, 0)|
|rt jesseduplantis...|ArrayBuffer(jesse...|ArrayBuffer(4, 0)|
|evangelio y vida ...|ArrayBuffer(evang...|ArrayBuffer(0, 0)|
|wronganswerpal ag...|ArrayBuffer(wrong...|ArrayBuffer(0, 1)|
|rt aldubeuzoneger...|ArrayBuffer(aldub...|ArrayBuffer(0, 0)|
|manches

In [19]:
def l(s):
    if s[0] > s[1]:
        return 'positive'
    elif s[1] > s[0]:
        return 'negative'
    else:
        return 'neutral'

data2 = data2.withColumn('Senti', udf(l, StringType())(data2['PosNeg']))

In [20]:
data2.show()

+--------------------+--------------------+-----------------+--------+
|                Text|            Filtered|           PosNeg|   Senti|
+--------------------+--------------------+-----------------+--------+
|style httpstcodea...|ArrayBuffer(style...|ArrayBuffer(0, 0)| neutral|
|morning bonding w...|ArrayBuffer(morni...|ArrayBuffer(0, 0)| neutral|
|bases loaded and ...|ArrayBuffer(bases...|ArrayBuffer(0, 0)| neutral|
|rt showersideas w...|ArrayBuffer(showe...|ArrayBuffer(0, 0)| neutral|
|loganhenderson im...|ArrayBuffer(logan...|ArrayBuffer(1, 0)|positive|
|rt fubaglady i la...|ArrayBuffer(fubag...|ArrayBuffer(0, 1)|negative|
|intl morning come...|ArrayBuffer(intl,...|ArrayBuffer(0, 0)| neutral|
|drmre assday  htt...|ArrayBuffer(drmre...|ArrayBuffer(1, 0)|positive|
|received this won...|ArrayBuffer(recei...|ArrayBuffer(1, 0)|positive|
|rt jesseduplantis...|ArrayBuffer(jesse...|ArrayBuffer(4, 0)|positive|
|evangelio y vida ...|ArrayBuffer(evang...|ArrayBuffer(0, 0)| neutral|
|wrong

In [1]:
sc.stop()