In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.dates import DateFormatter
import datetime
import itertools
from bs4 import BeautifulSoup

In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.datastax.spark:spark-cassandra-connector_2.11:2.3.0 --conf spark.cassandra.connection.host=127.0.0.1 pyspark-shell'

In [None]:
from pyspark import SparkContext
sc = SparkContext("local", "analysis")

In [None]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import *

In [None]:
table_df = sqlContext.read\
    .format("org.apache.spark.sql.cassandra")\
    .options(table='posts', keyspace='pw_project')\
    .load()

In [None]:
type(table_df)

In [None]:
table_df.dtypes

In [None]:
table_df.show()

In [None]:
table_df.count()

In [None]:
def is_accepted(x):
    if x != -1:
        return 1
    else:
        return 0
udf_is_accepted = udf(is_accepted, IntegerType())

table_df = table_df.withColumn("is_accepted", udf_is_accepted("accepted_answer_id"))

In [None]:
def plot_histogram(var):
    binSides, binCounts = table_df.select(var).rdd.flatMap(lambda x: x).histogram(10)
    binSides = [round(x,2) for x in binSides]
    print(list(zip(binSides, binCounts)))
    N = len(binCounts)
    index = np.arange(N)
    width = 1
    fig, ax = plt.subplots()
    rects1 = ax.bar(index+0.5, binCounts, width, color='b')
    ax.set_ylabel('Frequencies')
    ax.set_title('Histogram')
    ax.set_xticks(np.arange(N+1))
    ax.set_xticklabels(binSides)
    plt.show()

def plot_var_per_date(var):
    var_sum_per_date = table_df.select(["creation_date", var]).rdd\
        .map(lambda x: (datetime.date(x[0].year, x[0].month, 1), x[1]))\
        .reduceByKey(lambda a, b: a + b)\
        .sortBy(lambda x: x[0])\
        .collect()
    print(var_sum_per_date)
    x, y = zip(*var_sum_per_date)
    fig, ax = plt.subplots()
    ax.plot(x, y)
    ax.xaxis.set_major_formatter(DateFormatter("%m/%y"))
    plt.show()

def summarize_cont(var):
    print(table_df.describe([var]).show())
    print(table_df.where(table_df[var] > 0).describe([var]).show())
    plot_histogram(var)
    plot_var_per_date(var)

def summarize_id(var):
    print(table_df.where(table_df[var] > 0).count())

In [None]:
# answer count statistics
summarize_cont("answer_count")

In [None]:
# comment count statistics
summarize_cont("comment_count")

In [None]:
# favorite count statistics
summarize_cont("favorite_count")

In [None]:
# score
summarize_cont("score")

In [None]:
# view count
summarize_cont("view_count")

In [None]:
table_df.describe(['is_accepted']).show()

In [None]:
table_df.select('tags').show()

In [None]:
# najpopularniejsze tagi
# count
count_per_tag = table_df.select('tags').rdd\
    .flatMap(lambda x: x)\
    .flatMap(lambda x: tuple(x))\
    .map(lambda word: (word, 1))\
    .reduceByKey(lambda a, b: a + b)\
    .takeOrdered(50, key=lambda x: -x[1])
count_per_tag_list = list(zip(*count_per_tag))[0]
count_per_tag

In [None]:
def plot_var_per_tag(df):
    xlabels, values = zip(*df)
    N = len(xlabels)
    index = np.arange(N)
    width = 1
    fig, ax = plt.subplots()
    rects1 = ax.bar(index+0.5, values, width, color='b')
    ax.set_xticks(np.arange(N+1))
    ax.set_xticklabels("")
    plt.show()

plot_var_per_tag(count_per_tag)

In [None]:
def var_per_tag(var):
    res = table_df.select([var, 'tags']).rdd\
        .flatMapValues(lambda x: x)\
        .map(lambda x: (x[1], x[0]))\
        .reduceByKey(lambda a, b: a + b)\
        .takeOrdered(50, key=lambda x: -x[1])
    return res

In [None]:
def var_avg_per_tag(var, tag_list):
    tmp_tuple = (0,0)
    res = table_df.select([var, 'tags']).rdd\
        .flatMapValues(lambda x: x)\
        .map(lambda x: (x[1], x[0]))\
        .filter(lambda x: x[0] in tag_list)\
        .aggregateByKey(tmp_tuple, lambda a,b: (a[0] + b, a[1] + 1), lambda a,b: (a[0] + b[0], a[1] + b[1]))\
        .mapValues(lambda x: x[0]/x[1])\
        .takeOrdered(50, key=lambda x: -x[1])
    return res

