In [6]:
import nltk
import matplotlib.pyplot as plt
import re
%matplotlib inline

In [3]:
from pyspark.sql import SparkSession

In [115]:
spark = SparkSession\
        .builder\
        .master('local[*]')\
        .appName('MvT')\
        .getOrCreate()

# Sentiment Dictionary

In [165]:
afinn = open('afinn-111.txt')

In [8]:
sample = afinn.readline()

In [9]:
sample

'abandon\t-2\n'

In [20]:
re.split(r'\t|\n', sample)[:-1]

['abandon', '-2']

In [166]:
def cleanDict(file):
    file.seek(0)
    sentDict = {}
    while True:
        nxt = file.readline()
        if nxt != '':
            splt = re.split(r'\t|\n', nxt)
            if len(splt) == 3:
                splt = splt[:-1]
            sentDict[splt[0]] = int(splt[1])
        else:
            break
    return sentDict

In [167]:
sentDict = cleanDict(afinn)

# Traditional POS Distributions

**Sample**

In [32]:
f = open('/spring2021/project1/comparison/Charles Dickens - Cities.txt')

In [33]:
raw = f.read()

In [34]:
startin = raw.find(" ***") + 4

In [35]:
endin = raw.rfind("End of the Project Gutenberg")

In [36]:
raw = raw[startin:endin]

In [37]:
tokens = nltk.word_tokenize(raw)

In [39]:
text = nltk.Text(tokens)

In [41]:
words = [w.lower() for w in text]

In [45]:
score = 0
for word in words:
    if word in list(sentDict.keys()):
        score += sentDict[word]

In [46]:
score

-359

In [47]:
def scoreTrad(raw):
        startin = raw.find(" ***") + 4
        endin = raw.rfind("End of the Project Gutenberg")
        raw = raw[startin:endin]
        tokens = nltk.word_tokenize(raw)
        text = nltk.Text(tokens)
        words = [w.lower() for w in text]
        
        score = 0
        
        for word in words:
            if word in list(sentDict.keys()):
                score += sentDict[word]
                
        return score

In [48]:
f1 = open('/spring2021/project1/comparison/Charles Dickens - Cities.txt')

In [49]:
f2 = open('/spring2021/project1/comparison/Conan Doyle - Sherlock.txt')

In [50]:
f3 = open('/spring2021/project1/comparison/Herman Melville - Moby.txt')

In [51]:
f4 = open('/spring2021/project1/comparison/Jane Austen - Pride.txt')

In [52]:
f5 = open('/spring2021/project1/comparison/Mary Shelley - Frankenstein.txt')

In [53]:
f6 = open('/spring2021/project1/comparison/Nathaniel Hawthorne - Scarlet.txt')

In [54]:
f7 = open('/spring2021/project1/comparison/Scott Fitzgerald - Gatsby.txt')

In [55]:
tradRaws = [f.read() for f in [f1, f2, f3, f4, f5, f6, f7]]

In [57]:
tradScores = [scoreTrad(r) for r in tradRaws]

In [58]:
tradScores

[-359, 313, 1317, 3898, 60, 630, 230]

**Avg Score**

In [85]:
mu = sum(tradScores)/len(tradScores)
mu

869.8571428571429

**Range**

In [80]:
(min(tradScores),max(tradScores))

(-359, 3898)

**Standard Deviation**

In [88]:
import numpy as np
np.sqrt(sum([(x-mu)**2 for x in tradScores])/len(tradScores))

1326.2505073187588

# Blog Text POS Distribution

In [152]:
blogDF = spark.read.csv('/spring2021/project1/blogtext.csv', header = True, inferSchema=True)

In [153]:
blogDF.show()

+-------+------+---+-----------------+--------+------------+--------------------+
|     id|gender|age|            topic|    sign|        date|                text|
+-------+------+---+-----------------+--------+------------+--------------------+
|2059027|  male| 15|          Student|     Leo| 14,May,2004|           Info h...|
|2059027|  male| 15|          Student|     Leo| 13,May,2004|           These ...|
|2059027|  male| 15|          Student|     Leo| 12,May,2004|           In het...|
|2059027|  male| 15|          Student|     Leo| 12,May,2004|           testin...|
|3581210|  male| 33|InvestmentBanking|Aquarius|11,June,2004|             Than...|
|3581210|  male| 33|InvestmentBanking|Aquarius|10,June,2004|             I ha...|
|3581210|  male| 33|InvestmentBanking|Aquarius|10,June,2004|             Some...|
|3581210|  male| 33|InvestmentBanking|Aquarius|10,June,2004|             If a...|
|3581210|  male| 33|InvestmentBanking|Aquarius|10,June,2004|             Take...|
|3581210|  male|

In [154]:
blogRDD = blogDF.select('text').rdd

