# Connection and loadings

In [1]:
%%init_spark
launcher.master = "yarn"
launcher.conf.spark.app.name = "Similar_A/B"
launcher.conf.spark.yarn.queue="root.analyst.editor-bu"
launcher.conf.spark.executor.cores=5
launcher.conf.spark.executor.memory="15g"
launcher.conf.spark.driver.memory="10g"
launcher.conf.spark.dynamicAllocation.enabled="true"
launcher.conf.spark.shuffle.service.enabled="true"
launcher.conf.spark.dynamicAllocation.maxExecutors=50
launcher.jars=["/opt/shared/postgresql_1.jar"]
launcher.conf.spark.sql.shuffle.partitions=250
launcher.conf.spark.serializer="org.apache.spark.serializer.KryoSerializer"
launcher.conf.spark.yarn.executor.memoryOverhead="5120m"

In [2]:
import org.joda.time.format.DateTimeFormat
import java.util.Properties
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.joda.time.{DateTime, Days}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.joda.time.format.DateTimeFormat
import org.joda.time.{DateTime, Days}
val formatter = DateTimeFormat.forPattern("yyyy-MM-dd")

import scala.util.Try
import java.sql.{Connection, DriverManager, ResultSet}
val sparkSession = SparkSession.builder.master("local").appName("example").getOrCreate()
import sparkSession.implicits._      
import org.apache.spark.SparkContext  


val spark = SparkSession.builder().appName("test").getOrCreate()

val pathFormatter = DateTimeFormat.forPattern("yyyy/MM/dd")
val partitionFormatter = DateTimeFormat.forPattern("yyyy-MM-dd")

val props = new Properties()
val JDBC_URL = "jdbc:postgresql://172.16.33.44:5432/dwh"

props.setProperty("driver", "org.postgresql.Driver")
props.setProperty("max_connections", "10000")
props.setProperty("user", "dwh")
props.setProperty("password", "4F51hnXVMZoDcHrLvf")
props.setProperty("loginTimeout", "30")
props.setProperty("socketTimeout", "1800")


val sqlContext = spark


def publish(df:DataFrame, table:String, append:Boolean): Unit = {
 val conn = DriverManager.getConnection("jdbc:postgresql://172.16.33.44:5432/dwh?user=dwh&password=4F51hnXVMZoDcHrLvf")
 try{ val mode = if(append) Append else Overwrite
  df.write.mode(mode).jdbc(JDBC_URL, table, props)
 
 }
catch {
            case e: Exception => 
              e.printStackTrace()
    }
   finally {
    
    conn.close
}


}



def getBigTable(from:DateTime, to:DateTime) : DataFrame = {
  val days = Days.daysBetween(from.withTimeAtStartOfDay(), to.withTimeAtStartOfDay()).getDays
  val files = (0 to days).map { d=>
    val dateStr = from.plusDays(d).toString(partitionFormatter)
    s"/analytics/big-table/partition_date=$dateStr"
  }
  spark.read.parquet(files:_*)
}

def getEvent(app:String, event:String, from:DateTime, to:DateTime, mergeSchemaOption: String = "false") : DataFrame = {
  val days = Days.daysBetween(from.withTimeAtStartOfDay(), to.withTimeAtStartOfDay()).getDays
  val files = (0 to days).map { d=>
    val dateStr = from.plusDays(d).toString(pathFormatter)
    s"/analytics/events/PARQUET/mobile_events/$app/$dateStr/$event"
  }
  spark.read.option("mergeSchema", mergeSchemaOption).parquet(files:_*)
}

def getSocialEvent(from:DateTime, to:DateTime) : DataFrame = {
  val days = Days.daysBetween(from.withTimeAtStartOfDay(), to.withTimeAtStartOfDay()).getDays
  val files = (0 to days).map { d=>
    val dateStr = from.plusDays(d).toString(pathFormatter)
    s"/analytics/events/PARQUET/social_events/$dateStr/"
  }
  spark.read.parquet(files:_*)
}

def getActiveDevices(from:DateTime, to:DateTime): DataFrame = {
  val days = Days.daysBetween(from.withTimeAtStartOfDay(), to.withTimeAtStartOfDay()).getDays
  val files = (0 to days).map { d=>
    val dateStr = from.plusDays(d).toString(pathFormatter)
    s"/analytics/events/PARQUET/mobile_devices/$dateStr/"
  }
  spark.read.parquet(files:_*)
}

def table(app:String, event:String, from:DateTime, to:DateTime) : Unit = {
  val days = Days.daysBetween(from.withTimeAtStartOfDay(), to.withTimeAtStartOfDay()).getDays
  val files = (0 to days).map { d=>
    val dateStr = from.plusDays(d).toString(pathFormatter)
    s"/analytics/events/PARQUET/mobile_events/$app/$dateStr/$event"
  }
  spark.read.parquet(files:_*).registerTempTable(event)
}

def getEntity(entity: String): DataFrame = {
  spark.read.parquet(s"/analytics/entities/$entity")
}

def getUsers() : DataFrame = {
  getEntity("users")
}

def getPhotos() : DataFrame = {
  getEntity("photos")
}

def getContests() : DataFrame = {
  getEntity("contests")
}

def getTags() : DataFrame = {
  getEntity("tags")
}

def getStreams() : DataFrame = {
  getEntity("streams")
}

def getDevices() : DataFrame = {
  getEntity("device_attributes")
}

def getRequests() : DataFrame = {
  getEntity("requests")
}

def getCommon(app:String, from:DateTime, to:DateTime) : DataFrame = {
  getEvent(app, "common", from, to)
}


def today(): DateTime = {
  new DateTime().withTimeAtStartOfDay()
}

