In [1]:
import boto3, os
import pyarrow.parquet as pq
import pandas as pd
import re
import pyspark.sql.functions as f

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType, FloatType, DoubleType, DateType, LongType
from pyspark.sql.functions import explode
from pyspark.sql.functions import col


s3 = boto3.resource('s3')

conf = SparkConf()
conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.1.2')
conf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider')   
    
spark = SparkSession.builder \
    .config(conf=conf) \
    .appName('DataFrame') \
    .master('local[*]') \
    .getOrCreate()

#lista plikow parquet na s3

loc = "s3a://"
i = 0
for obj in s3.Bucket(name='moje-wiaderko').objects.all():
    if re.match(".*\.parquet",obj.key):
        plik = os.path.join(obj.bucket_name, obj.key)
        print(loc+plik)
        if i == 0:
            data = spark.read.parquet(loc+plik)
            i = 1
        else:
            data = data.union(spark.read.parquet(loc+plik))



:: loading settings :: url = jar:file:/home/ubuntu/miniconda3/lib/python3.9/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ece4fc40-f1fe-44b7-aeb8-16c6699ca242;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.1.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.271 in central
:: resolution report :: resolve 398ms :: artifacts dl 11ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.271 from central in [default]
	org.apache.hadoop#hadoop-aws;3.1.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	-------------------------------

s3a://moje-wiaderko/part-00000-f250440e-2fcc-4627-b102-98ebe8862d8c-c000.snappy.parquet


22/01/28 12:42:28 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

s3a://moje-wiaderko/part-00001-f250440e-2fcc-4627-b102-98ebe8862d8c-c000.snappy.parquet
s3a://moje-wiaderko/part-00002-f250440e-2fcc-4627-b102-98ebe8862d8c-c000.snappy.parquet
s3a://moje-wiaderko/part-00003-f250440e-2fcc-4627-b102-98ebe8862d8c-c000.snappy.parquet
s3a://moje-wiaderko/part-00004-f250440e-2fcc-4627-b102-98ebe8862d8c-c000.snappy.parquet
s3a://moje-wiaderko/part-00005-f250440e-2fcc-4627-b102-98ebe8862d8c-c000.snappy.parquet
s3a://moje-wiaderko/part-00006-f250440e-2fcc-4627-b102-98ebe8862d8c-c000.snappy.parquet
s3a://moje-wiaderko/part-00007-f250440e-2fcc-4627-b102-98ebe8862d8c-c000.snappy.parquet
s3a://moje-wiaderko/part-00008-f250440e-2fcc-4627-b102-98ebe8862d8c-c000.snappy.parquet
s3a://moje-wiaderko/part-00009-f250440e-2fcc-4627-b102-98ebe8862d8c-c000.snappy.parquet
s3a://moje-wiaderko/part-00010-f250440e-2fcc-4627-b102-98ebe8862d8c-c000.snappy.parquet
s3a://moje-wiaderko/part-00011-f250440e-2fcc-4627-b102-98ebe8862d8c-c000.snappy.parquet
s3a://moje-wiaderko/part-00012-f

In [2]:
data.count()

                                                                                

2899949

In [3]:
%%time

#tworzymy nowy df zeby dodac unikalny userID dla uzytkownikow

#wybieramy tylko kolumne z uzytkownikami
users = data.select(data.profilename).distinct()

#zamieniamy na rdd zeby skorzystac z zipWithIndex()
rdd_users = users.rdd.zipWithIndex()
users = rdd_users.toDF()

#odpakowanie kolumny
users = users.withColumn('profilename', users['_1'].getItem("profilename"))
users = users.select(users.profilename, users._2.alias("userid") )
users.show()

                                                                                

+--------------+------+
|   profilename|userid|
+--------------+------+
| turboespresso|     0|
|     annunz123|     1|
|      MrWalker|     2|
|     MrManning|     3|
|          Roli|     4|
|        macguy|     5|
|     Unibonger|     6|
|    Gachupines|     7|
|         Edmoe|     8|
| DCopperfields|     9|
|      Pawola22|    10|
|BrotherGrendel|    11|
|      TechMyst|    12|
|        Sammer|    13|
|       reviled|    14|
|       jseifer|    15|
|       roner77|    16|
|      veronika|    17|
|      PapaMick|    18|
|BrotherJoe1975|    19|
+--------------+------+
only showing top 20 rows

