In [1]:
%%init_spark
launcher.master = "local[*]"
launcher.conf.spark.driver.memory= "16G"
launcher.conf.set("spark.hadoop.fs.s3a.connection.maximum", "8192")
launcher.conf.set("spark.hadoop.fs.s3a.threads.max", "2048")

In [2]:
import java.time.LocalDate
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{SparkSession, Row, DataFrame, Dataset}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.expressions.UserDefinedFunction
import spark.implicits._
import org.apache.spark.sql.types._

Intitializing Scala interpreter ...

Spark Web UI available at http://localhost:4041
SparkContext available as 'sc' (version = 2.4.4, master = local[*], app id = local-1598919844691)
SparkSession available as 'spark'


import java.time.LocalDate
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{SparkSession, Row, DataFrame, Dataset}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.expressions.UserDefinedFunction
import spark.implicits._
import org.apache.spark.sql.types._


In [3]:
val model = "manual"

model: String = manual


In [4]:
val groundTruthPath = s"s3a://midgar-aws-workspace/prod/shinra/telescope/base/l1_banner_events_base_table"


groundTruthPath: String = s3a://midgar-aws-workspace/prod/shinra/telescope/base/l1_banner_events_base_table


# category meta

In [5]:
val cats = spark.read.format("csv")
            .option("header", "true")
.load("s3a://midgar-aws-workspace/prod/shinra/modelling/journeyautomation/tenant/paytm-india/resources/phaseRules/dt=2020-08-20")

cats: org.apache.spark.sql.DataFrame = [l1_category_cma_id: string, l1_category_cma_name: string ... 5 more fields]


In [6]:
val pairs = cats.select($"l1_category_cma_id".alias("cma_id"), 
                         $"l1_category_cma_name".alias("category_name"))

pairs: org.apache.spark.sql.DataFrame = [cma_id: string, category_name: string]


In [8]:
pairs.filter($"cma_id" === 59).show()

+------+-------------+
|cma_id|category_name|
+------+-------------+
|    59|   O2OC-Deals|
+------+-------------+



# ground truth

In [23]:
def getUserSales(spark:SparkSession, startDateStr: String,endDateStr: String): DataFrame={
    spark.read.parquet(groundTruthPath)
    .filter($"dt".between(startDateStr,endDateStr))
    .filter($"customer_id".isNotNull && $"customer_id" =!= "null" && $"sales".isNotNull &&  $"sales" > 0)
    .select($"customer_id".cast(LongType), $"cma_id")
}

getUserSales: (spark: org.apache.spark.sql.SparkSession, startDateStr: String, endDateStr: String)org.apache.spark.sql.DataFrame


In [24]:
def getTopSalesCategories(sales: DataFrame): RDD[(Long, Array[String])] = {
  import org.apache.spark.mllib.rdd.MLPairRDDFunctions.fromPairRDD
  import sales.sparkSession.implicits._
    
  sales.rdd.
    map {
      case Row(customerId: Long, cma_id: String) =>
        (customerId, cma_id)
    }
    .groupByKey()
    .map {
      case (customerId: Long, recentCategories: Iterable[String]) =>
        (customerId, recentCategories.toArray)
    }
}

getTopSalesCategories: (sales: org.apache.spark.sql.DataFrame)org.apache.spark.rdd.RDD[(Long, Array[String])]


In [25]:
val salesCategories = getUserSales(spark, "2020-08-22", "2020-08-22")


salesCategories: org.apache.spark.sql.DataFrame = [customer_id: bigint, cma_id: string]


In [26]:
val t = salesCategories.select("customer_id").count()
t

t: Long = 9243874
res5: Long = 9243874


In [52]:
val actual_sales =salesCategories.groupBy("cma_id").agg(count("customer_id").as("count"))
.withColumn("percent", round($"count"/t, 6))
.sort($"percent".desc)
.join(pairs, Seq("cma_id")).orderBy(desc("count")).coalesce(1).cache()

actual_sales: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cma_id: string, count: bigint ... 2 more fields]


In [53]:
actual_sales.count()

res19: Long = 51


In [54]:
actual_sales.show(50, false)