def yesterday(): DateTime = {
  today().minusDays(1)
}

def getCommonLastNDays(app:String, days:Int) : org.apache.spark.sql.DataFrame = {
  getEvent(app, "common", today().minusDays(days), today())
}

def getEventLastNDays(app:String, event:String, days:Int) : DataFrame = {
  getEvent(app, event, today().minusDays(days), today())
}

def getStringParam(name:String, default:String): String = {
  Try(System.getenv(name)).getOrElse(default)
}

def getLongParam(name:String, default:Long): Long = {
  Try(System.getenv(name).toLong).getOrElse(default)
}

def getDateParam(name:String, default:DateTime): DateTime = {
  Try(pathFormatter.parseDateTime(System.getenv(name))).getOrElse(default)
}



def getMobileDevices(from:DateTime, to:DateTime): DataFrame = {
  val days = Days.daysBetween(from.withTimeAtStartOfDay(), to.withTimeAtStartOfDay()).getDays
  val files = (0 to days).map { d=>
    val dateStr = from.plusDays(d).toString(pathFormatter)
    s"/analytics/events/PARQUET/mobile_devices/$dateStr/"
  }
  spark.read.parquet(files:_*)
}



def union_events(event:Array[(String,String)], from:DateTime, to:DateTime) : DataFrame = {
  
  var union_base : org.apache.spark.sql.DataFrame = null
  var query:String=""
  
   for(d<-event)
{
    
     if (d._2!="")
     {query="where "+d._2}
    else
    query=""

    var second=getEvent("com.picsart.studio",d._1,from,to).registerTempTable("second")

    var final_second = spark.sql(s""" select * from second  $query """).
    select($"device_id",$"platform",to_date($"timestamp").as("date"),lower($"country_code").as("country_code"))  

if(union_base==null) 
    {union_base=final_second} 
else 
    {union_base=union_base.unionAll(final_second)}

}
return union_base
}
 
 
def erase_table(table_name:String, condition:String ="") {
   var JDBC_DRIVER = "org.postgresql.Driver";  
   var DB_URL = "jdbc:postgresql://172.16.33.44:5432/dwh";
   var USER = "dwh";
   var PASS = "4F51hnXVMZoDcHrLvf";
   var conn:java.sql.Connection = null;
   var stmt:java.sql.Statement = null;
   conn = java.sql.DriverManager.getConnection(DB_URL, USER, PASS);
   stmt = conn.createStatement();
   var sql:String = s"DELETE FROM $table_name" ;
   if (condition != "") { sql = sql+ " where " + condition}
   println(sql)
   stmt.executeUpdate(sql);
   stmt.close()
}

def getActive(from:DateTime, to:DateTime): DataFrame = {

 val days = Days.daysBetween(from.withTimeAtStartOfDay(), to.withTimeAtStartOfDay()).getDays
 var union_base : org.apache.spark.sql.DataFrame = null
(0 to days).map { d=>
val dateStr = from.plusDays(d).toString(pathFormatter) 
var aa=spark.read.parquet(s"/analytics/events/PARQUET/mobile_devices/$dateStr/").
filter($"app"==="com.picsart.studio").
select("device_id").distinct.
withColumn("date",lit(dateStr)).
withColumn("date", regexp_replace(col("date"), "/", "-")).
groupBy(to_date($"date").as("date")).agg(countDistinct("device_id"))

   

if(union_base==null) 
    {union_base=aa} 
else 
    {union_base=union_base.unionAll(aa)}

}
return union_base
}

Intitializing Scala interpreter ...

Spark Web UI available at http://ny-dev-master.picsart.loc:8088/proxy/application_1567159442863_1617,http://ny-hbase-master.picsart.loc:8088/proxy/application_1567159442863_1617
SparkContext available as 'sc' (version = 2.4.3, master = yarn, app id = application_1567159442863_1617)
SparkSession available as 'spark'


import org.joda.time.format.DateTimeFormat
import java.util.Properties
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.joda.time.{DateTime, Days}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.joda.time.format.DateTimeFormat
import org.joda.time.{DateTime, Days}
formatter: org.joda.time.format.DateTimeFormatter = org.joda.time.format.DateTimeFormatter@3b97b9c5
import scala.util.Try
import java.sql.{Connection, DriverManager, ResultSet}
sparkSession: o...

# # Similar

In [4]:
val from = DateTime.parse("2019-07-09")
val to = DateTime.parse("2019-07-23")


// var dates_seq : Seq[(String)] = Seq()

// for (f <- 0 to Days.daysBetween(from, to).getDays()) {
//     val b = (from.plusDays(f).toString())
//     dates_seq = dates_seq :+ b
// }

// var dates = dates_seq.toDF("current_date").select(to_date($"current_date").as("current_date"))

// var countries_arr = Array("am", "ar","br","ca","cn","co","fr","de","in","id","it","jp","my","mx","ph","ru","kr","es","tr","gb","us","vn")


// var countries = sqlContext.read.parquet("/analyst-shared/countries").select($"country_name", lower($"country_code").as("country_code"))

// var users = sqlContext.read.parquet("/analytics/entities/users_orders").
//             select($"device_id".as("did"), $"order_id".as("order_id")).distinct().
//             filter($"did".isNotNull)


// var country_codes = getDevices().select($"device_id", lower($"country_code").as("country_code")).
//                     withColumn("country_code", when(lower($"country_code").isin(countries_arr:_*), lower($"country_code")).otherwise("other")).
//                     groupBy($"device_id").agg(first($"country_code").as("country_code")).distinct().
//                     join(users, $"device_id" === $"did", "inner").
//                     select($"did", $"order_id", $"country_code").distinct()