CPU times: user 64.9 ms, sys: 23.4 ms, total: 88.3 ms
Wall time: 17 s


In [4]:
%%time

#tworzymy nowy df zeby dodac unikalny styleID dla stylu

#wybieramy tylko kolumne z uzytkownikami
styles = data.select(data.style).distinct()

#zamieniamy na rdd zeby skorzystac z zipWithIndex()
rdd_styles = styles.rdd.zipWithIndex()
styles = rdd_styles.toDF()

#odpakowanie kolumny
styles = styles.withColumn('style', styles['_1'].getItem("style"))
styles = styles.select(styles.style, styles._2.alias("styleid") )
styles = styles.select('*', (styles.styleid+1).alias('style_id')).drop('styleid')
styles.show()



+--------------------+--------+
|               style|style_id|
+--------------------+--------+
|              Porter|       1|
|       Baltic Porter|       2|
|        Sak - Junmai|       3|
|       Sak - Infused|       4|
|         Belgian Ale|       5|
|            Pilsener|       6|
|           Irish Ale|       7|
|   California Common|       8|
| American Strong Ale|       9|
|       Lambic - Faro|      10|
|   American Pale Ale|      11|
|     Lambic - Gueuze|      12|
|Classic German Pi...|      13|
|              Bitter|      14|
|     Berliner Weisse|      15|
|  Belgian Strong Ale|      16|
|          Doppelbock|      17|
|Imperial/Strong P...|      18|
|            Mild Ale|      19|
|         Sweet Stout|      20|
+--------------------+--------+
only showing top 20 rows

CPU times: user 62.5 ms, sys: 15.3 ms, total: 77.8 ms
Wall time: 11.7 s


                                                                                

In [13]:
# ilość recenzji każdego piwa

beer_review_count = data.groupBy(data.beerid).agg(f.count("beer_name").alias("beer_review_count")).orderBy(f.desc('beer_review_count'))
beer_review_count.show(20)



+------+-----------------+
|beerid|beer_review_count|
+------+-----------------+
|  1267|             3696|
|   734|             3662|
| 10569|             3230|
|   473|             3126|
|   365|             3119|
|   158|             3110|
|    53|             3056|
|   680|             2904|
|  1315|             2872|
|   835|             2813|
|   132|             2812|
|   399|             2787|
|  2530|             2673|
| 14709|             2670|
|   589|             2668|
|   742|             2653|
|   139|             2632|
|   422|             2632|
|    37|             2624|
|   370|             2586|
+------+-----------------+
only showing top 20 rows



                                                                                

In [14]:
%%time

#zmniejszamy ilosc kolumn zeby bylo bardziej czytelne
#sumujemy oceny cząstkowe do jednej wartości "rating"
data_s = data.select(data.reviewid, data.profilename, data.beerid, data.beer_name, data.style, \
                     (data.appearance+data.aroma+data.palate+data.taste+data.overall).alias("rating"))
data_s.show()