+------+-------+--------+--------------------------------------+
|cma_id|count  |percent |category_name                         |
+------+-------+--------+--------------------------------------+
|443   |1458358|0.157765|RU-Prepaid                            |
|352   |1175501|0.127165|WLT-add money                         |
|6797  |1175212|0.127134|PAYTM-UPI P2P                         |
|357   |519627 |0.056213|WLT-P2P                               |
|6808  |482024 |0.052145|3PP-ON-Digital Content & Entertainment|
|420   |443737 |0.048003|3PP-OFF-Food                          |
|421   |284924 |0.030823|3PP-OFF-Gas and Petrol                |
|6809  |215447 |0.023307|3PP-ON-Food Delivery                  |
|38    |213819 |0.023131|RU-DTH                                |
|39    |179160 |0.019381|RU-Electricity                        |
|429   |171286 |0.01853 |3PP-OFF-Milk Dairy and Cooperative    |
|6810  |169122 |0.018296|3PP-ON-O2O                            |
|423   |151646 |0.016405|

# predictions

In [56]:
spark.read.parquet("s3a://midgar-aws-workspace/home/mchen/model_analysis/l1_base_event_add_dt/dt=2020-08-22").printSchema

root
 |-- customer_id: string (nullable = true)
 |-- category_name: string (nullable = true)
 |-- banner_id: string (nullable = true)
 |-- serving_strategy: string (nullable = true)
 |-- final_serving_strategy: string (nullable = true)
 |-- customer_autopilot_group: integer (nullable = true)
 |-- banner_event_click: long (nullable = true)
 |-- banner_event_impression: long (nullable = true)
 |-- clicks_180_days: long (nullable = true)
 |-- sales_180_days: long (nullable = true)
 |-- interactions_180_days: long (nullable = true)
 |-- cma_id: string (nullable = true)
 |-- category_clicks_180_days: long (nullable = true)
 |-- category_sales_180_days: long (nullable = true)
 |-- category_interactions_180_days: long (nullable = true)
 |-- pred_topk_cma_id: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- pred_topk_category_name: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rank_shown_category: string (nullable = true)
 |-- last_int

In [57]:
val autopilot_predictions = spark.read
.parquet("s3a://midgar-aws-workspace/home/mchen/model_analysis/l1_base_event_add_dt/dt=2020-08-22")
.filter($"customer_id".isNotNull && $"customer_id" =!= "null" && $"final_serving_strategy" === "autopilot")
.select("customer_id", "category_name")
.join(pairs, Seq("category_name"))
.select($"customer_id".cast(LongType), $"cma_id", $"category_name")
.filter($"customer_id".isNotNull)

val manual_predictions = spark.read
.parquet("s3a://midgar-aws-workspace/home/mchen/model_analysis/l1_base_event_add_dt/dt=2020-08-22")
.filter($"customer_id".isNotNull && $"customer_id" =!= "null" && $"final_serving_strategy" === "manual")
.select("customer_id", "category_name")
.join(pairs, Seq("category_name"))
.select($"customer_id".cast(LongType), $"cma_id", $"category_name")
.filter($"customer_id".isNotNull)

autopilot_predictions: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [customer_id: bigint, cma_id: string ... 1 more field]
manual_predictions: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [customer_id: bigint, cma_id: string ... 1 more field]


In [58]:
val autopilot_total = autopilot_predictions.select("customer_id").count()
val manual_total = manual_predictions.select("customer_id").count()
//total

autopilot_total: Long = 2943276
manual_total: Long = 21595834


In [63]:
val autopilotCount = autopilot_predictions.groupBy("cma_id", "category_name").agg(count("customer_id").as("count"))
.withColumn("percent", round($"count"/autopilot_total, 6))
.sort($"percent".desc).coalesce(1).cache

val manualCount = manual_predictions.groupBy("cma_id", "category_name").agg(count("customer_id").as("count"))
.withColumn("percent", round($"count"/manual_total, 6))
.sort($"percent".desc).coalesce(1).cache

autopilotCount: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cma_id: string, category_name: string ... 2 more fields]
manualCount: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cma_id: string, category_name: string ... 2 more fields]


In [64]:
autopilotCount.count()
manualCount.count()

res25: Long = 33


In [65]:
autopilotCount.show(50, false)

+------+---------------+------+--------+
|cma_id|category_name  |count |percent |
+------+---------------+------+--------+
|443   |RU-Prepaid     |518755|0.176251|
|14    |EC-Apparel     |361743|0.122905|
|59    |O2OC-Deals     |279755|0.095049|
|352   |WLT-add money  |274753|0.093349|
|23    |EC-Mobiles     |198769|0.067533|
|9     |PAYTM-Money    |165211|0.056132|
|39    |RU-Electricity |157869|0.053637|
|19    |EC-CE          |134240|0.045609|
|45    |RU-Insurance   |117290|0.03985 |
|25    |EC-Groceries   |98753 |0.033552|
|10    |PAYTM-Games    |90099 |0.030612|
|18    |EC-HnK         |87187 |0.029622|
|12    |PAYTM-Gold     |75910 |0.025791|
|15    |EC-FashAcc     |56379 |0.019155|
|445   |RU-Credit Card |52997 |0.018006|
|53    |O2OC-Flights   |50431 |0.017134|
|6803  |PAYTM-Mini Apps|40581 |0.013788|
|56    |O2OC-Events    |38805 |0.013184|
|22    |EC-LA          |33346 |0.01133 |
|1     |PAYTM          |27993 |0.009511|
|16    |EC-Footwear    |24870 |0.00845 |
|40    |RU-FastT

