In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql import Row

spark = SparkSession.builder.appName("Engine").master("local[*]").getOrCreate()

In [2]:
class Contents():
    def load(self):
        self.contents = spark.read.csv(path="./csv/video.contents.csv", header=True, comment="-", inferSchema=True)\
            .select("ContentId", "Codename").cache()
        self.categories = spark.read.csv(path="./csv/video.categories.csv", header=True, comment="-", inferSchema=True)\
            .select("CategoryId", col("Codename").alias("CategoryCodename"), "CategoryTypeId").cache()
        self.contentCategories = spark.read.csv(path="./csv/video.contentcategories.csv", header=True, comment="-", inferSchema=True)\
            .select("ContentId", "CategoryId").cache()
        self.categoryTypes = spark.read.csv(path="./csv/video.categorytypes.csv", header=True, comment="-", inferSchema=True)\
            .select("CategoryTypeId", col("Codename").alias("CategoryTypeCodename")).cache()
        self.usercontent = spark.read.csv(path="./csv/video.usercontent.csv", header=True, comment="-", inferSchema=True)\
            .select("UserId", "ContentId").cache()
        return self
    
    def transform(self):
        #self.filterUserLowerBound()
        self.filterContentLowerBound()
        self.filterCategoryTypes(4)
        self.usercontent.cache()
        return self
    
    def cache(self):
        self.contents.count()
        self.categories.count()
        self.contentCategories.count()
        self.categoryTypes.count()
        self.usercontent.count()
        return self
    
    def getUsersWithContent(self, contentName):
        return self.usercontent\
            .join(self.contents, "ContentId")\
            .filter("Codename like '%{0}%'".format(contentName))\
            .select("UserId", "ContentId", "Codename")
        
    def getUserHistory(self, userId):
        return self.usercontent\
            .filter("UserId == {0}".format(userId))\
            .join(self.contents, "ContentId")\
            .select("UserId", "ContentId", "Codename")
            
    def getContentsWithCategoryType(self, categoryTypeId):
        return self.contents\
            .join(self.contentCategories, "ContentId")\
            .join(self.categories, "CategoryId")\
            .filter("CategoryTypeId == {0}".format(categoryTypeId))
            
    def getContentCategories(self, contentId):
        return self.contentCategories\
            .filter("ContentId == {0}".format(contentId))\
            .join(self.categories, "CategoryId")\
            .join(self.categoryTypes, "CategoryTypeId")\
            .select("ContentId", "CategoryId", "CategoryCodename", "CategoryTypeId", "CategoryTypeCodename")
            
    def filterContentLowerBound(self, cutOffNo=20):
        grouped = self.usercontent.groupBy("ContentId").count().filter("count > {0}".format(cutOffNo))
        self.usercontent = self.usercontent.join(grouped, "ContentId").select("UserId", "ContentId", col("count").alias("ContentCount"))
    
    def filterUserLowerBound(self, cutOffNo=2):
        grouped = self.usercontent.groupBy("UserId").count().filter("count > {0}".format(cutOffNo))
        self.usercontent = self.usercontent.join(grouped, "UserId").select("UserId", "ContentId", col("count").alias("UserCount"))
    
    def filterCategoryTypes(self, categoryTypeId):
        self.usercontent = self.usercontent.join(self.getContentsWithCategoryType(categoryTypeId), "ContentId", "left_anti")
        
class Recommender():
    def __init__(self, contents):
        self.contents = contents
        als = ALS(userCol="UserId", itemCol="ContentId", ratingCol="ContentCount", implicitPrefs=True, rank=20, maxIter=10)
        self.alsModel = als.fit(self.contents.usercontent)
        self.userRecommendations = self.alsModel.recommendForAllUsers(20).cache()
        
        self.listToVector = udf(lambda l: Vectors.dense(l), VectorUDT())
        self.itemsFactors = self.alsModel.itemFactors.select(
            self.alsModel.itemFactors["id"],
            self.listToVector(self.alsModel.itemFactors["features"]).alias("features")).cache()
        brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=1.0)
        self.brpModel = brp.fit(self.itemsFactors)
        
    def getUserToItem(self, userId):
        userRecommendations = spark.createDataFrame(self.userRecommendations.filter('UserId == {0}'.format(userId)).take(1)[0].recommendations)
        return userRecommendations.join(self.contents.contents, "ContentId")
        
    def getItemToItem(self, itemId, k=20):
        features = self.itemsFactors.filter("id == {0}".format(itemId)).take(1)[0].features
        nearest = self.brpModel.approxNearestNeighbors(self.itemsFactors, features, k)
        return nearest.join(self.contents.contents, self.contents.contents.ContentId == nearest.id)\
            .select("ContentId", "Codename", "distCol")

In [3]:
%%time
c = Contents().load().transform().cache()
r = Recommender(c)

Wall time: 1min 4s


In [7]:
%%time
r.getUserToItem(59399).show(20, False)