In [None]:
# top tags
# score
score_per_tag = var_per_tag('score')
score_per_tag_list = list(zip(*score_per_tag))[0]
print(score_per_tag)
plot_var_per_tag(score_per_tag)
score_avg_per_tag = var_avg_per_tag('score', score_per_tag_list[:100])
print(score_avg_per_tag)
plot_var_per_tag(score_avg_per_tag)

In [None]:
# views
view_per_tag = var_per_tag('view_count')
view_per_tag_list = list(zip(*view_per_tag))[0]
print(view_per_tag)
plot_var_per_tag(view_per_tag)
view_avg_per_tag = var_avg_per_tag('view_count', view_per_tag_list[:100])
print(view_avg_per_tag)
plot_var_per_tag(view_avg_per_tag)

In [None]:
# comments
comment_per_tag = var_per_tag('comment_count')
comment_per_tag_list = list(zip(*comment_per_tag))[0]
print(comment_per_tag)
plot_var_per_tag(comment_per_tag)
comment_avg_per_tag = var_avg_per_tag('comment_count', comment_per_tag_list[:100])
print(comment_avg_per_tag)
plot_var_per_tag(comment_avg_per_tag)

In [None]:
# favorites
favorite_per_tag = var_per_tag('favorite_count')
favorite_per_tag_list = list(zip(*favorite_per_tag))[0]
print(favorite_per_tag)
plot_var_per_tag(favorite_per_tag)
favorite_avg_per_tag = var_avg_per_tag('favorite_count', favorite_per_tag_list[:100])
print(favorite_avg_per_tag)
plot_var_per_tag(favorite_avg_per_tag)

In [None]:
# is accepted
accepted_per_tag = var_per_tag('is_accepted')
accepted_per_tag_list = list(zip(*accepted_per_tag))[0]
print(accepted_per_tag)
plot_var_per_tag(accepted_per_tag)
accepted_avg_per_tag = var_avg_per_tag('is_accepted', accepted_per_tag_list[:100])
print(accepted_avg_per_tag)
plot_var_per_tag(accepted_avg_per_tag)

In [None]:
# popularity of tags within given month
# filtered to top tags for a given criterion
def cum_sum(input):
    input_sorted = sorted(input, key=lambda x: x[0], reverse=False)
    res_time = [i[0] for i in input_sorted]
    res_cumsum = np.cumsum([i[1] for i in input_sorted]).tolist()
    res = list(zip(res_time, res_cumsum))
    return res

count_per_tag_month = table_df.select(['creation_date', 'tags']).rdd\
    .flatMapValues(lambda x: x)\
    .filter(lambda x: x[1] in count_per_tag_list[0:9])\
    .map(lambda x: ((x[1], datetime.date(x[0].year, x[0].month, 1)), 1))\
    .reduceByKey(lambda a, b: a + b)\
    .map(lambda x: (x[0][0], (x[0][1], x[1])))\
    .groupByKey()\
    .flatMapValues(lambda x: cum_sum(x))\
    .map(lambda x: (x[0], x[1][0], x[1][1]))\
    .collect()

count_per_tag_month

In [None]:
def plot_var_per_tag_month(df, tag_list):
    palette = plt.get_cmap('Set1')

    fig = plt.figure(figsize=(10, 10))

    num=0
    for tag in tag_list:
        num+=1

        ax = fig.add_subplot(3, 3, num)

        for v in [i for i in tag_list if i != tag]:
            x = [i[1] for i in df if i[0]==v]
            y = [i[2] for i in df if i[0]==v]
            ax.plot(x, y, marker='', color='grey', linewidth=0.6, alpha=0.3)

        x = [i[1] for i in df if i[0]==tag]
        y = [i[2] for i in df if i[0]==tag]
        ax.plot(x, y, marker='', color=palette(num), linewidth=2.4, alpha=0.9, label=tag)

        if num in range(7) :
            ax.tick_params(labelbottom='off')
        if num not in [1,4,7] :
            ax.tick_params(labelleft='off')

        ax.xaxis.set_major_formatter(DateFormatter("%m/%y"))

        ax.set_title(tag, loc='left', fontsize=12, fontweight=0, color=palette(num))

    plt.show()


plot_var_per_tag_month(count_per_tag_month, count_per_tag_list[:9])