// var orders = sqlContext.read.parquet("/analytics/entities/orders").
//             withColumn("platform", 
//                 when(($"market" === "google"), lit("android")).
//                 when(($"market" === "apple"), lit("apple")).
//                 otherwise(lit("other"))).
//             filter($"platform"==="apple").as("a").
//                 join(country_codes.as("b"),$"b.order_id" === $"a._id", "inner")./*drop($"b.order_id").*/
//                 select(
//                     $"a._id", $"b.country_code", $"b.order_id", $"b.did", 
//                     $"a.platform", $"events", $"renewals").as("a").
//                     join(countries.as("b"), $"a.country_code" === $"b.country_code", "left").
//                 select($"order_id", $"_id", $"did", $"platform", $"events", $"renewals", when($"b.country_code".isNotNull, $"b.country_name").otherwise(lit("Other")).as("country")).
//                 drop($"order_id")


// var orders_rn = orders.
//                 select($"_id", $"did", $"platform", $"country", $"events", explode($"renewals").as("rn")).
//                 select($"_id", $"did", $"platform", $"country", $"events", $"rn.purchaseDate".as("RN_purchase"), $"rn.expireDate".as("RN_expire"), $"rn.paymentStatus".as("RN_status")).
//                 withColumn("dif", round(($"RN_expire".cast("long") - $"RN_purchase".cast("long"))/24D/3600D, 0)).
//                 filter($"dif" >= 6 && $"RN_status" === "PAID")
    
// var orders_ev = orders. 
//                 select($"_id", $"did",  $"platform", $"country", $"renewals", explode($"events").as("ev")).
//                 select($"_id", $"did",  $"platform", $"country", $"renewals", $"ev.timestamp".as("rf_date"), $"ev.eventType".as("event")).
//                 filter($"event" === "SUBSCRIPTION_REFUNDED")


// var orders_without_refunds = orders_rn.as("a").join(orders_ev.as("b"), $"a._id" === $"b._id" && $"rf_date" >= $"RN_purchase" && $"rf_date" <= $"RN_expire", "left").
//                             select($"a._id", $"a.did", $"a.platform", $"a.country", $"RN_status".as("status"), $"RN_purchase".as("start"),
//                             when($"rf_date".isNotNull, $"rf_date").otherwise($"RN_expire").as("finish"))



// var in_grace =  orders.filter(array_contains($"events.eventType", "SUBSCRIPTION_IN_GRACE_PERIOD")).
//                 select($"_id", $"did", $"platform", $"country", $"renewals", explode($"events").as("ev")).
//                 select($"_id", $"did", $"platform", $"country", $"renewals", to_date($"ev.timestamp").as("GP_start"), $"ev.timestamp".as("timestamp"), $"ev.eventType".as("event")).
//                 withColumn("GP_next_event", lead($"event", 1).over(Window.partitionBy($"_id").orderBy($"timestamp"))).
//                 withColumn("GP_next_event_timestamp", lead($"GP_start", 1).over(Window.partitionBy($"_id").orderBy($"timestamp"))).
//                 withColumn("GP_start_plus7", when($"event" === "SUBSCRIPTION_IN_GRACE_PERIOD", date_add($"GP_start",7))).
//                 filter($"event" === "SUBSCRIPTION_IN_GRACE_PERIOD").
//                 withColumn("min", when($"GP_start_plus7".gt($"GP_next_event_timestamp"), $"GP_next_event_timestamp").otherwise($"GP_start_plus7")).
//                 withColumn("GP_finish", 
//                 when($"GP_next_event".isNotNull, $"min").
//                 when($"GP_next_event".isNull, $"GP_start_plus7")).
//                 select($"_id", $"did", $"platform", $"country", lit("In_Grace_Period").as("status"), $"GP_start".as("start"), $"GP_finish".as("finish"))


// var all_active_users = orders_without_refunds.unionAll(in_grace).
//                         select($"_id", $"did", $"platform", $"country", $"status", to_date($"start").as("start"), to_date($"finish").as("finish"))


// var actives = dates.as("a").join(all_active_users.as("b"), $"a.current_date" >= $"b.start" && $"a.current_date" <= $"b.finish", "left").
//                 select($"current_date", $"_id", $"did", $"b.platform", $"country")
                
// val installs = sqlContext.read.parquet("/analytics/entities/installs")
//     .filter($"app"==="com.picsart.studio")
//     .select(to_date($"timestamp").as("install_date"), $"device_id")
//     .groupBy($"device_id")
//     .agg(min($"install_date").as("install_date"))
//     .select($"install_date", $"device_id")




// val experiment = getEvent("com.picsart.studio", "experiment_participate", from, to)
//     .filter($"experiment_id" === "97de" && $"platform"=== "apple")
//     .select($"timestamp", $"device_id", $"variant", $"country_code")
//     .groupBy($"device_id", $"variant", $"country_code").agg(min($"timestamp").as("timestamp"))
//     .select($"timestamp", $"device_id", $"variant", lower($"country_code").as("cc"))
//     .as("a")
//     .join(installs.as("b"), $"a.device_id" === $"b.device_id" && to_date($"a.timestamp") === $"b.install_date", "left")
//     .select($"a.timestamp", $"a.device_id", $"a.variant", (when($"b.device_id".isNull, lit(false)).otherwise(lit(true))).as("is_new"))
//     .as("a")
//     .join(actives.as("b"), $"a.device_id"===$"b.did" && to_date($"a.timestamp") === $"b.current_date", "left")
//     .select($"a.timestamp", $"a.device_id", $"a.variant", $"a.is_new",(when($"b.did".isNull, lit(false)).otherwise(lit(true))).as("is_subscribed"))
//     .write.mode(org.apache.spark.sql.SaveMode.Overwrite).parquet("/user/nare.silanyan/exp")




