In [None]:
pip install pyspark

In [None]:
import pyspark
from pyspark import SparkContext
sc = SparkContext("local", "BDA")

In [None]:
from google.colab import files
uploaded = files.upload()

In [None]:
import json
json_file = sc.textFile("Software_5.json.gz")
records = json_file.map(lambda js: json.loads(js))
records.take(1)

In [None]:
import re
from operator import add
def reg_filter(word):
    return True if re.match(r'((?:[\.,!?;"])|(?:(?:\#|\@)?[A-Za-z0-9_\-]+(?:\'[a-z]{1,3})?))', word.lower()) else False


# Find top 1k common words across reviews using word count algo
reviews = records.filter(lambda review: 'reviewText' in review)
review_words = reviews.map(lambda review: (review['reviewText'])).flatMap(lambda review: review.split())
filtered_review_words = review_words.filter(lambda word: reg_filter(word))
word_freq_kvs = filtered_review_words.map(lambda word: (word.lower(), 1)).reduceByKey(add)
onek_common_words = word_freq_kvs.sortBy(lambda t: t[1], False).map(lambda t: t[0]).take(1000)
onek_common_words_shared = sc.broadcast(onek_common_words)
onek_common_words_shared.value


In [None]:
# Find relative frequency of 1k words 
# Prepare data for 1k linear regressions grouped by common word
def relative_frequencies(review_text):
    onek_common_words = onek_common_words_shared.value
    words = review_text.split()
    qualified_words = []
    for w in words:
        if reg_filter(w):
            qualified_words.append(w.lower())
    if len(qualified_words) > 0:
        onek_rel_freqs = []
        for cw in onek_common_words:
            onek_rel_freqs.append((cw, qualified_words.count(cw)/len(qualified_words)))
        return onek_rel_freqs
    else:
        return list(zip(onek_common_words, [0]*1000))

review_with_rel_freqs = reviews.map(lambda review: ((review['overall'], int(review['verified'])), relative_frequencies(review['reviewText'])))
review_with_rel_freqs_flattened = review_with_rel_freqs.flatMapValues(lambda t: t).map(lambda t: (t[1][0], (t[1][1], t[0][0], t[0][1])))
review_with_rel_freqs_grouped_by_word = review_with_rel_freqs_flattened.groupByKey().map(lambda t: (t[0], list(t[1])))
review_with_rel_freqs_grouped_by_word.take(1)

In [None]:
import numpy as np
from scipy import stats as ss

def compute_beta(list_of_tuples):
    ratings = [i[1] for i in list_of_tuples]
    rel_freq = [i[0] for i in list_of_tuples]
    
    mean_X = np.mean(rel_freq)
    std_X = np.std(rel_freq)
    
    normalised_rel_freq = [(i - mean_X)/std_X for i in rel_freq]
    
    mean_Y = np.mean(ratings)
    std_Y = np.std(ratings)
    
    normalised_ratings = [(i-mean_Y)/std_Y for i in ratings]
    
    X = np.array(normalised_rel_freq)
    
    row_to_be_added = np.full((1,len(ratings)), 1)
    
    X_N = np.transpose(np.vstack((X, row_to_be_added)))

    
    Y = np.transpose(np.array(normalised_ratings))

    beta = np.matmul(np.matmul(np.linalg.inv(np.matmul(np.transpose(X_N),X_N)),np.transpose(X_N)),Y)
    
    y_pred = beta[1] + beta[0] * X
    y = np.array(normalised_ratings)
    rss = np.sum(np.square(y_pred - y ))
    N = len(ratings)
    m = 1
    dof = N-(m+1)
    s_square = rss/dof
    
    deno = [np.square(i - mean_X) for i in rel_freq]
    deno_sum = np.sum(deno)
    standard_error = s_square/np.sqrt(deno_sum)
    t_value = beta[0] / standard_error
    
    t_cdf = ss.t.cdf(t_value, dof)
    if t_cdf < 0.5:
        p_value =  t_cdf *2
    else:
        p_value = (1- t_cdf)*2
    
    return [beta[1], beta[0], p_value]


betas_p_values = review_with_rel_freqs_grouped_by_word.map(lambda t: (t[0], compute_beta(t[1])))
t20_positive_correlated_words = betas_p_values.sortBy(lambda d: d[1][1], False).take(20)
t20_negative_correlated_words = betas_p_values.sortBy(lambda d: d[1][1], False).take(20)

t20_positive_correlation