# Purpose
This notebook will gather the rating data by game and user, split it into and save in a parquet format so that we don't have to reprocess the data for each recommendation algorithm we try.

In [15]:
from boardgamegeek import BGGClient
import requests
import seaborn as sns
from bs4 import BeautifulSoup
import re
from proxy_requests.proxy_requests import ProxyRequests
from retry import retry
import time
import os
import matplotlib.pyplot as plt
from collections import Counter, defaultdict
import numpy as np
import pandas as pd
import random
import pickle
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import UserDefinedFunction as udf
from pyspark.sql.types import IntegerType,FloatType,StructField,StructType
from lxml import etree

In [2]:
def local_path(path):
    
    return 'file://'+str(os.path.abspath(os.path.curdir))+'/'+path

sc = SparkContext('local[*]','temp')

#get only the files that will have stats
files = sc.wholeTextFiles(local_path('../DataAcquisition/Data/*/*.xml'))

In [3]:
def user_rating_parser(xml):
    # This function will return an rdd with game_id, user, and rating
    
    #get game id as key
    game_id = int(((xml[0]).split('/')[-1]).split('_')[0])
    
    soup = BeautifulSoup(xml[1])
    
    comments = soup.find_all('comment')
    
    # get rating and username for each comment and yield k,v pair
    for comment in comments:
        
        user = comment.get('username')
        
        rating = float(comment.get('rating'))
        
        yield (user,[[game_id],[rating]])
        
def user_rating_columnwise(xml):
    # This function will return an rdd with game_id, user, and rating
    
    #get game id as key
    game_id = int(((xml[0]).split('/')[-1]).split('_')[0])
    
    soup = BeautifulSoup(xml[1])
    
    comments = soup.find_all('comment')
    
    # get rating and username for each comment and yield k,v pair
    for comment in comments:
        
        user = comment.get('username')
        
        rating = float(comment.get('rating'))
        
        yield (user,game_id, rating)
        
def train_val_test(xml):
    # This function will return an rdd with game_id, user, and rating as well as a key 
    #which tells us if the data belongs to our training, validation, or test set.
    
    #get game id
    game_id = int(((xml[0]).split('/')[-1]).split('_')[0])
    
    soup = BeautifulSoup(xml[1])
    
    comments = soup.find_all('comment')
    
    # get rating and username for each comment and yield k,v pair
    for comment in comments:
        
        user = comment.get('username')
        
        rating = float(comment.get('rating'))
        
        train_val_test_split = np.random.choice(np.array([0,1,2]),p=[0.8,0.15,0.05])
            
        if train_val_test_split == 0:        
                                        
            yield ('train',[user,game_id, rating])
            
        elif train_val_test_split ==1:
            
            yield ('validation',[user,game_id, rating])
            
        elif train_val_test_split == 2:
            
            yield ('test',[user,game_id, rating])
    

def add_values(x,y):
    
    return [x[0]+y[0],x[1]+y[1]]

In [4]:
user_item_matrix = files.flatMap(user_rating_parser).reduceByKey(add_values).collect()

In [5]:
games = pd.read_csv('../DataAcquisition/top_games.csv')

In [6]:
#Let's make dictionaries which will transform thd game id to an integer between 0 and 999
#and another dictionary which encodes that integer to the corresponding game's "fancy name"
game_id_dict = {}
game_id = 0
id_to_fancy_name = {}
for x in games.values:
    
    game_id_dict[x[0]] = game_id
    id_to_fancy_name[game_id] = x[2]
    game_id+=1

In [7]:
#Save our index to name dictionary for use in other notebooks.
with open('index_to_name.pkl', 'wb') as f:
    pickle.dump(id_to_fancy_name, f, pickle.HIGHEST_PROTOCOL)

In [8]:
#Now we also need to convert the user id into an integer. We will use a default dict to accomplish this.
users = defaultdict(int)
user_count = 1

for x in user_item_matrix:
    
    if users[x[0]] == 0:
        
        users[x[0]] = user_count
        user_count+=1

In [9]:
sqlContext = SQLContext(sc)

In [16]:
data_schema = [StructField('user', IntegerType(), True), StructField('item', IntegerType(), True),
              StructField('rating',FloatType(),True)]
final_struc = StructType(fields=data_schema)
user_item_matrix = files.flatMap(user_rating_columnwise).map(lambda x: (int(users[x[0]]),int(game_id_dict[x[1]]),
                                   round(float(x[2]),2)))
alternate_form = user_item_matrix.toDF(schema=final_struc)

In [17]:
alternate_form.printSchema()

root
 |-- user: integer (nullable = true)
 |-- item: integer (nullable = true)
 |-- rating: float (nullable = true)



In [18]:
alternate_form = alternate_form.withColumnRenamed("_1", "user"
                                                 ).withColumnRenamed("_2", "item"
                                                                    ).withColumnRenamed("_3", "rating")
alternate_form.write.save("parquet_user_rating_matrix", format="parquet")

In [19]:
user_item_matrix_labeled = files.flatMap(train_val_test).map(lambda x: [x[0],(users[x[1][0]],game_id_dict[x[1][1]],
                                      round(float(x[1][2]),2))])

In [20]:
train = user_item_matrix_labeled.filter(lambda x: x[0] == 'train').map(lambda x: x[1])
alternate_form = train.toDF(schema=final_struc)
alternate_form.printSchema()

root
 |-- user: integer (nullable = true)
 |-- item: integer (nullable = true)
 |-- rating: float (nullable = true)



In [21]:
alternate_form = alternate_form.withColumnRenamed("_1", "user"
                                                 ).withColumnRenamed("_2", "item"
                                                                    ).withColumnRenamed("_3", "rating")
alternate_form.write.save("parquet_train", format="parquet")

In [22]:
val = user_item_matrix_labeled.filter(lambda x: x[0] == 'validation').map(lambda x: x[1])
alternate_form = val.toDF(schema=final_struc)
alternate_form = alternate_form.withColumnRenamed("_1", "user"
                                                 ).withColumnRenamed("_2", "item"
                                                                    ).withColumnRenamed("_3", "rating")
alternate_form.write.save("parquet_validation", format="parquet")

In [23]:
test = user_item_matrix_labeled.filter(lambda x: x[0] == 'test').map(lambda x: x[1])
alternate_form = test.toDF(schema=final_struc)
alternate_form = alternate_form.withColumnRenamed("_1", "user"
                                                 ).withColumnRenamed("_2", "item"
                                                                    ).withColumnRenamed("_3", "rating")
alternate_form.write.save("parquet_test", format="parquet")