In [66]:
manualCount.show(50, false)

+------+---------------+-------+--------+
|cma_id|category_name  |count  |percent |
+------+---------------+-------+--------+
|10    |PAYTM-Games    |4381249|0.202875|
|1     |PAYTM          |2815187|0.130358|
|352   |WLT-add money  |1596737|0.073937|
|443   |RU-Prepaid     |1478787|0.068476|
|25    |EC-Groceries   |1362711|0.063101|
|23    |EC-Mobiles     |1256235|0.05817 |
|14    |EC-Apparel     |1109263|0.051365|
|19    |EC-CE          |930570 |0.04309 |
|45    |RU-Insurance   |868589 |0.04022 |
|22    |EC-LA          |779242 |0.036083|
|18    |EC-HnK         |770794 |0.035692|
|39    |RU-Electricity |765018 |0.035424|
|12    |PAYTM-Gold     |706981 |0.032737|
|16    |EC-Footwear    |463998 |0.021486|
|9     |PAYTM-Money    |439709 |0.020361|
|445   |RU-Credit Card |406662 |0.018831|
|6803  |PAYTM-Mini Apps|373462 |0.017293|
|15    |EC-FashAcc     |311806 |0.014438|
|59    |O2OC-Deals     |233421 |0.010809|
|26    |EC-LaptopsPC   |139786 |0.006473|
|56    |O2OC-Events    |105928 |0.

In [67]:
import org.apache.commons.math3.stat.correlation.KendallsCorrelation

def computeKendall(arrA: Seq[Long], arrB: Seq[Long]): Double = {
    val x: Array[Double] = arrA.map(_.toDouble).toArray
    val y: Array[Double] = arrB.map(_.toDouble).toArray
    val kc = new KendallsCorrelation
    kc.correlation(x, y)
}

def jc(arrA:Set[Int], arrB:Set[Int]):Double ={
    (arrA.intersect(arrB).size.toDouble)/(arrA.union(arrB).size.toDouble)
}

import org.apache.commons.math3.stat.correlation.KendallsCorrelation
computeKendall: (arrA: Seq[Long], arrB: Seq[Long])Double
jc: (arrA: Set[Int], arrB: Set[Int])Double


In [123]:
import org.apache.spark.sql.functions.format_number

def evaluate(predictionsPerCategory: DataFrame, salesPerCategory: DataFrame) = {
    val res1 = predictionsPerCategory
    .select($"cma_id".cast(IntegerType), $"count".alias("pred_count"))
    .orderBy(desc("pred_count")).coalesce(1).cache

    val res2 = salesPerCategory
    .select($"cma_id".cast(IntegerType), $"count".alias("test_count"))
    .orderBy(desc("test_count")).coalesce(1).cache
    val size = res2.count
    (1.0 to 10.0 by 1.0).map(_/10).map(k => {
        val topk = (size*k).toInt - 1
        val joined = (res2.limit(topk).join(res1.limit(topk), Seq("cma_id"), "left")
                      .withColumn("pred_count", coalesce($"pred_count", (round(rand()*topk)).cast(LongType))))

        val cmaids1= joined.select("pred_count").map(r => r.getLong(0)).collect.toSeq
        val cmaids2= joined.select("test_count").map(r => r.getLong(0)).collect.toSeq
        val res1Ids = res1.limit(topk).select("cma_id").map(r => r.getInt(0)).collect.toSet
        val res2Ids = res2.limit(topk).select("cma_id").map(r => r.getInt(0)).collect.toSet
        val topSales = res2.limit(topk).map(r => (r.getInt(0), r.getLong(1))).collect.toSeq
        val topPrediction = res1.limit(topk).map(r => (r.getInt(0), r.getLong(1))).collect.toSeq
        (k*100, "%1.2f".format(computeKendall(cmaids1,cmaids2)), "%1.2f".format(jc(res1Ids,res2Ids)))
    }).toDF("top_percent_category", "kendall", "jc_similarity")
}