// val participation = exp.groupBy(to_date($"timestamp").as("date"), $"is_new", $"is_subscribed").pivot("variant").agg(countDistinct($"device_id")).orderBy($"date").show(100,false)

val exp = sqlContext.read.parquet("/user/nare.silanyan/exp")


val open = getEvent("com.picsart.studio", "edit_item_open", from, to).
    filter($"item"==="sticker" && $"origin"==="editor").
    select(to_date($"timestamp").as("date"), $"device_id", $"editor_sid", $"source")

val categoryk = getEvent("com.picsart.studio", "edit_sticker_category_open", from, to).
    filter($"origin" === "editor" && $"platform" === "apple" && $"editor_sid".isNotNull).
    select($"timestamp", $"device_id", $"editor_sid", $"source")


val subcategory_open = getEvent("com.picsart.studio", "edit_sticker_subcategory_open", from, to).
    filter($"origin" === "editor" && $"platform" === "apple" && $"editor_sid".isNotNull).
    select($"timestamp", $"device_id", $"editor_sid", $"source")


val category_open = categoryk.unionAll(subcategory_open).
    select($"timestamp", $"device_id", $"editor_sid", $"source")

val sticker_try = getEvent("com.picsart.studio", "edit_sticker_try", from, to).
    filter($"origin"==="editor" && $"platform" === "apple" && $"editor_sid".isNotNull).
    select($"timestamp", $"category", $"device_id", $"editor_sid")


val sticker_apply = getEvent("com.picsart.studio", "edit_sticker_apply", from, to).
    filter($"origin" === "editor" && $"platform" === "apple" && $"editor_sid".isNotNull).
    select($"timestamp", $"category", $"device_id", $"editor_sid", $"source")


val editor_done = getEvent("com.picsart.studio", "editor_done_click", from, to).
    filter($"platform" === "apple" && $"editor_sid".isNotNull).
    select($"device_id", $"editor_sid").distinct()



val funnel = exp
    .as("a")
    .join(category_open.as("b"), $"a.device_id" === $"b.device_id" && $"a.timestamp" <= $"b.timestamp", "inner")
    .select(to_date($"b.timestamp").as("date"), $"b.device_id", $"b.editor_sid", $"a.variant", $"a.is_new", $"a.is_subscribed")
    .as("a")
    .join(sticker_try.as("b"), $"a.editor_sid" === $"b.editor_sid", "left")
    .select(
        $"a.date",$"a.variant", $"a.is_new", $"a.is_subscribed",
        $"a.device_id".as("open_dv"), $"a.editor_sid".as("open_sid"), 
        $"b.device_id".as("try_dv"), $"b.editor_sid".as("try_sid"))  
    .as("c")
    .join(sticker_apply.as("d"), $"c.try_sid" === $"d.editor_sid", "left")
    .select(
        $"c.date", $"c.variant", $"c.is_new", $"c.is_subscribed",
        $"c.open_dv", $"c.open_sid", 
        $"c.try_dv", $"c.try_sid", 
        $"d.device_id".as("apply_dv"), 
        $"d.editor_sid".as("apply_sid"))
    .groupBy($"date", $"variant", $"is_new", $"is_subscribed")
    .agg(
        countDistinct($"open_sid").as("open_sid"),
        countDistinct($"open_dv").as("open_dv"),
        countDistinct($"try_sid").as("try_sid"),
        countDistinct($"try_dv").as("try_dv"),
        countDistinct($"apply_sid").as("apply_sid"),
        countDistinct($"apply_dv").as("apply_dv"))

   

val try_apply_category =exp
    .as("a")
    .join(sticker_try.as("b"), $"a.device_id" === $"b.device_id" && $"a.timestamp" <= $"b.timestamp", "inner")
    .select(to_date($"b.timestamp").as("date"), $"b.device_id", $"b.editor_sid", $"a.variant", $"a.is_new", $"a.is_subscribed", $"b.category")
    .as("a")
    .join(sticker_apply.as("b"), $"a.editor_sid" === $"b.editor_sid" && $"a.category" === $"b.category", "left")
    .select($"a.date", $"a.variant", $"a.is_new", $"a.is_subscribed", $"a.category", $"a.editor_sid".as("try"), $"b.editor_sid".as("apply"))
    .groupBy($"date", $"category", $"variant")
    .agg(countDistinct($"try").as("try_sid"), countDistinct($"apply").as("apply_sid"))
    .groupBy($"category", $"variant")
    .agg(avg($"try_sid"), avg($"apply_sid"))
//     .show(100000,false)



val apply_editor_done = exp
    .as("a")
    .join(sticker_apply.as("b"), $"a.device_id" === $"b.device_id" && $"a.timestamp" <= $"b.timestamp", "inner")
    .select(to_date($"b.timestamp").as("date"), $"b.device_id", $"b.editor_sid", $"a.variant", $"a.is_new", $"a.is_subscribed")
    .as("a")
    .join(editor_done.as("b"), $"a.editor_sid" === $"b.editor_sid", "left")
    .select($"a.date", $"a.variant", $"a.editor_sid".as("apply"), $"b.editor_sid".as("done"))
    .groupBy($"date", $"variant")
    .agg(countDistinct($"apply").as("apply"), countDistinct($"done").as("done"))
    .groupBy($"variant")
    .agg(avg($"apply"), avg($"done"))
    .orderBy($"variant")
//     .show(1000,false)


 category_open.as("a").join(exp.as("b"), $"a.device_id"===$"b.device_id" && $"a.timestamp" >= $"b.timestamp", "inner")