+--------+-------------+------+--------------------+--------------------+------------------+
|reviewid|  profilename|beerid|           beer_name|               style|            rating|
+--------+-------------+------+--------------------+--------------------+------------------+
|      20|        vyvvy| 91592|Barley Island Bar...| Imperial/Double IPA|               3.0|
|      40|        MI2CA| 77833|Barley Island Sin...|     Traditional Ale|               3.7|
|      60|      emacgee| 77833|Barley Island Sin...|     Traditional Ale|               3.2|
|      80|        thedm| 77833|Barley Island Sin...|     Traditional Ale|3.4000000000000004|
|     100|       golubj| 77833|Barley Island Sin...|     Traditional Ale|3.4000000000000004|
|     120|CaptainCougar| 58511|Barley Island Bar...|India Pale Ale (IPA)|              3.75|
|     140|     3fourths| 58511|Barley Island Bar...|India Pale Ale (IPA)|              3.05|
|     160|     Ughsmash| 58511|Barley Island Bar...|India Pale Ale (IP

In [15]:
%%time

#join tabel, dodajemy userid, style_id i beer_review_count

data_short = data_s.join(users, on=["profilename"], how="inner")
data_short = data_short.join(styles, on =['style'], how='left')
data_short = data_short.join(beer_review_count, on=['beerid'], how='left')
data_short.show()



+------+-----+------------+--------+----------------+------------------+------+--------+-----------------+
|beerid|style| profilename|reviewid|       beer_name|            rating|userid|style_id|beer_review_count|
+------+-----+------------+--------+----------------+------------------+------+--------+-----------------+
|   148|Klsch|    bhensonb|  457830|Hollywood Blonde|              3.25|   140|      37|              136|
|   148|Klsch|    bhensonb|  342795|Hollywood Blonde|              3.25|   140|      37|              136|
|   148|Klsch|Skinnyviking|  342822|Hollywood Blonde|               2.3|   714|      37|              136|
|   148|Klsch|Skinnyviking|  457857|Hollywood Blonde|               2.3|   714|      37|              136|
|   148|Klsch|     DarkElf|  342842|Hollywood Blonde|              2.95|   983|      37|              136|
|   148|Klsch|     DarkElf|  457877|Hollywood Blonde|              2.95|   983|      37|              136|
|   148|Klsch|  LoveCaissa|  342820|H

                                                                                

In [16]:
# ilu jest użytkowników?

users.count()

                                                                                

29174

In [17]:
# segregujemy użytkowników według ilości recenzji

data_beerid_count = data_short.groupBy("userid").agg(f.count("beerid").alias("beerid_count")).orderBy(f.desc("beerid_count"))
data_beerid_count.show()



+------+------------+
|userid|beerid_count|
+------+------------+
| 18621|       16236|
| 16107|       15167|
|  6272|       14052|
|  7580|       13365|
| 14974|        9933|
|  2607|        9422|
| 22461|        9349|
| 11687|        8269|
| 13031|        8225|
|   134|        8171|
| 18910|        8147|
| 15848|        8034|
| 12417|        7966|
| 19341|        7581|
| 27679|        7353|
| 14194|        6650|
| 16404|        6538|
|   848|        6516|
| 13730|        6487|
| 12270|        6409|
+------+------------+
only showing top 20 rows



                                                                                

In [18]:
# Dla sprawdzenia implicitPrefs musimy zamienić nasze oceny na wartości w przedziale od 0 do 1,
# dodatkowo przemnożmy przez nuer stylu żeby wprowadzić go do oceny
# (rating/5)*style_id/100

#user_beer_rating = data_short.drop('profilename','beer_name','reviewid','style','count')
user_beer_rating = data_short.select(data_short.userid, data_short.beerid,
                                     ((data_short.rating/5)*data_short.style_id/100).alias("r_i"))
user_beer_rating.show()



+------+------+-------------------+
|userid|beerid|                r_i|
+------+------+-------------------+
|   140|   148|0.24050000000000002|
|   140|   148|0.24050000000000002|
|   714|   148|             0.1702|
|   714|   148|             0.1702|
|   983|   148|0.21830000000000002|
|   983|   148|0.21830000000000002|
|   998|   148|0.19240000000000002|
|   998|   148|0.19240000000000002|
|  1145|   148|0.16279999999999997|
|  1145|   148|0.16279999999999997|
|  1698|   148|0.21830000000000002|
|  1698|   148|0.21830000000000002|
|  1846|   148|             0.1406|
|  1846|   148|             0.1406|
|  2296|   148|             0.2257|
|  2296|   148|             0.2257|
|  2288|   148|0.20720000000000002|
|  2288|   148|0.20720000000000002|
|  2460|   148|             0.2257|
|  2460|   148|             0.2257|
+------+------+-------------------+
only showing top 20 rows



                                                                                

In [19]:
# implicitPrefs ale tym razem ratingi będą tylko przyjmować wartość -1 dla ocen mniejszych niz 3 
# i 1 dla większych od 3

user_beer_rating1 = data_short.select('userid', 'beerid', "rating", f.when(data_short.rating > 3, 1).
                                     otherwise(-1).alias("r_1")).drop(data_short.rating)
user_beer_rating1.show()



+------+------+---+
|userid|beerid|r_1|
+------+------+---+
|   140|   148|  1|
|   140|   148|  1|
|   714|   148| -1|
|   714|   148| -1|
|   983|   148| -1|
|   983|   148| -1|
|   998|   148| -1|
|   998|   148| -1|
|  1145|   148| -1|
|  1145|   148| -1|
|  1698|   148| -1|
|  1698|   148| -1|
|  1846|   148| -1|
|  1846|   148| -1|
|  2296|   148|  1|
|  2296|   148|  1|
|  2288|   148| -1|
|  2288|   148| -1|
|  2460|   148|  1|
|  2460|   148|  1|
+------+------+---+
only showing top 20 rows



                                                                                

In [20]:
# implicitPrefs ale tym razem ratingi będą tylko przyjmować wartość -1 dla ocen mniejszych niz 2.5 
# i 1 dla większych od 2.5, dodatkowo przemnożymy przez style_id/100

user_beer_rating2 = data_short.select('userid', 'beerid', 'style_id', "rating", f.when(data_short.rating > 2.5, data_short.style_id/100).
                                     otherwise(-1*data_short.style_id/100).alias("r_1")).drop('rating', 'style_id')

user_beer_rating2.show()



+------+------+-----+
|userid|beerid|  r_1|
+------+------+-----+
|   140|   148| 0.37|
|   140|   148| 0.37|
|   714|   148|-0.37|
|   714|   148|-0.37|
|   983|   148| 0.37|
|   983|   148| 0.37|
|   998|   148| 0.37|
|   998|   148| 0.37|
|  1145|   148|-0.37|
|  1145|   148|-0.37|
|  1698|   148| 0.37|
|  1698|   148| 0.37|
|  1846|   148|-0.37|
|  1846|   148|-0.37|
|  2296|   148| 0.37|
|  2296|   148| 0.37|
|  2288|   148| 0.37|
|  2288|   148| 0.37|
|  2460|   148| 0.37|
|  2460|   148| 0.37|
+------+------+-----+
only showing top 20 rows



                                                                                

In [21]:
# implicitPrefs ale tym razem weźmiemy znormalizowane rating, style_id i beer_reviev_count 

user_beer_rating3 = data_short.select(data_short.userid, data_short.beerid, (((data_short['rating']-2.75)/4.5)+((data_short['style_id']-45)/88)+((data_short['beer_review_count']-1848.5)/3698)).alias("r_3"))
user_beer_rating3.show()



+------+------+--------------------+
|userid|beerid|                 r_3|
+------+------+--------------------+
|   140|   148|-0.44288613555785006|
|   140|   148|-0.44288613555785006|
|   714|   148| -0.6539972466689612|
|   714|   148| -0.6539972466689612|
|   983|   148| -0.5095528022245166|
|   983|   148| -0.5095528022245166|
|   998|   148| -0.5873305800022944|
|   998|   148| -0.5873305800022944|
|  1145|   148| -0.6762194688911835|
|  1145|   148| -0.6762194688911835|
|  1698|   148| -0.5095528022245166|
|  1698|   148| -0.5095528022245166|
|  1846|   148|   -0.74288613555785|
|  1846|   148|   -0.74288613555785|
|  2296|   148| -0.4873305800022945|
|  2296|   148| -0.4873305800022945|
|  2288|   148| -0.5428861355578499|
|  2288|   148| -0.5428861355578499|
|  2460|   148| -0.4873305800022945|
|  2460|   148| -0.4873305800022945|
+------+------+--------------------+
only showing top 20 rows



                                                                                

In [23]:
#dzielimy dane na dwie czesci

(training, test) = data_short.randomSplit([0.8, 0.2])

In [24]:
%%time

#robimy cross validator ktory powinien sprawdzić które warunki będą najlepsze
#jeżeli będzie działac, trzeba popracować

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, TrainValidationSplit

# właściwe oceny w tabeli
als = ALS(implicitPrefs=False, userCol="userid", itemCol="beerid",
          ratingCol="rating", nonnegative = True, coldStartStrategy="drop")

# dane implicit
# als = ALS(implicitPrefs=True, userCol="userid", itemCol="beerid",
#           ratingCol="r_3", nonnegative = False, coldStartStrategy="drop")

param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10]) \
            .addGrid(als.maxIter, [10]) \
            .addGrid(als.regParam, [.1]) \
            .build()

