# Sentiment Analysis (Big Data Environment)

## Tweets Dataset

Dataset : sentiment140 corpus, comprised of about 1.6M tweets, each with below information

1.Polarity of tweet: 0 = -ve, 2 = neutral, 4 = +ve <br>
2.ID of tweet <br>
3.Date of tweet <br>
4.The query (if available) <br>
5.The user of the tweet<br>
6.The text of the tweet

Also, AFINN wordlist have been used, it provides a sentiment score for 2476 English <br>
words to achieve a better accuracy, Each row of this dataset is:

1.Word <br>
2.Score

### Data Preparation

Loading data into HIVE table

Create a spark session

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession. \
builder. \
config("spark.sql.warehouse.dir","/user/itv000684/warehouse"). \
appName("sentiment Analysis"). \
enableHiveSupport(). \
master("yarn"). \
getOrCreate()

In [3]:
spark.sql("SELECT current_database()").show()

+------------------+
|current_database()|
+------------------+
|           default|
+------------------+



In [19]:
#spark.sql("CREATE DATABASE IF NOT EXISTS itv000684_tweetdata")

In [4]:
spark.sql("USE itv000684_tweetdata")

In [5]:
spark.sql("SELECT current_database()").show()

+-------------------+
| current_database()|
+-------------------+
|itv000684_tweetdata|
+-------------------+



In [51]:
#spark.sql("DROP TABLE tweets")

In [6]:
spark.sql("SHOW TABLES").show()

+-------------------+---------------+-----------+
|           database|      tableName|isTemporary|
+-------------------+---------------+-----------+
|itv000684_tweetdata|sentiment_words|      false|
|itv000684_tweetdata|         tweets|      false|
+-------------------+---------------+-----------+



In [30]:
#spark.sql("DROP TABLE tweets")

In [31]:
spark.sql("""CREATE TABLE tweets (
polarity INT,
id STRING,
date STRING,
query STRING,
user STRING,
text STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
tblproperties("skip.header.line.count" = "0")
""")

In [26]:
spark.sql("""CREATE TABLE sentiment_words (
word STRING,
score INT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
""")

In [7]:
spark.sql("SHOW TABLES").show()

+-------------------+---------------+-----------+
|           database|      tableName|isTemporary|
+-------------------+---------------+-----------+
|itv000684_tweetdata|sentiment_words|      false|
|itv000684_tweetdata|         tweets|      false|
+-------------------+---------------+-----------+



In [32]:
spark.sql("LOAD DATA INPATH '/user/itv000684/sentiment_analysis/data' INTO TABLE tweets")

In [34]:
spark.sql("LOAD DATA INPATH '/user/itv000684/sentiment_analysis/wordlist' INTO TABLE sentiment_words")

In [8]:
spark.sql("SELECT * FROM tweets").show()

+--------+----------+--------------------+--------+---------------+--------------------+
|polarity|        id|                date|   query|           user|                text|
+--------+----------+--------------------+--------+---------------+--------------------+
|       0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|       0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|       0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|       0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|       0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|       0|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|       0|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|       0|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|       0|1467811795|

In [35]:
datas = spark.read.format('csv'). \
schema('''polarity INT,
id STRING,
date STRING,
query STRING,
user STRING,
text STRING
'''). \
load('/user/itv000684/sentiment_analysis/data')

In [36]:
datas

polarity,id,date,query,user,text
0,1467810369,Mon Apr 06 22:19:...,NO_QUERY,_TheSpecialOne_,@switchfoot http:...
0,1467810672,Mon Apr 06 22:19:...,NO_QUERY,scotthamilton,is upset that he ...
0,1467810917,Mon Apr 06 22:19:...,NO_QUERY,mattycus,@Kenichan I dived...
0,1467811184,Mon Apr 06 22:19:...,NO_QUERY,ElleCTF,my whole body fee...
0,1467811193,Mon Apr 06 22:19:...,NO_QUERY,Karoli,@nationwideclass ...
0,1467811372,Mon Apr 06 22:20:...,NO_QUERY,joy_wolf,@Kwesidei not the...
0,1467811592,Mon Apr 06 22:20:...,NO_QUERY,mybirch,Need a hug
0,1467811594,Mon Apr 06 22:20:...,NO_QUERY,coZZ,@LOLTrish hey lo...
0,1467811795,Mon Apr 06 22:20:...,NO_QUERY,2Hood4Hollywood,@Tatiana_K nope t...
0,1467812025,Mon Apr 06 22:20:...,NO_QUERY,mimismo,@twittera que me ...


