In [50]:
import pyspark

from pyspark import SparkConf,SparkContext
from pyspark.sql.functions import regexp_extract
from pyspark.sql.types import *
from pyspark.sql import SQLContext

from pyspark.sql.functions import monotonically_increasing_id,row_number 

from pyspark.sql.functions import isnan, count, when, col, desc, udf, col,rand
from pyspark.sql.functions import sort_array, asc, avg
from pyspark.sql.functions import min as Fmin
from pyspark.sql.functions import max as Fmax
from pyspark.sql.functions import stddev as Fstddev
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler


from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

from pyspark.sql import Window

from pyspark.sql.functions import sum as Fsum
from pyspark.sql.functions import rank 
import pyspark.sql.functions as F
from pyspark.sql import DataFrameStatFunctions as statFunc
from pyspark.sql.functions import first
from pyspark.sql.functions import lit

from pyspark.sql.functions import col, countDistinct


import pandas as pd
import sklearn.metrics as metrics
import numpy as np
from sklearn.neighbors import NearestNeighbors
from scipy.spatial.distance import correlation
from sklearn.metrics.pairwise import pairwise_distances
import ipywidgets as widgets
from IPython.display import display, clear_output
from contextlib import contextmanager
import warnings
warnings.filterwarnings('ignore')
import os, sys
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [51]:
Training_filename ='TrainingRatings.txt'
Testing_filename = 'TestingRatings.txt'
Movie_filename = 'movie_titles.txt'
#'s3://netflixfinal/movie_titles.txt'

In [52]:
from pyspark.sql.types import *

movies_df_schema = StructType(
  [StructField('movieId', IntegerType()),
   StructField('yearofrelease', IntegerType()),
   StructField('title', StringType())]
)
Training_df_schema = StructType(
  [StructField('movieId', IntegerType()),
   StructField('userId', IntegerType()),
   StructField('ratings',DoubleType())]
)

Testing_df_schema = StructType(
  [StructField('movieId', IntegerType()),
   StructField('userId', IntegerType()),
   StructField('ratings',DoubleType())]
)

In [53]:
#Creating the training,testing and movie dataframes

df_training = sqlContext.read.format('txt').options(inferSchema=True).schema(Training_df_schema).csv(Training_filename)
 
df_testing = sqlContext.read.format('txt').options(inferSchema=True).schema(Testing_df_schema).csv(Testing_filename)
 
df_movies = sqlContext.read.format('txt').options(inferSchema=True).schema(movies_df_schema).csv(Movie_filename)
 



#Count of each dataframes     
training_count = df_training.count()
testing_count = df_testing.count()
movies_count = df_movies.count()


print('There are %s samples in training set , %s samples in testing set and %s samples in movies in the datasets' % (training_count,testing_count, movies_count))
print('Training:')
df_training.show(5)
print('Testing:')
df_testing.show(5)
print('Movies:')
df_movies.show(5,truncate=False)

There are 3255352 samples in training set , 100478 samples in testing set and 17770 samples in movies in the datasets
Training:
+-------+-------+-------+
|movieId| userId|ratings|
+-------+-------+-------+
|      8|1744889|    1.0|
|      8|1395430|    2.0|
|      8|1205593|    4.0|
|      8|1488844|    4.0|
|      8|1447354|    1.0|
+-------+-------+-------+
only showing top 5 rows

Testing:
+-------+-------+-------+
|movieId| userId|ratings|
+-------+-------+-------+
|      8| 573364|    1.0|
|      8|2149668|    3.0|
|      8|1089184|    3.0|
|      8|2465894|    3.0|
|      8| 534508|    1.0|
+-------+-------+-------+
only showing top 5 rows

Movies:
+-------+-------------+----------------------------+
|movieId|yearofrelease|title                       |
+-------+-------------+----------------------------+
|1      |2003         |Dinosaur Planet             |
|2      |2004         |Isle of Man TT 2004 Review  |
|3      |1997         |Character                   |
|4      |1994      

In [54]:
## HOW MANY DISTINCT USERS AND DISTINCT ITEMS ARE THERE IN THE TEST SET ?
#finding out the distinct users in the training set and testing set 