# evaluator dla obliczenia RMSE
# labelCol zmieniamy np. na r_3 jeżeli używamy implicit
evaluator = RegressionEvaluator(metricName = "rmse", labelCol = "rating", predictionCol = "prediction")
# ile modeli będzie testowanie?
print ("Ilość modeli do sprawdzania według param_grid: ", len(param_grid))

# Corss Validator
cv = CrossValidator(estimator = als, estimatorParamMaps = param_grid, evaluator = evaluator,
                    numFolds = 3, parallelism = 3)
model = cv.fit(training)

# TrainValidationSplit działa podobnie do CrossValidator tylko że dzieli dane tylko raz, a CrossValidator
# standardowo 3, lub więcej ustawiając odpowiednio numFolds=
# tvs = TrainValidationSplit(estimator = als, estimatorParamMaps = param_grid, evaluator = evaluator, parallelism = 3)
# model = tvs.fit(training)

best_model = model.bestModel
print("Rank= " + str(best_model.rank))
print("MaxIter= " + str(best_model._java_obj.parent().getMaxIter()))
print("RegParm= " + str(best_model._java_obj.parent().getRegParam()))

predictions = best_model.transform(test)
predictions.show(n = 10)

rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Ilość modeli do sprawdzania według param_grid:  1


22/01/28 13:17:59 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/01/28 13:17:59 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

