### Creating Spark session
Spark session is "the gateway to the structured data processing".
It can be used to create datasets, dataframes, user defined functions and execute SQL.
It replaces SQLContext used in previous versions of Apache Spark.

In [1]:
import sagemaker_pyspark
from pyspark.sql import SparkSession

# This enables s3 support in Spark. You may need to restart the kernel!
classpath = ":".join(sagemaker_pyspark.classpath_jars())

spark = SparkSession.builder \
    .master("local") \
    .appName("Teemuko mle") \
    .config("spark.driver.extraClassPath", classpath) \
    .getOrCreate()

### Loading the CSV-data from S3

In [2]:
# Movie ratings data
filePath = "s3a://sagemaker-tmukoo/ratings-5000.csv"
#filePath = "s3a://sagemaker-tmukoo/ratings.csv"


ratings = spark.read.load(filePath, format="csv", inferSchema="true", header="true")
#df=spark.read.csv(filePath,header=True)

In [3]:
# Movie metadata
filePath = "s3a://sagemaker-tmukoo/movies_metadata.csv"


movies = spark.read.load(filePath, format="csv", inferSchema="true", header="true")
#df=spark.read.csv(filePath,header=True)

## Data exploration

In [4]:
ratings.take(3)

[Row(userId=1, movieId=110, rating=1.0, timestamp=1425941529),
 Row(userId=1, movieId=147, rating=4.5, timestamp=1425942435),
 Row(userId=1, movieId=858, rating=5.0, timestamp=1425941523)]

In [5]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [6]:
movies.printSchema()

