# Loading the Data

In [13]:
def loadTwitterData(filePath):
    
    df = spark.read.json(filePath + '*.gz')
    df2 = df.select('body','gnip.matching_rules.tag', \
                    'gnip.matching_rules.value', \
                    'postedTime', 'retweetCount').filter(df.twitter_lang == "en").na.drop()
    df2 = df2.withColumn('date', df2['postedTime'].cast('date'))
    df2 = df2.withColumn('tag', df2['tag'].cast('string')) \
         .withColumn('value', df2['value'].cast('string')) 
    df2 = df2.withColumnRenamed("tag", "movieName")
    df2 = df2.withColumnRenamed("value", "searchPattern")
    return df2

# Running the VADER Classifier

In [14]:
from pyspark.sql.functions import col, udf, avg

# sentiment analysis
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

# classifying vader scores into bins
from pyspark.ml.feature import Bucketizer

In [15]:
analyzer = SentimentIntensityAnalyzer()


def compoundScore(text):
    """
    The proportion of words in a tweet that are classified as neutral

    INPUTS:
    text = a Spark column of tweets
    
    OTHER FUNCTIONS AND FILES CALLED BY THIS FUNCTION: 
    -  vaderSentiment polarity analyzer: imported as SentimentIntensityAnalyzer()
    
    OBJECTS CREATED WITHIN THIS FUNCTION:
    - compoundScores = a Spark SQL function that sums the valence scores of each
        words in the lexicon, and normalizes the result to be between 
        -1 (most extreme negative) and +1 (most extreme positive)
    - neutralScores_udf = a column of a Spark DataFrame where each row is the 
        normalized result from the lexicon
    
    FILES CREATED BY THIS FUNCTION: None
    
    RETURNS: 
    - neutralScores_udf - a column of proportions of neutral words as classified
        under VADER    
    """
    
    compoundScores = analyzer.polarity_scores(text).get('compound')
    return compoundScores

compound_udf = udf(compoundScore)

In [16]:
def returnCompoundScore(dataset, textColumn = 'body' , outputColumn = 'vaderScore'):
    
    print 'Computing VADER Scores for each tweet'
    sentiment = dataset.withColumn(outputColumn, compound_udf(col(textColumn)).cast('Double'))
    
    return sentiment

def vaderClassify(dataset, textColumn, thresholds = [-1.0, -0.5, 0.5, 1.0]):
    
    
#     # return vader score from text as column 'vaderScore'
#     outCol  = 'vaderScore'
#     sentimentData = returnCompoundScore(dataset, textColumn, outCol)
    
    print 'Classifying all tweets in to buckets using the cutoffs', thresholds[1], 'and', thresholds [2]
    
    # classify using thresholds, returns a Classifier
    bucketizer = Bucketizer(splits = thresholds, inputCol = "vaderScore", outputCol = "vaderClassifier")
    
    print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits())-1))
    
    bucketedData = bucketizer.transform(dataset)
    return bucketedData

## Finding Unique Movies to Filter on

In [17]:
from pyspark.sql.functions import col

In [18]:
def uniqueMovies(dataset, movieKey):
    # find all unique movie names, some tweets have multiple movies attached 
    # to them and the names are separated by commas
    movies = dataset.select(movieKey).where(~col(movieKey).like('%,%')).distinct().collect()
        
    # strip the markup to return the name only
    moviesUniq = [str(iMovie.movieName[1:-1]) for iMovie in movies]
        
    return moviesUniq

## Movie Level analysis

In [19]:
from pyspark.sql.functions import lit

def singleMovieTweets(dataset, identifier):
    #print 'identifier is', identifier
    singleMovie = dataset.filter(dataset.movieName.like('%{0}%' \
                                                .format(identifier)))
    return singleMovie
    
def vaderCountsByClassification(dataset, identifier):
    #print 'identifier is', identifier
    vaderCounts = dataset.groupby(dataset.date, dataset.vaderClassifier).count()
    vaderCounts = vaderCounts.withColumnRenamed("count", "nTweets")
    vaderCounts = vaderCounts.withColumn('movieName', lit(identifier))
    vaderCounts = vaderCounts.orderBy(['date', 'vaderClassifier'], ascending=False)
    return vaderCounts

In [20]:
from pyspark.sql.functions import mean, stddev, min, max, count

def vaderStats(dataset, identifier, vaderCol = 'vaderScore'):
    
    # aggregate functions
    aggStats = [mean, stddev, min, max, count]
    aggVariable = [vaderCol] 
    exprs = [iStat(col(iVariable)) for iStat in aggStats for iVariable in aggVariable]
    
    # summary stats 
    dailyStats = dataset.groupby('date').agg(*exprs)
    
    # rename cols
    autoNames  = dailyStats.schema.names
    newNames   = ["date", "avgScore", "stdDev", "minScore", "maxScore", "totalTweets"]
    
    dailyStats = reduce(lambda dailyStats, idx: dailyStats.withColumnRenamed(autoNames[idx], newNames[idx]), 
                            xrange(len(autoNames)), dailyStats)
    dailyStats = dailyStats.withColumn('movieName', lit(identifier))
    dailyStats = dailyStats.orderBy(['date'], ascending=False)
    
    return dailyStats