Rank= 10
MaxIter= 10
RegParm= 0.1


                                                                                

+------+-----+------------+--------+----------------+------------------+------+--------+-----------------+----------+
|beerid|style| profilename|reviewid|       beer_name|            rating|userid|style_id|beer_review_count|prediction|
+------+-----+------------+--------+----------------+------------------+------+--------+-----------------+----------+
|   148|Klsch|    SledgeJr|  457828|Hollywood Blonde|               2.5|  3490|      37|              136| 2.6701343|
|   148|Klsch|austinpowers|  342850|Hollywood Blonde|3.4499999999999997| 13734|      37|              136| 2.6767302|
|   148|Klsch|    Dogbrick|  342796|Hollywood Blonde|              2.75| 10495|      37|              136|  2.751981|
|   148|Klsch|    Dogbrick|  457831|Hollywood Blonde|              2.75| 10495|      37|              136|  2.751981|
|   148|Klsch|        saxo|  342828|Hollywood Blonde|2.1999999999999997|  1145|      37|              136|   2.58084|
|   148|Klsch|        saxo|  457863|Hollywood Blonde|2.1



Root-mean-square error = 0.4525638401104922
CPU times: user 2.69 s, sys: 705 ms, total: 3.39 s
Wall time: 11min 7s


                                                                                

In [25]:
#zapisywanie modelu

# print(model.avgMetrics[0])
# model_path = "/home/ubuntu/environment/model"
# print(model_path)
# best_model.write().save(model_path)

#odczyt modelu
#cvModelRead = CrossValidatorModel.read().load(model_path)

In [26]:
%%time


# tworzymy n rekomendacji dla każdego użytkownika
ALS_recommendationsForAllUsers = best_model.recommendForAllUsers(numItems = 10) # n — 10

#odpakowanie kolumny i przypisanie pojedyńczej rekomendacji na wiersz

nrecommendations = ALS_recommendationsForAllUsers.withColumn("rec_exp", explode("recommendations")).select('userId', col("rec_exp.beerid"), col("rec_exp.rating"))
nrecommendations.limit(20).show()



+------+------+---------+
|userId|beerid|   rating|
+------+------+---------+
|   148|137480| 5.172485|
|   148| 26230| 5.011108|
|   148|137481| 4.915808|
|   148|127221| 4.910299|
|   148|120472|4.8892198|
|   148| 13818|4.8716607|
|   148|163190| 4.826746|
|   148|  8485|4.8074527|
|   148|132888|4.8008986|
|   148|110739|4.7745523|
|   463|137480| 5.019601|
|   463| 26230| 4.965478|
|   463|137481|4.7703347|
|   463|127221|4.7567167|
|   463| 13818|4.7178736|
|   463|163190|4.6834674|
|   463|  8485|4.6711993|
|   463|132888| 4.648534|
|   463|110739|4.6352158|
|   463| 83760|4.5806475|
+------+------+---------+

CPU times: user 140 ms, sys: 30.3 ms, total: 170 ms
Wall time: 3min 11s


                                                                                

In [27]:
# sprawdzamy jakie piwo ile razy zostało polecone
nrecommendations.groupBy("beerid").agg(f.count("userid")).sort('count(userid)', ascending=False).show()



