In [1]:
#
# Copyright © 2019 Sunho Kim. All rights reserved.
#

In [2]:
cd ..

/gorani/gorani/backend/dataserver


In [3]:
from pyspark.sql import SparkSession, DataFrame

spark = SparkSession\
    .builder\
    .appName('Recommend Books')\
    .getOrCreate()
sc = spark.sparkContext

In [4]:
# parameters
book_number = 1

In [10]:
from gorani.spark import read_api_all, read_data_all

rate_df = read_api_all(spark, 'rates')\
        .where('kind = "recommended_book"')
rate_df.show()

eb_df = read_data_all(spark, 'experienced_books')
eb_df.show()

rb_df = read_data_all(spark, 'readable_books')
rb_df.show()

rcb_df = read_api_all(spark, 'recommended_books')
rcb_df.show()

cluster_df = read_data_all(spark, 'book_cluster')
cluster_df.show()

+--------------------+-------+---------+----------------+----+--------------------+--------------------+
|                  id|user_id|target_id|            kind|rate|          created_at|          updated_at|
+--------------------+-------+---------+----------------+----+--------------------+--------------------+
|ae142f49-b63e-443...|     10|        2|recommended_book|  -1|2019-05-25 21:13:...|2019-05-25 22:42:...|
|21c98ea1-670a-41a...|     10|        3|recommended_book|  -1|2019-05-26 05:23:...|2019-05-26 06:01:...|
|1110fe5a-26af-4bc...|     10|        5|recommended_book|   1|2019-05-26 06:02:...|2019-05-26 06:02:...|
+--------------------+-------+---------+----------------+----+--------------------+--------------------+

+-------+-------+
|user_id|book_id|
+-------+-------+
|     10|      4|
+-------+-------+

+-------+-------+
|user_id|book_id|
+-------+-------+
|     10|      1|
|     10|      2|
|     10|      3|
|     10|      4|
|     10|      5|
|     10|      6|
|      1|  

In [11]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

cluster_count_df = rate_df.join(cluster_df, rate_df['target_id'] == cluster_df['id'], 'inner')\
    .select(F.col('target_id').alias('book_id'), 'user_id', 'cluster', 'rate')\
    .groupBy('user_id', 'cluster').agg(F.sum('rate').alias('count'))\
    .select('user_id', 'cluster', F.when(F.col('count') < 0, F.lit(0)).otherwise(F.col('count').cast(IntegerType())).alias('count'))

cluster_count_sum_df = cluster_count_df.groupBy('user_id')\
                    .agg(F.sum('count').alias('sum'))

cluster_count_df = cluster_count_df.join(cluster_count_sum_df.alias('f'),\
                    cluster_count_df['user_id'] == cluster_count_sum_df['user_id'], 'left')\
                    .drop(F.col('f.user_id'))

rcbn_df = rcb_df.groupBy('user_id')\
    .agg(F.count(F.lit(1)).alias('old_rec'))\

cluster_count_df = cluster_count_df.join(rcbn_df.alias('f2'),\
        cluster_count_df['user_id'] == rcbn_df['user_id'], 'left')\
        .drop(F.col('f2.user_id'))\
        .select('user_id', 'cluster', 'count', 'sum', F.when(F.isnull('old_rec'), 0).otherwise(F.col('old_rec')).alias('old_rec'))

cluster_count_df.show()

+-------+-------+-----+---+-------+
|user_id|cluster|count|sum|old_rec|
+-------+-------+-----+---+-------+
|     10|      1|    0|  0|      0|
|     10|      0|    0|  0|      0|
+-------+-------+-----+---+-------+



In [12]:
df = cluster_df.join(rb_df, cluster_df['id'] == rb_df['book_id'], 'inner').drop(F.col('id'))
df.show()
df = df.join(eb_df, (df['book_id'] == eb_df['book_id']) & (df['user_id'] == eb_df['user_id']), 'left_anti')
candidate_df = df.join(rate_df, (df['book_id'] == rate_df['target_id']) & (df['user_id'] == rate_df['user_id']), 'left_anti')
candidate_df.show()