In [21]:
def computeVaderCounts(dataset, movieList):

    for idx, movieName in enumerate(movieList): # for all unique movies
        
        # get tweets for one movie
        indivTweets = singleMovieTweets(dataset, movieList[idx])
        # count tweets by type
        indivCounts = vaderCountsByClassification(indivTweets, movieList[idx])

        if 'allVaderCounts' not in locals() or 'allVaderCounts' in globals():
            allVaderCounts = indivCounts
        if 'allVaderCounts' in locals() or 'allVaderCounts' in globals():
            allVaderCounts = allVaderCounts.union(indivCounts)

        print movieName, 'passed Lazy Evaluation at the tweet count stage'
    
    print 'Converting Count Data to Pandas DF, this may take a while...'
    pandasVaderCounts = allVaderCounts.toPandas()

    print 'complete!'
    return pandasVaderCounts

In [22]:
def computeVaderStats(dataset, movieList):

    for idx, movieTitle in enumerate(movieList): # for all unique movies

        # get tweets for one movie
        indivTweets = singleMovieTweets(dataset, movieList[idx])
        # count tweets by type
        indivStats = vaderStats(indivTweets, movieList[idx])

        if 'allVaderStats' not in locals() or 'allVaderStats' in globals():
            allVaderStats = indivStats
        if 'allVaderStats' in locals() or 'allVaderStats' in globals():
            allVaderStats = allVaderStats.union(indivStats)

        print movieTitle, 'passed Lazy Evaluation at the daily summary stage'
    
    print 'Converting daily stats to Pandas DF, this may take a while...'
    pandasVaderStats = allVaderStats.toPandas()

    print 'complete!'
    return pandasVaderStats

In [23]:
def vaderCounts2csv(dataset, inPath, outPath):
    
    # make output directory if not created
    if not os.path.exists(outPath):
        os.makedirs(outPath)
    
    # save to .csv File
    fileName = os.path.basename(os.path.normpath(inPath)) + 'Counts.csv'
    outFile  = outPath + fileName
    dataset.to_csv(outFile, index=False, encoding='utf-8')
    
    print 'VADER Counts saved to csv file', outFile
    
def vaderStats2csv(dataset, inPath, outPath):
    
    # make output directory if not created
    if not os.path.exists(outPath):
        os.makedirs(outPath)
    
    # save to .csv File
    fileName = os.path.basename(os.path.normpath(inPath)) + 'Stats.csv'
    outFile  = outPath + fileName
    dataset.to_csv(outFile, index=False, encoding='utf-8')
    
    print 'VADER Stats saved to csv file', outFile

## Running the analysis

In [30]:
def parseMovieData(filePath, textCol = 'body', thresholds = [-1.0, -0.5, 0.5, 1.0]):
    
    # TODO: check if zero length files, and clean them out before loading the data
    
    print 'I am loading the data from ', filePath
    # load the data
    df = loadTwitterData(filePath)
    print 'data loaded ...'
    
    ## --- Compute Sentiment and Classify --- ##
    
    # sentiment data
    sentimentData = returnCompoundScore(df, textCol)
    # compute Classification for each tweet
    classifiedData = vaderClassify(sentimentData, textCol, thresholds)
    
    # identify unique movies
    moviesUnique = uniqueMovies(df, 'movieName')
    
    print 'I found ', len(moviesUnique), ' movies in ', filePath
    print 'The movies are:'
    print '\n'.join(str(iMovie) for iMovie in moviesUnique) 
    
    ## --- Tweet Counts by Movie - day --- #
    
    # compute daily counts for each classification for each movie 
    vaderCounts = computeVaderCounts(classifiedData, moviesUnique)
    
    # save vaderCounts to a csv file
#     outCounts  = 'out/vaderCounts/'
#     vaderCounts2csv(vaderCounts, filePath, outCounts)
    
    ## --- Summary Stats by Movie-day --- ##
    # TODO: compute summary stats per day of the vader output
    
#     # compute daily counts for each classification for each movie 
#     vaderStats = computeVaderStats(sentimentData, moviesUnique)
    
#     # save vaderCounts to a csv file
#     outStats  = 'out/vaderStats/'
#     vaderStats2csv(vaderStats, filePath, outStats)
    
    return vaderCounts #, vaderStats

## Test on a Folder that has multiple movies

In [31]:
dataPath = '/twitter/movie/**/'

In [32]:
counts, stats = parseMovieData(dataPath, thresholds = [-1.0, -1.0/3.0, 1.0/3.0, 1.0])

I am loading the data from  /twitter/movie/**/


KeyboardInterrupt: 

In [None]:
vaderOutput.head(10)

In [None]:
vaderOutput.movieName.unique()

In [1]:
type(vaderOutput)

NameError: name 'vaderOutput' is not defined

## Test on a folder that has one movie

In [None]:
dataPath2 = '/twitter/movie/DeerAntMan/'

In [None]:
vaderOutput2 = parseMovieData(dataPath2, thresholds = [-1.0, -1.0/3.0, 1.0/3.0, 1.0])

In [None]:
vaderOutput2.head(10)

In [None]:
vaderOutput2.movieName.unique()

## Saving Output (Experimental)

shouldn't need this if the spark2pandas2csv is not overly burdensome

In [None]:
#from pyspark.sql import DataFrameWriter

In [None]:
# vaderCounts.repartition(1).write.format('com.databricks.spark.csv') \
#     .mode("overwrite").option("header", "true") \
#     .save('file:///home/lachlan/out/test_save/vaderCount_test.csv')
    
# vaderCounts.repartition(1).write.csv(path='file:///home/lachlan/out/test_save/vaderTest.csv',
#                       header="true", mode="overwrite")
vaderOutput2.

In [None]:
#spark.version

In [None]:
#!ls -a /home/lachlan/out/test_save/

In [None]:
#!rm -rf /home/lachlan/out/test_save/vaderTest.csv

In [None]:
#!head /home/lachlan/out/test_save/vaderCount_test.csv