# 02 - Data Sampling

The goal of this notebook is to develop a sampling method to select movie ratings from the original 5-core Amazon Movies and TV dataset. We start by writing a sampling function that takes proportion as a parameter and then pass in different sample proportions to obtain datasets with different sample sizes. These samples will later be used to study how the overall recommendation accuracy changes and how run-time scales as sample size changes from small to large.

In [21]:
# Import nessesary Python modules
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[4]").appName("main").getOrCreate()
from pyspark.sql.functions import col

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os

sns.set(style='whitegrid')
% matplotlib inline

In [27]:
# Create the file path and read the JSON file
data_path = os.path.join('..','..','data')
data_file = os.path.join(data_path, 'reviews.json')

# Create a Spark DataFrame called "sdf" using data from the JSON file
sdf = spark.read.json(data_path)

# Output the structure (column names and types) of the DataFrame 
sdf.printSchema()

# Output the number of total ratings (number of rows in this DataFrame)
total_ratings = sdf.count()
print 'Total ratings:', total_ratings

root
 |-- _corrupt_record: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)



2383182

# Derek's thoughts

I changed the sampling methodology a bit. It turns out we can't just filter movies and users just once because when we remove users, that changes the number of reviews per movie, and vice versa. So, we have to iteratetively filter to convergence. I initially wrote make_sample for spark dataframes, but it turns out that running this locally is quite slow, so I wrote an identical implementation for pandas, which was lightning fast! So for data sampling, I just did everything in pandas but left the spark code here anyways.

# Methdology Notes

The density of the original 5-core dataset is approximately 0.00027. If we directly take a small random sample at this sparsity level, the density of the resulting dataset could be even lower and runs the risk of negatively affecting the quality of our recommendations. To correct for this, we restrict each of our samples to be above a certain density level by controlling the minimum number of ratings a movie and a user can have. 

To achieve this goal, we first take a random sample based on a given fraction, and then we iteratively filter through the sample based on a given threshold on number of ratings, until the minimum number of ratings in the resulting dataset converges to this threshold. 

By default, a movie or a user should have at least five ratings in order to be included in a sample. However, this threshold can be changed based on different sample sizes and desired density levels. We also set the default random seed to be 1 in order to obtain the same random samples every time we run the program.

In [28]:
# spark version
# def make_sample_sdf(in_df, in_prop, in_threshold=5, in_seed=1):

#     min_reviews_per_movie = 0
#     min_reviews_per_user = 0
#     n_iterations = 1
#     sample_df = in_df.sample(False, in_prop, in_seed)
#     n_samples = sample_df.count()
#     print 'Number of initial samples:', n_samples
    
#     while min_reviews_per_movie < in_threshold and min_reviews_per_user < in_threshold and sample_df.count() > 0:
#         print 'Iteration:', n_iterations
    
#         sample_item_counts = sample_df.groupby('asin').count()
#         sample_item_counts.cache()
#         items_geq = sample_item_counts.filter(sample_item_counts['count'] >= in_threshold)
#         items_geq.cache()
#         n_items = items_geq.count()
#         print '\tNumber of items with at least {} reviews:'.format(in_threshold), n_items 
#         min_reviews_per_movie = sample_item_counts.select('count').rdd.map(lambda x: x['count'])\
#             .reduce(lambda x, y: min(x, y))
#         print '\tMin reviews per movie:', min_reviews_per_movie
        
#         sample_user_counts = sample_df.groupby('reviewerID').count()
#         sample_user_counts.cache()
#         users_geq = sample_user_counts.filter(sample_user_counts['count'] >= in_threshold)
#         users_geq.cache()
#         n_users = users_geq.count()
#         print '\tNumber of users with at least {} reviews:'.format(in_threshold), n_users
#         min_reviews_per_user = sample_user_counts.select('count').rdd.map(lambda x: x['count'])\
#             .reduce(lambda x, y: min(x, y))
#         print '\tMin reviews per user:', min_reviews_per_user

#         asin_list = items_geq.select('asin').collect()
#         asin_list = [x['asin'].encode() for x in asin_list]
#         #print('asin_list:', len(asin_list))
    
#         user_list = users_geq.select('reviewerID').collect()
#         user_list = [x['reviewerID'].encode() for x in user_list]
#         #print('user_list:', len(user_list))
        
#         sample_df = sample_df.filter(sample_df['asin'].isin(asin_list) & sample_df['reviewerID'].isin(user_list))
#         #in_df.show()
#         print '\tNumber of final samples:', sample_df.count()
        
