In [10]:
import pandas as pd

sqlContext = SQLContext(spark)
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
import os.path

spark = SparkSession.builder.getOrCreate()

In [2]:
DATA_FOLDER = 'data/'
RESULT_FOLDER = 'results/'

RATINGS_KINDLE = 'ratings_Kindle_Store.csv'
REVIEWS_KINDLE = 'reviews_Kindle_Store_5.json'
RATINGS_BOOKS = 'ratings_Books.csv'
REVIEWS_BOOKS = 'reviews_Books_5.json'

BOOKS_SELECTED_USERS = 'books_selected_users.parquet'
KINDLE_SELECTED_USERS = 'kindle_selected_users.parquet'

RATINGS_SCHEMA = StructType([
    StructField("User", StringType(), True),
    StructField("Asin", IntegerType(), True),
    StructField("Ratings", FloatType(), True),
    StructField("Timestamp", IntegerType(), True)])

RATINGS_USER = 0
RATINGS_ASIN = 1
RATINGS_RATINGS = 2
RATINGS_TIMESTAMP = 3

In [3]:
def user_is_in(data, sel_users):
    '''
    Indicates if the user corresponding to a data sample belongs to the given array
    :param : {'User', ...} tuple
    :param : String[]
    :return : Boolean
    '''
    return data['User'] in sel_users

def write_parquet(df, path, force = False):
    '''
    Writes the given dataframe into a parquet file
    :param : DataFrame
    :param : String
    :param : Boolean
    '''
    if os.path.isfile(path) and not force:
        print("You already have an existing file with this name.")
        print("If you want to overwrite it, set force = True")
    else:
        df.write.mode('overwrite').parquet(path)
        
def get_selected_users(regenerate = False):
    '''
    Return the dataframes containing the ratings on books and kindle of users who bought both type of platforms
    :param : Boolean
    :return : DataFrame, DataFrame
    '''
    books_distinct_users = df_ratings_books.select('User').distinct()
    kindle_distinct_users = df_ratings_kindle.select('User').distinct()
    both_users = books_distinct_users.join(kindle_distinct_users, 'User').distinct().persist()

    if os.path.isfile(RESULT_FOLDER + BOOKS_SELECTED_USERS) and not regenerate:
        books_sel_users = spark.read.parquet(RESULT_FOLDER + BOOKS_SELECTED_USERS)
    else:
        books_sel_users = df_ratings_books.join(both_users, 'User')
        
    if os.path.isfile(RESULT_FOLDER + KINDLE_SELECTED_USERS and not regenerate):
        kindle_sel_users = spark.read.parquet(RESULT_FOLER + KINDLE_SELECTED_USERS)
    else:
        kindle_sel_users = df_ratings_kindle.join(both_users, 'User')
        
    return books_sel_users, kindle_sel_users

# Load data

In [4]:
# Ratings datasets

# Kindle
df_ratings_kindle = spark.read.csv(DATA_FOLDER + RATINGS_KINDLE, header = False, schema = RATINGS_SCHEMA)
print(df_ratings_kindle.head(5))

# Books
df_ratings_books = spark.read.csv(DATA_FOLDER + RATINGS_BOOKS, header = False, schema = RATINGS_SCHEMA)
print(df_ratings_books.head(5))

[Row(User='A2GZ9GFZV1LWB0', Asin=1603420304, Ratings=4.0, Timestamp=1405209600), Row(User='A1K7VSUDCVAPW8', Asin=1603420304, Ratings=3.0, Timestamp=1282176000), Row(User='A35J5XRE5ZT6H2', Asin=1603420304, Ratings=4.0, Timestamp=1365206400), Row(User='A3DGZNFSMNWSX5', Asin=1603420304, Ratings=4.0, Timestamp=1285632000), Row(User='A2CVDQ6H36L4VL', Asin=1603420304, Ratings=5.0, Timestamp=1342396800)]
[Row(User='AH2L9G3DQHHAJ', Asin=116, Ratings=4.0, Timestamp=1019865600), Row(User='A2IIIDRK3PRRZY', Asin=116, Ratings=1.0, Timestamp=1395619200), Row(User='A1TADCM7YWPQ8M', Asin=868, Ratings=4.0, Timestamp=1031702400), Row(User='AWGH7V0BDOJKB', Asin=13714, Ratings=4.0, Timestamp=1383177600), Row(User='A3UTQPQPM4TQO0', Asin=13714, Ratings=5.0, Timestamp=1374883200)]


