In [9]:
import os
import sys
from time import time
from pyspark import SparkContext
from pyspark.sql import SQLContext,Row,SparkSession
from pyspark.sql.types import *
from pyspark.ml.recommendation import ALS as ML_ALS
from pyspark.ml.evaluation import RegressionEvaluator
from time import time
from pyspark.storagelevel import StorageLevel
from pyspark.mllib.recommendation import Rating,ALS as MLLIB_ALS

In [10]:
# Path for spark source folder
os.environ['SPARK_HOME'] = "G:\spark-2.2.0-bin-hadoop2.7"

# Append pyspark to Python Path
sys.path.append("G:\spark-2.2.0-bin-hadoop2.7\python")

sc = SparkContext().getOrCreate()

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by __init__ at <ipython-input-2-0a58089c11ed>:7 

In [11]:
class DataFrame_with_out_RDD():
    def __init__(self, sql_context, movies_csv, ratings_csv):
        self.movies_csv = movies_csv
        self.ratings_csv = ratings_csv
        self.sql_context = sql_context
        self.als = ML_ALS()

    
    def get_spark_df(self, csv_file, schema):
        t0 = time()
        spark_df = self.sql_context.read.format('csv') \
            .option('delimeter', '\t') \
            .option("header", "true") \
            .load(csv_file, schema=StructType(schema))
        tt = time() - t0
        #print("*************************Time Consumed during creating {0} {1} *******************".format(schema,round(tt,3)))        
        return spark_df
    
    def get_movies_schema(self):
        movies_schema = [
            StructField("movieId", IntegerType(), True),
            StructField("title", StringType(), True),
            StructField("genres", StringType(), True)
        ]
        return movies_schema

    def get_ratings_schema(self):
        ratings_schema = [
            StructField("userID", IntegerType(), True),
            StructField("movieID", IntegerType(), True),
            StructField("rating", DoubleType(), True),
        ]
        return ratings_schema
    
    def get_best_rank(self,ranks,errors,models,count,min_error,reg_val,training_df,validation_df,test_df):
        best_rank = -1
        for rank in ranks:
            als = self.als.setRank(rank)
            model = als.fit(training_df)

            #make the prediction on the validation set
            predict_df = model.transform(validation_df)
            predicted_rating_df = predict_df.filter(predict_df.prediction !=float('nan'))
            predicted_rating_df.cache()
            
            #Run the previously created RMSE evaluator,reg_val,on the predicted_rating_df
            error = reg_val.evaluate(predicted_rating_df)
            errors[count] = error
            models[count] = model
            print('For rank %s the RMSE is %s'% (rank,error))

            if error < min_error:
                min_error = error
                best_rank = count
            count +=1
        return best_rank
    
    def build_model(self):
        t0 = time()
        
        movies_schema = DataFrame_with_out_RDD.get_movies_schema(self)
        raw_movies_df = DataFrame_with_out_RDD.get_spark_df(self, self.movies_csv, movies_schema)
        # print movies data
        '''
        print("Movies Data: !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
        raw_movies_df.show()
        '''
        
        # removed genres from movies_df
        movies_df = raw_movies_df.drop('genres')

        '''
        print("After removed genres from movies dataframe !!!!!!!!!!!!!")
        movies_df.show()
        '''
        
        ratings_schema = DataFrame_with_out_RDD.get_ratings_schema(self)
        ratings_df = DataFrame_with_out_RDD.get_spark_df(self, self.ratings_csv, ratings_schema)
        
        print("Rating Data: !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
        ratings_df.show()
        
        
        # cached both DataFrames
        movies_df.cache()
        ratings_df.cache()

        # Splitting the ratings DataFrame
        (training_df, validation_df, test_df) = ratings_df.randomSplit([0.6, 0.2, 0.2], seed=42)

        # Now cached the data splits
        training_df.cache()
        validation_df.cache()
        test_df.cache()

        # initailze the ALS() method from ml.recommendations
        print("Initialize the ALS ")
        als =self.als
        MAX_ITERATIONS  = 10
        REG_PARAM = 0.01
        SEED_VALUE = 42

        als.setMaxIter(MAX_ITERATIONS)\
            .setSeed(SEED_VALUE)\
            .setRegParam(REG_PARAM)\
            .setUserCol('userID')\
            .setItemCol('movieID')\
            .setRatingCol('rating')\
           
        ranks=[1,3,5,7]
        errors = [0,0,0,0]
        models = [0,0,0,0]
        count = 0
        min_error = float('inf') #greater than any other number
        
        #Create RMSE evaluator using the label and prediction columns
        reg_val = RegressionEvaluator(predictionCol='prediction',labelCol='rating',metricName='rmse')

        best_rank = DataFrame_with_out_RDD.get_best_rank(self,ranks,errors,models,count,min_error,reg_val,training_df,validation_df,test_df)
        
        als.setRank(ranks[best_rank])
        print('The best model was trained with rank {0}'.format(ranks[best_rank]))
        
        #Select the best model 
        b_model = models[best_rank]
        
        #Apply model on test dataset 
        test_predict_df = b_model.transform(test_df)
        test_predict_df.cache()
        
        test_predict_df = test_predict_df.filter(test_predict_df.prediction != float('NaN'))
        test_predict_df.cache()
        
        #Compute the test accuracy 
        test_RMSE = reg_val.evaluate(test_predict_df)
        
        print('The model had a RMSE on the test set of {0}'.format(test_RMSE))        
        tt = time() - t0
        print("Model build in %s seconds"%round(tt,3))
        print("Representation of DataFrame: ")
        print(test_predict_df.show(10))

In [12]:
class DataFrame_with_RDD():
    def __init__(self,SC,rating_csv):
        self.SC = SC
        self.rating_csv = rating_csv
        
    def get_values(self,ranks,models,min_error,MSErrors,ratings,MAX_ITERATIONS,SEED_VALUE):
        count = 0
        best_rank = -1
        for rank in ranks:
            model = MLLIB_ALS.train(ratings=ratings, rank=rank, iterations=MAX_ITERATIONS, seed=SEED_VALUE)
            test_data = ratings.map(lambda p: (p[0], p[1]))
            prediction = model.predictAll(test_data).map(lambda r: ((r[0], r[1]), r[2]))
            ratesAndPred = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(prediction)
            MSError = ratesAndPred.map(lambda r: (r[1][0] - r[1][1]) ** 2).mean()
            print("Error of %s = %s " % (rank, MSError))
            MSErrors[count] = MSError
            models[count] = model
            if MSError < min_error:
                min_error = MSError
                best_rank = count
            count += 1
        return best_rank,test_data,prediction,ratesAndPred
    
    def get_ratings_rdd(self):
        t0 = time()
        ratings_rdd = self.SC.textFile(self.rating_csv)
        header = ratings_rdd.first()
        ratings_rdd = ratings_rdd.filter(lambda row: row != header) 
        ratings = ratings_rdd.map(lambda l: l.split(','))\
                .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
        time_taken = time() - t0
        #print("*************************Time Consumed during creating rdd {0} *******************".format(round(time_taken,3)))
        print(ratings.take(10))
        return ratings
        
    def build_model(self):
        t0 = time()
        ratings = DataFrame_with_RDD.get_ratings_rdd(self)
        ranks = [3, 5, 7]
        Errors = [0, 0, 0, 0]
        models = [0, 0, 0, 0]
        count = 0
        min_error = float('inf')  # greater than any other number
        MAX_ITERATIONS = 10
        SEED_VALUE = 42
        best_rank,test_data,prediction,rateAndPred = DataFrame_with_RDD.get_values(self,ranks,models,min_error,\
                                                                    Errors,ratings,MAX_ITERATIONS,SEED_VALUE)
        print('************* The best model was trained with rank {0} *******************'.format(ranks[best_rank],Errors[best_rank]))
        time_taken = time() - t0
        print("****************** Time Consumed during model creation {0} *******************".format(round(time_taken,3)))
        print("Representation of RDD: ")
        print(rateAndPred.take(10))

In [13]:
if __name__=='__main__':
    sql_context = SQLContext(sc)
    print("Dataframe !!!!!!!!!!!!!!!!!!")
    movie_csv_file = "C:\\Users\\Rohit Singh\\Desktop\\Internship Task\\ml-latest-small\\movies.csv"
    rating_csv_file = "C:\\Users\\Rohit Singh\\Desktop\\Internship Task\\ml-latest-small\\ratings.csv"

    ALS_with_Dataframe =  DataFrame_with_out_RDD(sql_context,movie_csv_file,rating_csv_file)
    ALS_with_Dataframe.build_model()

Dataframe !!!!!!!!!!!!!!!!!!
Rating Data: !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+------+-------+------+
|userID|movieID|rating|
+------+-------+------+
|     1|     31|   2.5|
|     1|   1029|   3.0|
|     1|   1061|   3.0|
|     1|   1129|   2.0|
|     1|   1172|   4.0|
|     1|   1263|   2.0|
|     1|   1287|   2.0|
|     1|   1293|   2.0|
|     1|   1339|   3.5|
|     1|   1343|   2.0|
|     1|   1371|   2.5|
|     1|   1405|   1.0|
|     1|   1953|   4.0|
|     1|   2105|   4.0|
|     1|   2150|   3.0|
|     1|   2193|   2.0|
|     1|   2294|   2.0|
|     1|   2455|   2.5|
|     1|   2968|   1.0|
|     1|   3671|   3.0|
+------+-------+------+
only showing top 20 rows

Initialize the ALS 
For rank 1 the RMSE is 1.3311724107946843
For rank 3 the RMSE is 1.0384365334796244
For rank 5 the RMSE is 1.1228417078768396
For rank 7 the RMSE is 1.1756376244225129
The best model was trained with rank 3
The model had a RMSE on the test set of 1.0397066285097418
Model build in 405.602 second

In [14]:
    print("RDD !!!!!!!!!!")
    SC = sc
    ALS_with_RDD =  DataFrame_with_RDD(SC,rating_csv_file)
    ALS_with_RDD.build_model()

RDD !!!!!!!!!!
[Rating(user=1, product=31, rating=2.5), Rating(user=1, product=1029, rating=3.0), Rating(user=1, product=1061, rating=3.0), Rating(user=1, product=1129, rating=2.0), Rating(user=1, product=1172, rating=4.0), Rating(user=1, product=1263, rating=2.0), Rating(user=1, product=1287, rating=2.0), Rating(user=1, product=1293, rating=2.0), Rating(user=1, product=1339, rating=3.5), Rating(user=1, product=1343, rating=2.0)]
Error of 3 = 0.511299294590665 
Error of 5 = 0.407893721305383 
Error of 7 = 0.3477487383536872 
************* The best model was trained with rank 7 *******************
****************** Time Consumed during model creation 106.683 *******************
Representation of RDD: 
[((1, 31), (2.5, 2.416029994638364)), ((1, 1129), (2.0, 2.4972826898952007)), ((1, 2193), (2.0, 1.8273437104380568)), ((2, 110), (4.0, 4.175767829222066)), ((2, 144), (3.0, 2.866384563264269)), ((2, 266), (5.0, 4.072009131080138)), ((2, 292), (3.0, 3.458009204354747)), ((2, 364), (3.0, 4.

# Comaprison between DatFrame and RDD Using PySpark 

##  Time Consumed while using  ALS() for recommendation:
         Dataframe take more time as compare to Rdd while using ALS().

##  ml use by  Dataframe : 
        from pyspark.ml.recommendation import ALS as ML_ALS

##  mllib use by RDD:
        from pyspark.mllib.recommendation import Rating,ALS as MLLIB_ALS 

##  RDD: 
        RDD is stand for Resilent Distributed Dataset.
        RDD use collection and better for unstructured data.
        RDD is good when we dont want impose schema such as columns format
        RDD can be converted into Dataframe 

##  Dataframe: 
        Dataframe is also a distributed collection of data.
        IN DataFrame data is organised into named column like a relational table.
        It is designed for large dataset processing.APIs of Dataframe are easy to use 
        similar to SQL language.
        
# Union of RDD and Dataframe is Datasets