In [37]:
datas.write.saveAsTable(name='tweets', format='orc', mode='overwrite')

In [38]:
spark.sql("SELECT * FROM tweets").show()

+--------+----------+--------------------+--------+---------------+--------------------+
|polarity|        id|                date|   query|           user|                text|
+--------+----------+--------------------+--------+---------------+--------------------+
|       0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|       0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|       0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|       0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|       0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|       0|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|       0|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|       0|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|       0|1467811795|

In [40]:
spark.sql("SELECT count(*) FROM tweets").show()

+--------+
|count(1)|
+--------+
| 1600498|
+--------+



In [41]:
spark.sql("SELECT * FROM sentiment_words").show()

+----------+-----+
|      word|score|
+----------+-----+
|   abandon|   -2|
| abandoned|   -2|
|  abandons|   -2|
|  abducted|   -2|
| abduction|   -2|
|abductions|   -2|
|     abhor|   -3|
|  abhorred|   -3|
| abhorrent|   -3|
|    abhors|   -3|
| abilities|    2|
|   ability|    2|
|    aboard|    1|
|  absentee|   -1|
| absentees|   -1|
|   absolve|    2|
|  absolved|    2|
|  absolves|    2|
| absolving|    2|
|  absorbed|    1|
+----------+-----+
only showing top 20 rows



In [43]:
spark.sql("SELECT count(*) FROM sentiment_words").show()

+--------+
|count(1)|
+--------+
|    2477|
+--------+



We have successfully loaded data into hive tables

### Feature Generation

We Represent each tweet as follows : <br>
1 A TF-IDF based vector of words from the text of the tweet.<br>
2 A +ve & -ve sentiment score, based on sentiment, wordlist
3 Temporal features including month, day-of-week and hour-of-day

In [5]:
from pyspark.conf import SparkConf

In [6]:
spark.sparkContext._conf.getAll()

[('spark.eventLog.enabled', 'true'),
 ('spark.sql.repl.eagerEval.enabled', 'true'),
 ('spark.eventLog.dir', 'hdfs:///spark-logs'),
 ('spark.app.name', 'sentiment Analysis'),
 ('spark.yarn.historyServer.address', 'm02.itversity.com:18080'),
 ('spark.yarn.jars', ''),
 ('spark.history.provider',
  'org.apache.spark.deploy.history.FsHistoryProvider'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.history.fs.logDirectory', 'hdfs:///spark-logs'),
 ('spark.submit.deployMode', 'client'),
 ('spark.history.fs.update.interval', '10s'),
 ('spark.driver.extraJavaOptions', '-Dderby.system.home=/tmp/derby/'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS',
  'm02.itversity.com'),
 ('spark.ui.filters',
  'org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter'),
 ('spark.driver.port', '35235'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES',
  'http://m02.itversity.com:19088/proxy/application_16247098913

In [11]:
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '4g'), ('spark.executor.instances',4),("spark.sql.warehouse.dir","/user/itv000684/warehouse"),('spark.sql.autoBroadcastJoinThreshold', 400 * 1024 * 1024)])

In [10]:
spark.sparkContext.stop()

In [12]:
spark = SparkSession.builder.config(conf=conf). \
appName("sentiment Analysis"). \
enableHiveSupport(). \
master("local"). \
getOrCreate()

In [13]:
from pyspark import HiveContext
hc = HiveContext(spark.sparkContext)

In [1]:
import re,string
import pyspark.sql.functions as func
from pyspark.sql.types import StringType, ArrayType, FloatType

puncture = re.compile('[%s]' % re.escape(string.punctuation))
def token_string(text, ngrams = 1, minChars = 2) :
    #remove whitespace
    text = re.sub(r'\s+', ' ', text)
    #change to lowecase and split into tokens
    tokens = map(unicode, text.lower().split(' '))
    #remove short words and usernames
    tokens = filter(lambda x: len(x) >= minChars and x[0]!='@', tokens)
    #replace any links by URL
    tokens = ["URL" if t[:4] == "http" else t for t in tokens]
    #remove punctuation from tokens
    tokens = [puncture.sub('', t) for t in tokens]
    if ngrams==1:
        return tokens
    else:
        return tokens + [' '.join(tokens[i:i+ngrams]) for i in xrange(len(tokens)-ngrams+1)]
tokenizer = func.udf(lambda s: token_string(unicode(s), ngrams = 2), ArrayType(StringType()))

In [3]:
tokenizer("Now ,you can e,xecute the code")

Column<b'<lambda>(Now ,you can e,xecute the code)'>