root
 |-- adult: string (nullable = true)
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- vote_count: string (nu

ratings.describe().show()

In [7]:
# How many distinct userIds?
ratings.select('userId').distinct().count()

49

In [8]:
# How many distinct movieIds?
ratings.select('movieId').distinct().count()

2347

In [9]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
plt.rc("font",size=15)
ratings.select('rating').toPandas().rating.sort_values().value_counts(sort=False).plot(kind='bar')
plt.title('Rating distribution')
plt.xlabel('Rating')
plt.ylabel('Count')
plt.show()

<Figure size 640x480 with 1 Axes>

#### Overall statistics 
from pyspark.sql.functions import mean, min, max
ratings.select([mean('rating'), min('rating'), max('rating')]).show()

In [10]:
# Average rating per movie and rating counts
average_ratings=ratings.groupBy(ratings.movieId).agg({"movieId": "count", "rating": "avg"}).orderBy(["count(movieId)"],ascending=0)
average_ratings.join(movies, movies.id == ratings.movieId).select('movieId','title','avg(rating)','count(movieId)').show()


+-------+--------------------+------------------+--------------+
|movieId|               title|       avg(rating)|count(movieId)|
+-------+--------------------+------------------+--------------+
|   1408|    Cutthroat Island|               4.0|             2|
|    524|              Casino|              2.25|             2|
|      5|          Four Rooms|               3.5|             2|
|    902| {'name': 'Victoi...|             3.875|             4|
|     63|      Twelve Monkeys|               2.5|             1|
|   2054|  Mr. Holland's Opus|2.5833333333333335|             6|
|    880|      Antonia's Line|               2.5|             2|
|    568|           Apollo 13|               3.0|             1|
|   1873|      Beyond Rangoon|               2.5|             2|
|   3512|Under Siege 2: Da...|               3.0|             1|
|   1909|    Don Juan DeMarco|               4.0|             3|
|   4954|           Drop Zone|               3.0|             1|
|    628|Interview with t

### Define helper functions

In [11]:
# Find movieId:s and ratings for a specific user
def users_ratings_df(user):
    return ratings.filter(ratings["userId"]==user).select('movieId','rating')
#.collect()

users_ratings_df(2).collect()
#for movieId,rating in users_ratings(3):
#    print(movieId)

[Row(movieId=5, rating=3.0),
 Row(movieId=25, rating=3.0),
 Row(movieId=32, rating=2.0),
 Row(movieId=58, rating=3.0),
 Row(movieId=64, rating=4.0),
 Row(movieId=79, rating=4.0),
 Row(movieId=141, rating=3.0),
 Row(movieId=260, rating=4.0),
 Row(movieId=339, rating=5.0),
 Row(movieId=377, rating=4.0),
 Row(movieId=605, rating=4.0),
 Row(movieId=628, rating=4.0),
 Row(movieId=648, rating=4.0),
 Row(movieId=762, rating=3.0),
 Row(movieId=780, rating=3.0),
 Row(movieId=786, rating=1.0),
 Row(movieId=788, rating=1.0),
 Row(movieId=1210, rating=4.0),
 Row(movieId=1233, rating=4.0),
 Row(movieId=1356, rating=5.0),
 Row(movieId=1475, rating=3.0),
 Row(movieId=1552, rating=2.0)]

### Pivoting data for analysis

# Create a movieId vs userId average rating pivot (using average if the movie has been rated twice) 
ratings_pivot=ratings.groupBy('userId')\
.pivot('movieId')\
.agg({"rating": "avg"})

#specific_film_ratings=ratings_pivot.select('858')
#from pyspark.sql import functions as F
#ratings_pivot.filter(specific_film_ratings._1.isNotNull)
ratings_pivot.filter(ratings_pivot['858'].isNotNull()).show()

# Machine learning to the rescue!
Note! SparkML will eventually replace MLlib. => don't use MLlib.

## ALS based Collaborative Filtering
https://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html

In [12]:
# DON'T USE! from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

#lines = spark.read.text("data/mllib/als/sample_movielens_ratings.txt").rdd
#parts = lines.map(lambda row: row.value.split("::"))
#ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]), rating=float(p[2]), timestamp=long(p[3])))
#ratings = spark.createDataFrame(ratingsRDD)

(training, test) = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

Root-mean-square error = 3.0873995150333977


In [13]:
# Tämä kai olisi kullekin käyttäjälle ALS:n suosittelemat leffat
userRecs.take(3)

[Row(userId=31, recommendations=[Row(movieId=4973, rating=8.152472496032715), Row(movieId=58559, rating=7.841063022613525), Row(movieId=68157, rating=7.421996116638184), Row(movieId=7153, rating=7.082418918609619), Row(movieId=4993, rating=6.970481872558594), Row(movieId=2791, rating=6.839625358581543), Row(movieId=1080, rating=6.831757545471191), Row(movieId=2571, rating=6.804441928863525), Row(movieId=4886, rating=6.6022443771362305), Row(movieId=1136, rating=6.383308410644531)]),
 Row(userId=34, recommendations=[Row(movieId=318, rating=7.932281970977783), Row(movieId=1073, rating=6.483544826507568), Row(movieId=47, rating=5.941385746002197), Row(movieId=2804, rating=5.867222309112549), Row(movieId=1210, rating=5.41481876373291), Row(movieId=592, rating=5.408281326293945), Row(movieId=5952, rating=5.395789623260498), Row(movieId=457, rating=5.299190044403076), Row(movieId=6539, rating=5.297870635986328), Row(movieId=2502, rating=5.294939041137695)]),
 Row(userId=28, recommendations=[

In [14]:
movieRecs.take(3)

[Row(movieId=1580, recommendations=[Row(userId=38, rating=6.308559894561768), Row(userId=1, rating=5.330371856689453), Row(userId=41, rating=5.036062240600586), Row(userId=15, rating=5.0004072189331055), Row(userId=4, rating=4.731659889221191), Row(userId=46, rating=4.526854991912842), Row(userId=16, rating=4.504119873046875), Row(userId=31, rating=4.449660301208496), Row(userId=33, rating=4.372954368591309), Row(userId=9, rating=4.342031478881836)]),
 Row(movieId=471, recommendations=[Row(userId=24, rating=2.992443084716797), Row(userId=44, rating=2.3984858989715576), Row(userId=13, rating=2.364163875579834), Row(userId=4, rating=1.8684618473052979), Row(userId=28, rating=1.5902862548828125), Row(userId=38, rating=1.580761432647705), Row(userId=15, rating=1.5459280014038086), Row(userId=17, rating=1.5233920812606812), Row(userId=25, rating=1.290235161781311), Row(userId=19, rating=1.238815426826477)]),
 Row(movieId=1591, recommendations=[Row(userId=8, rating=3.9521634578704834), Row(u

In [15]:
movies.select('id','title').filter("title like '%Toy Story'").show()

+---+---------+
| id|    title|
+---+---------+
|862|Toy Story|
+---+---------+



In [16]:
# Mitä tää data kertoo? Elokuvan perusteella lähinnä mun makua olevat käyttäjät?
# Onko tää joku Tinderin korvike?
movieRecs.filter("movieId = 1580").select('recommendations').collect()

[Row(recommendations=[Row(userId=38, rating=6.308559894561768), Row(userId=1, rating=5.330371856689453), Row(userId=41, rating=5.036062240600586), Row(userId=15, rating=5.0004072189331055), Row(userId=4, rating=4.731659889221191), Row(userId=46, rating=4.526854991912842), Row(userId=16, rating=4.504119873046875), Row(userId=31, rating=4.449660301208496), Row(userId=33, rating=4.372954368591309), Row(userId=9, rating=4.342031478881836)])]

# Palataan takaisin manuaaliseen recommendation enginen koodaukseen...

# Pearson correlation test
# Result... nulls are counted as zeros in Spark => crap results
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation

data = [(Vectors.sparse(4, [(0, 1.0), (3, -2.0)]),),
        (Vectors.dense([4.0, 5.0, 0.0, 3.0]),),
        (Vectors.dense([6.0, 7.0, 0.0, 8.0]),),
        (Vectors.sparse(4, [(0, 9.0), (3, 1.0)]),)]

data = [(Vectors.dense([7,6,7,4,5,4]),),
        (Vectors.sparse(6, [(0,6), (1,7), (3,4), (4,3), (5,4) ]),),
        (Vectors.sparse(6, [(1,3),(2,3),(3,1),(4,1)]),),
        (Vectors.dense([1,2,2,3,3,4]),),
        (Vectors.sparse(6, [(0,1),(2,1),(3,2),(4,3),(5,3)]),),
        (Vectors.sparse(6, [(0,5),(1,4),(3,3),(5,4)]),)]

df = spark.createDataFrame(data, ["features"])

r1 = Correlation.corr(df, "features").head()
print("Pearson correlation matrix:\n" + str(r1[0]))

r2 = Correlation.corr(df, "features", "spearman").head()
print("Spearman correlation matrix:\n" + str(r2[0]))

In [17]:
# Testing spark sum aggregation for user's ratings
both_rated=[5,25,32]
person1_preferences_sum = users_ratings_df(2).filter(users_ratings_df(2).movieId.isin(both_rated))\
.agg({"rating": "sum"}).select('sum(rating)').collect()[0][0]
#.groupBy().power('rating',2)
#    
#    .agg({"movieId": "sum"})
print(person1_preferences_sum)

8.0


### Pandas UDF

In [20]:
# This installs pyarrow to the conda_python3 kernel environment
# conda install --yes --name python3 --channel conda-forge pyarrow
from pyspark.sql.functions import pandas_udf, PandasUDFType

# Use pandas_udf to define a Pandas UDF
@pandas_udf('double', PandasUDFType.SCALAR)
# Input/output are both a pandas.Series of doubles

def pandas_plus_one(v):
    return v + 1

@pandas_udf('double', PandasUDFType.SCALAR)
# Input/output are both a pandas.Series of doubles

def pandas_power2(v):
    return v*v

#df.withColumn('v2', pandas_plus_one(df.v))

In [25]:
person1=2
person1_square_preferences_sum = sum(\
                                     users_ratings_df(person1).\
filter(users_ratings_df(person1).movieId.isin(both_rated)).select('rating')*\
                                      users_ratings_df(person1).\
filter(users_ratings_df(person1).movieId.isin(both_rated)).select('rating')\
                                      )
                                     
#print(person1_square_preferences_sum)

TypeError: unsupported operand type(s) for *: 'DataFrame' and 'DataFrame'

In [223]:
from pyspark.sql.functions import *
from math import sqrt,pow 

def pearson_correlation(person1,person2):

##### To get both rated items
#    both_rated = {}
    #for item in dataset[person1]:
    #    if item in dataset[person2]:
    #        both_rated[item] = 1
 
#    for movieId,rating in users_ratings_df(person1).collect():
#        if movieId in users_ratings_df(person2).collect():
#            both_rated[movieId] = 1

#    person1_ratings = users_ratings_df(34).alias('df1')
#    person2_ratings = users_ratings_df(2).alias('df2')
#    both_rated=person1_ratings.join(person2_ratings, df1.movieId == df2.movieId)\
#    .select('df1.*').select('movieId').rdd.flatMap(lambda x: x).collect()
    
#    return both_rated
###########################


#    number_of_ratings = len(both_rated)
    
    # Checking for number of ratings in common
#    if number_of_ratings == 0:
#        return 0
 
    # Add up all the preferences of each user
#    person1_preferences_sum = sum([dataset[person1][item] for item in both_rated])
#    person2_preferences_sum = sum([dataset[person2][item] for item in both_rated])

#    person1_preferences_sum = users_ratings_df(person1).\
#    filter(users_ratings_df(person1).movieId.isin(both_rated))\
#    .agg({"rating": "sum","rating": "avg"}).select('sum(rating)').collect()[0][0]
        
#    person2_preferences_sum = users_ratings(person2).\
#    filter(users_ratings_df(person2).movieId.isin(both_rated))\
#    .agg({"rating": "sum"}).select('sum(rating)').collect()[0][0]
    
    
    # Sum up the squares of preferences of each user
#    person1_square_preferences_sum = sum([pow(dataset[person1][item],2) for item in both_rated])
#    person2_square_preferences_sum = sum([pow(dataset[person2][item],2) for item in both_rated])

#    person1_square_preferences_sum = sum(users_ratings_df(person1).\
#    filter(users_ratings_df(person1).movieId.isin(both_rated)).select('rating'),2)
 
 
    # Sum up the product value of both preferences for each item
#    product_sum_of_both_users = sum([dataset[person1][item] * dataset[person2][item] for item in both_rated])

    
    # Spark way of calculating all the above:
    # Method to find common MovieIds
    
    person1_ratings = users_ratings_df(person1).alias('df1')
    person2_ratings = users_ratings_df(person2).alias('df2')

    # pow(df2.rating, 2) does not work in here, for some reason.
    common_ratings = person1_ratings.join(person2_ratings, df1.movieId == df2.movieId).select(\
                                                                         df1.movieId, \
                                                                         df1.rating.alias('rating1'), \
                                                                         df2.rating.alias('rating2'), \
                                                                         (df1.rating*df1.rating).alias('rating1^2'),\
                                                                        (df2.rating*df2.rating).alias('rating2^2'),\
                                                                         (df1.rating*df2.rating).alias('ratings_product')\
                                                                        )
    common_ratings.show()
    number_of_ratings = common_ratings.count()
    
    # Checking for number of ratings in common
    if number_of_ratings == 0:
        return 0
  
 
    common_ratings_agg = common_ratings.agg({\
                                             "movieId":"count",\
                                             "rating1": "sum",\
                                             "rating2": "sum",\
                                             "rating1^2": "sum",\
                                             "rating2^2": "sum",\
                                             "ratings_product": "sum"}).collect()[0]
    
    # Unpacking the numebers from the named tuple:
    (number_of_ratings2,\
     person1_preferences_sum,\
     person2_preferences_sum,\
     person1_square_preferences_sum,\
     person2_square_preferences_sum,\
     product_sum_of_both_users) = \
    (common_ratings_agg["count(movieId)"],\
     common_ratings_agg["sum(rating1)"],\
     common_ratings_agg["sum(rating2)"],\
     common_ratings_agg["sum(rating1^2)"],\
     common_ratings_agg["sum(rating2^2)"],\
     common_ratings_agg["sum(ratings_product)"],\
    )
    
    #return person2_preferences_sum
    #return number_of_ratings2

    # Calculate the pearson score
    numerator_value = product_sum_of_both_users - (person1_preferences_sum*person2_preferences_sum/number_of_ratings)
    denominator_value = sqrt((person1_square_preferences_sum - pow(person1_preferences_sum,2)/number_of_ratings) * (person2_square_preferences_sum -pow(person2_preferences_sum,2)/number_of_ratings))
    if denominator_value == 0:
        return 0
    else:
        r = numerator_value/denominator_value
        return r
    
print(pearson_correlation(2,24))
#print pearson_correlation('Lisa Rose','Gene Seymour')

+-------+-------+-------+---------+---------+---------------+
|movieId|rating1|rating2|rating1^2|rating2^2|ratings_product|
+-------+-------+-------+---------+---------+---------------+
|     25|    3.0|    3.0|      9.0|      9.0|            9.0|
|     32|    2.0|    2.0|      4.0|      4.0|            4.0|
|     58|    3.0|    3.0|      9.0|      9.0|            9.0|
|    260|    4.0|    4.0|     16.0|     16.0|           16.0|
|    377|    4.0|    4.0|     16.0|     16.0|           16.0|
|    648|    4.0|    4.0|     16.0|     16.0|           16.0|
|    788|    1.0|    1.0|      1.0|      1.0|            1.0|
|   1210|    4.0|    4.0|     16.0|     16.0|           16.0|
|   1233|    4.0|    4.0|     16.0|     16.0|           16.0|
+-------+-------+-------+---------+---------+---------------+

1.0


In [147]:
common_ratings=person1_ratings.join(person2_ratings, df1.movieId == df2.movieId).select(\
                                                                         df1.movieId, \
                                                                         df1.rating.alias('rating1'), \
                                                                         df2.rating.alias('rating2'), \
                                                                         (pow(df1.rating,2)).alias('rating1^2'),\
                                                                        (pow(df2.rating,2)).alias('rating2^2'),\
                                                                         (df1.rating*df2.rating).alias('ratings_product')\
                                                                        )


In [55]:
for mo,ra in users_ratings_df(2).collect():
    print(users_ratings_df(34).movieId.contains(mo))

Column<b'contains(movieId, 5)'>
Column<b'contains(movieId, 25)'>
Column<b'contains(movieId, 32)'>
Column<b'contains(movieId, 58)'>
Column<b'contains(movieId, 64)'>
Column<b'contains(movieId, 79)'>
Column<b'contains(movieId, 141)'>
Column<b'contains(movieId, 260)'>
Column<b'contains(movieId, 339)'>
Column<b'contains(movieId, 377)'>
Column<b'contains(movieId, 605)'>
Column<b'contains(movieId, 628)'>
Column<b'contains(movieId, 648)'>
Column<b'contains(movieId, 762)'>
Column<b'contains(movieId, 780)'>
Column<b'contains(movieId, 786)'>
Column<b'contains(movieId, 788)'>
Column<b'contains(movieId, 1210)'>
Column<b'contains(movieId, 1233)'>
Column<b'contains(movieId, 1356)'>
Column<b'contains(movieId, 1475)'>
Column<b'contains(movieId, 1552)'>


In [333]:
# Method to find common MovieIds
from pyspark.sql.functions import *
from math import sqrt 
person1_ratings = users_ratings_df(34) #.alias('df1')
person2_ratings = users_ratings_df(2) #.alias('df2')

common_ratings = person1_ratings.join(person2_ratings.toDF('movieId','rating2'), "movieId")\
.select(person1_ratings.movieId,"rating2")#,(pow(rating2,2)).alias('rating1^2'))
#common_ratings = person1_ratings.withColumnRenamed("rating", "rating1").join(person2_ratings.withColumnRenamed("rating", "rating2"), \

                                                                             #                                                        person1_ratings.movieId == person2_ratings.movieId).\
#select(person1_ratings.movieId,"rating1",("rating2"))#,(pow(rating1,2)).alias('rating1^2'))#,\
#                                                                        (pow(rating2,2)).alias('rating2^2'),\
#                                                                         (rating1*rating2).alias('ratings_product')\
#)
#.select(\
#                                                                         df1.movieId, \
#                                                                        df1.rating1.alias('rating1'))#, \
  #                                                                       df2.rating2.alias('rating2'), \
  #                                                                       (pow(df1.rating,2)).alias('rating1^2'),\
  #                                                                      (pow(df2.rating,2)).alias('rating2^2'),\
  #                                                                       (df1.rating*df2.rating).alias('ratings_product')\
  #                                                                      )

common_ratings.show()
#(numb,x,y,z,f,g)=
foo=common_ratings.agg({"movieId":"count",\
                                "rating1": "sum",\
                    "rating2": "sum",\
                    "rating1^2": "sum",\
                    "rating2^2": "sum",\
                    "ratings_product": "sum"}) #.collect()[0]
#foo["sum(rating1)"]
foo.show()
#.select('movieId').rdd.flatMap(lambda x: x).collect()
#numb,sqrt(x),type(y),z,f,g,common_ratings.count()

NameError: name 'rating2' is not defined

In [None]:
#From https://blog.epigno.systems/2018/02/21/machine-learning-with-pyspark-feature-selection/
#prepare the data
features = ["temperature", "exhaust_vacuum", "ambient_pressure", "relative_humidity"]
lr_data = data.select(col("energy_output").alias("label"), *features).dropna()

vector = VectorAssembler(inputCols=columns, outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

#stages = [vector, scaler]
stages = [scaler]

pipe = Pipeline(stages=stages)

# we'll be using this data frame
data_for_correlation = pipe.fit(lr_data).transform(lr_data).select("scaled_features")


#The correlation step
correlation = Correlation.corr(data_for_correlation, "scaled_features", "pearson").collect()[0][0].toArray()

 # rename _1, _2 ... columns to their original name
df = pd.DataFrame(correlation)
df["features"] = pd.Series(columns)

 # let's see the results
display(spark.createDataFrame(df, schema=columns))

import pandas as pd
spark.createDataFrame(data).toPandas()

In [None]:
# count mean ratings


pearsonCorr = Correlation.corr(df, "features").collect()[0][0]
print(str(pearsonCorr).replace('nan', 'NaN'))

#import numpy as np
#np.average((Vectors.sparse(4, [(0, 2.0), (3, 2.0)]),))
#neo=spark.createDataFrame([(Vectors.sparse(4, [(0, 2.0), (3, 2.0)]),)], ["rating"])

from pyspark.sql.functions import mean, min, max
#neo.select([mean('rating'), min('rating'), max('rating')]).show()

df[0]