import org.apache.spark.sql.functions.format_number
evaluate: (predictionsPerCategory: org.apache.spark.sql.DataFrame, salesPerCategory: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


In [124]:
val evalAutopilot = evaluate(autopilotCount, actual_sales)

evalAutopilot: org.apache.spark.sql.DataFrame = [top_percent_category: double, kendall: string ... 1 more field]


In [125]:
evalAutopilot.show(10, false)

+--------------------+-------+-------------+
|top_percent_category|kendall|jc_similarity|
+--------------------+-------+-------------+
|10.0                |1.00   |0.33         |
|20.0                |0.46   |0.13         |
|30.0                |0.15   |0.12         |
|40.0                |0.34   |0.09         |
|50.0                |0.10   |0.14         |
|60.0                |0.19   |0.23         |
|70.0                |-0.08  |0.27         |
|80.0                |-0.17  |0.35         |
|90.0                |-0.12  |0.39         |
|100.0               |-0.06  |0.45         |
+--------------------+-------+-------------+



In [126]:
val evalManual = evaluate(manualCount, actual_sales)

evalManual: org.apache.spark.sql.DataFrame = [top_percent_category: double, kendall: string ... 1 more field]


In [127]:
evalManual.show(10, false)

+--------------------+-------+-------------+
|top_percent_category|kendall|jc_similarity|
+--------------------+-------+-------------+
|10.0                |0.67   |0.33         |
|20.0                |0.40   |0.13         |
|30.0                |0.54   |0.12         |
|40.0                |0.37   |0.09         |
|50.0                |0.06   |0.14         |
|60.0                |-0.12  |0.23         |
|70.0                |-0.08  |0.29         |
|80.0                |0.00   |0.36         |
|90.0                |-0.14  |0.43         |
|100.0               |-0.09  |0.48         |
+--------------------+-------+-------------+



In [93]:
actual_sales.show()

+------+-------+--------+--------------------+
|cma_id|  count| percent|       category_name|
+------+-------+--------+--------------------+
|   443|1458358|0.157765|          RU-Prepaid|
|   352|1175501|0.127165|       WLT-add money|
|  6797|1175212|0.127134|       PAYTM-UPI P2P|
|   357| 519627|0.056213|             WLT-P2P|
|  6808| 482024|0.052145|3PP-ON-Digital Co...|
|   420| 443737|0.048003|        3PP-OFF-Food|
|   421| 284924|0.030823|3PP-OFF-Gas and P...|
|  6809| 215447|0.023307|3PP-ON-Food Delivery|
|    38| 213819|0.023131|              RU-DTH|
|    39| 179160|0.019381|      RU-Electricity|
|   429| 171286| 0.01853|3PP-OFF-Milk Dair...|
|  6810| 169122|0.018296|          3PP-ON-O2O|
|   423| 151646|0.016405|  3PP-OFF-Healthcare|
|   358|  91531|0.009902|WLT-transfer to bank|
|    44|  90610|0.009802|      RU-Google Play|
|   391|  88438|0.009567|       3PP-ON-Others|
|   444|  59678|0.006456|         RU-Postpaid|
|  6811|  54754|0.005923|3PP-ON-Travel & T...|
|   426|  449

In [94]:
manualCount.show()

+------+---------------+-------+--------+
|cma_id|  category_name|  count| percent|
+------+---------------+-------+--------+
|    10|    PAYTM-Games|4381249|0.202875|
|     1|          PAYTM|2815187|0.130358|
|   352|  WLT-add money|1596737|0.073937|
|   443|     RU-Prepaid|1478787|0.068476|
|    25|   EC-Groceries|1362711|0.063101|
|    23|     EC-Mobiles|1256235| 0.05817|
|    14|     EC-Apparel|1109263|0.051365|
|    19|          EC-CE| 930570| 0.04309|
|    45|   RU-Insurance| 868589| 0.04022|
|    22|          EC-LA| 779242|0.036083|
|    18|         EC-HnK| 770794|0.035692|
|    39| RU-Electricity| 765018|0.035424|
|    12|     PAYTM-Gold| 706981|0.032737|
|    16|    EC-Footwear| 463998|0.021486|
|     9|    PAYTM-Money| 439709|0.020361|
|   445| RU-Credit Card| 406662|0.018831|
|  6803|PAYTM-Mini Apps| 373462|0.017293|
|    15|     EC-FashAcc| 311806|0.014438|
|    59|     O2OC-Deals| 233421|0.010809|
|    26|   EC-LaptopsPC| 139786|0.006473|
+------+---------------+-------+--