In [1]:
try:
    sc.stop()
except:
    pass

In [2]:
import json
import os
import subprocess
import sys

os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'
os.environ["PYSPARK_SUBMIT_ARGS"]='--num-executors 2 pyspark-shell'

spark_home = os.environ.get('SPARK_HOME', None)

sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.7
      /_/

Using Python version 3.6.5 (default, Apr 29 2018 16:14:56)
SparkSession available as 'spark'.


In [3]:
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import functions as sf
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType, MapType, DoubleType

import re
import numpy as np
import pandas as pd

spark.conf.set('spark.sql.execution.arrow.enabled', 'false')

In [4]:
spark

## Исследуем данные

In [5]:
# USERS PURCHASES

schema = StructType([StructField("user_id", IntegerType()),
                    StructField("item_id", IntegerType()),
                    StructField("purchase", DoubleType())])

df_train = spark.read.csv('/labs/slaba03/laba03_train.csv', header= True, schema=schema)
test = spark.read.csv('/labs/slaba03/laba03_test.csv', header= True, schema=schema)

df_views = spark.read.csv('/labs/slaba03/laba03_views_programmes.csv', header= True)
df_items = spark.read.csv('/labs/slaba03/laba03_items.csv', header= True, sep= '\t')

In [6]:
print('Num partitions ',df_train.rdd.getNumPartitions())
print('Row count ', df_train.count())
df_train.show(5)

Num partitions  2
Row count  5032624
+-------+-------+--------+
|user_id|item_id|purchase|
+-------+-------+--------+
|   1654|  74107|     0.0|
|   1654|  89249|     0.0|
|   1654|  99982|     0.0|
|   1654|  89901|     0.0|
|   1654| 100504|     0.0|
+-------+-------+--------+
only showing top 5 rows



In [7]:
print('Num partitions ', test.rdd.getNumPartitions())
print('Row count ', test.count())
test.show(2)

Num partitions  2
Row count  2156840
+-------+-------+--------+
|user_id|item_id|purchase|
+-------+-------+--------+
|   1654|  94814|    null|
|   1654|  93629|    null|
+-------+-------+--------+
only showing top 2 rows



In [8]:
print('Num partitions ', df_views.rdd.getNumPartitions())
print('Row count ', df_views.count())
df_views.show(2)

Num partitions  7
Row count  20845607
+-------+-------+----------+----------+---------+
|user_id|item_id|  ts_start|    ts_end|item_type|
+-------+-------+----------+----------+---------+
|      0|7101053|1491409931|1491411600|     live|
|      0|7101054|1491412481|1491451571|     live|
+-------+-------+----------+----------+---------+
only showing top 2 rows



In [9]:
print('Num partitions ', df_items.rdd.getNumPartitions())
print('Row count ', df_items.count())
df_items.show(1, vertical=True, truncate=False)

Num partitions  2
Row count  635568
-RECORD 0--------------------------------------------------------------------
 item_id                     | 65667                                         
 channel_id                  | null                                          
 datetime_availability_start | 1970-01-01T00:00:00Z                          
 datetime_availability_stop  | 2018-01-01T00:00:00Z                          
 datetime_show_start         | null                                          
 datetime_show_stop          | null                                          
 content_type                | 1                                             
 title                       | на пробах только девушки (all girl auditions) 
 year                        | 2013.0                                        
 genres                      | Эротика                                       
 region_id                   | null                                          
only showing top 1 row



In [10]:
spark.conf.set("spark.sql.shuffle.partitions", 200)

## Посмотрим какое кол-во человек из обучающей выборки делали покупки

In [11]:
df_train.select('user_id').distinct().count()

1941

In [12]:
df_train[df_train['purchase'] == 1].select('user_id').distinct().count()

1675

### Посмотрим покупки одного пользователя¶

In [13]:
df_train[(df_train['user_id'] == 1654) & 
         (df_train['purchase'] == 1)].show(5)