#         n_iterations += 1
        
#     print 'Density:', sample_df.count() / float(n_users * n_items)
#     return sample_df


# pandas version

# Define a function to make multiple samples of different sizes
def make_sample_pdf(in_df, in_prop, in_threshold=5, in_seed=1):

    # Initilalize variables
    min_reviews_per_movie = 0
    min_reviews_per_user = 0
    n_iterations = 1
    
    # Before generating samples, we first take a random sample based on a given fraction, then 
    # output the number rows in initial samples.
    sample_df = in_df.sample(frac = in_prop, random_state = in_seed)
    n_samples = sample_df.shape[0]
    print 'Number of initial samples:', n_samples
    
    # We interatively filter through the sample and make the original dataset more dense by assigning
    # a threshold on number of ratings/reviews, until the minimum number of ratings/reviews in the 
    # resulting dataset converges to this threshold.
    while min_reviews_per_movie < in_threshold and min_reviews_per_user < in_threshold and len(sample_df) > 0:
        print 'Iteration:', n_iterations
    
    
        # We filter through samples by number of ratings per movie based on a given threshold k, 
        # and output the number of items with at least k reviews and the minumum ratings per movie.
        sample_item_counts = sample_df.groupby('asin').count()
        items_geq = sample_item_counts[sample_item_counts['overall'] >= in_threshold]
        n_items = items_geq.shape[0]
        print '\tNumber of items with at least {} reviews:'.format(in_threshold), n_items 
        min_reviews_per_movie = sample_item_counts['overall'].min()
        print '\tMin reviews per movie:', min_reviews_per_movie
        
        # We filter through samples by number of ratings per user based on a given threshold k,
        # and output the number of users with at least k reviews and the minumum ratings per user.
        sample_user_counts = sample_df.groupby('reviewerID').count()
        users_geq = sample_user_counts[sample_user_counts['overall'] >= in_threshold]
        n_users = users_geq.shape[0]
        print '\tNumber of users with at least {} reviews:'.format(in_threshold), n_users
        min_reviews_per_user = sample_user_counts['overall'].min()
        print '\tMin reviews per user:', min_reviews_per_user
        
        # Create two lists using filtered data. This is the new sample we generated.
        asin_list = items_geq.index
        user_list = users_geq.index
        
        # Create a DataFrame using the two lists and output the number of rows in the final samples
        sample_df = sample_df[sample_df['asin'].isin(asin_list) & sample_df['reviewerID'].isin(user_list)]
        #in_df.show()
        print '\tNumber of final samples:', sample_df.shape[0]
        
        # Increment the interation by 1
        n_iterations += 1
    
    # Output the density and proportion of the final sample
    print 'Density:', sample_df.shape[0] / float(n_items * n_users)
    print 'Proportion:', sample_df.shape[0] / float(total_ratings)
    return sample_df

In [29]:
# Convert certain columns from Spark DataFrame to a Pandas DataFrame
pdf = sdf.select(*('asin', 'reviewerID', 'overall', 'reviewTime')).toPandas()

# Creating samples

# Derek's notes

The problem with using smaller samples of data while maintaining the density of the original data is simply that the original data is too sparse. This puts a lower bound on how small we can make our sampled data, so a solution was to make the original dataset more dense (i.e. require that each movie has at least $k$ reviews and each user made at least $k$ reviews for $k \geq 5$). In the make_sample function, I call $k$ the threshold. Basically, if we decide first what proportion of the data we want to sample, we can then adjust the threshold to control the density of the sample. The parameters I choose and the resulting densities were:

|Dataset|% Sampled|$k$|Density|
|---|---|---|---|
|pdf_sample_100|100|20|0.008|
|pdf_sample_50|50|13|0.007|
|pdf_sample_25|25|9|0.007|
|pdf_sample_10|10|5|0.005|


# Methdology

Next we implement the sampling function to select samples with a few different sizes, each with a corresponding threshold on number of ratings. The table belows provides a basic description of each sample.

|Dataset|% Sampled(?)|$k$|Density|% Sampled (result)
|---|---|---|---|---|
|pdf_sample_100|100|20|0.008|18|
|pdf_sample_50|50|13|0.007|7|
|pdf_sample_25|25|9|0.007|2|
|pdf_sample_10|10|5|0.005|?|

In [30]:
# Call the function above to make a sample of proportion == 100% and threshold == 20
pdf_sample_100 = make_sample_pdf(pdf, 1.0, 20)

