In [1]:
import pandas as pd
import numpy as np
import os
import sys
import matplotlib.pyplot as plt
import seaborn as sns
import json
import os
import shutil

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.functions import udf
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField, StringType
from pyspark.sql import Window

In [3]:
transactions_data_path = "../data/transactions.csv"
catalogue_path = "../data/catalogue.json"
test_path = "../data/test_users.json"
ratings_path = "../data/StepDan_ratings_prq"

In [4]:
spark = SparkSession.builder \
            .appName("OkkoRecSystem") \
            .getOrCreate()

In [5]:
df = spark.read.parquet(ratings_path)

In [None]:
df.show()

In [None]:
df.count()

In [6]:
userCol = 'user_uid'
itemCol = 'element_uid'
ratingCol = 'rate'


df = df.withColumnRenamed(itemCol, 'item_id')\
        .withColumnRenamed(ratingCol, 'rate')\
        .withColumnRenamed(userCol, 'user_id')

In [7]:
rating_df = df

rating_df = rating_df.withColumn("rate", col("rate").cast('int'))\
                    .withColumn("user_id", col("user_id").cast('int'))\
                    .withColumn("item_id", col("item_id").cast('int'))

In [8]:
rating_df = rating_df.withColumn("user_id", col("user_id").cast('int'))\
                    .withColumn("item_id", col("item_id").cast('int'))

In [9]:
film_cnt = rating_df.groupBy('user_id').count()\
            .withColumn('enough_films', col('count') >= 3)

In [10]:
rating_df = rating_df.join(film_cnt, on='user_id', how='left')\
            .where(col('enough_films') == True)

In [None]:
rating_df.count()

### OOT split

### randomSplit 

### Client oriented split on train/train/test

In [None]:
fin_cols = ['user_id', 'item_id', 'ts', 'rate']

rating_df = rating_df.select(fin_cols)

In [None]:
def train_test_split(df, test_size=0.2):
    
    cols = df.columns
    
    window = Window.orderBy("ts")\
                .partitionBy('user_id')\
                .rowsBetween(Window.unboundedPreceding, Window.currentRow)
    df = df.withColumn('group_index', row_number().over(window))
    
    group_sizes = rating_df.groupBy('user_id').count()\
                    .select('user_id', col('count').alias('group_size'))\
                    .withColumn('test', ceil(col('group_size') * test_size))\
                    .withColumn('train_second', ((col('group_size') - col('test')) * 0.5).cast('int'))\
                    .withColumn('train_first', (col('group_size') - col('train_second') - col('test')))\
                    .withColumn('train_second_index', col('train_first') + col('train_second'))\
                    .withColumn('test_index', col('train_second_index') + col('test'))
    df = df.join(group_sizes, on='user_id', how='left')
    
    df = df.withColumn('first_dataset', col('group_index') <= col('train_first'))\
            .withColumn('second_dataset', (col('group_index') > col('train_first')) & \
                                            (col('group_index') <= col('train_second_index')))\
            .withColumn('third_dataset', (col('group_index') > col('train_second_index')) & \
                                            (col('group_index') <= col('test_index')))

    first_train = df.filter(df['first_dataset'] == True).select(cols)
    second_train = df.filter(df['second_dataset'] == True).select(cols)
    test = df.filter(df['third_dataset'] == True).select(cols)
    return first_train, second_train, test

In [None]:
train_als, train_cb, test = train_test_split(rating_df)

### First level model building

In [None]:
rating_df.show()

In [12]:
rating_df.where(rating_df['rate'].isNull()).show()

+-------+-------+------------------+----+-----+------------+
|user_id|item_id|                ts|rate|count|enough_films|
+-------+-------+------------------+----+-----+------------+
| 170501|   1570| 42957432.60121477|null|    3|        true|
| 170501|    524|42158500.037479214|null|    3|        true|
| 170501|   3057|  42226336.1775519|null|    3|        true|
|  50891|   5266| 42210037.78274297|null|    4|        true|
|  50891|   2174|44274120.350332566|null|    4|        true|
|  50891|    439| 42209787.20296109|null|    4|        true|
|  50891|   3866|  42208715.3444934|null|    4|        true|
| 514373|   3101| 42227380.03900147|null|    5|        true|
| 514373|   8987| 42226967.27363485|null|    5|        true|
| 514373|    327| 42584781.91646723|null|    5|        true|
| 514373|   3370| 42576591.71197143|null|    5|        true|
| 514373|   2598| 42594735.64953747|null|    5|        true|
| 133052|   1653|  42944692.2293622|null|    3|        true|
| 133052|   7767| 428295

