In [1]:
sc.version

u'1.6.2'

In [2]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window as W

# Finding Insights/Rough Work -- Start

In [3]:
df = sqlContext.read.parquet("data")

In [4]:
#priniting Schema for checking data types
df.printSchema()

root
 |-- collector_ts: timestamp (nullable = true)
 |-- event_id: string (nullable = true)
 |-- device_tstamp: timestamp (nullable = true)
 |-- cookie_id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- email_id: string (nullable = true)
 |-- page_id: string (nullable = true)
 |-- date: string (nullable = true)



In [5]:
#checking number of cookies against email_ids
df.groupby("email_id").agg(F.countDistinct("cookie_id").alias("ccount")).where("ccount>1").show()

+--------------------+------+
|            email_id|ccount|
+--------------------+------+
|4a1331a5b49024cfe...|     2|
|6ba80b6ade87f9c8f...|     3|
|f4b613ea28504de3a...|     2|
|fbb11341b453f2427...|     2|
|f11d2a90fe01341e7...|     2|
|a74c9c61c49dd0599...|     2|
|becddea4c2f71d079...|     2|
|ab2fede4e63f3a59f...|     2|
|76862f3c747f1df5b...|     2|
|041c37f576ed36f70...|     2|
|efa5c97b0fc2dbb2b...|     2|
|6ba6d26bfd5d962ca...|     3|
|8493a3a50f7f380f1...|     2|
|4546e3a4abbfbb965...|     2|
|aa07f6fc71f0ff40f...|     2|
|db813dd981068f380...|     2|
|b13224ba6b46603ec...|     2|
|2c5812458b13371bd...|     2|
|e03055bef523150cd...|     2|
|bfb2ec2e9faa4b0e6...|     2|
+--------------------+------+
only showing top 20 rows



In [6]:
#checking number of email_id against cookies
df.groupby("cookie_id").agg(F.countDistinct("email_id").alias("ecount")).where("ecount>1").show()

+--------------------+------+
|           cookie_id|ecount|
+--------------------+------+
|931caa50-51d5-4d4...|     2|
|91c7343d-046e-481...|     2|
|f1a77923-c554-44b...|     2|
|541cd73b-a785-434...|     2|
|e6250835-6963-4cf...|     2|
|d3306815-7239-43a...|     2|
|10d3e108-a827-4b0...|     2|
|17b0f072-fbbc-472...|     2|
|dde6307f-d5f6-402...|     2|
|f1bd5ff3-87c4-445...|     2|
|5b57cff9-1be0-432...|     2|
|33349554-9a39-414...|     2|
|d4922b93-a097-474...|     2|
|bc2448e4-b209-471...|     2|
|a8fa0c5e-9d45-4fb...|     2|
|3d55e031-133c-4cc...|     2|
|908deb84-5e41-4e1...|     2|
|9495117a-3ab1-4b4...|     2|
|95897c24-8dea-492...|     2|
|f95809a3-b9d9-472...|     2|
+--------------------+------+
only showing top 20 rows



In [7]:
#Check for M to M assciation of session with cookies
df.groupby("session_id").agg(F.countDistinct("cookie_id").alias("ccount")).where("ccount>1").show()

+----------+------+
|session_id|ccount|
+----------+------+
+----------+------+



In [8]:
#checking mail data
df.select("email_id").distinct().sort("email_id").show(truncate=False)

+--------------------------------+
|email_id                        |
+--------------------------------+
|null                            |
|0007f376bec1442f7b848d36dbe02595|
|0019749a13edbf0241bd4926ee6e9449|
|002d59f0502a60130b998035426d4f62|
|004626ad014df56188051434601719d5|
|00618a360475b7b079598b5b4929d499|
|0066e1a8672b73094bbd75ae51fdd4b9|
|0076e5a358b7c26d86bbd22aa620324e|
|007ebc5231d2838675051ac17e3788ec|
|0094b4878bc15208874f2907869faae4|
|009712fc0f2df6472ec17e76017cd4c4|
|00ab7a02bdb070016ed9f143a83d9363|
|00aebab61e2ea891654c622e233edfef|
|00cd2b5aa429e34bfd6d547f9c0c6c2e|
|00e5276b15171ccde699524d427c8bc6|
|00e81428843ba9e66f1a348d3fe14763|
|00eab6d9b81228273dafd5f723607b81|
|00fa412ad5eae8e57b3e0917dc194ef7|
|013c00a4002e23321366f7e64632ddf6|
|013dc93f20ff1817077c33c155044501|
+--------------------------------+
only showing top 20 rows



