In [1]:
!pip install pyspark

!apt-get install openjdk-8-jdk-headless -qq > /dev/null

!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop2.7.tgz
!tar xf spark-3.2.1-bin-hadoop2.7.tgz

!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop2.7"

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 32 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 31.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=511ec9bfdb0c62f454e49fcb9a8ce27cd6f9e8a1710e2bb672a7974511c29be2
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("app_analytics").getOrCreate()
spark

In [3]:
uicycles = spark.read.format("csv").option("header",True).option("inferSchema",True).load("uicycles.csv")

In [4]:
uicycles.printSchema()

root
 |-- uuid: string (nullable = true)
 |-- os: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- creation_date: string (nullable = true)



In [5]:
uicycles.show(5, truncate = False)

+----------------+-------+----------+---------------------+
|uuid            |os     |event_type|creation_date        |
+----------------+-------+----------+---------------------+
|a9ba6d0cd8586a4f|Android|install   |2016-08-06 19:54:47.0|
|7365ac7d0721bbf0|Android|re-install|2016-08-14 16:04:45.0|
|4bf1a1368533ad74|Android|uninstall |2016-08-06 17:40:18.0|
|8f2a30a777e2714b|Android|install   |2016-08-31 05:25:24.0|
|cd002b38b452db4c|Android|install   |2016-08-20 20:19:06.0|
+----------------+-------+----------+---------------------+
only showing top 5 rows



In [6]:
uicycles.schema

StructType(List(StructField(uuid,StringType,true),StructField(os,StringType,true),StructField(event_type,StringType,true),StructField(creation_date,StringType,true)))

In [7]:
eventdump=spark.read.format('csv').option('header','True').option('escape','"').load("eventdump.csv")

In [8]:
eventdump.show(5, truncate = False)