+-------+-------+-------+
|cluster|user_id|book_id|
+-------+-------+-------+
|      0|     10|      1|
|      0|     10|      2|
|      1|     10|      3|
|      1|     10|      4|
|      1|     10|      5|
|      1|     10|      6|
|      0|      1|      1|
|      0|      1|      2|
|      1|      1|      3|
|      1|      1|      4|
|      1|      1|      5|
|      1|      1|      6|
+-------+-------+-------+

+-------+-------+-------+
|cluster|user_id|book_id|
+-------+-------+-------+
|      1|      1|      6|
|      1|      1|      3|
|      0|      1|      1|
|      0|      1|      2|
|      1|     10|      6|
|      1|      1|      5|
|      0|     10|      1|
|      1|      1|      4|
+-------+-------+-------+



In [13]:
need_cluster_df = cluster_count_df.select('user_id', 'cluster', 'old_rec',\
(F.col('count') / F.when(F.col('sum') == 0, F.lit(0.1)).otherwise(F.col('sum'))\
* (F.lit(book_number) - F.col('old_rec'))).cast(IntegerType()).alias('need'))

need_cluster_df.show()
need_cluster = need_cluster_df.collect()

+-------+-------+-------+----+
|user_id|cluster|old_rec|need|
+-------+-------+-------+----+
|     10|      1|      0|   0|
|     10|      0|      0|   0|
+-------+-------+-------+----+



In [16]:
from gorani.spark import write_api
from gorani.utils import uuid

out = []
for row in need_cluster:
    out += candidate_df.where((F.col('cluster') == row['cluster']) & (F.col('user_id') == row['user_id']))\
                .orderBy(F.rand())\
                .limit(row['need'])\
                .drop(F.col('cluster'))\
                .collect()
    if row['need'] != (book_number - row['old_rec']):
        need = book_number - row['need']
        out += candidate_df\
            .where(F.col('user_id') == row['user_id'])\
            .orderBy(F.rand())\
            .limit(need)\
            .drop(F.col('cluster'))\
            .collect()

if len(out) != 0:
    result_df = spark.createDataFrame(out)
    result_df = result_df.withColumn('id', uuid())\
        .withColumn('updated_at', F.current_timestamp())\
        .withColumn('created_at', F.current_timestamp())
    result_df.show()
    write_api('recommended_books', result_df)
    

print('success')

Py4JJavaError: An error occurred while calling o650.jdbc.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 248.0 failed 4 times, most recent failure: Lost task 1.3 in stage 248.0 (TID 5508, 183.99.225.238, executor 0): java.sql.BatchUpdateException: Batch entry 0 INSERT INTO recommended_books ("user_id","book_id","id","updated_at","created_at") VALUES (10,6,'d9411e66-4375-486c-ba9b-75382f9e13b4','2019-05-26 15:05:34.071+09','2019-05-26 15:05:34.071+09') was aborted: ERROR: duplicate key value violates unique constraint "recommended_books_pkey"
  Detail: Key (user_id, book_id)=(10, 6) already exists.  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148)
	at org.postgresql.core.ResultHandlerDelegate.handleError(ResultHandlerDelegate.java:50)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2184)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:481)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:840)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1538)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:672)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "recommended_books_pkey"
  Detail: Key (user_id, book_id)=(10, 6) already exists.
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2440)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2183)
	... 18 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:935)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:933)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:933)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:834)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:68)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
	at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:506)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO recommended_books ("user_id","book_id","id","updated_at","created_at") VALUES (10,6,'d9411e66-4375-486c-ba9b-75382f9e13b4','2019-05-26 15:05:34.071+09','2019-05-26 15:05:34.071+09') was aborted: ERROR: duplicate key value violates unique constraint "recommended_books_pkey"
  Detail: Key (user_id, book_id)=(10, 6) already exists.  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148)
	at org.postgresql.core.ResultHandlerDelegate.handleError(ResultHandlerDelegate.java:50)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2184)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:481)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:840)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1538)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:672)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "recommended_books_pkey"
  Detail: Key (user_id, book_id)=(10, 6) already exists.
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2440)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2183)
	... 18 more