In [9]:
df.show(truncate=False)

+-----------------------+------------------------------------+-----------------------+------------------------------------+------------------------------------+--------------------------------+--------------------------------+----------+
|collector_ts           |event_id                            |device_tstamp          |cookie_id                           |session_id                          |email_id                        |page_id                         |date      |
+-----------------------+------------------------------------+-----------------------+------------------------------------+------------------------------------+--------------------------------+--------------------------------+----------+
|2019-05-29 19:39:42.82 |002cb280-e563-447f-bbdb-65637277a4eb|2019-05-29 19:39:42.714|3bf98c9d-f648-4695-91cf-fe09d5679ecf|e47d3eff-07fe-4608-86ec-d158a283be82|null                            |625738828b8cf88f9a81d9c9578e2dab|2019-05-29|
|2019-05-30 00:08:52.795|00ac063a-11f6-4fc0-be50

# Rough Work -- End

In [10]:
df = sqlContext.read.parquet("data")

# Question 1

##### [20 points] Create a dataset with site entries as rows (a site entry denotes the first event of a session) including the time of the event and page ID and save it to a file partitioned per day. Print the top 10 entry pages per month (page IDs with highest number of site entries).

**First Part** - Create a dataset with site entries as rows (a site entry denotes the first
event of a session) including the time of the event and page ID and save it to a file
partitioned per day

In [11]:
df.select("session_id", "page_id", "collector_ts").distinct()\
.withColumn(
    "visit_order", 
    F.rank().over(
        W.partitionBy("session_id").orderBy(F.col("collector_ts").asc())
    )
).where("visit_order = 1").drop("visit_order")\
.withColumn(
    "date", 
    F.to_date("collector_ts")
)\
.write.mode("overwrite").parquet("site_entries", partitionBy=["date"])

In [12]:
sqlContext.read.parquet("site_entries").show(truncate=False)

+------------------------------------+--------------------------------+-----------------------+----------+
|session_id                          |page_id                         |collector_ts           |date      |
+------------------------------------+--------------------------------+-----------------------+----------+
|0549aa28-75b6-4a75-b713-d694c1f60921|db694666c288f61fbcbe0c15530922ca|2019-05-04 22:14:18.093|2019-05-04|
|0e995cc6-a1bb-4417-a73b-04310e501bbe|f2709e58e917db3b2f9073017c57bccc|2019-05-04 22:49:00.874|2019-05-04|
|10574ac2-6617-474c-ab12-83d342902e9e|88e51958b73b519fc23c3271a923c776|2019-05-04 14:26:45.115|2019-05-04|
|14efe5a6-f557-433a-9650-d6974f88a63d|8cd220661f62f435e44ff5d1172e7ba7|2019-05-04 14:54:21.447|2019-05-04|
|1979a2d7-8fb6-42b7-892d-e93050797956|db694666c288f61fbcbe0c15530922ca|2019-05-04 23:17:14.467|2019-05-04|
|1dcac440-8791-4557-90b3-401a5ba1c8d1|db694666c288f61fbcbe0c15530922ca|2019-05-04 23:11:26.034|2019-05-04|
|23e8a781-cf87-4c41-b075-22b42ec9e62b

**Second Part** - Print the top 10 entry pages per month (page IDs with highest number of site entries).

In [13]:
sqlContext.read.parquet("site_entries").groupby(
    F.concat(F.year("date"),F.month("date")).alias("month"), 
    "page_id"
).agg(
    F.count("session_id").alias("page_count")
).withColumn(
    "rank", 
    F.row_number().over(
        W.partitionBy("month").orderBy(F.col("page_count").desc())
    )
).filter(
    F.col("rank") <= 10
).show(truncate=False)