+-------+-------+--------+
|user_id|item_id|purchase|
+-------+-------+--------+
|   1654|   9897|     1.0|
|   1654|   7394|     1.0|
|   1654|   9064|     1.0|
|   1654|  73216|     1.0|
|   1654|  88816|     1.0|
+-------+-------+--------+



In [14]:
df_items[df_items['item_id'].isin([9897, 7394, 9064, 73216, 88816])]\
        .show(5, vertical=True, truncate=False)

-RECORD 0--------------------------------------------------------------------
 item_id                     | 7394                                          
 channel_id                  | null                                          
 datetime_availability_start | 1970-01-01T00:00:00Z                          
 datetime_availability_stop  | 2099-12-31T21:00:00Z                          
 datetime_show_start         | null                                          
 datetime_show_stop          | null                                          
 content_type                | 1                                             
 title                       | лиса и заяц                                   
 year                        | 1973.0                                        
 genres                      | Мультфильмы,Союзмультфильм,Наши               
 region_id                   | null                                          
-RECORD 1-------------------------------------------------------

### Посмотрим, как распределены между собой метки классов

In [15]:
df_train.groupBy("purchase").count().collect()

[Row(purchase=0.0, count=5021720), Row(purchase=1.0, count=10904)]

### Train-Validation Split

In [16]:
train = df_train.sampleBy("purchase", fractions={0: 0.8, 1: 0.8}, seed=5757)

valid = df_train.join(train, on=["user_id", "item_id"], how="leftanti")

test = spark.read.csv('/labs/slaba03/laba03_test.csv', header= True, schema=schema)

In [17]:
train.show(5)

+-------+-------+--------+
|user_id|item_id|purchase|
+-------+-------+--------+
|   1654|  74107|     0.0|
|   1654|  89249|     0.0|
|   1654|  99982|     0.0|
|   1654|  89901|     0.0|
|   1654|  84350|     0.0|
+-------+-------+--------+
only showing top 5 rows



In [18]:
train.count()

4026845

In [19]:
valid.show(5)

+-------+-------+--------+
|user_id|item_id|purchase|
+-------+-------+--------+
|   1654|   7679|     0.0|
|   1654|  67318|     0.0|
| 510087|  10820|     0.0|
| 510087|  67040|     0.0|
| 510087|  72905|     0.0|
+-------+-------+--------+
only showing top 5 rows



## Создадим фичи, которые отражают склонность к покупке пользователя и "покупаемость" item'а

In [20]:
train_purchases = train.groupBy('user_id')\
                .sum().select(sf.col("sum(purchase)").alias("user_purchases"),
                              sf.col("user_id")).cache()

train_purchases.show(2)

+--------------+-------+
|user_purchases|user_id|
+--------------+-------+
|           0.0| 901457|
|           1.0| 927211|
+--------------+-------+
only showing top 2 rows



In [21]:
item_purchases = train.groupBy('item_id')\
                        .sum().select(sf.col("sum(purchase)").alias("item_purchases"),
                                      sf.col("item_id")).cache()

item_purchases.show(2)

+--------------+-------+
|item_purchases|item_id|
+--------------+-------+
|           2.0|  90019|
|           1.0|   8638|
+--------------+-------+
only showing top 2 rows



In [22]:
# Сколько было покупок у пользователя и сколько раз покупали item?
train = train.join(train_purchases, on='user_id', how='left')
valid = valid.join(train_purchases, on='user_id', how='left')
test = test.join(train_purchases, on='user_id', how='left')

train = train.join(item_purchases, on='item_id', how='left')
valid = valid.join(item_purchases, on='item_id', how='left')
test = test.join(item_purchases, on='item_id', how='left')

In [23]:
train_user_attempts = train.groupBy('user_id').count().\
                                select(sf.col("count").alias("user_attempts"),
                                       sf.col("user_id")).cache()

train_item_attempts = train.groupBy('item_id').count().\
                                select(sf.col("count").alias("item_attempts"),
                                       sf.col("item_id")).cache()

