In [28]:
from pyspark.sql import SparkSession

# New API
spark = SparkSession.builder\
        .master("spark://192.168.2.186:7077") \
        .appName("Group14_Application")\
        .config("spark.cores.max", 4)\
        .getOrCreate()


In [29]:
data = spark.read.json("sample_data.json") # Load the sample data

                                                                                

In [30]:
data.show(vertical=True) # Show data to see that it loads correctly

[Stage 1:>                                                          (0 + 1) / 1]

-RECORD 0--------------------------------------
 author                 | Dethcola             
 author_cakeday         | null                 
 author_flair_css_class |                      
 author_flair_text      | Clairemont           
 body                   | A quarry             
 can_gild               | true                 
 controversiality       | 0                    
 created_utc            | 1506816000           
 distinguished          | null                 
 edited                 | false                
 gilded                 | 0                    
 id                     | dnqik14              
 is_submitter           | false                
 link_id                | t3_73ieyz            
 parent_id              | t3_73ieyz            
 permalink              | /r/sandiego/comme... 
 retrieved_on           | 1509189606           
 score                  | 3                    
 stickied               | false                
 subreddit              | sandiego      

                                                                                

In [5]:
data.count() # Amount of comments in data set

                                                                                

10000

In [6]:
data.rdd.getNumPartitions() # Number of partitions, increase this when increasing amount of workers?

2

In [5]:
# Question 1: The most used subreddits
data1 = data.groupBy(data["subreddit"]).count()
data1.orderBy("count", ascending=False).show(20)

                                                                                

+--------------------+-----+
|           subreddit|count|
+--------------------+-----+
|           AskReddit|  486|
|                 CFB|  403|
|          CrazyIdeas|  261|
|                news|  158|
|         ConciseIAmA|  147|
|         4chan4trump|  136|
|            politics|  117|
|RocketLeagueExchange|   96|
|          The_Donald|   90|
|                 nba|   90|
|          edc_raffle|   77|
|           worldnews|   74|
|               funny|   73|
|           teenagers|   69|
|     leagueoflegends|   65|
|        Ice_Poseidon|   59|
|                pics|   58|
|      DestinyTheGame|   58|
|              hockey|   55|
|                 nfl|   53|
+--------------------+-----+
only showing top 20 rows



[Stage 6:>                                                          (0 + 2) / 2]

# Simons Question: Word sentiment based on comment scores

In [31]:
# Drop all columns except body and score
data2 = data.select(data["body"], data["score"])
# Show the data
data2.show(20, vertical=True)
# make into rdd
data2_rdd = data2.rdd


-RECORD 0---------------------
 body  | A quarry             
 score | 3                    
-RECORD 1---------------------
 body  | [Salutations! I'm... 
 score | 3                    
-RECORD 2---------------------
 body  | I got into baseba... 
 score | 2                    
-RECORD 3---------------------
 body  | FUCKING TORY         
 score | 18                   
-RECORD 4---------------------
 body  | I see a water dra... 
 score | 1                    
-RECORD 5---------------------
 body  | Wait. The Michiga... 
 score | 1                    
-RECORD 6---------------------
 body  | ye fam               
 score | 2                    
-RECORD 7---------------------
 body  | 143417804| &gt; U... 
 score | 1                    
-RECORD 8---------------------
 body  | That is some chic... 
 score | 2                    
-RECORD 9---------------------
 body  | Does he even know... 
 score | 1                    
-RECORD 10--------------------
 body  | Tequila.             
 score |

In [62]:
# Split the body into words and remove all non-alphanumeric characters
import re
data2_rdd_split = data2_rdd.map(lambda x: ([re.sub(r'\W+', '', s).lower() for s in x[0].split()], x[1]))
# Remove words longer than 20 characters
data2_rdd_split = data2_rdd_split.map(lambda x: ([s for s in x[0] if len(s) <= 20], x[1]))

# Show the data
data2_rdd_split.take(5)

[(['a', 'quarry'], 3),
 (['salutations', 'im', 'not', 'sure', 'what', 'you'], 3),
 (['i',
   'got',
   'into',
   'baseball',
   'at',
   'about',
   'he',
   'same',
   'time',
   'matt',
   'cain',
   'started',
   'playing',
   'in',
   'the',
   'majors',
   'crazy',
   'to',
   'see',
   'him',
   'go',
   'i',
   'teared',
   'up',
   'a',
   'bit',
   'too'],
  2),
 (['fucking', 'tory'], 18),
 (['i', 'see', 'a', 'water', 'dragon'], 1)]

In [63]:
# Separate the words into tuples with the score
data2_rdd_split_score = data2_rdd_split.flatMap(lambda x: [(s, x[1]) for s in x[0]])
# Show the data
data2_rdd_split_score.take(5)

[('a', 3), ('quarry', 3), ('salutations', 3), ('im', 3), ('not', 3)]

In [64]:
# Group the words by word and sum the score
data2_scored_words = data2_rdd_split_score.reduceByKey(lambda x, y: x + y)
# Sort the data by score
data2_scored_words = data2_scored_words.sortBy(lambda x: x[1], ascending=False)
# Show most "positive" words
data2_scored_words.take(20)




                                                                                

[('the', 69407),
 ('to', 59758),
 ('a', 59282),
 ('i', 47020),
 ('and', 43919),
 ('in', 33640),
 ('that', 31250),
 ('is', 29369),
 ('of', 27689),
 ('it', 26459),
 ('you', 19329),
 ('people', 17299),
 ('for', 17028),
 ('was', 16540),
 ('this', 15263),
 ('be', 14804),
 ('with', 14030),
 ('as', 13502),
 ('me', 13443),
 ('have', 12938)]

In [65]:
# Show most "negative" words
data2_scored_words = data2_scored_words.sortBy(lambda x: x[1], ascending=True)

data2_scored_words.take(20)

[('businesses', -60),
 ('openmindedness', -40),
 ('arresting', -39),
 ('uplay', -36),
 ('psus', -31),
 ('padding', -31),
 ('opinions', -31),
 ('rubs', -31),
 ('classy', -30),
 ('overused', -29),
 ('stink', -28),
 ('fantasized', -28),
 ('tyrone', -28),
 ('fascist', -26),
 ('two3', -24),
 ('jorts', -24),
 ('decision', -24),
 ('stat', -24),
 ('scraping', -24),
 ('giveaways', -23)]

In [68]:
# Convert score to "Positive" or "Negative"
data2_score_based = data2_scored_words.map(lambda x: (x[0], "Positive" if x[1] > 0 else "Negative"))
# Show the data
data2_score_based.take(20)

[('businesses', 'Negative'),
 ('openmindedness', 'Negative'),
 ('arresting', 'Negative'),
 ('uplay', 'Negative'),
 ('psus', 'Negative'),
 ('padding', 'Negative'),
 ('opinions', 'Negative'),
 ('rubs', 'Negative'),
 ('classy', 'Negative'),
 ('overused', 'Negative'),
 ('stink', 'Negative'),
 ('fantasized', 'Negative'),
 ('tyrone', 'Negative'),
 ('fascist', 'Negative'),
 ('two3', 'Negative'),
 ('jorts', 'Negative'),
 ('decision', 'Negative'),
 ('stat', 'Negative'),
 ('scraping', 'Negative'),
 ('giveaways', 'Negative')]

In [70]:
# Convert to dataframe
data2_score_based_df = data2_score_based.toDF(["Word", "Score"])
# Save the data to a file
data2_score_based_df.write.csv("data2_score_based.csv")

                                                                                

# Stop spark session

In [27]:
spark.stop()