+---------+------------------+--------------------------------------------------+
|ContentId|rating            |Codename                                          |
+---------+------------------+--------------------------------------------------+
|3281985  |1.5151088237762451|szybcy-i-wsciekli-24                              |
|2370089  |1.384731650352478 |upadek-2                                          |
|2370086  |1.299767255783081 |krolewna-sniezka-2                                |
|4132082  |1.2502920627593994|empire-state-ryzykowna-gra-1                      |
|2369939  |1.2452311515808105|asterix-i-obelix-w-sluzbie-jej-krolewskiej-mosci-6|
|2383850  |1.204926609992981 |spirited-away-w-krainie-bogow                     |
|2370085  |1.1927030086517334|burza                                             |
|4132077  |1.1880314350128174|hotelowa-milosc                                   |
|2568666  |1.177538275718689 |odcinek-38-jake-i-piraci-z-nibylandii             |
|2568660  |1.173

In [10]:
%%time
r.getItemToItem(2050700, 20).collect()

Wall time: 2.66 s


[Row(ContentId=2050700, Codename='nimfomanka-cz-1', distCol=0.0),
 Row(ContentId=2050699, Codename='nimfomanka-cz-2', distCol=0.07662286816769384),
 Row(ContentId=2194083, Codename='the-walking-dead-sezon-5-odcinek-16-conquer', distCol=0.21481132830723365),
 Row(ContentId=2186797, Codename='the-walking-dead-sezon-1-odcinek-1-days-gone-by', distCol=0.2212716422535205),
 Row(ContentId=2369950, Codename='titanic-zrodlo-tragedii-7', distCol=0.2218088481660394),
 Row(ContentId=2370123, Codename='ceremonia-wreczenia-oscarow', distCol=0.22436899193476254),
 Row(ContentId=1819757, Codename='egzoplanety-blizniacze-ziemie', distCol=0.22682481562964776),
 Row(ContentId=1587735, Codename='stawka-wieksza-niz-zycie-podwojny-nelson-3', distCol=0.22840263180305034),
 Row(ContentId=2227155, Codename='joe-10', distCol=0.23126860758112833),
 Row(ContentId=1763469, Codename='33-sceny-z-zycia', distCol=0.23174045734617119),
 Row(ContentId=2274120, Codename='flip-i-flap-prawdziwa-historia-7', distCol=0.2321

In [15]:
c.getUsersWithContent("spongebob").show(20, False)

+------+---------+--------------------------------------------+
|UserId|ContentId|Codename                                    |
+------+---------+--------------------------------------------+
|59399 |2410457  |spongebob-kanciastoporty-sezon-01-odcinek-29|
|137378|2410457  |spongebob-kanciastoporty-sezon-01-odcinek-29|
|89163 |2410457  |spongebob-kanciastoporty-sezon-01-odcinek-29|
|136525|2410457  |spongebob-kanciastoporty-sezon-01-odcinek-29|
|13273 |2410457  |spongebob-kanciastoporty-sezon-01-odcinek-29|
|75072 |2410457  |spongebob-kanciastoporty-sezon-01-odcinek-29|
|167687|2410457  |spongebob-kanciastoporty-sezon-01-odcinek-29|
|36224 |2410457  |spongebob-kanciastoporty-sezon-01-odcinek-29|
|35030 |2410457  |spongebob-kanciastoporty-sezon-01-odcinek-29|
|172415|2410457  |spongebob-kanciastoporty-sezon-01-odcinek-29|
|148180|2410457  |spongebob-kanciastoporty-sezon-01-odcinek-29|
|16548 |2410457  |spongebob-kanciastoporty-sezon-01-odcinek-29|
|128106|2410457  |spongebob-kanciastopor

In [6]:
c.getUserHistory(59399).show(40, False)

+------+---------+--------------------------------------------+
|UserId|ContentId|Codename                                    |
+------+---------+--------------------------------------------+
|59399 |3507547  |clangers-s1-odc4                            |
|59399 |2410457  |spongebob-kanciastoporty-sezon-01-odcinek-29|
|59399 |2417980  |psi-patrol-sezon-01-odcinek-06              |
|59399 |3447786  |dora-poznaje-swiat-sezon-05-odcinek-01      |
|59399 |2058145  |odcinek-7-klinika-dla-pluszakow             |
|59399 |2561504  |dora-poznaje-swiat-sezon-08-odcinek-17      |
|59399 |1763395  |odcinek-9-penn-zero-bohater-na-pol-etatu-1  |
|59399 |1942683  |dzielny-lew-eryk-s2-odc-04                  |
|59399 |2058146  |odcinek-8-klinika-dla-pluszakow             |
|59399 |3728703  |bing-odc-154                                |
|59399 |1674819  |odcinek-30-jake-i-piraci-z-nibylandii       |
|59399 |1956222  |dzielny-lew-eryk-s2-odc-07                  |
|59399 |1987643  |inwazja-korlikow-sezon

In [9]:
c.getContentCategories(2050700).show()

+---------+----------+----------------+--------------+--------------------+
|ContentId|CategoryId|CategoryCodename|CategoryTypeId|CategoryTypeCodename|
+---------+----------+----------------+--------------+--------------------+
|  2050700|       402|     film-dramat|             9|movie-internal-genre|
|  2050700|        43|            film|             5|               genre|
|  2050700|       312|        dramat-1|             7|      movie-category|
+---------+----------+----------------+--------------+--------------------+



In [84]:
12581
59399

59399

In [85]:
2050700

2050700