print("The distinct users in training set are :",df_training.select('userID').distinct().count())
print("The distinct users in testing set are :",df_testing.select('userID').distinct().count())


The distinct users in training set are : 28978
The distinct users in testing set are : 27555


In [55]:
#finding out the distinct items(movies) in the training set and testing set 

print("The distinct items in training set are :",df_training.select('movieID').distinct().count())
print("The distinct items in testing set are :",df_testing.select('movieID').distinct().count())

The distinct items in training set are : 1821
The distinct items in testing set are : 1701


In [56]:
# Finding Overall Rates for users and Movies in both the sets
## Movie Rating from Training Set
#checking overall rate of the movies  in the training set

Movie_rate_train = df_training.groupBy('movieId')

Movie_rate_train.avg('ratings').orderBy("avg(ratings)", ascending=False).show()

+-------+-----------------+
|movieId|     avg(ratings)|
+-------+-----------------+
|   3033|              4.5|
|  12293|4.464598134454594|
|  16147|4.422818791946309|
|  14283|4.418618618618619|
|   1256|4.414830736163353|
|   5760|4.410690051153565|
|   7569| 4.38388625592417|
|   4238|4.372822299651568|
|  14648| 4.35288414929714|
|   3290|4.339868147120056|
|  10947|4.339148474704135|
|  10080|4.318296583066823|
|   7016|4.318181818181818|
|  17085|4.283419689119171|
|   3928|4.234884732492388|
|   4207|4.234758013827781|
|  12184|4.230476707863643|
|  15557|4.218554861730597|
|   7445|4.198757763975156|
|    634|4.195422535211268|
+-------+-----------------+
only showing top 20 rows



In [57]:
## User Rating from Training Set


User_rate_train = df_training.groupBy('userId')
User_rate_train.avg('ratings').orderBy("avg(ratings)", ascending=False).show()

+-------+------------------+
| userId|      avg(ratings)|
+-------+------------------+
| 336578|               5.0|
|1482568|               5.0|
|2451667|               5.0|
| 784490|               5.0|
|2307226|               5.0|
|1663569|               5.0|
|1309838|               5.0|
|1193505|               5.0|
|1745577|               5.0|
| 396595|               5.0|
| 794999| 4.996108949416342|
|1379159| 4.989247311827957|
| 175935| 4.987341772151899|
| 642384| 4.987179487179487|
|1299754| 4.987179487179487|
|2635943| 4.982142857142857|
| 132333| 4.977777777777778|
|  27318|4.9753086419753085|
|1293835|4.9753086419753085|
| 223526|  4.97457627118644|
+-------+------------------+
only showing top 20 rows



In [58]:
## Movie Ratings from Testing Set


#checking overall rate of the movies  in the testing set
Movie_rate_test = df_testing.groupBy('movieID')
Movie_rate_test.avg('ratings').orderBy("avg(ratings)", ascending=False).show()

+-------+------------+
|movieID|avg(ratings)|
+-------+------------+
|   3033|         5.0|
|  13760|         5.0|
|  12695|         5.0|
|   6830|         5.0|
|  10404|         5.0|
|  13989|         5.0|
|  15536|         5.0|
|  12544|         5.0|
|  11657|         5.0|
|  13606|         5.0|
|   7823|         5.0|
|  12705|         5.0|
|   3164|         5.0|
|   3722|         5.0|
|  10743|         5.0|
|   5225|         5.0|
|   9930|         5.0|
|  11159|         5.0|
|  13965|         5.0|
|   9920|         5.0|
+-------+------------+
only showing top 20 rows



In [59]:
## User Ratings from Testing Set

User_rate_test = df_training.groupBy('userID')
User_rate_test.avg('ratings').orderBy("avg(ratings)", ascending=False).show()


+-------+------------------+
| userID|      avg(ratings)|
+-------+------------------+
| 336578|               5.0|
|1663569|               5.0|
|2307226|               5.0|
|1193505|               5.0|
|1482568|               5.0|
| 784490|               5.0|
|1309838|               5.0|
|2451667|               5.0|
|1745577|               5.0|
| 396595|               5.0|
| 794999| 4.996108949416342|
|1379159| 4.989247311827957|
| 175935| 4.987341772151899|
| 642384| 4.987179487179487|
|1299754| 4.987179487179487|
|2635943| 4.982142857142857|
| 132333| 4.977777777777778|
|1293835|4.9753086419753085|
|  27318|4.9753086419753085|
| 223526|  4.97457627118644|
+-------+------------------+
only showing top 20 rows