In [5]:
# Reviews datasets

# Kindle
df_reviews_kindle = spark.read.json(DATA_FOLDER + REVIEWS_KINDLE)
print(df_reviews_kindle.head(1))

# Books
df_reviews_books = spark.read.json(DATA_FOLDER + REVIEWS_BOOKS)
df_reviews_books.head(1)

[Row(asin='B000F83SZQ', helpful=[0, 0], overall=5.0, reviewText="I enjoy vintage books and movies so I enjoyed reading this book.  The plot was unusual.  Don't think killing someone in self-defense but leaving the scene and the body without notifying the police or hitting someone in the jaw to knock them out would wash today.Still it was a good read for me.", reviewTime='05 5, 2014', reviewerID='A1F6404F1VG29J', reviewerName='Avidreader', summary='Nice vintage story', unixReviewTime=1399248000)]


[Row(asin='000100039X', helpful=[0, 0], overall=5.0, reviewText='Spiritually and mentally inspiring! A book that allows you to question your morals and will help you discover who you really are!', reviewTime='12 16, 2012', reviewerID='A10000012B7CGYKOMPQ4L', reviewerName='Adam', summary='Wonderful!', unixReviewTime=1355616000)]

# Basic statistics
Compute basic statistics on both datasets to verify if any difference exists between the two (very different means of the ratings would mean that one is generally prefered to the other, and very different variances would mean that some opinions on one of the support is not as unilateral as the other).

In [46]:
# Averages

# Books
books_ratings = df_ratings_books.select('Ratings').rdd.map(lambda x : x[0]).persist()
books_nb_sample = books_ratings.count()
books_average = books_ratings.mean()
books_variance = books_ratings.variance()
books_ratings.unpersist()

# Kindle
kindle_ratings = df_ratings_kindle.select('Ratings').rdd.map(lambda x : x[0]).persist()
kindle_nb_sample = kindle_ratings.count()
kindle_average = kindle_ratings.mean()
kindle_variance = kindle_ratings.variance()
kindle_ratings.unpersist()

# Print results
print("-- Books -- Count : {}, Mean : {}, Variance : {}".format(books_nb_sample, books_average, books_variance))
print("-- Kindle -- Count : {}, Mean : {}, Variance : {}".format(kindle_nb_sample, kindle_average, kindle_variance))

-- Books -- Count : 22507155, Mean : 4.29575892644, Variance : 1.23544712428
-- Kindle -- Count : 3205467, Mean : 4.23210689737, Variance : 1.28548661246


## Results
As expected, there is no differences in the basic statistics. The means are similar as well as the variances. This means that the two supports are appreciated the same way.

# Select users who bought both Kindles and Books

In [7]:
# Select users who bought both kindle and books
books_distinct_users = df_ratings_books.select('User').distinct()
kindle_distinct_users = df_ratings_kindle.select('User').distinct()
#both_users = books_distinct_users.join(kindle_distinct_users, 'User').distinct().persist()

In [8]:
books_distinct_users.head(5)

[Row(User='A00115723IFW2X9CUR3ST'),
 Row(User='A0161036FHTX15O1IG9T'),
 Row(User='A022858327OPV8WND7KHM'),
 Row(User='A028136819UJ4AT33LID2'),
 Row(User='A03032083BKNO315AGVT0')]

In [9]:
write_parquet(books_distinct_users, RESULT_FOLDER + 'books_users')
write_parquet(kindle_distinct_users, RESULT_FOLDER + 'kindle_users')

In [22]:
books_distinct_users.registerTempTable("books_users")
kindle_distinct_users.registerTempTable("kindle_users")

In [27]:
df_common_users = spark.sql("""SELECT books_users.User
             FROM books_users 
             WHERE books_users.User IN 
             (SELECT kindle_users.User 
             FROM kindle_users)""")

In [28]:
write_parquet(df_common_users, RESULT_FOLDER + 'common_users')

In [29]:
df_common_users = sqlContext.read.parquet(RESULT_FOLDER + "common_users") 

In [36]:
df_common_users.count()

1361903