In [1]:
import pymongo
import urllib.parse
import json
import re
import spacy
import en_core_web_sm
import emoji
from nltk.corpus import stopwords
from nltk.tokenize import RegexpTokenizer
from nltk.stem import WordNetLemmatizer
from nltk.stem import PorterStemmer
from nltk import FreqDist
import nltk
import pandas as pd
from nltk.sentiment.vader import SentimentIntensityAnalyzer as SIA
import time

In [2]:
import pprint
from pyspark.sql import SparkSession
from operator import add
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# New API
spark_session = SparkSession\
    .builder\
    .appName("test_notebook")\
    .master("spark://host-192-168-2-176-de1:7077")\
    .config("spark.dynamicAllocation.enabled", True)\
    .config("spark.dynamicAllocation.shuffleTracking.enabled", True)\
    .config("spark.shuffle.service.enabled", False)\
    .config("spark.dynamicAllocation.executorIdleTimeout","100s")\
    .config("spark.driver.port",9998)\
    .config("spark.blockManager.port",10005)\
    .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

spark_context.setLogLevel("ERROR")

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/15 14:03:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/03/15 14:03:47 WARN Utils: Service 'sparkDriver' could not bind on port 9998. Attempting port 9999.
22/03/15 14:03:47 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/03/15 14:03:48 WARN Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 10005. Attempting port 10006.
22/03/15 14:03:49 WARN ExecutorAllocationManager: Dynamic allocation without a shuffle service is an experimental feature.


In [3]:
print(spark_context.uiWebUrl)

http://host-192-168-2-176-de1:4041


In [4]:
schema = StructType([
      StructField("subreddit",StringType(),True),
      StructField("body",StringType(),True)]
)

In [5]:
df = spark_session.read.schema(schema).json("hdfs://host-192-168-2-176-de1:9000/comments/RC_2011-07")

In [9]:
#write to parquet ( done only once)
#df.write.parquet("hdfs://host-192-168-2-176-de1:9000/comments/parquet/reddit.parquet")

                                                                                

In [7]:
parqDF = spark_session.read.parquet("hdfs://host-192-168-2-176-de1:9000/comments/parquet/reddit.parquet")

In [13]:
#df = df.limit(100000)

In [None]:
categories = df.select('subreddit').distinct().collect()

In [6]:
start = time.time()
most_popular_categories = df.groupBy('subreddit').count().sort('count', ascending=False).head(50)
end = time.time()
print("the operation takes {0} seconds".format(end - start))

[Stage 2:>                                                          (0 + 2) / 2]

the operation takes 49.16558814048767 seconds


                                                                                

In [9]:
start = time.time()
most_popular_categories = parqDF.groupBy('subreddit').count().sort('count', ascending=False).head(50)
end = time.time()
print("the operation takes {0} seconds".format(end - start))



the operation takes 4.818623304367065 seconds


22/03/15 14:38:41 ERROR TaskSchedulerImpl: Lost executor 1 on 192.168.2.176: Command exited with code 137
22/03/15 14:38:41 ERROR TransportRequestHandler: Error sending result RpcResponse[requestId=5012939683213738533,body=NioManagedBuffer[buf=java.nio.HeapByteBuffer[pos=0 lim=81 cap=156]]] to /192.168.2.176:46188; closing connection
io.netty.channel.StacklessClosedChannelException
	at io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object, ChannelPromise)(Unknown Source)
22/03/15 14:46:29 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: Master removed our application: KILLED
22/03/15 14:46:29 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exiting due to error from cluster scheduler: Master removed our application: KILLED
	at org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:919)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:154)
	at org.apache.spark.deploy.clien

In [None]:
most_popular_categories

In [8]:
i = 0

cat_to_id = {}
id_to_cat = {}

for c in categories:
    cat_to_id[c['subreddit']] = i
    id_to_cat[i] = c['subreddit']
    i += 1

In [9]:
def transform_to_id(cat):
    return cat_to_id[cat]

transform_to_id_udf = udf(transform_to_id)

In [10]:
df = df.withColumn("cat_id",transform_to_id_udf(df["subreddit"]).cast("int"))

In [11]:
most_popular_categories = df.groupBy('cat_id').count().sort('count', ascending=False).head(50)

                                                                                

In [12]:
categories_to_keep = []

for r in most_popular_categories:
    
    categories_to_keep.append(r['cat_id'])

In [13]:
rdd = df.rdd.map(list)



In [14]:
filtered_rdd = rdd.filter(lambda x: x[2] in categories_to_keep)

In [15]:
filtered_rdd.count()

                                                                                

10

In [16]:
filtered_rdd1 = filtered_rdd.map(lambda x: (x[2], emoji.get_emoji_regexp().sub(u'', x[1])))

In [17]:
tokenizer = RegexpTokenizer('\w+|\$[\d\.]+|http\S+')
filtered_rdd2 = filtered_rdd1.map(lambda x: (x[0], tokenizer.tokenize(x[1])))

In [18]:
filtered_rdd3 = filtered_rdd2.map(lambda x: (x[0], [word.lower() for word in x[1]]))

In [19]:
nlp = en_core_web_sm.load()

all_stopwords = nlp.Defaults.stop_words

filtered_rdd4 = filtered_rdd3.map(lambda x: (x[0], [word for word in x[1] if not word in all_stopwords]))

In [20]:
nltk.download('wordnet')
nltk.download('omw-1.4')

lemmatizer = WordNetLemmatizer()

filtered_rdd5 = filtered_rdd4.map(lambda x: (x[0], ([lemmatizer.lemmatize(w) for w in x[1]])))

[nltk_data] Downloading package wordnet to /home/ubuntu/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to /home/ubuntu/nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!


In [21]:
nltk.download('vader_lexicon')

sia = SIA()

filtered_rdd6 = filtered_rdd5.map(lambda x: (x[0], [sia.polarity_scores(word)['compound'] for word in x[1]]))

[nltk_data] Downloading package vader_lexicon to
[nltk_data]     /home/ubuntu/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


In [22]:
def find_positive_negative(scores):
    pos = 0
    neg = 0
    for s in scores:
        if s > 0.1 :
            pos += 1
        elif s <-0.1:
            neg += 1
    if pos >= neg:
        return 1
    return 0

find_positive_negative_udf = udf(find_positive_negative)

In [23]:
filtered_rdd7 = filtered_rdd6.map(lambda x: (x[0], find_positive_negative(x[1])))

In [24]:
final_rdd = filtered_rdd7.reduceByKey(lambda x, y: x+y)

In [27]:
reduced_rdd = final_rdd.collect()

In [28]:
count_dic = {}
for i in most_popular_categories:
    count_dic[i['cat_id']] = i['count']

In [29]:
final_list = []

In [30]:
for i in reduced_rdd:
    final_list.append([id_to_cat[i[0]], float(int(i[1]/count_dic[i[0]]*10000))/100])

In [31]:
final_list

In [32]:
final_df = pd.DataFrame (final_list, columns = ['category', 'frequency'])

In [33]:
final_df.to_csv('frequency_table.csv', sep='\t')

In [34]:
spark_session.stop()