In [11]:
als = ALS(maxIter=10, regParam=0.01, userCol="user_id", itemCol="item_id", ratingCol="rate",
          coldStartStrategy="drop", implicitPrefs=True)

model = als.fit(rating_df)

In [14]:
@udf(returnType=ArrayType(IntegerType()))
def get_film_ids(arr):
    return [x[0] for x in arr]

### Test on boosters

In [15]:
with open(test_path, "r") as f:
    test = json.load(f)

cSchema = StructType([StructField('user_id', IntegerType(), False)])

test_users = list(map(lambda x: [x], test['users']))

test_df = spark.createDataFrame(test_users, schema=cSchema)

ans = model.recommendForUserSubset(test_df, 20)

ans = ans.select(col('user_id').cast(StringType()).alias('user_id'),
                 get_film_ids(col('recommendations')).alias('reccomendations'))

ans_df = ans.toPandas()

result = {}

for i in range(ans_df.shape[0]):
    result[ans_df.loc[i, 'user_id']] = ans_df.loc[i, 'reccomendations']

ans_df.index = ans_df.user_id

a = ans_df.reccomendations.to_json(orient = 'index', force_ascii=False)

with open('../data/answerStepDan3.json', "w") as f:
    json.dump(result, f)

### Create predictions

In [None]:
all_movies = train_als.select('item_id').distinct()

In [None]:
all_users = train_als.select('user_id').distinct()

In [None]:
rec_df = model.recommendForAllUsers(100)

In [None]:
%%time
rec_df = rec_df.repartition(500)

In [None]:
%%time
prediction_path = '../data/predictions_prq'

if os.path.exists(prediction_path):
    shutil.rmtree(prediction_path)
    rec_df.write.parquet(prediction_path)
else:
    rec_df.write.parquet(prediction_path)

### Test dataset for second model

In [None]:
pred_df = spark.read.parquet('../data/predictions_prq')

In [None]:
pred_df = pred_df.withColumn("rec_films", get_film_ids(col('recommendations')))

In [None]:
user_watched_films = train_als.groupBy('user_id').agg(collect_list('item_id').alias('watched_films'))

In [None]:
pred_df = pred_df.join(user_watched_films, on='user_id', how='left')

In [None]:
pred_df = pred_df.withColumn('new_films', array_except('rec_films', 'watched_films'))

In [None]:
user_future_films = train_cb.groupBy('user_id').agg(collect_list('item_id').alias('future_films'))

In [None]:
pred_df = pred_df.join(user_future_films, on='user_id', how='left')

In [None]:
pred_df.show()

In [None]:
rec_intersect = pred_df.select('user_id', array_intersect('new_films', 'future_films').alias('rec_intersection'))

In [None]:
@udf(returnType=IntegerType())
def NotEmpty(x):
    if x == None:
        return 0
    elif len(x) == 0:
        return 1
    else:
        return 2

In [None]:
rec_intersect = rec_intersect.withColumn('intersection_len', NotEmpty('rec_intersection'))

In [None]:
s = pred_df.count()

In [None]:
rec_intersect.groupBy('intersection_len').count().withColumn('count', col('count')/s * 100).show()

### Build dataset for second model

In [None]:
train = pred_df.select('user_id',
                       array_intersect('new_films', 'future_films').alias('positives'),
                      array_except('new_films', 'future_films').alias('negatives'))

In [None]:
train.show()

In [None]:
train_negatives = train.select('user_id', explode(col('negatives')).alias('item_id'), lit(0).alias('target'))

train_positives = train.select('user_id', explode(col('positives')).alias('item_id'), lit(1).alias('target'))

train = train_positives.unionAll(train_negatives)

In [None]:
train.show()

### Building the second model

In [None]:
train.count()

In [None]:
spark.stop()@