train_user_attempts.show(2)

+-------------+-------+
|user_attempts|user_id|
+-------------+-------+
|         2089| 754230|
|         2058| 761341|
+-------------+-------+
only showing top 2 rows



In [24]:
train_item_attempts.show(2)

+-------------+-------+
|item_attempts|item_id|
+-------------+-------+
|         1117|  95940|
|         1067|  74757|
+-------------+-------+
only showing top 2 rows



In [25]:
train = train.join(train_user_attempts, on='user_id', how='left')
valid = valid.join(train_user_attempts, on='user_id', how='left')
test = test.join(train_user_attempts, on='user_id', how='left')

train = train.join(train_item_attempts, on='item_id', how='left')
valid = valid.join(train_item_attempts, on='item_id', how='left')
test = test.join(train_item_attempts, on='item_id', how='left')

In [26]:
test.show(2)

+-------+-------+--------+--------------+--------------+-------------+-------------+
|item_id|user_id|purchase|user_purchases|item_purchases|user_attempts|item_attempts|
+-------+-------+--------+--------------+--------------+-------------+-------------+
|  94814|   1654|    null|           4.0|           0.0|         2014|         1097|
|  93629|   1654|    null|           4.0|           4.0|         2014|         1099|
+-------+-------+--------+--------------+--------------+-------------+-------------+
only showing top 2 rows



In [27]:
train = train.withColumn('user_addict', sf.col('user_purchases') / sf.col('user_attempts'))
valid = valid.withColumn('user_addict', sf.col('user_purchases') / sf.col('user_attempts'))
test = test.withColumn('user_addict', sf.col('user_purchases') / sf.col('user_attempts'))

In [28]:
train = train.withColumn('item_addict', sf.col('item_purchases') / sf.col('item_attempts'))
valid = valid.withColumn('item_addict', sf.col('item_purchases') / sf.col('item_attempts'))
test = test.withColumn('item_addict', sf.col('item_purchases') / sf.col('item_attempts'))

In [29]:
train.show(2)

+-------+-------+--------+--------------+--------------+-------------+-------------+--------------------+--------------------+
|item_id|user_id|purchase|user_purchases|item_purchases|user_attempts|item_attempts|         user_addict|         item_addict|
+-------+-------+--------+--------------+--------------+-------------+-------------+--------------------+--------------------+
|  74107|   1654|     0.0|           4.0|           0.0|         2014|         1090|0.001986097318768...|                 0.0|
|  89249|   1654|     0.0|           4.0|           2.0|         2014|         1090|0.001986097318768...|0.001834862385321101|
+-------+-------+--------+--------------+--------------+-------------+-------------+--------------------+--------------------+
only showing top 2 rows



In [30]:
# На всякий случай заполняем пропуски
test = test.na.fill(0)
train = train.na.fill(0)
valid = valid.na.fill(0)

In [31]:
train_purchases.unpersist()
item_purchases.unpersist()
train_user_attempts.unpersist()
train_item_attempts.unpersist()

DataFrame[item_attempts: bigint, item_id: int]

In [32]:
from pyspark.ml.feature import VectorAssembler
# Выбираю колонки, которые войдут в features для GBT
cols = ['item_purchases', 'user_purchases', 'user_addict', 'item_addict']
assembler = VectorAssembler(inputCols=cols, outputCol="features")

train_data = assembler.transform(train).cache()
valid_data = assembler.transform(valid)
test_data = assembler.transform(test)

In [33]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(labelCol="purchase")

pipeline = Pipeline(stages=[gbt])

In [34]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol="purchase", metricName='areaUnderROC')
# score = evaluator.evaluate(predictions_valid)
# score

In [35]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [36]:
paramGrid = ParamGridBuilder().addGrid(gbt.maxDepth, [3, 4])\
                              .addGrid(gbt.minInstancesPerNode, [2, 3])\
                              .addGrid(gbt.maxBins, [50, 55])\
                              .build()

