In [1]:
import os
import sys
os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'
os.environ["PYSPARK_SUBMIT_ARGS"]='--num-executors 4 pyspark-shell'

spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')

sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.7
      /_/

Using Python version 3.6.5 (default, Apr 29 2018 16:14:56)
SparkSession available as 'spark'.


In [2]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import *
from pyspark.sql.functions import col
from pyspark import Row
import json

conf = SparkConf()

In [3]:
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics
from pyspark.ml.evaluation import RegressionEvaluator

In [4]:
spark.stop()

In [5]:
spark = SparkSession.builder.appName('Mikhail Pronin lab03').getOrCreate()

In [6]:
spark

In [8]:
!hdfs dfs -ls /labs/slaba03/

Found 4 items
-rw-r--r--   3 hdfs hdfs   91066524 2022-01-06 18:46 /labs/slaba03/laba03_items.csv
-rw-r--r--   3 hdfs hdfs   29965581 2022-01-06 18:46 /labs/slaba03/laba03_test.csv
-rw-r--r--   3 hdfs hdfs   74949368 2022-01-06 18:46 /labs/slaba03/laba03_train.csv
-rw-r--r--   3 hdfs hdfs  871302535 2022-01-06 18:46 /labs/slaba03/laba03_views_programmes.csv


In [9]:
items = spark.read.option("delimiter","\\t").csv("/labs/slaba03/laba03_items.csv", header=True).cache()

In [10]:
items = items.withColumn("item_id",col("item_id").cast(IntegerType()))\
             .withColumn("channel_id",col("channel_id").cast(IntegerType()))\
             .withColumn("datetime_availability_start",col("datetime_availability_start").cast(DateType()))\
             .withColumn("datetime_availability_stop",col("datetime_availability_stop").cast(DateType()))\
             .withColumn("datetime_show_start",col("datetime_show_start").cast(DateType()))\
             .withColumn("datetime_show_stop",col("datetime_show_stop").cast(DateType()))\
             .withColumn("content_type",col("content_type").cast(IntegerType()))\
             .withColumn("year",col("year").cast(IntegerType()))\
             .withColumn("genres",col("genres").cast(StringType()))\
             .withColumn("region_id",col("region_id").cast(IntegerType()))\

In [11]:
items.agg({"year": "min"}).first()[0]

1916

In [12]:
train = spark.read.csv("/labs/slaba03/laba03_train.csv", header=True).cache()

In [13]:
train = train.withColumn("user_id",col("user_id").cast(IntegerType()))\
             .withColumn("item_id",col("item_id").cast(IntegerType()))\
             .withColumn("purchase",col("purchase").cast(IntegerType()))

In [14]:
train_data = train.filter(train.purchase == 1).collect()

In [15]:
train_data = spark.sparkContext.parallelize(train_data).toDF()

In [16]:
train_data.take(5)

[Row(user_id=1654, item_id=9897, purchase=1),
 Row(user_id=1654, item_id=7394, purchase=1),
 Row(user_id=1654, item_id=9064, purchase=1),
 Row(user_id=1654, item_id=73216, purchase=1),
 Row(user_id=1654, item_id=88816, purchase=1)]

In [17]:
user_purchase = train_data.groupBy("user_id").sum("purchase").alias("user_purchase").collect()

In [18]:
user_purchase = spark.sparkContext.parallelize(user_purchase).toDF()

In [19]:
newColumns_user = ['user_id', 'cnt_purchase_user']
user_purchase = user_purchase.toDF(*newColumns_user)

In [20]:
item_purchase = train_data.groupBy("item_id").sum("purchase").alias("item_purchase").collect()

In [21]:
item_purchase = spark.sparkContext.parallelize(item_purchase).toDF()

In [22]:
newColumns_items = ['item_id', 'cnt_purchase_movie']
item_purchase = item_purchase.toDF(*newColumns_items)

In [23]:
train = train.join(user_purchase, on = 'user_id', how = 'left')

In [24]:
train = train.join(item_purchase, on = 'item_id', how = 'left')

In [25]:
train = train.na.fill(value=0, subset=["cnt_purchase_movie"])
train = train.na.fill(value=0, subset=["cnt_purchase_user"])

In [26]:
from pyspark.ml.linalg import Vectors, VectorUDT

In [27]:
ud_f = f.udf(lambda r : Vectors.dense(r),VectorUDT())

In [28]:
prog = spark.read.csv("/labs/slaba03/laba03_views_programmes.csv", header=True).cache()

In [29]:
prog = prog.withColumn("user_id",col("user_id").cast(IntegerType()))\
           .withColumn("item_id",col("item_id").cast(IntegerType()))\
           .withColumn("ts_start",col("ts_start").cast(IntegerType()))\
           .withColumn("ts_end",col("ts_end").cast(IntegerType()))\
           .withColumn("item_type",col("item_type").cast(StringType()))\
           .withColumn("watching_time",col("ts_end") - col("ts_start"))

In [31]:
watching_time = prog.groupBy(["user_id"]).sum("watching_time").alias("watching_time").collect()

In [32]:
watching_time = spark.sparkContext.parallelize(watching_time).toDF()

In [33]:
movies_watched = prog.groupBy(["user_id"]).count().alias("movies_watched").collect()

In [34]:
movies_watched = spark.sparkContext.parallelize(movies_watched).toDF()

In [35]:
watching_time = watching_time.join(movies_watched, on = "user_id", how = 'inner')

In [36]:
train = train.join(watching_time, on = "user_id", how = 'left')

In [37]:
train = train.join(items[['item_id', 'year']], on = 'item_id', how = 'left')

In [38]:
newColumns = ['user_id','item_id','purchase','cnt_purchase_user','cnt_purchase_movie','watching_time', 'count_watched', 'year']
train = train.toDF(*newColumns)