+----------------+--------------------------+---------------+----------------------------------------------------------------------------------------------------------------------+
|uuid            |event                     |event_timestamp|properties                                                                                                            |
+----------------+--------------------------+---------------+----------------------------------------------------------------------------------------------------------------------+
|43e1796c34ac2a91|Cart is viewed            |07:56.9        |{"email_id": "careers24@gmail.com", "timeStamp": "29-08-2016 07:43:10", "No Of Products": 1, "Cart Value": "1495"}    |
|3065bf9960737af |Cart is viewed            |04:00.1        |{"email_id": "devangnipathak@gmail.com", "timeStamp": "29-08-2016 08:39:03", "No Of Products": 4, "Cart Value": "877"}|
|636677073e9445e3|Search results page viewed|26:23.2        |{"category": "Panties", "timeStamp

In [9]:
eventdump_exp = eventdump.withColumn("properties", explode(split(col("properties"), ", "))).withColumn("key", split(col("properties"), "\":").getItem(0)).withColumn("value", split(col("properties"), "\":").getItem(1)).withColumn("properties", create_map(col("key"), col("value")))

In [10]:
eventdump_exp.show(5, truncate=False)

+----------------+--------------+---------------+-------------------------------------------+---------------+---------------------------+
|uuid            |event         |event_timestamp|properties                                 |key            |value                      |
+----------------+--------------+---------------+-------------------------------------------+---------------+---------------------------+
|43e1796c34ac2a91|Cart is viewed|07:56.9        |{{"email_id ->  "careers24@gmail.com"}     |{"email_id     | "careers24@gmail.com"     |
|43e1796c34ac2a91|Cart is viewed|07:56.9        |{"timeStamp ->  "29-08-2016 07:43:10"}     |"timeStamp     | "29-08-2016 07:43:10"     |
|43e1796c34ac2a91|Cart is viewed|07:56.9        |{"No Of Products ->  1}                    |"No Of Products| 1                         |
|43e1796c34ac2a91|Cart is viewed|07:56.9        |{"Cart Value ->  "1495"}}                  |"Cart Value    | "1495"}                   |
|3065bf9960737af |Cart is viewed|0

In [11]:
eventdump_exp1 = eventdump_exp.withColumn('key', regexp_replace('key', '^\{|\"', '')).withColumn('value', regexp_replace('value', '\}$|\"', ''))

In [12]:
eventdump_exp1.show(5, truncate=False)

+----------------+--------------+---------------+-------------------------------------------+--------------+-------------------------+
|uuid            |event         |event_timestamp|properties                                 |key           |value                    |
+----------------+--------------+---------------+-------------------------------------------+--------------+-------------------------+
|43e1796c34ac2a91|Cart is viewed|07:56.9        |{{"email_id ->  "careers24@gmail.com"}     |email_id      | careers24@gmail.com     |
|43e1796c34ac2a91|Cart is viewed|07:56.9        |{"timeStamp ->  "29-08-2016 07:43:10"}     |timeStamp     | 29-08-2016 07:43:10     |
|43e1796c34ac2a91|Cart is viewed|07:56.9        |{"No Of Products ->  1}                    |No Of Products| 1                       |
|43e1796c34ac2a91|Cart is viewed|07:56.9        |{"Cart Value ->  "1495"}}                  |Cart Value    | 1495                    |
|3065bf9960737af |Cart is viewed|04:00.1        |{{"ema

In [13]:
from pyspark.sql import functions as F
eventdump_fin = eventdump_exp1.groupby('uuid','event','event_timestamp').pivot('key').agg(F.first('value'))

In [14]:
eventdump_fin.printSchema()

root
 |-- uuid: string (nullable = true)
 |-- event: string (nullable = true)
 |-- event_timestamp: string (nullable = true)
 |-- : string (nullable = true)
 |-- 2xl,l size in inches: string (nullable = true)
 |-- 2xl,l size in inches}: string (nullable = true)
 |-- Brand Name: string (nullable = true)
 |-- Cart Value: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Login From: string (nullable = true)
 |-- Login Type: string (nullable = true)
 |-- No Of Products: string (nullable = true)
 |-- PINCode Status: string (nullable = true)
 |-- PUSH MESSAGE: string (nullable = true)
 |-- PinCode: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Product SKU: string (nullable = true)
 |-- Search Query: string (nullable = true)
 |-- Search String: string (nullable = true)
 |-- SignIn From: string (nullable = true)
 |-- SignIn Type: string (nullable = true)
 |-- Sub-Category name: string (nullable = true)
 |-- adid=[120565720720]: string (nullable =

In [15]:
eventdump_fin1 = eventdump_fin.withColumn("timeStamp",to_timestamp(col("timeStamp")," dd-MM-yyyy hh:mm:ss"))

In [16]:
uicycles.createOrReplaceTempView("uicycles")
eventdump_fin1.createOrReplaceTempView("eventsdump")

**## Customer retention trends from their lifetime cycles [frequency chart or histogram plot] [retention is defined as the duration of one install-uninstall cycle, so multiple re-installs have to be treated separately]**

In [17]:
from pyspark.sql.window import Window
from pyspark.sql.functions import * #lot of functions available here

In [18]:
#create windowing object
windowSpec = Window.partitionBy("uuid").orderBy("creation_date")

In [19]:
uicycles1 = uicycles.withColumn("nxt_event_type",lead("event_type",1).over(windowSpec)).withColumn("nxt_creation_date",lead("creation_date",1).over(windowSpec))
uicycles1.createOrReplaceTempView("uicycles1")

In [20]:
uicycles1.show(5, truncate=False)

+----------------+-------+----------+---------------------+--------------+---------------------+
|uuid            |os     |event_type|creation_date        |nxt_event_type|nxt_creation_date    |
+----------------+-------+----------+---------------------+--------------+---------------------+
|null            |null   |install   |2016-08-23 14:59:00.0|install       |2016-08-26 03:59:45.0|
|null            |null   |install   |2016-08-26 03:59:45.0|null          |null                 |
|085d478f5ec1c379|Android|install   |2016-08-29 02:31:34.0|uninstall     |2016-08-30 05:51:25.0|
|085d478f5ec1c379|Android|uninstall |2016-08-30 05:51:25.0|null          |null                 |
|10001b8438ddec6 |Android|install   |2016-08-08 10:36:31.0|uninstall     |2016-08-09 17:33:51.0|
+----------------+-------+----------+---------------------+--------------+---------------------+
only showing top 5 rows



In [21]:
spark.sql("""select datediff(nxt_creation_date, creation_date) as life_cycle_days, count(uuid) as cnt
          from uicycles1 
          where event_type in ("install", "re-install")
          and nxt_event_type = "uninstall"
          group by life_cycle_days
          order by cnt desc""").show(10)

+---------------+-----+
|life_cycle_days|  cnt|
+---------------+-----+
|              1|20828|
|              2| 6675|
|              3| 2425|
|              4| 1715|
|              5| 1246|
|              6| 1146|
|              7|  912|
|              8|  828|
|              9|  674|
|             10|  573|
+---------------+-----+
only showing top 10 rows



**## Find out the time of day when the customers are most active [use your own discretion for time of day bucketing] [activity is defined on the basis of events]**

In [22]:
spark.sql(""" 
        select extract(hour from timeStamp) as time_of_day, count(event) as cnt
        from eventsdump
        group by time_of_day
        order by cnt desc"""
     ).show()

+-----------+------+
|time_of_day|   cnt|
+-----------+------+
|       null|168151|
|         11|112187|
|          0| 82802|
|         10| 78383|
|          9| 68743|
|          8| 61429|
|          1| 59095|
|          7| 54814|
|          6| 53895|
|          4| 50133|
|          5| 49429|
|          2| 48364|
|          3| 46999|
+-----------+------+



In [24]:
spark.sql(""" 
        select case when hour(timeStamp) <=6 then '00-06'
        when hour(timeStamp) <=12 then '06-12'
        when hour(timeStamp) <=18 then '12-18'
        when hour(timeStamp) <=24 then '18-24'
        end as time_of_day, count(event) as cnt
        from eventsdump
        group by time_of_day
        order by cnt desc""").show()

+-----------+------+
|time_of_day|   cnt|
+-----------+------+
|      00-06|390717|
|      06-12|375556|
|       null|168151|
+-----------+------+



**## Purchase value buckets [find purchase/checkout events from event logs and parse the 'properties' column to get total value associated and generate a simple bucketed frequency chart/histogram plot]**

In [25]:
spark.sql(""" 
        select distinct event
        from eventsdump""").show(100, truncate = False)

+-----------------------------------------------------------------+
|event                                                            |
+-----------------------------------------------------------------+
|Sub-Category is viewed                                           |
|Cart is viewed                                                   |
|Checkout is completed by Cash On Delivery                        |
|Place Order is initiated                                         |
|Product review is viewed                                         |
|Checkout is completed by Credit Cards / Debit Cards / Net Banking|
|Product is shared                                                |
|Checkout is completed by PG                                      |
|Checkout is completed by Paid using zCoins                       |
|Signup failed                                                    |
|User has logged in                                               |
|Brand page is viewed                           

In [26]:
spark.sql(""" 
        select total, count(event) as cnt
        from eventsdump
        where event like 'Checkout%'
        group by total
        order by cnt """).show()

+-----+---+
|total|cnt|
+-----+---+
|  805|  1|
|  726|  1|
| 1711|  1|
|  816|  1|
|  484|  1|
| 1026|  1|
| 1883|  1|
| 1654|  1|
|  904|  1|
|  606|  1|
| 2541|  1|
| 2924|  1|
| 1169|  1|
| 5200|  1|
| 1017|  1|
| 2118|  1|
|  705|  1|
| 6565|  1|
| 1873|  1|
| 1142|  1|
+-----+---+
only showing top 20 rows



**## Behavior of purchasing and non-purchasing customers [something along the lines of their in-app event frequency in a given install-uninstall cycle]**

In [27]:
spark.sql("""select cust_type, count(event) as event_freq from (
        select a.*, b.cust_type, b.event, b.timeStamp
        from uicycles1 a
        left join
        (select *, case when event like 'Checkout%' then 'purchasing'
        else 'non-purchasing' end as cust_type
        from eventsdump) b
        on a.uuid = b.uuid
        where a.event_type in ("install", "re-install") and a.nxt_event_type = "uninstall"
        and b.timeStamp between creation_date and nxt_creation_date
        ) where cust_type is not null
        group by cust_type
        order by event_freq""").show()

+--------------+----------+
|     cust_type|event_freq|
+--------------+----------+
|    purchasing|        69|
|non-purchasing|     25020|
+--------------+----------+



**## Week over Week revenue trends for purchasing customers**

In [28]:
spark.sql(""" 
        select year(timeStamp) as year, weekofyear(timeStamp) as week, sum(total) as tot_amt
        from eventsdump
        where event like 'Checkout%'
        group by year, week
        order by tot_amt """).show()

+----+----+---------+
|year|week|  tot_amt|
+----+----+---------+
|2016|  40|    513.0|
|2016|  36|    779.0|
|2016|  26|    889.0|
|2016|  39|   1023.0|
|2016|  27|   1220.0|
|2016|  33|   2398.0|
|2016|  35|1880301.0|
|2016|  34|2472212.0|
+----+----+---------+



**## How their purchases are distributed post install? [the number and value of purchases after installing the app in one retention cycle]**

In [29]:
spark.sql("""select count(event) as num_purchase, sum(total) as tot_amt from (
        select a.*, b.event, b.timeStamp, b.total
        from uicycles1 a
        left join
        (select * from eventsdump
        where event like 'Checkout%') b
        on a.uuid = b.uuid
        where a.event_type in ("install", "re-install") and a.nxt_event_type = "uninstall"
        and b.timeStamp between creation_date and nxt_creation_date)
        order by num_purchase desc""").show()

+------------+-------+
|num_purchase|tot_amt|
+------------+-------+
|          69|60549.0|
+------------+-------+



**## Do they perform purchases in the 2nd, 3rd etc weeks post install? [if their retention cycle is greater than 1 week]**

In [30]:
spark.sql("""select week_post_install, count(event) as num_purchase from 
            (
            select a.*, b.event, ceil(datediff(b.timeStamp, creation_date)/7) as week_post_install, b.total
            from uicycles1 a
            left join
            (select * from eventsdump
            where event like 'Checkout%') b
            on a.uuid = b.uuid
            where a.event_type in ("install", "re-install") and a.nxt_event_type = "uninstall"
            and b.timeStamp between creation_date and nxt_creation_date
            )
        group by week_post_install
        order by week_post_install""").show()

+-----------------+------------+
|week_post_install|num_purchase|
+-----------------+------------+
|                0|          16|
|                1|          40|
|                2|           6|
|                3|           6|
|                4|           1|
+-----------------+------------+



**## Is there a steady inflow of revenue for customers with high retention? [Growth can decline but is it still a positive gradient?]**

In [31]:
spark.sql("""select retention_period, week_post_install, sum(total) as tot_rev from 
            (
            select a.*, datediff(nxt_creation_date, creation_date) as retention_period,
            b.event, ceil(datediff(b.timeStamp, creation_date)/7) as week_post_install, b.total
            from uicycles1 a
            left join
            (select * from eventsdump
            where event like 'Checkout%') b
            on a.uuid = b.uuid
            where a.event_type in ("install", "re-install") and a.nxt_event_type = "uninstall"
            and b.timeStamp between creation_date and nxt_creation_date
            )
        group by retention_period, week_post_install
        order by retention_period desc, week_post_install asc""").show()

+----------------+-----------------+-------+
|retention_period|week_post_install|tot_rev|
+----------------+-----------------+-------+
|              26|                3| 1434.0|
|              24|                4|  663.0|
|              23|                3|  405.0|
|              22|                3| 2266.0|
|              19|                3|  488.0|
|              18|                3| 1810.0|
|              17|                2| 1048.0|
|              16|                2|  364.0|
|              14|                2|  668.0|
|               9|                1| 2115.0|
|               9|                2| 1039.0|
|               8|                1| 3302.0|
|               7|                1| 1292.0|
|               6|                1| 4712.0|
|               5|                1| 5825.0|
|               4|                0| 1119.0|
|               4|                1| 3342.0|
|               3|                0| 2913.0|
|               3|                1| 7741.0|
|         

**## Any other actionable insights that can be drawn from the given data?**

In [33]:
spark.sql(""" 
        select year(timeStamp) as year, month(timeStamp) as month, sum(total) as tot_amt
        from eventsdump
        where event like 'Checkout%'
        group by year, month
        order by tot_amt """).show()

+----+-----+---------+
|year|month|  tot_amt|
+----+-----+---------+
|2016|   10|    513.0|
|2016|    7|   2109.0|
|2016|    9|  53297.0|
|2016|    8|4303416.0|
+----+-----+---------+



In [34]:
spark.sql(""" 
        select weekday(timeStamp) as week_of_day, count(event) as cnt
        from eventsdump
        group by week_of_day
        order by cnt desc"""
     ).show()

+-----------+------+
|week_of_day|   cnt|
+-----------+------+
|       null|168151|
|          6|143533|
|          5|128747|
|          2|121584|
|          1|111686|
|          4| 99648|
|          0| 98787|
|          3| 62288|
+-----------+------+



In [35]:
spark.sql(""" 
        select case when weekday(timeStamp) =1 then 'Monday'
        when weekday(timeStamp) =2 then 'Tuesday'
        when weekday(timeStamp) =3 then 'Wednesday'
        when weekday(timeStamp) =4 then 'Thursday'
        when weekday(timeStamp) =5 then 'Friday'
        when weekday(timeStamp) =6 then 'Saturday'
        when weekday(timeStamp) =0 then 'Sunday'
        end as week_of_day, count(event) as cnt
        from eventsdump
        group by week_of_day
        order by cnt desc""").show()

+-----------+------+
|week_of_day|   cnt|
+-----------+------+
|       null|168151|
|   Saturday|143533|
|     Friday|128747|
|    Tuesday|121584|
|     Monday|111686|
|   Thursday| 99648|
|     Sunday| 98787|
|  Wednesday| 62288|
+-----------+------+

