In [1]:
import findspark
findspark.init()

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

from glob import glob
import numpy as np

from nltk.sentiment.vader import SentimentIntensityAnalyzer



In [2]:
sparkConf = SparkConf()
sparkConf.setAppName("Sentiment Metadata")
sparkConf.setMaster("local[20]")
sparkConf.setAll([
    ("spark.local.dir", "./tmp",)
])

sc = SparkContext(conf=sparkConf)
spark = SparkSession(sc)

In [3]:
FILENAMES=glob("/homed/smishra8/backupdisk/DataFiles/TwitterDatasets/SentimentMetaDataUsers/user_timelines.[0-9].json")
FILENAMES

['/homed/smishra8/backupdisk/DataFiles/TwitterDatasets/SentimentMetaDataUsers/user_timelines.0.json',
 '/homed/smishra8/backupdisk/DataFiles/TwitterDatasets/SentimentMetaDataUsers/user_timelines.1.json',
 '/homed/smishra8/backupdisk/DataFiles/TwitterDatasets/SentimentMetaDataUsers/user_timelines.2.json']

In [None]:
%%time
tweet_rdd = spark.read.json("/homed/smishra8/backupdisk/DataFiles/TwitterDatasets/SentimentMetaDataUsers/user_timelines.*.json")

In [None]:
tweet_rdd.head(1)

## Get sentiment

In [None]:
sid = SentimentIntensityAnalyzer()

In [None]:
tweet_rdd.select("full_text").head()

In [None]:
def parse_row(row):
    # Tweet metadata
    tid = row.id
    hashtags = row.entities.hashtags
    urls = row.entities.urls
    user_mentions = row.entities.user_mentions
    
    len_hashtags = 0 if not hashtags else len(hashtags)
    len_urls = 0 if not urls else len(urls)
    len_user_mentions = 0 if not user_mentions else len(user_mentions)
    
    created_at = row.created_at
    is_reply = row.in_reply_to_status_id is not None
    is_quoted = row.is_quote_status
    is_retweet = row.retweeted
    
    # user metadata
    user = row.user
    uid = user.id
    
    user_since = user.created_at
    is_verified = user.verified
    
    main_url = user.url
    desc_url = False if not user.entities.description else user.entities.description.urls
    url_url = False if not user.entities.url else user.entities.url.urls
    has_url = bool(main_url or desc_url or url_url)
    
    followers_count = user.followers_count
    friends_count = user.friends_count
    statuses_count = user.statuses_count
        
    # Vader scores
    scores = sid.polarity_scores(row.full_text)
    return (
        (tid, len_hashtags, len_urls, len_user_mentions, created_at, is_reply, is_quoted, is_retweet) 
        + (uid, user_since, is_verified, has_url, followers_count, friends_count, statuses_count)
        + (scores["compound"], scores["neg"], scores["neu"], scores["pos"])
    )

In [None]:
tweet_rdd.rdd.map(parse_row).take(2)

In [None]:
def tuple2tsv(x):
    return "\t".join(["{}"]*len(x)).format(*[str(k) for k in x])

In [None]:
metadata_string = tweet_rdd.rdd.map(parse_row).map(tuple2tsv)

In [None]:
metadata_string.take(2)

In [None]:
metadata_string.saveAsTextFile("./output/metadata")

In [14]:
print("Done")

Done