+-----+--------------------------------+----------+----+
|month|page_id                         |page_count|rank|
+-----+--------------------------------+----------+----+
|20194|db694666c288f61fbcbe0c15530922ca|14486     |1   |
|20194|d275790e1fd374c7bd17c95852567ef3|1987      |2   |
|20194|424b07aaddd48b108fa399607c3fe448|362       |3   |
|20194|6d79e10ec5d60f4b478e6b889ef45497|342       |4   |
|20194|9360f72c202c5a9e5e0b2576016f2dae|252       |5   |
|20194|a1b8959aab37ab7dc915f9bfaaba0f2a|203       |6   |
|20194|b03237c17b4e1fbe28eb1afef0a7f9d0|194       |7   |
|20194|35ce5943f3b00e9dddacaa0695d917d5|190       |8   |
|20194|73457518974378b52873dc3e9e1882f1|188       |9   |
|20194|f66991ca8f0b4b2f2fa0d9d6628eff95|181       |10  |
|20195|db694666c288f61fbcbe0c15530922ca|51370     |1   |
|20195|d275790e1fd374c7bd17c95852567ef3|5791      |2   |
|20195|9edc0c4fd1fdff33d2d82984f31c364f|2160      |3   |
|20195|c750d6f068d29335aa5211f1bb122ac5|1378      |4   |
|20195|424b07aaddd48b108fa39960

# Question - 2

##### [30 points] Determine following three numbers: the number of unique cookie IDs, the number of unique email IDs, and the number of unique real users (actual physical users like you, or the barista in your favorite coffee shop). Please also provide some verbal discussion of your core assumptions for calculating the number of unique real users, the tradeoffs these assumptions imply and other thoughts you might have.

**First Part** - The number of unique cookie_id (For each event occuring on website have a cookie associated so there is no need to check null entries, if cookie is null then probably block this at triggering end)

In [14]:
df.select("cookie_id").distinct().count()

109939

**Second Part** - The number of unique email_id (A pageview event can be done by anyone which can be a logged in or non-logged in user, so events can have null as mail. To count unique email_id filter out null entries from data set)

In [15]:
df.where("email_id is not null").select("email_id").distinct().count()

3651

**Thrid Part** - The number of unique real users (actual physical users like you, or the barista in your favorite coffee shop) - 

*I am considering real users as unique email ids and cookies which wouldn't have any mail id associated*  

**Formula** - Unique_Mails + (distinct_cookies_without_mail-distinct_cookies_with_mail)

(After caclculations i have more assumptions)

In [16]:
uniqe_mail_ids = df.where("email_id is not null").select("email_id").distinct()
uniqe_mail_ids.count()

3651

In [17]:
cookies_with_email_ids = df.where("email_id is not null").select("cookie_id").distinct()
cookies_with_email_ids.count()

3775

In [18]:
cookies_without_any_email_ids = df.where("email_id is null").select("cookie_id").distinct()
cookies_without_any_email_ids.count()

109898

In [19]:
cookies_without_any_email_ids.subtract(cookies_with_email_ids).count()

106164

In [20]:
#Unique_Mails + (distinct_cookies_without_mail-distinct_cookies_with_mail)
uniqe_mail_ids.unionAll(
    cookies_without_any_email_ids.subtract(
        cookies_with_email_ids
    ).select(F.col("cookie_id").alias("email_id"))
).count()

109815

### Assumptions for Finiding Real Users

*By looking at rough finding its concluded that cookies to email have M:M mapping (for this type of sitatution it becomes difficult to count real users. counting real users depends on business use case whether to count it as one or multi), Below are my other assumptions for finidng real users - *

- Unique Email Ids are our actual users
    - A person can have multiple email ids and can register with Many. In this case we need to do more R&D to label them as same users


- The cookie_id without any email_id associated can be actual user (without login)

It depends on use case whether to consider any one point or both above mentioned points