In [None]:
def var_per_tag_month(var, tag_list):
    res = table_df.select(['creation_date', var, 'tags']).rdd\
        .map(lambda x: ((x[0], x[1]), x[2]))\
        .flatMapValues(lambda x: x)\
        .filter(lambda x: x[1] in tag_list)\
        .map(lambda x: ((x[1], datetime.date(x[0][0].year, x[0][0].month, 1)), x[0][1]))\
        .reduceByKey(lambda a, b: a + b)\
        .map(lambda x: (x[0][0], (x[0][1], x[1])))\
        .groupByKey()\
        .flatMapValues(lambda x: cum_sum(x))\
        .map(lambda x: (x[0], x[1][0], x[1][1]))\
        .collect()
    return res

score_per_tag_month = var_per_tag_month('score', score_per_tag_list[:9])
print(score_per_tag_month)
plot_var_per_tag_month(score_per_tag_month, score_per_tag_list[:9])

In [None]:
view_per_tag_month = var_per_tag_month('view_count', view_per_tag_list[:9])
print(view_per_tag_month)
plot_var_per_tag_month(view_per_tag_month, view_per_tag_list[:9])

In [None]:
comment_per_tag_month = var_per_tag_month('comment_count', comment_per_tag_list[:9])
print(comment_per_tag_month)
plot_var_per_tag_month(comment_per_tag_month, comment_per_tag_list[:9])

In [None]:
favorite_per_tag_month = var_per_tag_month('favorite_count', favorite_per_tag_list[:9])
print(favorite_per_tag_month)
plot_var_per_tag_month(favorite_per_tag_month, favorite_per_tag_list[:9])

In [None]:
accepted_per_tag_month = var_per_tag_month('is_accepted', accepted_per_tag_list[:9])
print(accepted_per_tag_month)
plot_var_per_tag_month(accepted_per_tag_month, accepted_per_tag_list[:9])

In [None]:
count_per_tag_comb = table_df.select('tags').rdd\
    .flatMap(lambda x: x)\
    .map(lambda x: list(itertools.combinations_with_replacement(x, 2)))\
    .flatMap(lambda x: x)\
    .filter(lambda x: x[0] != x[1])\
    .map(lambda x: (x, 1))\
    .reduceByKey(lambda a, b: a + b)\
    .takeOrdered(999, key=lambda x: -x[1])

count_per_tag_comb[:50]

