In [1]:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

In [2]:
val df = spark.read.option("header", "true").csv("/tmp/nithin/3p/dump/ct_rr.csv").cache

Waiting for a Spark session to start...

df = [ts: string, number: string ... 4 more fields]


[ts: string, number: string ... 4 more fields]

In [4]:
df.show(2)

+-------------------+------+----------+-----------------+---------+-----------------+
|                 ts|number|  pick_lat|         pick_lng| drop_lat|         drop_lng|
+-------------------+------+----------+-----------------+---------+-----------------+
|2018-04-07 07:07:17| 14626|12.3136215|76.65819499999998|12.287301|76.60228000000002|
|2018-04-07 07:32:27| 85490| 12.943947|        77.560745|12.954014|         77.54377|
+-------------------+------+----------+-----------------+---------+-----------------+
only showing top 2 rows



In [15]:
object AggType extends Enumeration {
    type AggType = Value  
    protected case class Val(aggTypeName: String) extends super.Val {}

    implicit def valueToFormatVal(x: Value): Val = x.asInstanceOf[Val]

    val hourly = Val("hourly")
    val daily = Val("daily")
    val monthly = Val("monthly")

}

defined object AggType




In [17]:
import java.time.LocalDate
def avg_agg_stats(df: DataFrame, startDate: LocalDate, endDate: LocalDate) = {

    val tmp = df
        .filter('ts.between(startDate.toString, endDate.toString))
        .withColumn("date", to_date('ts))
        .withColumn("month", month('ts))
        .withColumn("hour", hour('ts))
        .cache
    
    val dailyAggDf = tmp.groupBy('number, 'date).agg(count("*").as("daily_cnt"))
    val hourlyAggDf = tmp.groupBy('number, 'hour).agg(count("*").as("hourly_cnt"))
    val monthlyAggDf = tmp.groupBy('number, 'month).agg(count("*").as("monthly_cnt"))
    
    
    val avg_daily = dailyAggDf.select(avg('daily_cnt)).head().getAs[Double](0)
    val avg_hourly = hourlyAggDf.select(avg('hourly_cnt)).head().getAs[Double](0)
    val avg_monthly = monthlyAggDf.select(avg('monthly_cnt)).head().getAs[Double](0)
    
    
    val res = s"Avg. of aggregated counts:\n\t * hourly: ${avg_hourly}\n\t * daily: ${avg_daily}\n\t * monthly: ${avg_monthly}"
    println(res)
} 

avg_agg_stats: (df: org.apache.spark.sql.DataFrame, startDate: java.time.LocalDate, endDate: java.time.LocalDate)Unit


In [19]:
val startDate = LocalDate.parse("2018-04-01")
val endDate = LocalDate.parse("2018-04-10")
avg_agg_stats(df, startDate, endDate)

Avg. of aggregated counts:
	 * hourly: 2.392940542482486
	 * daily: 3.0919113380526864
	 * monthly: 3.973009245451834


startDate = 2018-04-01
endDate = 2018-04-10


2018-04-10

In [7]:
val tmp = df

        .withColumn("date", to_date('ts))
        .withColumn("month", month('ts))
        .withColumn("hour", hour('ts))

tmp = [ts: string, number: string ... 7 more fields]


[ts: string, number: string ... 7 more fields]

In [10]:
val dailyAggDf = tmp.groupBy('number, 'date).agg(count("*").as("daily_cnt"))
val hourlyAggDf = tmp.groupBy('number, 'hour).agg(count("*").as("hourly_cnt"))
val monthlyAggDf = tmp.groupBy('number, 'month).agg(count("*").as("monthly_cnt"))


dailyAggDf.select(avg('daily_cnt)).head().getAs[Double](0)

dailyAggDf = [number: string, date: date ... 1 more field]
hourlyAggDf = [number: string, hour: int ... 1 more field]
monthlyAggDf = [number: string, month: int ... 1 more field]


3.3086164774039366

### Cohort 

In [27]:
import java.time.LocalDate
val startDate = LocalDate.parse("2018-04-01")
val noOfWeeks = 3
val endDate = startDate.plusDays(noOfWeeks * 7)

val filteredDf = df.filter('ts.between(lit(startDate.toString), lit(endDate.toString)))
    .withColumn("week", weekofyear('ts))
    .dropDuplicates("number", "week")
    .select('number, 'week)
    .cache

val minWeek = filteredDf.select(min('week)).head().getAs[Int](0)

startDate = 2018-04-01
noOfWeeks = 3
endDate = 2018-04-22
filteredDf = [number: string, week: int]
minWeek = 14


14

In [5]:
val retentionWindow = Window.partitionBy('number).orderBy('week.asc)
val ret = filteredDf
    .withColumn("first_week", min('week).over(retentionWindow))
    //.withColumn("retention_week", 'week - 'first_week)
    .orderBy('number, 'week)

ret.show

+------+----+----------+
|number|week|first_week|
+------+----+----------+
|  None|  14|        14|
|  None|  15|        14|
|  None|  16|        14|
| 00004|  15|        15|
| 00005|  16|        16|
| 00007|  16|        16|
| 00009|  14|        14|
| 00009|  15|        14|
| 00010|  14|        14|
| 00013|  16|        16|
| 00017|  14|        14|
| 00025|  16|        16|
| 00029|  15|        15|
| 00030|  16|        16|
| 00040|  15|        15|
| 00040|  16|        15|
| 00042|  14|        14|
| 00042|  16|        14|
| 00053|  14|        14|
| 00053|  15|        14|
+------+----+----------+
only showing top 20 rows



retentionWindow = org.apache.spark.sql.expressions.WindowSpec@7ccde36b
ret = [number: string, week: int ... 1 more field]


[number: string, week: int ... 1 more field]

In [6]:
val dist = ret.groupBy('first_week, 'week).agg(count("number").as("tot_users"))
.orderBy('first_week, 'week)
dist.show

+----------+----+---------+
|first_week|week|tot_users|
+----------+----+---------+
|        14|  14|     4345|
|        14|  15|     2718|
|        14|  16|     2312|
|        15|  15|     8848|
|        15|  16|     3971|
|        16|  16|     4513|
+----------+----+---------+



dist = [first_week: int, week: int ... 1 more field]


[first_week: int, week: int ... 1 more field]

In [46]:
dist.groupBy('first_week).pivot('week).agg(first('tot_users))
    .orderBy('first_week)
    .show

+----------+----+----+----+
|first_week|  14|  15|  16|
+----------+----+----+----+
|        14|4345|2718|2312|
|        15|null|8848|3971|
|        16|null|null|4513|
+----------+----+----+----+



In [20]:
val firstWeek = filteredDf
    .withColumn("week", weekofyear('ts))
    .groupBy('number).agg(min('week).as("first_week"))
firstWeek.count

firstWeek = [number: string, first_week: int]


17706

In [21]:
val newUsers = firstWeek.groupBy('first_week).agg(count('number).as("new_users"))
newUsers.show

+----------+---------+
|first_week|new_users|
+----------+---------+
|        16|     4513|
|        15|     8848|
|        14|     4345|
+----------+---------+



newUsers = [first_week: int, new_users: bigint]


[first_week: int, new_users: bigint]

In [33]:
  def cohort(df: DataFrame, startDate: LocalDate, noOfWeeks: Int): DataFrame = {
    val endDate = startDate.plusWeeks(noOfWeeks)

    val filteredDf = df.filter('ts.between(lit(startDate.toString), lit(endDate.toString)))
      .withColumn("week", weekofyear('ts))
      .dropDuplicates("number", "week")
      .select('number, 'week)
      .cache

    val minWeek = filteredDf.select(min('week)).head().getAs[Int](0)
    val weekRange = (minWeek until minWeek + noOfWeeks+1)
    
    val retentionWindow = Window.partitionBy('number).orderBy('week.asc)

    val distribution = filteredDf
      .withColumn("first_week", min('week).over(retentionWindow))
      .groupBy('first_week, 'week)
      .agg(count('number).as("tot_users"))

    val result = distribution.groupBy('first_week)
      .pivot('week, weekRange).agg(first('tot_users))
      .orderBy('first_week)

    result
  }

cohort: (df: org.apache.spark.sql.DataFrame, startDate: java.time.LocalDate, noOfWeeks: Int)org.apache.spark.sql.DataFrame


In [34]:
val st = LocalDate.parse("2019-01-12")
val t = cohort(df, st, 5)
t.show(false)

+----------+-----+-----+-----+-----+-----+-----+
|first_week|2    |3    |4    |5    |6    |7    |
+----------+-----+-----+-----+-----+-----+-----+
|2         |12997|9473 |9038 |8946 |9084 |8286 |
|3         |null |22454|13311|13014|13406|12180|
|4         |null |null |11651|5437 |5576 |4848 |
|5         |null |null |null |7848 |3547 |3039 |
|6         |null |null |null |null |6679 |2556 |
|7         |null |null |null |null |null |4397 |
+----------+-----+-----+-----+-----+-----+-----+



st = 2019-01-12
t = [first_week: int, 2: bigint ... 5 more fields]


[first_week: int, 2: bigint ... 5 more fields]

In [32]:
cohort(df, LocalDate.parse("2018-04-01"), 3).show

+----------+----+----+----+
|first_week|  14|  15|  16|
+----------+----+----+----+
|        14|4345|2718|2312|
|        15|null|8848|3971|
|        16|null|null|4513|
+----------+----+----+----+



f: [T](v: T)T


class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema