In [1]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.mllib.linalg.distributed import RowMatrix,CoordinateMatrix
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.sql.functions import broadcast, col, array, udf
from pyspark.sql import functions as F, SQLContext
import numpy as np
from operator import mul
import math

In [2]:
from pyspark import SparkConf, SparkContext
sc = SparkContext(conf=SparkConf().setAppName("MyApp").setMaster("local[2]"))

In [3]:
sqlContext = SQLContext(sc)

In [4]:
import matplotlib.pyplot as plt

In [5]:
spark = SparkSession.builder.appName("tSalesItem").enableHiveSupport().getOrCreate()

In [6]:
movies = spark.read.csv('movies.csv',header=True)
actors = spark.read.csv('movie_actors.csv',header=True)
countries = spark.read.csv('movie_countries.csv',header=True)
directors = spark.read.csv('movie_directors.csv',header=True)
genres = spark.read.csv('movie_genres.csv',header=True)
ratings = spark.read.csv('user_ratedmovies-timestamps.csv',header=True)

In [7]:
genres = genres.groupBy(genres.movieID).agg(F.collect_list('genre').alias('genre'))

In [8]:
movies = movies.dropDuplicates(['title'])

In [9]:
# plt.plot(movies.id, movies.rtAudienceNumRatings)
# movies.plot(x='id', y='rtAudienceNumRatings')
movies = movies.filter(movies.rtAudienceNumRatings > 1800)
movies = movies.filter(movies.rtAllCriticsNumReviews > 150)
movieIds = movies.select("id")

In [10]:
actors = actors.join(movieIds, actors.movieID == movieIds.id, how='inner').drop('id')
countries = countries.join(movieIds, countries.movieID == movieIds.id, how='inner').drop('id')
directors = directors.join(movieIds, directors.movieID == movieIds.id, how='inner').drop('id')
genres = genres.join(movieIds, genres.movieID == movieIds.id, how='inner').drop('id')
ratings = ratings.join(movieIds, ratings.movieID == movieIds.id, how='inner').drop('id')

In [11]:
movies = movies.select("id", "title", "year", "rtAllCriticsRating"\
                       , "rtTopCriticsRating", "rtAudienceRating")


In [12]:
temp_movies_ids = movies[["id","id"]].toPandas().copy()
temp_movies_ids.columns = ['id1', 'id2']
temp_movies_ids['val'] = 0.0
temp_movies_ids = spark.createDataFrame(temp_movies_ids)

temp_movies_ids = temp_movies_ids.withColumn("id1", temp_movies_ids["id1"].cast(IntegerType()))
temp_movies_ids = temp_movies_ids.withColumn("id2", temp_movies_ids["id2"].cast(IntegerType()))

temp_movies_id1 = temp_movies_ids.select(col('id1'), col('val').alias('val1'))
temp_movies_id2 = temp_movies_ids.select('id2', 'val')

# temp_movies_id1.columns = ['id']
# temp_movies_id2 = temp_movies_ids['id2']
# # temp_movies_id2.columns = ['id']
# # temp_movies_id1 = spark.createDataFrame(temp_movies_id1)
# # temp_movies_id2 = spark.createDataFrame(temp_movies_id2)
result = temp_movies_id1.crossJoin(temp_movies_id2)

# result = broadcast(temp_movies_id1).crossJoin(temp_movies_id2)
# # pd.merge(temp_movies_id1, temp_movies_id2, how="cross")

# temp_movies_ids = spark.createDataFrame(temp_movies_ids)

In [13]:

result = result.select('id1', 'id2', 'val')
# result = result.select(col('id1'), col('id2'), col('val').alias('val1'))

In [14]:
result = result.filter(result.id2 > result.id1)

In [15]:
result = result.join(genres, result.id1 == genres.movieID, how='left')\
.withColumnRenamed("genre", "g1")
result = result.drop(result.movieID)
result = result.join(genres, result.id2 == genres.movieID, how='left')\
.withColumnRenamed("genre", "g2")
result = result.drop(result.movieID)

In [16]:
result.show()