+------+-------------+
|beerid|count(userid)|
+------+-------------+
|137480|        24544|
| 26230|        23694|
|137481|        22256|
|127221|        20965|
| 13818|        20393|
|  8485|        19433|
|163190|        19365|
|120472|        19148|
|132888|        16266|
|110739|        15600|
|158827|         7268|
| 78762|         5266|
| 86102|         3240|
| 83760|         3092|
| 67957|         2539|
|116588|         2254|
| 41065|         2219|
| 88202|         1674|
| 54852|         1567|
|  1225|         1414|
+------+-------------+
only showing top 20 rows



                                                                                

In [28]:
# segregujemy użytkowników według ilości recenzji i filtrujemy

data_beerid_count = data_short.groupBy("userid").agg(f.count("beerid").alias("beerid_count")).orderBy(f.desc("beerid_count"))
data_beerid_count.filter(data_beerid_count.beerid_count > 20).filter(data_beerid_count.beerid_count < 35).show()



+------+------------+
|userid|beerid_count|
+------+------------+
| 19917|          34|
| 22618|          34|
|  8902|          34|
| 27863|          34|
|  3108|          34|
|  2925|          34|
|  1729|          34|
|  2924|          34|
| 27715|          34|
| 10104|          34|
|  1010|          34|
| 18681|          34|
|  8586|          34|
|  5075|          34|
|  5778|          34|
|  5918|          34|
| 21150|          34|
| 27276|          34|
|  9972|          34|
|  7010|          34|
+------+------------+
only showing top 20 rows



                                                                                

In [29]:
#czy to ma sens? sprawdzamy rekomendacje dla konkrtetnego uzytkownika
data_spr = data.select(data.beerid, data.beer_name, data.style)
recommend_data = nrecommendations.join(data_spr, on='beerid')


In [36]:
# rekomendacje dla konkretnego użytkownika
# uzytkownicy do sprawdzenia 1196, 6334, 234, 1010

recommend_data.filter('userid = 6334').distinct().limit(20).show()



+------+------+---------+--------------------+-------------------+
|beerid|userId|   rating|           beer_name|              style|
+------+------+---------+--------------------+-------------------+
|163190|  6334|  5.86417|Austin Beerworks ...|     Imperial Stout|
|137480|  6334| 5.416985|Saint Boniface Li...|Imperial/Double IPA|
| 80513|  6334|5.2034893|      Birra di Capri|        Belgian Ale|
| 26230|  6334|5.6114264|Wildbru Dunkle We...|       Dunkelweizen|
|116588|  6334| 5.299364|Wildfire Pirates ...|              Cider|
| 83760|  6334|5.2477856|Kamikokoro Toukag...|      Sak - Namasak|
|158827|  6334| 5.402828|Draught House Orf...|          Amber Ale|
|120472|  6334| 5.337894|Poutnk Pelhrimov ...|  Bohemian Pilsener|
| 28648|  6334|5.1919584|Nihonkai (Sea of ...|        Sak - Koshu|
|  8485|  6334| 5.214427|       Edsten Triple|       Abbey Tripel|
+------+------+---------+--------------------+-------------------+



                                                                                

In [37]:
#sprawdzamy co w rzeczywistosci ocenial ten uzytkownik
# uzytkownicy do sprawdzenia 1196, 6334, 234, 1010

aaa = data_short.select('beerid', 'style', 'beer_name', 'rating')
aaa.filter('userid = 6334').sort('rating', ascending=False).limit(20).show()



+------+--------------------+--------------------+------------------+
|beerid|               style|           beer_name|            rating|
+------+--------------------+--------------------+------------------+
|  2228|Belgian White (Wi...|Blue Moon Belgian...|              4.75|
|  4456|           Dry Stout|Guinness Extra St...|              3.85|
|   365|   American Pale Ale|Sierra Nevada Pal...|3.8000000000000003|
|  1267|           Dry Stout|    Guinness Draught|               3.2|
|   483| American Dark Lager|  Michelob AmberBock|              2.95|
+------+--------------------+--------------------+------------------+



                                                                                

In [32]:
# zapisanie do csv naszych rekomendacji

# nrecommendations.write.csv("10recomendations.csv")