In [None]:
# Start Spark and SQL
import findspark
findspark.init()

from pyspark import SparkContext
sc = SparkContext()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Mini-Project").getOrCreate()

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import col
import pandas as pd
import numpy as np
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer, StopWordsRemover
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline
from matplotlib import pyplot as plt

In [None]:
# read in csvs
subreddit = sc.textFile("subreddit_body.csv")
top_1000 = sc.textFile("subreddit_score.csv")

In [None]:
#Cast indices as strings
subreddit.subreddit = subreddit.subreddit.astype(str)
top_1000.subreddit = top_1000.subreddit.astype(str)

#Left Join datasets on subreddit to only keep top 1,000 subreddits with field for body
join_data = top_1000.set_index('subreddit').join(subreddit.set_index('subreddit'))
join_data.reset_index(level=0, inplace=True)

In [None]:
# Remove rows where body was deleted or removed
join_data = join_data[join_data.body != "[deleted]"]
join_data = join_data[join_data.body != "[removed]"]

In [None]:
#Create unique list of subreddits
distinct_subreddit = join_data.subreddit.unique()

#Create a dataframe dictionary for unique subreddits
subreddit_dict = {elem : pd.DataFrame for elem in distinct_subreddit}
for key in subreddit_dict.keys():
    subreddit_dict[key] = join_data[:][join_data.subreddit == key]

#Create dataframe for unique subreddits with longest body (most words)
max_body = []
for key in subreddit_dict.keys():
    max_body.append({'Subreddit': key,
            'Max_body': np.max(subreddit_dict[key]['body'])})
subreddit_maxbody = pd.DataFrame(max_body)

In [None]:
# convert back to spark dataframe
df_sub_maxbod = spark.createDataFrame(subreddit_maxbody)

In [None]:
# run k-means clustering on body for 10 clusters
df_sub_maxbod.cache().count()

# tokenize, remove stopwords, and vectorize text
tokenizer = Tokenizer(inputCol= "Max_body", outputCol="tokens")
remover = StopWordsRemover(inputCol="tokens", outputCol="stopWordsRemovedTokens")
hashingTF = HashingTF(inputCol="stopWordsRemovedTokens", outputCol="rawFeatures", 
                      numFeatures=3)
idf = IDF(inputCol="rawFeatures", outputCol="features")
kmeans = KMeans(k=10)

# pipeline and fit model
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, kmeans])
model = pipeline.fit(df_sub_maxbod)

#store results
results = model.transform(df_sub_maxbod)
results.cache()

In [None]:
# Number of subreddits in each cluster
results.groupBy("prediction").count().orderBy("prediction", ascending= True).show()

In [None]:
# Find cluster centers
model_stage = model.stages[-1]
centers = model_stage.clusterCenters()

In [None]:
# Set up cluster centers and prediction labels to plot 
columns = ["x", "y", "z"]
pred = range(0,10)  
data = centers
for_plot = pd.DataFrame(data=data, columns= columns)
for_plot.loc[:,'pred'] = pd.Series(pred, index=for_plot.index)

#Make color palette
colors = ( '#46f0f0', '#f032e6', '#bcf60c', '#fabebe', '#008080', '#e6beff', 
          '#9a6324', '#fffac8', '#800000', '#aaffc3', '#808000', '#ffd8b1')
c_list = []
for c in colors:
    c_list.append([c]*1)

#Create plot
fig = plt.figure(figsize=(12, 12), dpi=100)
ax = fig.gca(projection='3d')
ax.set_title('Plot of Cluster Centers for Each Group')

for x,y,z, color, group in zip(for_plot['x'],for_plot['y'],for_plot['z'], c_list, for_plot['pred']):
    ax.scatter(x,y,z, c= color, edgecolors='none', s=200, label= group, )
    ax.legend()

plt.show()

In [None]:
# Label 5 cluster Subreddit
pred_5 = results.filter(results.prediction.isin([5]))
pred_5.select("prediction", "Subreddit").show()

In [None]:
# average score of this subreddit
sub_df = top_1000.loc[top_1000["subreddit"] == "anriokita"]
print("Average Score:", sub_df.iloc[0]['avg_score'])

In [None]:
# Top 10 highest scoring subreddits' clusters
pred_top_10 = results.filter(results.Subreddit.isin(list(top_10["subreddit"])))
pred_top_10.select("prediction", "Subreddit").show()

In [None]:
sc.stop()