.select($"a.timestamp", $"a.editor_sid", $"b.variant", $"a.source")
.groupBy(to_date($"timestamp").as("date"), $"variant", $"source")
.agg(countDistinct($"editor_sid").as("sid"))
.groupBy($"variant", $"source")
.agg(avg($"sid"))
.show(10000,false)








+----------------+-----------+-----------------+
|variant         |source     |avg(sid)         |
+----------------+-----------+-----------------+
|similar stickers|editor     |2.142857142857143|
|original        |default    |38727.0          |
|original        |editor     |4.0              |
|similar stickers|default    |38106.4          |
|similar stickers|more_button|9734.666666666666|
|original        |more_button|9628.2           |
+----------------+-----------+-----------------+



from: org.joda.time.DateTime = 2019-07-09T00:00:00.000Z
to: org.joda.time.DateTime = 2019-07-23T00:00:00.000Z
exp: org.apache.spark.sql.DataFrame = [timestamp: timestamp, device_id: string ... 3 more fields]
open: org.apache.spark.sql.DataFrame = [date: date, device_id: string ... 2 more fields]
categoryk: org.apache.spark.sql.DataFrame = [timestamp: timestamp, device_id: string ... 2 more fields]
subcategory_open: org.apache.spark.sql.DataFrame = [timestamp: timestamp, device_id: string ... 2 more fields]
category_open: org.apache.spark.sql.DataFrame = [timestamp: timestamp, device_id: string ... 2 more fields]
sticker_try: org.apache.spark.sql.DataFrame = [timestamp: timestamp, category: string ... 2 more fields]
sticker_apply: org.apache.spark.sql.DataFrame = [timestamp: timestamp, c...

In [19]:
// import org.apache.hadoop.fs.{Path, FileSystem}
// import org.apache.hadoop.conf.Configuration

// FileSystem.get(new Configuration()).delete(new Path("/user/nare.silanyan/exp"))




import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.conf.Configuration
res12: Boolean = true


+------------------+
|               app|
+------------------+
|               app|
|com.picsart.studio|
+------------------+



installs: Unit = ()


In [11]:
// category_open.as("a").
// join(exp.as("b"), $"a.device_id" === $"b.device_id" && $"a.timestamp" >= $"b.timestamp", "inner").
//     select(to_date($"a.timestamp").as("date"), $"a.device_id", $"a.editor_sid",$"b.variant", $"a.source", $"b.is_new", $"b.is_subscribed").
//     groupBy($"date", $"variant", $"source", $"b.is_new", $"b.is_subscribed").
//     agg(
//         countDistinct($"device_id").as("category_open_dv"), 
//         countDistinct($"editor_sid").as("category_open_sid")).show(1000000,false)


// sticker_try.as("a").
//     join(exp.as("b"), $"a.device_id" === $"b.device_id" && $"a.timestamp" >= $"b.timestamp", "inner").
//     select(to_date($"a.timestamp").as("date"), $"a.device_id", $"a.editor_sid",$"b.variant", $"b.is_new", $"b.is_subscribed").
//     groupBy($"date", $"variant", $"is_new", $"is_subscribed").
//     agg(countDistinct($"device_id").as("try_dv"), count($"editor_sid").as("try_sid"), countDistinct($"editor_sid").as("sid")).show(100000,false)

val object_action_exp = getEvent("com.picsart.studio", "add_object_action", from, to).
    filter($"object_type"==="sticker" && $"origin"==="editor" && $"action" === "similar").
    select($"timestamp", $"device_id", $"editor_sid").as("a").
    join(exp.as("b"), $"a.device_id" === $"b.device_id" && $"a.timestamp" >= $"b.timestamp", "inner").
    select(to_date($"a.timestamp").as("date"), $"a.device_id", $"a.editor_sid",$"b.variant", $"b.is_new", $"b.is_subscribed")

//     object_action_exp.groupBy($"date", $"variant", $"is_new", $"is_subscribed").
//     agg(countDistinct($"device_id").as("similar_dv"), count($"editor_sid").as("similar_sid"), countDistinct($"editor_sid").as("sid")).show(100000,false)

val sticker_apply_similar = sticker_apply.filter($"source" === "similar")

object_action_exp.as("a").join(sticker_apply_similar.as("b"), $"a.editor_sid" === $"b.editor_sid", "left")
.select($"a.date", $"a.variant", $"a.is_new", $"a.is_subscribed", $"a.editor_sid".as("similar_try_sid"), $"b.editor_sid".as("similar_apply_sid"))
.groupBy($"date", $"variant")
.agg(countDistinct($"similar_try_sid").as("similar_try_sid"), countDistinct($"similar_apply_sid").as("similar_apply_sid"))
.groupBy($"variant")
.agg(avg($"similar_try_sid"), avg($"similar_apply_sid"))
.show(10000000,false)



// sticker_apply.as("a").
//     join(exp.as("b"), $"a.device_id" === $"b.device_id" && $"a.timestamp" >= $"b.timestamp", "inner").
//     select(to_date($"a.timestamp").as("date"), $"a.device_id", $"a.editor_sid",$"b.variant", $"a.source", 
//            (when($"a.source"==="similar", lit("similar")).otherwise(lit("no_similar"))).as("new_source"), $"b.is_new", $"b.is_subscribed").
// filter($"new_source" === "no_similar").
//     groupBy($"date", $"variant", $"new_source", $"is_new", $"is_subscribed").
//     agg(countDistinct($"device_id").as("dv"),countDistinct($"editor_sid").as("sid"), count($"editor_sid").as("applies")).show(100000,false)


// sticker_apply.groupBy($"category").agg(countDistinct($"editor_sid").as("apply_sid")).orderBy($"apply_sid".desc).show(1000000,false)

// sticker_try.groupBy($"category").agg(countDistinct($"editor_sid").as("sid")).orderBy($"sid".desc).show(1000000,false)

// val applies_per_applier = sticker_apply.as("a").
//     join(exp.as("b"), $"a.device_id" === $"b.device_id" && $"a.timestamp" >= $"b.timestamp", "inner").
//     select(to_date($"a.timestamp").as("date"), $"a.device_id", $"a.editor_sid",$"b.variant",$"b.is_new", $"b.is_subscribed").
//     groupBy($"date", $"variant", $"is_new", $"is_subscribed").
//     agg(countDistinct($"device_id").as("dv"),countDistinct($"editor_sid").as("sid"), count($"editor_sid").as("applies")).show(100000,false)



+----------------+--------------------+----------------------+
|variant         |avg(similar_try_sid)|avg(similar_apply_sid)|
+----------------+--------------------+----------------------+
|similar stickers|11894.333333333334  |5372.4                |
|original        |10.428571428571429  |4.5                   |
+----------------+--------------------+----------------------+



object_action_exp: org.apache.spark.sql.DataFrame = [date: date, device_id: string ... 4 more fields]
sticker_apply_similar: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [timestamp: timestamp, category: string ... 3 more fields]


In [30]:


var done = getEvent("com.picsart.studio","subscription_done", from, to).
    select($"session_id", $"device_id",$"sub_sid",$"source", $"source_sid",$"timestamp".as("done_timestamp"),$"platform",$"direct_purchase", $"sub_source").
    withColumn("N", row_number().over(Window.partitionBy($"device_id", to_date($"done_timestamp")).orderBy($"done_timestamp".desc))).
    filter($"N" === 1).drop($"N").    
    distinct().
    filter($"source".like("%sticker%") && $"platform" === "apple").
    select($"done_timestamp", $"device_id", $"sub_sid", $"source", $"source_sid").as("a").
    join(exp.as("b"), $"a.device_id"=== $"b.device_id" && $"a.done_timestamp">= $"b.timestamp", "inner").
    select($"a.done_timestamp", $"a.device_id", $"a.sub_sid", $"a.source", $"a.source_sid", $"b.variant", $"b.is_new")





//------------------------------------------------------------------------------ BACKEND DATA
var users = sqlContext.read.parquet("/analytics/entities/users_orders").filter($"device_id". isNotNull).select($"device_id", $"order_id").distinct()

var orders = sqlContext.read.parquet("/analytics/entities/orders").
    withColumn("purchase_dd", to_date($"purchase_date")).
    withColumn("expire_dd", to_date($"expire_date")).
    withColumn("platform", when(($"market" === "google") || ($"market" === "wechat"), lit("android")).otherwise(lit("apple"))).
    filter($"platform"==="apple").
    filter($"purchase_dd" <= to.toString().slice(0, 10) && $"purchase_dd" >= from.toString().slice(0, 10)).
    select($"_id", $"purchase_dd", $"expire_dd", $"renewals", $"platform")

var users_order = orders.as("a").join(users.as("b"), $"a._id" === $"b.order_id", "inner").
    select($"b.device_id", $"a._id", $"purchase_dd", explode($"a.renewals").as("ev")).
    select($"device_id", $"_id", $"purchase_dd", $"ev.purchaseDate".as("RN_purchase"), $"ev.paymentStatus".as("RN_payment")).
    withColumn("N", row_number().over(Window.partitionBy($"_id").orderBy($"rn_purchase")))

//------------------------------------------------------------------------------ FT's AND PAIDS BY SOURCE (from subscription done)
var backend_done = users_order.as("a").
    join(done.as("b"), $"a.device_id" === $"b.device_id" && $"a.purchase_dd" === to_date($"b.done_timestamp"), "inner").
    select($"b.sub_sid", $"b.device_id", $"a.purchase_dd", $"b.source", $"b.variant", $"b.is_new")


backend_done.groupBy($"purchase_dd", $"variant").agg(countDistinct($"device_id"), countDistinct($"sub_sid"))
.show(1000000,false)

    


+-----------+----------------+-------------------------+-----------------------+
|purchase_dd|variant         |count(DISTINCT device_id)|count(DISTINCT sub_sid)|
+-----------+----------------+-------------------------+-----------------------+
|2019-07-19 |original        |3                        |3                      |
|2019-07-13 |similar stickers|4                        |4                      |
|2019-07-22 |similar stickers|5                        |5                      |
|2019-07-20 |similar stickers|4                        |4                      |
|2019-07-10 |original        |3                        |3                      |
|2019-07-17 |similar stickers|2                        |2                      |
|2019-07-12 |original        |4                        |4                      |
|2019-07-22 |original        |8                        |8                      |
|2019-07-15 |original        |6                        |6                      |
|2019-07-23 |similar sticker

done: org.apache.spark.sql.DataFrame = [done_timestamp: timestamp, device_id: string ... 5 more fields]
users: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [device_id: string, order_id: string]
orders: org.apache.spark.sql.DataFrame = [_id: string, purchase_dd: date ... 3 more fields]
users_order: org.apache.spark.sql.DataFrame = [device_id: string, _id: string ... 4 more fields]
backend_done: org.apache.spark.sql.DataFrame = [sub_sid: string, device_id: string ... 4 more fields]


In [13]:
 val df: DataFrame = null

df: org.apache.spark.sql.DataFrame = null


In [31]:
//Retention



def retention(dd: DateTime): Unit={
    
    val new_user = sqlContext.read.parquet("/user/nare.silanyan/exp").
    filter($"is_new"=== true && to_date($"timestamp")=== dd.toString.slice(0,10)).
    select($"timestamp", $"is_subscribed", $"device_id", $"variant")



val sticker_apply = getEvent("com.picsart.studio", "edit_sticker_apply", dd, dd).
    filter($"origin" === "editor" && $"platform" === "apple" && $"device_id".isNotNull).
    select($"timestamp", $"device_id", $"category", $"source").distinct()


val sticker_apply_1 = getEvent("com.picsart.studio", "edit_sticker_apply", dd.plusDays(7), dd.plusDays(7)).
filter($"origin"==="editor" && $"platform" === "apple" && $"device_id".isNotNull).
select(to_date($"timestamp").as("apply_date_1"), $"device_id", $"category").distinct()

    
    
    
val new_sticker_apply = sticker_apply.as("a").
    join(new_user.as("b"), $"a.device_id" === $"b.device_id" && $"a.timestamp" >= $"b.timestamp", "inner").
    select($"b.variant", $"a.device_id".as("apply_dv_0"), to_date($"a.timestamp").as("apply_date"), $"a.category")
    
val new_sticker_apply_0_1 = new_sticker_apply
    .as("a")
    .join(sticker_apply_1.as("b"), $"a.apply_dv_0" === $"b.device_id" /*&& $"a.category" === $"b.category"*/, "left")
    .select($"a.variant", $"a.apply_dv_0", $"a.apply_date", $"b.apply_date_1", $"b.device_id".as("apply_dv_1"), $"a.category")
    .groupBy($"apply_date", $"variant", $"category")
    .agg(countDistinct($"apply_dv_0").as("apply_dv_0"), countDistinct($"apply_dv_1").as("apply_dv_1"))
    .withColumn("join_condition", lit("not_category"))
    
//     df.unionAll(new_sticker_apply_0_1)

    
    
    
    
    
 
    
val similar_new_sticker_apply = sticker_apply.filter($"source" === "similar").as("a").
    join(new_user.as("b"), $"a.device_id" === $"b.device_id" && $"a.timestamp" >= $"b.timestamp", "inner").
    select($"b.variant", $"a.device_id".as("apply_dv_0"), to_date($"a.timestamp").as("apply_date"))
       
val similar_sticker_apply_1 = getEvent("com.picsart.studio", "edit_sticker_apply", dd.plusDays(7), dd.plusDays(7)).
filter($"origin"==="editor" && $"platform" === "apple" && $"device_id".isNotNull /*&& $"source" === "similar"*/).
select(to_date($"timestamp").as("apply_date_1"), $"device_id").distinct()
    
    
val similar_new_sticker_apply_0_1 = similar_new_sticker_apply
    .as("a")
    .join(similar_sticker_apply_1.as("b"), $"a.apply_dv_0" === $"b.device_id" , "left")
    .select($"a.variant", $"a.apply_dv_0", $"a.apply_date", $"b.apply_date_1", $"b.device_id".as("apply_dv_1"))
    .groupBy($"apply_date", $"variant")
    .agg(countDistinct($"apply_dv_0").as("apply_dv_0"), countDistinct($"apply_dv_1").as("apply_dv_1"))
        .withColumn("join_condition", lit("not_category"))

        
    
    
 
//     return new_sticker_apply_0_1
//     return similar_new_sticker_apply_0_1
    
    
// new_sticker_apply_0_1
//     .write.mode(org.apache.spark.sql.SaveMode.Append)
//     .parquet("/user/nare.silanyan/7_day_retention_sticker_apply_applied")
    
// similar_new_sticker_apply_0_1
//     .write.mode(org.apache.spark.sql.SaveMode.Append)
//     .parquet("/user/nare.silanyan/7_day_retention_similar_sticker_apply")
    
//     publish(new_sticker_apply_0_1, "7_day_retention_sticker_apply_category_applied", true)
//     publish(similar_new_sticker_apply_0_1, "7_day_retention_similar_sticker_apply_category_applied", true)

    
}




retention: (dd: org.joda.time.DateTime)Unit


In [32]:
val from_ret = DateTime.parse("2019-07-09")
val to_ret = DateTime.parse("2019-07-16")
var dd = from_ret


while(dd.isBefore(to_ret.plusDays(1))){
    retention(dd)
    println("Finished for" + ": " + dd.toString.slice(0,10) + "  " + "Started for" + ": " + dd.plusDays(1).toString.slice(0,10))
    dd = dd.plusDays(1)

}

// retention(dd).show(100,false)

Finished for: 2019-07-09  Started for: 2019-07-10
Finished for: 2019-07-10  Started for: 2019-07-11
Finished for: 2019-07-11  Started for: 2019-07-12
Finished for: 2019-07-12  Started for: 2019-07-13
Finished for: 2019-07-13  Started for: 2019-07-14
Finished for: 2019-07-14  Started for: 2019-07-15
Finished for: 2019-07-15  Started for: 2019-07-16
Finished for: 2019-07-16  Started for: 2019-07-17


from_ret: org.joda.time.DateTime = 2019-07-09T00:00:00.000Z
to_ret: org.joda.time.DateTime = 2019-07-16T00:00:00.000Z
dd: org.joda.time.DateTime = 2019-07-17T00:00:00.000Z


In [6]:
sqlContext.read.parquet("/user/nare.silanyan/7_day_retention_similar_sticker_apply")
.filter($"join_condition" === "not_category")
.select($"apply_date", $"variant", $"apply_dv_0", $"apply_dv_1")
.groupBy($"apply_date", $"variant")
.agg(sum($"apply_dv_0").as("apply_dv_0"), sum($"apply_dv_1").as("apply_dv_1"))
.groupBy($"variant")
.agg(avg($"apply_dv_0").as("apply_dv_0"), avg($"apply_dv_1").as("apply_dv_1"))
.orderBy($"variant",$"apply_dv_0".desc)
.show(100,false)

+----------------+----------+----------+
|variant         |apply_dv_0|apply_dv_1|
+----------------+----------+----------+
|original        |2.0       |0.0       |
|similar stickers|302.75    |24.0      |
+----------------+----------+----------+



In [22]:

// import org.apache.hadoop.fs.{Path, FileSystem}
// import org.apache.hadoop.conf.Configuration
// FileSystem.get(new Configuration).delete(new Path("/user/nare.silanyan/7_day_retention_sticker_apply_category_applied"))

import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.conf.Configuration
res14: Boolean = true


In [3]:
def retention(dd: DateTime, tt : DateTime): DataFrame={
    
    val new_user = sqlContext.read.parquet("/user/nare.silanyan/exp").
    filter($"is_new"=== true && to_date($"timestamp")>= dd.toString.slice(0,10) && to_date($"timestamp")<= tt.toString.slice(0,10)).
    select($"timestamp", $"is_subscribed", $"device_id", $"variant")



val sticker_apply = getEvent("com.picsart.studio", "edit_sticker_apply", dd, tt).
    filter($"origin" === "editor" && $"platform" === "apple" && $"device_id".isNotNull).
    select($"timestamp", $"device_id", $"category", $"source").distinct()


val sticker_apply_1 = getEvent("com.picsart.studio", "edit_sticker_apply", dd.plusDays(7), dd.plusDays(14)).
filter($"origin"==="editor" && $"platform" === "apple" && $"device_id".isNotNull).
select($"device_id", $"category").distinct()

    
    
    
val new_sticker_apply = sticker_apply.as("a").
    join(new_user.as("b"), $"a.device_id" === $"b.device_id" && to_date($"a.timestamp") === to_date($"b.timestamp"), "inner").
    select($"b.variant", $"a.device_id".as("apply_dv_0"), to_date($"a.timestamp").as("apply_date"), $"a.category")
    
val new_sticker_apply_0_1 = new_sticker_apply
    .as("a")
    .join(sticker_apply_1.as("b"), $"a.apply_dv_0" === $"b.device_id" /*&& $"a.category" === $"b.category"*/, "left")
    .select($"a.variant", $"a.apply_dv_0",  $"b.device_id".as("apply_dv_week"), $"a.category")
    .groupBy($"variant", $"category")
    .agg(countDistinct($"apply_dv_0"), countDistinct($"apply_dv_week"))
    

    
    
    
    
    
 
    
val similar_new_sticker_apply = sticker_apply.filter($"source" === "similar").as("a").
    join(new_user.as("b"), $"a.device_id" === $"b.device_id" && $"a.timestamp" >= $"b.timestamp", "inner").
    select($"b.variant", $"a.device_id".as("apply_dv_0"), to_date($"a.timestamp").as("apply_date"))
       
val similar_sticker_apply_1 = getEvent("com.picsart.studio", "edit_sticker_apply", dd.plusDays(7), dd.plusDays(14)).
filter($"origin"==="editor" && $"platform" === "apple" && $"device_id".isNotNull && $"source" === "similar").
select(to_date($"timestamp").as("apply_date_1"), $"device_id").distinct()
    
    
val similar_new_sticker_apply_0_1 = similar_new_sticker_apply
    .as("a")
    .join(similar_sticker_apply_1.as("b"), $"a.apply_dv_0" === $"b.device_id" , "left")
    .select($"a.variant", $"a.apply_dv_0", $"a.apply_date", $"b.apply_date_1", $"b.device_id".as("apply_dv_1"))
    .groupBy($"variant")
    .agg(countDistinct($"apply_dv_0"), countDistinct($"apply_dv_1"))
        
    
    
 
//     return new_sticker_apply_0_1
    return similar_new_sticker_apply_0_1

    
}





retention: (dd: org.joda.time.DateTime, tt: org.joda.time.DateTime)org.apache.spark.sql.DataFrame


In [13]:
val from_ret = DateTime.parse("2019-07-09")
val to_ret = DateTime.parse("2019-07-15")

val dd = from_ret
val tt = to_ret

retention(dd, tt).show(100,false)


+----------------+--------------------------+--------------------------+
|variant         |count(DISTINCT apply_dv_0)|count(DISTINCT apply_dv_1)|
+----------------+--------------------------+--------------------------+
|similar stickers|2635                      |456                       |
|original        |1                         |0                         |
+----------------+--------------------------+--------------------------+



from_ret: org.joda.time.DateTime = 2019-07-09T00:00:00.000Z
to_ret: org.joda.time.DateTime = 2019-07-15T00:00:00.000Z
dd: org.joda.time.DateTime = 2019-07-09T00:00:00.000Z
tt: org.joda.time.DateTime = 2019-07-15T00:00:00.000Z


In [9]:
val from = DateTime.parse("2019-08-30")


val sticker_apply_subcategory = getEvent("com.picsart.studio", "edit_sticker_apply", from, from)
.filter($"origin"==="editor")
.select(to_date($"timestamp").as("apply_date"), $"editor_sid", $"subcategory")
.groupBy($"subcategory")
.agg(countDistinct($"editor_sid"))





from: org.joda.time.DateTime = 2019-08-30T00:00:00.000Z
sticker_apply_subcategory: org.apache.spark.sql.DataFrame = [subcategory: string, count(DISTINCT editor_sid): bigint]