In [37]:
crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid,
                              evaluator=evaluator, numFolds=3, parallelism=3)

In [38]:
cv_model = crossval.fit(train_data)

In [39]:
cv_model.avgMetrics

[0.9191091461891099,
 0.919282972638775,
 0.9190849339367069,
 0.9210114068350821,
 0.9345468253944449,
 0.9339005288112304,
 0.9346320848739754,
 0.9339005288112302]

In [40]:
cv_model.getEstimatorParamMaps()[np.argmax(cv_model.avgMetrics)]

{Param(parent='GBTClassifier_ac487a28726a', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 4,
 Param(parent='GBTClassifier_ac487a28726a', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1.'): 3,
 Param(parent='GBTClassifier_ac487a28726a', name='maxBins', doc='Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature.'): 50}

In [41]:
predictions_valid = cv_model.transform(valid_data)

In [42]:
evaluator.evaluate(predictions_valid)

0.8928295148884524

In [43]:
train_data.unpersist()

DataFrame[item_id: int, user_id: int, purchase: double, user_purchases: double, item_purchases: double, user_attempts: bigint, item_attempts: bigint, user_addict: double, item_addict: double, features: vector]

In [44]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(labelCol="purchase", maxDepth=4, minInstancesPerNode=3, maxBins=50)

gbt_model = gbt.fit(train_data)
predictions_valid = gbt_model.transform(valid_data)

In [45]:
gbt_model.featureImportances

SparseVector(4, {0: 0.4014, 1: 0.3427, 2: 0.121, 3: 0.1349})

In [46]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol="purchase", metricName='areaUnderROC')
score = evaluator.evaluate(predictions_valid)
score

0.8920894555767243

## Создадим фичу, используя жанры

In [47]:
df_items.show(vertical=True, truncate=False)

-RECORD 0-------------------------------------------------------------------------------------------------------------
 item_id                     | 65667                                                                                  
 channel_id                  | null                                                                                   
 datetime_availability_start | 1970-01-01T00:00:00Z                                                                   
 datetime_availability_stop  | 2018-01-01T00:00:00Z                                                                   
 datetime_show_start         | null                                                                                   
 datetime_show_stop          | null                                                                                   
 content_type                | 1                                                                                      
 title                       | на пробах только 

In [48]:
from pyspark.ml.feature import RegexTokenizer

items_genres_years = df_items.select('item_id', 'genres', 'year')
items_genres_years = items_genres_years.na.fill({'genres': u'_'})
items_genres_years = items_genres_years.na.fill({'year': u'1899'})
items_genres_years = items_genres_years.withColumn('year', items_genres_years.year.cast(IntegerType()))

tokenizer = RegexTokenizer(inputCol="genres", outputCol="genre_tokens", gaps=False, pattern=u"[_А-Яа-яёЁ]+", toLowercase=False)

items_genres_years_tk = tokenizer.transform(items_genres_years)

In [49]:
from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(inputCol="genre_tokens", outputCol="genre_vector")
cv_model = cv.fit(items_genres_years_tk)
items_features_vec = cv_model.transform(items_genres_years_tk)

In [50]:
items_features_vec.show(5)

+-------+-------+----+------------+---------------+
|item_id| genres|year|genre_tokens|   genre_vector|
+-------+-------+----+------------+---------------+
|  65667|Эротика|2013|   [Эротика]|(96,[22],[1.0])|
|  65669|Эротика|2011|   [Эротика]|(96,[22],[1.0])|
|  65668|Эротика|2011|   [Эротика]|(96,[22],[1.0])|
|  65671|Эротика|2011|   [Эротика]|(96,[22],[1.0])|
|  65670|Эротика|2010|   [Эротика]|(96,[22],[1.0])|
+-------+-------+----+------------+---------------+
only showing top 5 rows



In [51]:
train_genres = train.join(items_features_vec, on='item_id', how='left')
valid_genres = valid.join(items_features_vec, on='item_id', how='left')
test_genres = test.join(items_features_vec, on='item_id', how='left')

In [52]:
# Выбираю колонки, которые войдут в features для GBT
cols = ['item_purchases', 'user_purchases', 'user_addict', 'item_addict', 'year', 'genre_vector']
assembler = VectorAssembler(inputCols=cols, outputCol="features")

train_data = assembler.transform(train_genres).cache()
valid_data = assembler.transform(valid_genres)
test_data = assembler.transform(test_genres)

In [53]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(labelCol="purchase", maxDepth=4, minInstancesPerNode=3, maxBins=50)

gbt_model = gbt.fit(train_data)
predictions_valid = gbt_model.transform(valid_data)

In [54]:
gbt_model.featureImportances

SparseVector(101, {0: 0.3998, 1: 0.3117, 2: 0.1558, 3: 0.0983, 5: 0.0033, 21: 0.0084, 26: 0.0017, 27: 0.0075, 49: 0.0107, 60: 0.0014, 73: 0.0, 74: 0.0015})

In [55]:
evaluator = BinaryClassificationEvaluator(labelCol="purchase", metricName='areaUnderROC')
score = evaluator.evaluate(predictions_valid)
score

0.8934890901380091

In [56]:
train_data.unpersist()

DataFrame[item_id: int, user_id: int, purchase: double, user_purchases: double, item_purchases: double, user_attempts: bigint, item_attempts: bigint, user_addict: double, item_addict: double, genres: string, year: int, genre_tokens: array<string>, genre_vector: vector, features: vector]

## Добавим вектор пользовательской истории

In [57]:
# Добавим вектор пользовательской истории
from pyspark.sql.functions import monotonically_increasing_id

items_count = train.groupBy('item_id').count().withColumnRenamed('count', 'item_count')
items_desc_count = items_count.orderBy(items_count.item_count.desc()).limit(500) 
items_desc_count = items_desc_count.coalesce(1)
items_desc_count = items_desc_count.withColumn("item_row_id", monotonically_increasing_id())

items_desc_count.cache()

DataFrame[item_id: int, item_count: bigint, item_row_id: bigint]

In [58]:
items_desc_count.show(2)

+-------+----------+-----------+
|item_id|item_count|item_row_id|
+-------+----------+-----------+
|  66185|      1172|          0|
|  94000|      1164|          1|
+-------+----------+-----------+
only showing top 2 rows



In [59]:
train_truncated = train.join(items_desc_count, on='item_id', how='inner')\
                    .select('user_id', 'item_row_id', 'purchase').cache()

train_truncated.show(2)

+-------+-----------+--------+
|user_id|item_row_id|purchase|
+-------+-----------+--------+
| 886063|        436|     0.0|
| 920599|        436|     0.0|
+-------+-----------+--------+
only showing top 2 rows



In [60]:
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry
from pyspark.ml.linalg import VectorUDT

from pyspark.sql.functions import udf

In [61]:
# Now we build up a history vector

as_ml = udf(lambda v: v.asML(), VectorUDT())

train_matrix = train_truncated.rdd.map(lambda r: MatrixEntry(r[0], r[1], r[2]))
train_matrix = CoordinateMatrix(train_matrix)

train_row_mat_i = train_matrix.toIndexedRowMatrix()

train_mat_df = train_row_mat_i.rows.toDF().withColumnRenamed('index', 'user_id') \
                                    .withColumn("history_vec", as_ml("vector"))

In [62]:
train_mat_df.show(10)

+-------+--------------------+--------------------+
|user_id|              vector|         history_vec|
+-------+--------------------+--------------------+
| 922400|(500,[6,7,8,9,11,...|(500,[6,7,8,9,11,...|
| 940600|(500,[1,2,4,6,7,8...|(500,[1,2,4,6,7,8...|
| 866400|(500,[0,1,4,5,6,7...|(500,[0,1,4,5,6,7...|
| 857000|(500,[0,1,2,3,4,8...|(500,[0,1,2,3,4,8...|
| 899200|(500,[0,1,4,6,7,8...|(500,[0,1,4,6,7,8...|
| 879401|(500,[1,3,6,8,10,...|(500,[1,3,6,8,10,...|
| 749801|(500,[0,2,3,4,6,7...|(500,[0,2,3,4,6,7...|
| 792601|(500,[2,3,4,5,6,7...|(500,[2,3,4,5,6,7...|
| 905401|(500,[0,2,4,9,11,...|(500,[0,2,4,9,11,...|
| 905201|(500,[0,2,3,4,6,7...|(500,[0,2,3,4,6,7...|
+-------+--------------------+--------------------+
only showing top 10 rows



In [63]:
train_truncated.unpersist()

DataFrame[user_id: int, item_row_id: bigint, purchase: double]

In [64]:
# Joining everything together

train_hist = train.join(train_mat_df, 'user_id', 'left') \
                    .join(items_features_vec, on='item_id', how='left') 

valid_hist = valid.join(train_mat_df, 'user_id', 'left') \
                    .join(items_features_vec, on='item_id', how='left') 

test_hist = test.join(train_mat_df, 'user_id', 'left') \
                    .join(items_features_vec, on='item_id', how='left')

In [65]:
train_hist.show(2)

+-------+-------+--------+--------------+--------------+-------------+-------------+--------------------+-------------------+--------------------+--------------------+--------------------+----+--------------------+--------------------+
|item_id|user_id|purchase|user_purchases|item_purchases|user_attempts|item_attempts|         user_addict|        item_addict|              vector|         history_vec|              genres|year|        genre_tokens|        genre_vector|
+-------+-------+--------+--------------+--------------+-------------+-------------+--------------------+-------------------+--------------------+--------------------+--------------------+----+--------------------+--------------------+
|   8389| 773825|     0.0|           2.0|           5.0|         2068|         1061|9.671179883945841E-4|0.00471253534401508|(500,[1,2,3,5,6,7...|(500,[1,2,3,5,6,7...|Мультфильмы,Детск...|1981|[Мультфильмы, Дет...|(96,[7,16,21,25],...|
|   8389| 867836|     0.0|           0.0|           5.0|

In [66]:
assembler = VectorAssembler(inputCols=['item_purchases', 'user_purchases',
                                       'user_addict', 'item_addict', 'history_vec',
                                       'genre_vector'],
                            outputCol="features")

train_data = assembler.transform(train_hist).cache()
valid_data = assembler.transform(valid_hist)
test_data = assembler.transform(test_hist)

In [67]:
train_data.where(train_data.features.isNull()).count()

0

In [68]:
gbt = GBTClassifier(labelCol="purchase", maxDepth=4, minInstancesPerNode=3, maxBins=50)

gbt_model = gbt.fit(train_data)
predictions_valid = gbt_model.transform(valid_data)

In [69]:
evaluator = BinaryClassificationEvaluator(labelCol="purchase", metricName='areaUnderROC')
score = evaluator.evaluate(predictions_valid)
score

0.8943848968336421

In [70]:
train_data.unpersist()

DataFrame[item_id: int, user_id: int, purchase: double, user_purchases: double, item_purchases: double, user_attempts: bigint, item_attempts: bigint, user_addict: double, item_addict: double, vector: vector, history_vec: vector, genres: string, year: int, genre_tokens: array<string>, genre_vector: vector, features: vector]

In [71]:
test_predictions = gbt_model.transform(test_data)

In [72]:
test_predictions.show(1, truncate=False, vertical=True)

-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [73]:
predictions_pd = test_predictions.select("user_id", "item_id", sf.col("probability").alias("purchase")).toPandas()
predictions_pd = predictions_pd.sort_values(by=['user_id', 'item_id'])
predictions_pd['purchase'] = predictions_pd['purchase'].apply(lambda x: x[1])
predictions_pd.to_csv('lab03.csv', index=False)

In [74]:
#sc.stop()