Number of initial samples: 2383182
Iteration: 1
	Number of items with at least 20 reviews: 17798
	Min reviews per movie: 5
	Number of users with at least 20 reviews: 14176
	Min reviews per user: 5
	Number of final samples: 673342
Iteration: 2
	Number of items with at least 20 reviews: 9477
	Min reviews per movie: 1
	Number of users with at least 20 reviews: 10909
	Min reviews per user: 1
	Number of final samples: 533762
Iteration: 3
	Number of items with at least 20 reviews: 8624
	Min reviews per movie: 8
	Number of users with at least 20 reviews: 8515
	Min reviews per user: 1
	Number of final samples: 482382
Iteration: 4
	Number of items with at least 20 reviews: 7777
	Min reviews per movie: 10
	Number of users with at least 20 reviews: 8173
	Min reviews per user: 14
	Number of final samples: 461668
Iteration: 5
	Number of items with at least 20 reviews: 7637
	Min reviews per movie: 15
	Number of users with at least 20 reviews: 7738
	Min reviews per user: 5
	Number of final samples: 4

In [31]:
# Call the function above to make a sample of proportion == 50% and threshold == 13
pdf_sample_50 = make_sample_pdf(pdf, 0.5, 13)

Number of initial samples: 1191591
Iteration: 1
	Number of items with at least 13 reviews: 14489
	Min reviews per movie: 1
	Number of users with at least 13 reviews: 10385
	Min reviews per user: 1
	Number of final samples: 290920
Iteration: 2
	Number of items with at least 13 reviews: 6910
	Min reviews per movie: 1
	Number of users with at least 13 reviews: 7430
	Min reviews per user: 1
	Number of final samples: 215980
Iteration: 3
	Number of items with at least 13 reviews: 6154
	Min reviews per movie: 6
	Number of users with at least 13 reviews: 5517
	Min reviews per user: 1
	Number of final samples: 189966
Iteration: 4
	Number of items with at least 13 reviews: 5502
	Min reviews per movie: 4
	Number of users with at least 13 reviews: 5221
	Min reviews per user: 5
	Number of final samples: 179590
Iteration: 5
	Number of items with at least 13 reviews: 5379
	Min reviews per movie: 11
	Number of users with at least 13 reviews: 4932
	Min reviews per user: 8
	Number of final samples: 1748

In [32]:
# Call the function above to make a sample of proportion == 25% and threshold == 9
pdf_sample_25 = make_sample_pdf(pdf, 0.25, 9)

Number of initial samples: 595796
Iteration: 1
	Number of items with at least 9 reviews: 11136
	Min reviews per movie: 1
	Number of users with at least 9 reviews: 7020
	Min reviews per user: 1
	Number of final samples: 120133
Iteration: 2
	Number of items with at least 9 reviews: 4593
	Min reviews per movie: 1
	Number of users with at least 9 reviews: 4612
	Min reviews per user: 1
	Number of final samples: 81173
Iteration: 3
	Number of items with at least 9 reviews: 3981
	Min reviews per movie: 3
	Number of users with at least 9 reviews: 3183
	Min reviews per user: 1
	Number of final samples: 68196
Iteration: 4
	Number of items with at least 9 reviews: 3434
	Min reviews per movie: 4
	Number of users with at least 9 reviews: 2967
	Min reviews per user: 4
	Number of final samples: 62654
Iteration: 5
	Number of items with at least 9 reviews: 3317
	Min reviews per movie: 6
	Number of users with at least 9 reviews: 2745
	Min reviews per user: 5
	Number of final samples: 60076
Iteration: 6
	

In [None]:
# Call the function above to make a sample of proportion == 10% and threshold == 5
pdf_sample_10 = make_sample_pdf(pdf, 0.10, 5)

Number of initial samples: 238318
Iteration: 1
	Number of items with at least 5 reviews: 8928
	Min reviews per movie: 1


Finally, all samples are saved as CSV files so that they can be easily converted back to spark dataframes in the next notebook.

In [104]:
# Save data sampled to CSV files
pdf_sample_10.to_csv(os.path.join(data_path, 'reviews_sample_10.csv'))
pdf_sample_25.to_csv(os.path.join(data_path, 'reviews_sample_25.csv'))
pdf_sample_50.to_csv(os.path.join(data_path, 'reviews_sample_50.csv'))
pdf_sample_100.to_csv(os.path.join(data_path, 'reviews_sample_100.csv'))