In [155]:
def mapScore(x):
    raw = x['text']
    tokens = nltk.word_tokenize(raw)
    text = nltk.Text(tokens)
    words = [w.lower() for w in text]

    score = 0

    for word in words:
        if word in list(sentDict.keys()):
            score += sentDict[word]

    return score

In [156]:
scoreRDD = blogRDD.map(mapScore)

In [68]:
from operator import add

**Cumulative Score**

In [69]:
cumulScore = scoreRDD.reduce(add)
cumulScore

3200179

**Average Score**

In [71]:
def avgMap(x):
    return (x,1)

In [72]:
def avgRed(x,y):
    return (x[0] + y[0], x[1] + y[1])

In [73]:
avgRDD = scoreRDD.map(avgMap)

In [74]:
avgTup = avgRDD.reduce(avgRed)

In [75]:
avgTup

(3200179, 681284)

In [76]:
avg = avgTup[0]/avgTup[1]

In [77]:
avg

4.697276025857058

**Range**

In [81]:
lowRng = scoreRDD.reduce(min)

In [82]:
upperRng = scoreRDD.reduce(max)

In [83]:
(lowRng, upperRng)

(-3551, 6034)

**Standard Deviation**

In [157]:
def sdMap(x):
    return (x**2,x,1)

In [158]:
blogSD = scoreRDD.map(sdMap)

In [161]:
blogSdTup = blogSD.reduce(add)

In [162]:
sd = np.sqrt((blogSdTup[0]/blogSdTup[2]) + ((blogSdTup[1]**2)/blogSdTup[2]))
sd

0.0

**Why can't I turn this into a dataframe??**

In [94]:
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField, IntegerType

schema = StructType([StructField("Score", IntegerType(), True)])

In [97]:
scoreDF = spark.createDataFrame(scoreRDD, schema=schema)

In [102]:
scoreDF.printSchema()

root
 |-- Score: integer (nullable = true)



# Hacker POS Distribution

In [116]:
hackDF = spark.read.csv('/spring2021/project1/hacker_news_sample.csv', header = True, inferSchema=True)

In [117]:
hackDF.show()

+--------------------+--------------------+--------------------+----+---------------+-----+----------+-------+--------+--------+-----------+-------+-------+-------------------+
|               title|                 url|                text|dead|             by|score|      time|   type|      id|  parent|descendants|ranking|deleted|          timestamp|
+--------------------+--------------------+--------------------+----+---------------+-----+----------+-------+--------+--------+-----------+-------+-------+-------------------+
|                null|                null|&gt;<i>which lead...|null|        coldtea| null|1390843873|comment| 7131680| 7127578|       null|   null|   null|2014-01-27 12:31:13|
|                null|                null|I would like to p...|null|         etanol| null|1319395600|comment| 3146879| 3145330|       null|   null|   null|2011-10-23 14:46:40|
|                null|                null|                null|null|           null| null|1456640816|comment|11190

In [118]:
hackRDD = hackDF.select('text').rdd

In [127]:
from bs4 import BeautifulSoup
def mapScoreHTML(x):
    if x['text'] != None:
        bs = BeautifulSoup(x['text'], 'lxml')
        raw = bs.get_text()
        tokens = nltk.word_tokenize(raw)
        text = nltk.Text(tokens)
        words = [w.lower() for w in text]

        score = 0

        for word in words:
            if word in list(sentDict.keys()):
                score += sentDict[word]
    else:
        score = 0
    return score

In [128]:
scoreHackRDD = hackRDD.map(mapScoreHTML)

In [129]:
from operator import add

**Cumulative Score**

In [130]:
cumulScoreHack = scoreHackRDD.reduce(add)
cumulScoreHack

221504

**Average Score**

In [131]:
def avgMap(x):
    return (x,1)

In [132]:
def avgRed(x,y):
    return (x[0] + y[0], x[1] + y[1])

In [133]:
avgHackRDD = scoreHackRDD.map(avgMap)

In [134]:
avgHackTup = avgHackRDD.reduce(avgRed)

In [135]:
avgHackTup

(221504, 215067)

In [136]:
avg = avgHackTup[0]/avgHackTup[1]

In [137]:
avg

1.0299302077957102

**Range**

In [139]:
lowRngHack = scoreHackRDD.reduce(min)

In [140]:
upperRngHack = scoreHackRDD.reduce(max)

In [141]:
(lowRngHack, upperRngHack)

(-105, 124)

**Standard Deviation**

In [142]:
def sdMap(x):
    return (x**2,x,1)

In [143]:
hackSD = scoreHackRDD.map(sdMap)

In [144]:
hackSdTup = hackSD.reduce(add)

In [146]:
sd = np.sqrt((hackSdTup[0]/hackSdTup[2]) + ((hackSdTup[1]**2)/hackSdTup[2]))
sd

14.142135623730951

In [2]:
spark.stop()

NameError: name 'spark' is not defined