In [60]:
#Identifying the number of users have watched each movie from the testing set

df_testing.groupBy('movieId').count().orderBy("count", ascending=False).show(10)

print("This user has watched this much movies")


+-------+-----+
|movieId|count|
+-------+-----+
|   6971|  811|
|   4640|  756|
|   6287|  737|
|   9728|  706|
|   8915|  695|
|   4432|  692|
|   8596|  691|
|   6408|  680|
|   1406|  652|
|   1744|  650|
+-------+-----+
only showing top 10 rows

This user has watched this much movies


In [61]:
#Identifying the number of movies each user has watched from testing set

df_testing.groupBy('userId').count().orderBy("count", ascending=False).show(10)

+-------+-----+
| userId|count|
+-------+-----+
|1664010|   70|
| 305344|   52|
|2439493|   52|
| 387418|   51|
|1314869|   38|
|2118461|   34|
|1932594|   28|
| 491531|   27|
|2606799|   27|
| 727242|   25|
+-------+-----+
only showing top 10 rows



In [62]:
#Identifying the number of movies each user has watched from training set 

df_training.groupBy('userId').count().orderBy("count", ascending=True).show(10) 


+-------+-----+
| userId|count|
+-------+-----+
| 202623|   68|
| 482107|   69|
|1405451|   69|
| 886158|   69|
|2474140|   69|
|1583736|   69|
|1922286|   69|
|1714390|   69|
|1382057|   69|
|1893836|   69|
+-------+-----+
only showing top 10 rows



In [63]:
#Identifying the number of users have watched each movie from training set 

df_training.groupBy('movieId').count().orderBy("count", ascending=False).show(10)

print("Movie with Id=6971 has been watched 25468 times ")

+-------+-----+
|movieId|count|
+-------+-----+
|   6971|25468|
|   6287|24393|
|   4640|23525|
|   9728|23184|
|   8596|23005|
|   4432|22565|
|  10947|21209|
|   6408|21198|
|   1202|20997|
|  13651|20902|
+-------+-----+
only showing top 10 rows

Movie with Id=6971 has been watched 25468 times 


# Joining the train and test dataframes with the movies dataframe

In [64]:
df_training_joined = df_training.join(df_movies,on=['movieId'],how='inner')

df_testing_joined = df_testing.join(df_movies,on=['movieId'],how='inner')

In [65]:
type(df_training_joined)

type(df_testing_joined)

pyspark.sql.dataframe.DataFrame

In [66]:
df_training_joined.show()

+-------+-------+-------+-------------+--------------------+
|movieId| userId|ratings|yearofrelease|               title|
+-------+-------+-------+-------------+--------------------+
|      8|1744889|    1.0|         2004|What the #$*! Do ...|
|      8|1395430|    2.0|         2004|What the #$*! Do ...|
|      8|1205593|    4.0|         2004|What the #$*! Do ...|
|      8|1488844|    4.0|         2004|What the #$*! Do ...|
|      8|1447354|    1.0|         2004|What the #$*! Do ...|
|      8| 306466|    4.0|         2004|What the #$*! Do ...|
|      8|1331154|    4.0|         2004|What the #$*! Do ...|
|      8|1818178|    3.0|         2004|What the #$*! Do ...|
|      8| 991725|    4.0|         2004|What the #$*! Do ...|
|      8|1987434|    4.0|         2004|What the #$*! Do ...|
|      8|1765381|    4.0|         2004|What the #$*! Do ...|
|      8| 433803|    3.0|         2004|What the #$*! Do ...|
|      8|1148143|    2.0|         2004|What the #$*! Do ...|
|      8|1174811|    5.0

In [67]:
user_df = sqlContext.read.format('csv').options(header=True, inferSchema=False).schema(df_schema).load('s3://netfinal/self-user.txt')
user_df.show(2, truncate=False)

NameError: name 'df_schema' is not defined