In [None]:
def plot_var_per_tag_comb(df, limit = 100):
    df2 = df[:limit] + [((i[0][1], i[0][0]), i[1]) for i in df[:limit]]
    rows, row_pos = np.unique([i[0][0] for i in df2], return_inverse=True)
    cols, col_pos = np.unique([i[0][1] for i in df2], return_inverse=True)
    pivoted_arr = np.zeros((len(rows), len(cols)))
    pivoted_arr[row_pos, col_pos] = [i[1] for i in df2]

    fig, ax = plt.subplots(figsize=(10, 10))
    im = ax.imshow(pivoted_arr)

    ax.set_xticks(np.arange(len(rows)))
    ax.set_yticks(np.arange(len(cols)))
    ax.set_xticklabels(rows)
    ax.set_yticklabels(cols)

    plt.setp(ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor")
    fig.tight_layout()
    plt.show()

plot_var_per_tag_comb(count_per_tag_comb, 100)

In [None]:
def var_per_tag_comb(var):
    res = table_df.select([var, 'tags']).rdd\
        .mapValues(lambda x: x)\
        .map(lambda x: (x[0], list(itertools.combinations_with_replacement(x[1], 2))))\
        .flatMapValues(lambda x: x)\
        .map(lambda x: (x[1], x[0]))\
        .filter(lambda x: x[0][0] != x[0][1])\
        .reduceByKey(lambda a, b: a + b)\
        .takeOrdered(50, key=lambda x: -x[1])
    return res

In [None]:
score_per_tag_comb = var_per_tag_comb('score')
print(score_per_tag_comb)
plot_var_per_tag_comb(score_per_tag_comb, 100)

In [None]:
view_per_tag_comb = var_per_tag_comb('view_count')
print(view_per_tag_comb)
plot_var_per_tag_comb(view_per_tag_comb)

In [None]:
comment_per_tag_comb = var_per_tag_comb('comment_count')
print(comment_per_tag_comb)
plot_var_per_tag_comb(comment_per_tag_comb)

In [None]:
favorite_per_tag_comb = var_per_tag_comb('favorite_count')
print(favorite_per_tag_comb)
plot_var_per_tag_comb(favorite_per_tag_comb)

In [None]:
# count of words
def count_words(x):
    return len(str(x).split(" "))
udf_count_words = udf(count_words, IntegerType())

table_df = table_df.withColumn("count_words", udf_count_words("body"))

In [None]:
words_per_tag = var_per_tag('count_words')
words_per_tag_list = list(zip(*words_per_tag))[0]
print(words_per_tag)
plot_var_per_tag(words_per_tag)
words_avg_per_tag = var_avg_per_tag('count_words', words_per_tag_list[:100])
print(words_avg_per_tag)
plot_var_per_tag(words_avg_per_tag)

In [None]:
# count unique words
def count_unique_words(x):
    return len(set(str(x).lower().split(" ")))
udf_count_unique_words = udf(count_unique_words, IntegerType())

table_df = table_df.withColumn("count_unique_words", udf_count_unique_words("body"))

In [None]:
words_unique_per_tag = var_per_tag('count_unique_words')
words_unique_per_tag_list = list(zip(*words_unique_per_tag))[0]
print(words_unique_per_tag)
plot_var_per_tag(words_unique_per_tag)
words_unique_avg_per_tag = var_avg_per_tag('count_unique_words', words_unique_per_tag_list[:100])
print(words_unique_avg_per_tag)
plot_var_per_tag(words_unique_avg_per_tag)

In [None]:
import string
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer

In [None]:
stop_words = stopwords.words('english')
st = PorterStemmer()

def count_words2(x):
    x = x.lower().translate(str.maketrans('', '', string.punctuation))
    return len([st.stem(i) for i in x.split() if i not in stop_words])
udf_count_words2 = udf(count_words2, IntegerType())

table_df = table_df.withColumn("count_words2", udf_count_words2("body"))

In [None]:
words2_per_tag = var_per_tag('count_words2')
words2_per_tag_list = list(zip(*words2_per_tag))[0]
print(words2_per_tag)
plot_var_per_tag(words2_per_tag)
words2_avg_per_tag = var_avg_per_tag('count_words2', words2_per_tag_list[:100])
print(words2_avg_per_tag)
plot_var_per_tag(words2_avg_per_tag)

In [None]:
# count unique words
def count_unique_words2(x):
    return len(set(str(x).lower().split(" ")))
udf_count_unique_words2 = udf(count_unique_words2, IntegerType())

table_df = table_df.withColumn("count_unique_words2", udf_count_unique_words2("body"))

In [None]:
words2_unique_per_tag = var_per_tag('count_unique_words2')
words2_unique_per_tag_list = list(zip(*words2_unique_per_tag))[0]
print(words2_unique_per_tag)
plot_var_per_tag(words2_unique_per_tag)
words2_unique_avg_per_tag = var_avg_per_tag('count_unique_words2', words2_unique_per_tag_list[:100])
print(words2_unique_avg_per_tag)
plot_var_per_tag(words2_unique_avg_per_tag)

In [None]:
import re
stop_words = stopwords.words('english')
st = PorterStemmer()
regex = re.compile('[^a-zA-Z ]')

def clean_text(x):
    return ' '.join([st.stem(i) for i in regex.sub('', x).lower().translate(str.maketrans('', '', string.punctuation)).split() if i not in stop_words])
udf_clean_text = udf(clean_text, StringType())

table_df = table_df.withColumn("body2", udf_clean_text("body"))

In [None]:
posts = table_df.select('body2').collect()
posts = [i[0] for i in posts]

In [None]:
import gensim

In [None]:
from sklearn.decomposition import LatentDirichletAllocation
from sklearn.feature_extraction.text import CountVectorizer

In [None]:
NUM_TOPICS = 10
 
vectorizer = CountVectorizer(min_df=5, max_df=0.9)
posts_vectorized = vectorizer.fit_transform(posts)

lda_model = LatentDirichletAllocation(n_components=NUM_TOPICS, max_iter=10, learning_method='online')
lda_model.fit_transform(posts_vectorized)

In [None]:
feature_names = vectorizer.get_feature_names()
for topic_idx, topic in enumerate(lda_model.components_):
    topic = topic/topic.sum()
    message = "Topic #%d: \n" % topic_idx
    message += " + ".join(["{:f}".format(topic[i]) + '*' + feature_names[i] for i in topic.argsort()[:-20 - 1:-1]])
    print(message)

In [None]:
import pyLDAvis.sklearn
 
pyLDAvis.enable_notebook()
panel = pyLDAvis.sklearn.prepare(lda_model, posts_vectorized, vectorizer, mds='tsne')
panel