+----+----+---+--------------------+--------------------+
| id1| id2|val|                  g1|                  g2|
+----+----+---+--------------------+--------------------+
|1483|2366|0.0|   [Drama, Thriller]|[Action, Adventur...|
|1133|2366|0.0|     [Comedy, Drama]|[Action, Adventur...|
|1889|2366|0.0|[Crime, Mystery, ...|[Action, Adventur...|
|2323|2366|0.0|       [Documentary]|[Action, Adventur...|
| 417|2366|0.0|   [Comedy, Romance]|[Action, Adventur...|
|1473|2366|0.0|[Action, Comedy, ...|[Action, Adventur...|
|2346|2366|0.0|  [Sci-Fi, Thriller]|[Action, Adventur...|
|1240|2366|0.0|[Action, Sci-Fi, ...|[Action, Adventur...|
|1119|2366|0.0|             [Drama]|[Action, Adventur...|
|2205|2366|0.0|   [Comedy, Romance]|[Action, Adventur...|
|1066|2366|0.0|[Comedy, Musical,...|[Action, Adventur...|
| 648|2366|0.0|[Action, Adventur...|[Action, Adventur...|
|1872|2366|0.0|             [Drama]|[Action, Adventur...|
|1791|2366|0.0|[Crime, Drama, Th...|[Action, Adventur...|
|2226|2366|0.0

In [17]:

def genres_addition(x):
    movie_id1 = x.id1
    movie_id2 = x.id2
    val = x.val
    movie1_genres = x.g1
    movie2_genres = x.g2
    if movie1_genres is not None and movie2_genres is not None:
        intersection_size = len(list(set(movie1_genres) & set(movie2_genres)))
        union_size = len(list(set(movie1_genres) | set(movie2_genres)))
        if union_size!=0:
            genre_ratio = float(intersection_size)/union_size
        else:
            genre_ratio=0
        val = val + 0.2*genre_ratio
        
    return (movie_id1, movie_id2, val, movie1_genres, movie2_genres)

In [18]:
rdd2=result.rdd.map(lambda x: genres_addition(x))
# rdd2=df.rdd.map(lambda x: 
#     (x[0]+","+x[1],x[2],x[3]*2)
#     )  
result_genre=rdd2.toDF(["id1","id2","val", "g1", "g2"]   )
result_genre = result_genre.drop("g1", "g2")
# result_genre.show()


In [19]:
result_genre.show()

+----+----+-------------------+
| id1| id2|                val|
+----+----+-------------------+
|1483|2366|                0.0|
|1133|2366|                0.0|
|1889|2366|                0.0|
|2323|2366|                0.0|
| 417|2366|                0.0|
|1473|2366|0.03333333333333333|
|2346|2366|                0.0|
|1240|2366|0.04000000000000001|
|1119|2366|                0.0|
|2205|2366|                0.0|
|1066|2366|                0.0|
| 648|2366|0.08000000000000002|
|1872|2366|                0.0|
|1791|2366|                0.0|
|2226|2366|                0.0|
|  45|2366|                0.0|
|1655|2366|0.04000000000000001|
| 889|2366|                0.0|
|1409|2366|                0.0|
|1327|2366|0.06666666666666667|
+----+----+-------------------+
only showing top 20 rows



In [20]:
actors = actors.withColumn("ranking", actors["ranking"].cast(IntegerType()))

actors = actors.sort("movieID", "ranking")
actors = actors.drop("ranking", "actorName")
actors = actors.groupBy(actors.movieID).agg(F.collect_list('actorID').alias('actorID'))

In [21]:
result_temp_actor = result_genre.join(actors, result_genre.id1 == actors.movieID, how='left')\
.withColumnRenamed("actorID", "a1")
result_temp_actor = result_temp_actor.drop(result_temp_actor.movieID)
result_temp_actor = result_temp_actor.join(actors, result_temp_actor.id2 == actors.movieID, how='left')\
.withColumnRenamed("actorID", "a2")
result_temp_actor = result_temp_actor.drop(result_temp_actor.movieID)
# result_temp_actor.show()

In [22]:
def actors_addition(x):
    movie_id1 = x.id1
    movie_id2 = x.id2
    val = x.val
    movie1_actors = x.a1
    
    movie2_actors = x.a2
    
    if movie1_actors is not None and movie2_actors is not None:
        movie1_actors = movie1_actors[:3]
        movie2_actors = movie2_actors[:3]
        intersection_size = len(list(set(movie1_actors) & set(movie2_actors)))
        union_size = len(list(set(movie1_actors) | set(movie2_actors)))
        if union_size!=0:
            actor_ratio = float(intersection_size)/union_size
        else:
            actor_ratio=0
        val = val + 0.2*actor_ratio
    return (movie_id1, movie_id2, val, movie1_actors, movie2_actors)

In [23]:
rdd_actor=result_temp_actor.rdd.map(lambda x: actors_addition(x))
# rdd2=df.rdd.map(lambda x: 
#     (x[0]+","+x[1],x[2],x[3]*2)
#     )  
result_actor=rdd_actor.toDF(["id1","id2","val", "a1", "a2"]   )
result_actor = result_actor.drop("a1", "a2")
result_actor.show()

+-----+-----+-------------------+
|  id1|  id2|                val|
+-----+-----+-------------------+
| 3280|45726|                0.0|
| 5418|45726|                0.0|
|25999|45726|                0.0|
| 1409|45726|               0.05|
| 3937|45726|                0.0|
| 6287|45726|                0.2|
| 8360|45726|0.03333333333333333|
|30707|45726|                0.0|
| 5888|45726|                0.0|
| 7032|45726|                0.0|
| 8977|45726|                0.0|
| 7139|45726|                0.0|
| 4220|45726|                0.2|
| 5364|45726|                0.0|
| 7325|45726|0.10666666666666667|
| 8965|45726|                0.0|
|32988|45726|                0.0|
|31685|45726|                0.1|
| 8907|45726|0.06666666666666667|
| 5400|45726|                0.0|
+-----+-----+-------------------+
only showing top 20 rows



In [24]:
result_temp_country = result_actor.join(countries, result_actor.id1 == countries.movieID, how='left')\
.withColumnRenamed("country", "c1")
result_temp_country = result_temp_country.drop(result_temp_country.movieID)
result_temp_country = result_temp_country.join(countries, result_temp_country.id2 == countries.movieID, how='left')\
.withColumnRenamed("country", "c2")
result_temp_country = result_temp_country.drop(result_temp_country.movieID)
result_temp_country.show()

+-----+-----+-------------------+-------+---+
|  id1|  id2|                val|     c1| c2|
+-----+-----+-------------------+-------+---+
| 3280|45726|                0.0|    USA|USA|
| 5418|45726|                0.0|    USA|USA|
|25999|45726|                0.0|    USA|USA|
| 1409|45726|               0.05|    USA|USA|
| 3937|45726|                0.0|    USA|USA|
| 6287|45726|                0.2|    USA|USA|
| 8360|45726|0.03333333333333333|    USA|USA|
|30707|45726|                0.0|    USA|USA|
| 5888|45726|                0.0|    USA|USA|
| 7032|45726|                0.0|    USA|USA|
| 8977|45726|                0.0|Germany|USA|
| 7139|45726|                0.0|Ireland|USA|
| 4220|45726|                0.2|    USA|USA|
| 5364|45726|                0.0|    USA|USA|
| 7325|45726|0.10666666666666667|    USA|USA|
| 8965|45726|                0.0|    USA|USA|
|32988|45726|                0.0|    USA|USA|
|31685|45726|                0.1|    USA|USA|
| 8907|45726|0.06666666666666667| 

In [25]:
def equality_addition(x, l):
    movie_id1 = x.id1
    movie_id2 = x.id2
    val = x.val
    movie1_att = x.c1
    movie2_att = x.c2
    if movie1_att is not None and movie2_att is not None:
        if movie1_att==movie2_att:
            val = val + l
    return (movie_id1, movie_id2, val, movie1_att, movie2_att)

In [26]:
rdd_country=result_temp_country.rdd.map(lambda x: equality_addition(x, 0.05))
# rdd2=df.rdd.map(lambda x: 
#     (x[0]+","+x[1],x[2],x[3]*2)
#     )  
result_country=rdd_country.toDF(["id1","id2","val", "c1", "c2"]   )
result_country = result_country.drop("c1", "c2")
result_country.show()

+-----+-----+-------------------+
|  id1|  id2|                val|
+-----+-----+-------------------+
| 3280|45726|               0.05|
| 5418|45726|               0.05|
|25999|45726|               0.05|
| 1409|45726|                0.1|
| 3937|45726|               0.05|
| 6287|45726|               0.25|
| 8360|45726|0.08333333333333334|
|30707|45726|               0.05|
| 5888|45726|               0.05|
| 7032|45726|               0.05|
| 8977|45726|                0.0|
| 7139|45726|                0.0|
| 4220|45726|               0.25|
| 5364|45726|               0.05|
| 7325|45726|0.15666666666666668|
| 8965|45726|               0.05|
|32988|45726|               0.05|
|31685|45726|0.15000000000000002|
| 8907|45726|0.11666666666666667|
| 5400|45726|               0.05|
+-----+-----+-------------------+
only showing top 20 rows



In [27]:
directors = directors.drop(directors.directorName)

In [28]:
result_temp_director = result_country.join(directors, result_country.id1 == directors.movieID, how='left')\
.withColumnRenamed("directorID", "c1")
result_temp_director = result_temp_director.drop(result_temp_director.movieID)
result_temp_director = result_temp_director.join(directors, result_temp_director.id2 == directors.movieID, how='left')\
.withColumnRenamed("directorID", "c2")
result_temp_director = result_temp_director.drop(result_temp_director.movieID)
result_temp_director.show()

+-----+-----+-------------------+--------------------+-------------+
|  id1|  id2|                val|                  c1|           c2|
+-----+-----+-------------------+--------------------+-------------+
| 3280|45726|               0.05|         ben_affleck|anthony_russo|
| 5418|45726|               0.05|          doug_liman|anthony_russo|
|25999|45726|               0.05|           sean_penn|anthony_russo|
| 1409|45726|                0.1|         tony_gilroy|anthony_russo|
| 3937|45726|               0.05|         gary_fleder|anthony_russo|
| 6287|45726|               0.25|         peter_segal|anthony_russo|
| 8360|45726|0.08333333333333334|        kelly_asbury|anthony_russo|
|30707|45726|               0.05|      clint_eastwood|anthony_russo|
| 5888|45726|               0.05|       larry_charles|anthony_russo|
| 7032|45726|               0.05|        george_lucas|anthony_russo|
| 8977|45726|                0.0|        oliver_stone|anthony_russo|
| 7139|45726|                0.0| 

In [29]:
rdd_director=result_temp_director.rdd.map(lambda x: equality_addition(x, 0.15))
# rdd2=df.rdd.map(lambda x: 
#     (x[0]+","+x[1],x[2],x[3]*2)
#     )  
result_director=rdd_director.toDF(["id1","id2","val", "c1", "c2"]   )
result_director = result_director.drop("c1", "c2")
result_director.show()

+-----+-----+-------------------+
|  id1|  id2|                val|
+-----+-----+-------------------+
| 3280|45726|               0.05|
| 5418|45726|               0.05|
|25999|45726|               0.05|
| 1409|45726|                0.1|
| 3937|45726|               0.05|
| 6287|45726|               0.25|
| 8360|45726|0.08333333333333334|
|30707|45726|               0.05|
| 5888|45726|               0.05|
| 7032|45726|               0.05|
| 8977|45726|                0.0|
| 7139|45726|                0.0|
| 4220|45726|               0.25|
| 5364|45726|               0.05|
| 7325|45726|0.15666666666666668|
| 8965|45726|               0.05|
|32988|45726|               0.05|
|31685|45726|0.15000000000000002|
| 8907|45726|0.11666666666666667|
| 5400|45726|               0.05|
+-----+-----+-------------------+
only showing top 20 rows



In [30]:
result_temp_movie = result_director.join(movies, result_director.id1 == movies.id, how='left')\
.withColumnRenamed("year", "y1").withColumnRenamed("rtAllCriticsRating", 't1')\
.withColumnRenamed("rtTopCriticsRating", "p1").withColumnRenamed("rtAudienceRating", 'a1')
result_temp_movie = result_temp_movie.drop('id','title')
result_temp_movie = result_temp_movie.join(movies, result_temp_movie.id2 == movies.id, how='left')\
.withColumnRenamed("year", "y2").withColumnRenamed("rtAllCriticsRating", 't2')\
.withColumnRenamed("rtTopCriticsRating", "p2").withColumnRenamed("rtAudienceRating", 'a2')
result_temp_movie = result_temp_movie.drop('id', 'title')
# result_temp_movie.show()

In [31]:
def movies_addition(x):
    movie_id1 = x.id1
    movie_id2 = x.id2
    val = x.val
    movie1_y = int(x.y1)
    movie1_t = float(x.t1)
    movie1_p = float(x.p1)
    movie1_a = float(x.a1)
    
    movie2_y = int(x.y2)
    movie2_t = float(x.t2)
    movie2_p = float(x.p2)
    movie2_a = float(x.a2)
    
    if movie1_y is not None and movie2_y is not None:
        year_diff = abs(movie1_y - movie2_y)
        if year_diff <= 2:
            val = val + 0.05
        elif year_diff <=5:
            val = val + 0.05*0.7
        elif year_diff <= 7:
            val = val + 0.05*0.5
    t_ratio=0
    p_ratio=0
    a_ratio=0
    if movie1_t is not None and movie2_t is not None:
        if max(movie1_t, movie2_t) !=0:
            t_ratio = min(movie1_t, movie2_t)/max(movie1_t, movie2_t)
    
    if movie1_p is not None and movie2_p is not None:
        if max(movie1_p, movie2_p)!=0:
            p_ratio = min(movie1_p, movie2_p)/max(movie1_p, movie2_p)

    if movie1_a is not None and movie2_a is not None:        
        if max(movie1_a, movie2_a)!=0:
            a_ratio = min(movie1_a, movie2_a)/max(movie1_a, movie2_a)

    total_ratio = (1.0/3)*t_ratio+(1.0/3)*p_ratio+(1.0/3)*a_ratio 
    val = val + 0.2*total_ratio
    
    return (movie_id1, movie_id2, val, movie1_y, movie1_t, movie1_p, movie1_a,\
           movie2_y, movie2_t, movie2_p, movie2_a)

In [32]:
rdd_movie=result_temp_movie.rdd.map(lambda x: movies_addition(x))
# rdd2=df.rdd.map(lambda x: 
#     (x[0]+","+x[1],x[2],x[3]*2)
#     )  
result_movie=rdd_movie.toDF(["id1","id2","val", "y1", "t1", "p1", "a1"\
                             , "y2", "t2", "p2", "a2"]   )
result_movie = result_movie.drop("y1", "t1", "p1", "a1"\
                             , "y2", "t2", "p2", "a2")
# result_movie.show()

In [33]:
ratings = ratings.drop('timestamp')

In [34]:
ratings = ratings.withColumn("userID", ratings["userID"].cast(IntegerType()))
ratings = ratings.withColumn("movieID", ratings["movieID"].cast(IntegerType()))
ratings = ratings.withColumn("rating", ratings["rating"].cast(DoubleType()))

In [35]:
ratings_temp = ratings.groupby(ratings.movieID).agg(F.collect_list('userID').alias('userID')\
                                     , F.collect_list('rating').alias('rating'))

In [36]:
result_temp_sim = result_movie.join(ratings_temp, result_movie.id1 == ratings_temp.movieID, how='left')\
.withColumnRenamed("userID", "u1").withColumnRenamed("rating", 'r1')
result_temp_sim = result_temp_sim.drop('movieID')
result_temp_sim = result_temp_sim.join(ratings_temp, result_temp_sim.id2 == ratings_temp.movieID, how='left')\
.withColumnRenamed("userID", "u2").withColumnRenamed("rating", 'r2')
result_temp_sim = result_temp_sim.drop('movieID')
# result_temp_sim.show()

In [37]:
def sim_addition(z):
    movie_id1 = z.id1
    movie_id2 = z.id2
    val = z.val
    u1 = z.u1
    u2 = z.u2
    r1 = z.r1
    r2 = z.r2
    if u1 is not None and u2 is not None:
        both = list(set(u1) & set(u2))
        indices_u1 = [u1.index(x) for x in both]
        indices_u2 = [u2.index(x) for x in both]
        if len(indices_u1) != 0:
            v1 = [r1[i] for i in indices_u1]
            v2 = [r2[i] for i in indices_u2]
            sim = sum([x*y for x,y in zip(v1,v2)])/(math.sqrt(sum(i**2 for i in v1))\
                                                    *math.sqrt(sum(i**2 for i in v2)))
            val = val+sim*0.2
    
    return (movie_id1, movie_id2, val)

In [38]:
rdd_sim=result_temp_sim.rdd.map(lambda x: sim_addition(x)) 
result_sim=rdd_sim.toDF(["id1","id2","val"])
#result_sim = result_sim.drop("u1", "r1", "u2", "r2")

# result_sim.show()

In [41]:
result_sim.registerTempTable("sim")

In [42]:
ratings.registerTempTable("ratings")
movies.registerTempTable("movies")



In [98]:
userID = 622

In [99]:
user = sqlContext.sql("SELECT m.movieID, m.rating FROM ratings m "+
            "where userID = "+str(userID))
user.registerTempTable("user")

In [100]:
not_watched = sqlContext.sql("select m.id as id from movies m left join user u on u.movieID = m.id "+
              "where u.movieID IS NULL order by m.id")
not_watched.registerTempTable("not_watched")

In [101]:
user = sqlContext.sql("select * from user u, not_watched nw ")

In [102]:
user = user.withColumn("id", user["id"].cast(IntegerType()))
user.registerTempTable("user")
result_sim = result_sim.withColumn("id1", result_sim["id1"].cast(IntegerType()))
result_sim = result_sim.withColumn("id2", result_sim["id2"].cast(IntegerType()))

In [103]:
temp = sqlContext.sql("select u.movieID, u.rating, s.val as val1, u.id from user u left join sim s on s.id1 = u.id "+
            "and u.movieID = s.id2")
temp.registerTempTable("temp")
# temp = user.join(result_sim, (user.id == result_sim.id1) \
#         , how='left')
# temp.show()

In [104]:
result_final_join =  sqlContext.sql("select u.movieID, u.rating,u.val1 ,s.val as val2, u.id from temp u left join sim s on s.id2 = u.id "+
            "and u.movieID = s.id1")
result_final_join.registerTempTable("result_final_join")

In [105]:
result_join = sqlContext.sql("select movieID as watched, rating, "
                             +" CASE WHEN val1 is null  THEN val2  ELSE val1 END AS val , id as notWatched "+
                               " from result_final_join" )
result_join.registerTempTable("result_join")

In [106]:
# result_join.registerTempTable("result_join")

In [107]:
result_intermediate = sqlContext.sql("select notWatched, sum(rating*val) as num, sum(val) as dem from result_join" +
                                     " group by notWatched")
result_intermediate.registerTempTable("result_intermediate")

In [108]:
final = sqlContext.sql("select notWatched, (num/dem) as rating from result_intermediate order by  rating desc")
final.registerTempTable("final")

In [109]:
sqlContext.sql("SELECT m.title FROM movies m join ratings r on m.id = r.movieID  where userID ="+str(userID)).toPandas()

Unnamed: 0,title
0,Terminator Salvation
1,War of the Worlds
2,American Beauty
3,Shrek
4,The Lord of the Rings: The Fellowship of the Ring
5,The Bourne Identity
6,Minority Report
7,Red Dragon
8,The Lord of the Rings: The Two Towers
9,Catch Me If You Can


In [110]:
sqlContext.sql("SELECT m.title as suggested FROM movies m join final f on m.id = f.notWatched ").limit(10).toPandas()

Unnamed: 0,suggested
0,3:10 to Yuma
1,King Kong
2,Across the Universe
3,Crash
4,The Prestige
5,Sunshine Cleaning
6,Madagascar
7,Melinda and Melinda
8,The Number 23
9,Hidalgo