In [39]:
train = train.na.fill(value=0, subset=["year"])
train = train.na.fill(value=0, subset=["watching_time"])
train = train.na.fill(value=0, subset=["count_watched"])

In [40]:
train

DataFrame[user_id: int, item_id: int, purchase: int, cnt_purchase_user: bigint, cnt_purchase_movie: bigint, watching_time: bigint, count_watched: bigint, year: int]

In [41]:
train = train.withColumn("decade",(f.floor((col("year")-1900)/10)).cast(IntegerType()))

In [42]:
train.columns

['user_id',
 'item_id',
 'purchase',
 'cnt_purchase_user',
 'cnt_purchase_movie',
 'watching_time',
 'count_watched',
 'year',
 'decade']

In [43]:
#from pyspark.sql.functions import isnan, when, count, col
#train.select([count(when(col(c).isNull(), c)).alias(c) for c in train.columns]).show()

In [44]:
from pyspark.ml.feature import VectorAssembler
 
featuresCols = train.columns
featuresCols.remove('purchase')
featuresCols.remove('user_id')
featuresCols.remove('item_id')
 
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="features")

In [45]:
from pyspark.ml import Pipeline

In [46]:
pipeline = Pipeline(stages=[vectorAssembler])

In [47]:
pipelineModel = pipeline.fit(train)

In [48]:
train = pipelineModel.transform(train)

In [49]:
train

DataFrame[user_id: int, item_id: int, purchase: int, cnt_purchase_user: bigint, cnt_purchase_movie: bigint, watching_time: bigint, count_watched: bigint, year: int, decade: int, features: vector]

In [50]:
train_dataset = train.sampleBy("purchase", fractions={0: 0.8, 1: 0.8}, seed=5757)

In [51]:
test_dataset = train.join(train_dataset, on=["user_id", "item_id"], how="leftanti")

In [53]:
from pyspark.ml.classification import GBTClassifier

In [54]:
gbt = GBTClassifier(maxIter=8, maxDepth=5, labelCol="purchase", seed=42)

In [55]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [56]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="features", 
                                          labelCol=gbt.getLabelCol(),
                                          metricName='areaUnderROC')

In [57]:
gbtModel = gbt.fit(train_dataset)

In [58]:
predictions = gbtModel.transform(test_dataset)

In [59]:
print("Area under ROC curve:", evaluator.evaluate(predictions))

Area under ROC curve: 0.825882829162064


In [112]:
test = spark.read.csv("/labs/slaba03/laba03_test.csv", header=True).cache()

In [113]:
test = test.withColumn("user_id",col("user_id").cast(IntegerType()))\
             .withColumn("item_id",col("item_id").cast(IntegerType()))\
             .withColumn("purchase",col("purchase").cast(IntegerType()))

In [97]:
test.take(5)

[Row(user_id=1654, item_id=94814, purchase=None),
 Row(user_id=1654, item_id=93629, purchase=None),
 Row(user_id=1654, item_id=9980, purchase=None),
 Row(user_id=1654, item_id=95099, purchase=None),
 Row(user_id=1654, item_id=11265, purchase=None)]

In [114]:
test = test.join(user_purchase, on = 'user_id', how = 'left')
test = test.join(item_purchase, on = 'item_id', how = 'left')
test = test.join(watching_time, on = "user_id", how = 'left')
test = test.join(items[['item_id', 'year']], on = 'item_id', how = 'left')

In [115]:
test = test.na.fill(value=0, subset=["year"])

test = test.withColumn("decade",(f.floor((col("year")-1900)/10)).cast(IntegerType()))

In [118]:
test_Columns = ['item_id','user_id','purchase','cnt_purchase_user','cnt_purchase_movie','watching_time', 'count_watched', 'year','decade']
test = test.toDF(*test_Columns)

In [121]:
test = test.na.fill(value=0, subset=["watching_time"])
test = test.na.fill(value=0, subset=["count_watched"])
test = test.na.fill(value=0, subset=["cnt_purchase_user"])
test = test.na.fill(value=0, subset=["cnt_purchase_movie"])

In [122]:
final_prediction_data = pipelineModel.transform(test)

In [123]:
predictions_submit = gbtModel.transform(final_prediction_data)

In [124]:
predictions_submit = predictions_submit[['user_id','item_id','probability']]

In [125]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

In [126]:
tr=udf(lambda v:float(v[1]),FloatType())

In [127]:
predictions_submit = predictions_submit.withColumn("purchase_try", tr('probability'))

In [128]:
predictions_submit = predictions_submit[['user_id','item_id','purchase_try']]

In [129]:
final_columns = ['user_id', 'item_id', 'purchase']
predictions_submit = predictions_submit.toDF(*final_columns)

In [76]:
predictions_submit.take(5)

[Row(user_id=5385, item_id=728960, purchase=0.07331158220767975),
 Row(user_id=8484, item_id=728960, purchase=0.07331158220767975),
 Row(user_id=72926, item_id=728960, purchase=0.07331158220767975),
 Row(user_id=89053, item_id=728960, purchase=0.07349050790071487),
 Row(user_id=94988, item_id=728960, purchase=0.07574103772640228)]

In [130]:
predictions_submit

DataFrame[user_id: int, item_id: int, purchase: float]

In [131]:
predictions_submit = predictions_submit.sort(predictions_submit.user_id.asc(),
                                             predictions_submit.item_id.asc()).toPandas()

In [133]:
predictions_submit.head(5)

Unnamed: 0,user_id,item_id,purchase
0,1654,336,0.073641
1,1654,678,0.073641
2,1654,691,0.073641
3,1654,696,0.073668
4,1654,763,0.073641


In [132]:
predictions_submit.to_csv('lab03.csv')

In [134]:
spark.stop()