# Users analysis

In [0]:
ACCESS_KEY_ID = "INSERT_KEY"
SECRET_ACCESS_KEY = "INSERT_KEY"

hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.access.key", ACCESS_KEY_ID)
hadoop_conf.set("fs.s3a.secret.key", SECRET_ACCESS_KEY)
hadoop_conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") 

playlog = spark.read.format("csv").option("header", "true").option("inferSchema","true").load("s3")
playlog.printSchema()

root
 |-- timestamp: integer (nullable = true)
 |-- user: integer (nullable = true)
 |-- song: string (nullable = true)



1. Compute a new column `datetime` that converts the timestamp to a datetime, drop the `timestamp` column, and order by `datetime`, save this as a new DataFrame `df`, show the first 5 rows of `df`.

> TIP: use the method `.from_unixtime(...)`, this method converts integers into dates.

In [0]:
import pyspark.sql.functions as F

In [0]:
df = (
  playlog \
    .withColumn('datetime',col=F.from_unixtime('timestamp')) \
    .drop('timestamp') \
    .orderBy('datetime')
)

df.printSchema()


root
 |-- user: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- datetime: string (nullable = true)



In [0]:
df.show(5)

+----+-----------+-------------------+
|user|       song|           datetime|
+----+-----------+-------------------+
|   4|nRa-eGzpT6o|1965-07-26 03:21:43|
|   0|t1l8Z6gLPzo|2014-02-14 14:18:53|
|  70|VJ6ofd0pB_c|2014-02-14 14:18:57|
|  22|Q24VZL8wpOM|2014-02-14 14:18:57|
|   1|t1l8Z6gLPzo|2014-02-14 14:18:58|
+----+-----------+-------------------+
only showing top 5 rows



Now that we have a datetime column, we can compute new columns, namely:
- [year](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=date#pyspark.sql.functions.year)
- [month](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=date#pyspark.sql.functions.month)
- [dayofmonth](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=date#pyspark.sql.functions.dayofmonth)
- [dayofweek](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=date#pyspark.sql.functions.dayofweek)
- [dayofyear](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=date#pyspark.sql.functions.dayofyear)
- [weekofyear](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=date#pyspark.sql.functions.weekofyear)

We will put the resulting DataFrame in a variable called `df_enriched`.

2. Follow previous instructions

*Tip: you use the reduce function from the functools package in order to automatically produce all the columns, otherwise you can just manually create them one by one*

In [0]:
df_enriched = (
  df
    .withColumn('year',F.year('datetime')) \
    .withColumn('month',F.month('datetime')) \
    .withColumn('dayofmonth',F.dayofmonth('datetime')) \
    .withColumn('dayofweek',F.dayofweek('datetime')) \
    .withColumn('dayofyear',F.dayofyear('datetime')) \
    .withColumn('weekofyear',F.weekofyear('datetime')) \
)

df_enriched.printSchema()

root
 |-- user: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- dayofmonth: integer (nullable = true)
 |-- dayofweek: integer (nullable = true)
 |-- dayofyear: integer (nullable = true)
 |-- weekofyear: integer (nullable = true)



In [0]:
df_enriched.show(5)

+----+-----------+-------------------+----+-----+----------+---------+---------+----------+
|user|       song|           datetime|year|month|dayofmonth|dayofweek|dayofyear|weekofyear|
+----+-----------+-------------------+----+-----+----------+---------+---------+----------+
|   4|nRa-eGzpT6o|1965-07-26 03:21:43|1965|    7|        26|        2|      207|        30|
|   0|t1l8Z6gLPzo|2014-02-14 14:18:53|2014|    2|        14|        6|       45|         7|
|  70|VJ6ofd0pB_c|2014-02-14 14:18:57|2014|    2|        14|        6|       45|         7|
|  22|Q24VZL8wpOM|2014-02-14 14:18:57|2014|    2|        14|        6|       45|         7|
|   1|t1l8Z6gLPzo|2014-02-14 14:18:58|2014|    2|        14|        6|       45|         7|
+----+-----------+-------------------+----+-----+----------+---------+---------+----------+
only showing top 5 rows



### Aggregates

#### `firstPlay`, `lastPlay`, `playCount`, `uniquePlayCount`
For each user, we will compute these metrics:
- `firstPlay`: datetime of the first listening
- `lastPlay`: datetime of the last listening
- `playCount`: total play counts
- `uniquePlayCount`: unique play counts

We'll save all these in a new DataFrame: `users`.  
When you're done, print out the first 5 rows of `users` ordered by descending `playCount`.

3. Compute, for each user
- firstPlay
- lastPlay
- playCount
- uniquePlayCount
Save the results in a DataFrame named `users`

In [0]:
dict_col = [
  F.min('datetime').alias('firstPlay'),
  F.max('datetime').alias('lastPlay'),
  F.count('*').alias('playCount'),
  F.count_distinct('song').alias('uniquePlayCount')
  ]

users = df_enriched.groupBy('user').agg(*dict_col).orderBy('uniquePlayCount',ascending=False)
users.show(5)

+-----+-------------------+-------------------+---------+---------------+
| user|          firstPlay|           lastPlay|playCount|uniquePlayCount|
+-----+-------------------+-------------------+---------+---------------+
|  213|2014-02-14 15:34:17|2019-04-02 06:04:08|   278749|         161406|
| 7290|2014-04-30 20:12:41|2019-04-03 06:50:05|   151513|          83831|
|  226|2014-02-14 16:28:13|2019-04-02 13:36:34|    94589|          38883|
|22332|2014-10-29 19:05:20|2018-12-13 06:50:53|    63516|          37584|
|  347|2014-02-14 17:53:17|2019-03-26 20:07:34|    93724|          31240|
+-----+-------------------+-------------------+---------+---------------+
only showing top 5 rows



4. Run a sanity check that all firstPlay are anterior to lastPlay

In [0]:
users.filter('firstPlay > lastPlay').count()

Out[7]: 0

5. Another sanity check, we grouped on the user column, so each user should represent a single row. Make sure all users are unique in the DataFrame

In [0]:
print("Total users {} \n Distinct users : {}".format(users.count(),users.select('user').distinct().count()))

Total users 45904 
 Distinct users : 45904


### `timespan`
We will compute `timespan`: the overall span of activity from a user in days, rounded to the inferior, for example:
- if a user was active 23 hours on the service, we will say he was active 0 days
- for 53 hours, that would be 2 days of activity

We **will not** transform the `users` DataFrame in place, but instead save the result as a new DataFrame: `users_with_timespan`.

6. Compute timespan and save the result a new DataFrame: `users_with_timespan`

In [0]:
users_with_timestamp = users.withColumn('timespan',F.datediff('lastPlay','firstPlay').cast("int"))
users_with_timestamp.limit(5).toPandas()

Unnamed: 0,user,firstPlay,lastPlay,playCount,uniquePlayCount,timespan
0,213,2014-02-14 15:34:17,2019-04-02 06:04:08,278749,161406,1873
1,7290,2014-04-30 20:12:41,2019-04-03 06:50:05,151513,83831,1799
2,226,2014-02-14 16:28:13,2019-04-02 13:36:34,94589,38883,1873
3,22332,2014-10-29 19:05:20,2018-12-13 06:50:53,63516,37584,1506
4,347,2014-02-14 17:53:17,2019-03-26 20:07:34,93724,31240,1866


In [0]:
users_with_timestamp = users.withColumn( "timespan", (( F.unix_timestamp("lastPlay") - F.unix_timestamp("firstPlay" ) ) / (60*60*24)).cast("int"))

users_with_timestamp.limit(5).toPandas()

Unnamed: 0,user,firstPlay,lastPlay,playCount,uniquePlayCount,timespan
0,213,2014-02-14 15:34:17,2019-04-02 06:04:08,278749,161406,1872
1,7290,2014-04-30 20:12:41,2019-04-03 06:50:05,151513,83831,1798
2,226,2014-02-14 16:28:13,2019-04-02 13:36:34,94589,38883,1872
3,22332,2014-10-29 19:05:20,2018-12-13 06:50:53,63516,37584,1505
4,347,2014-02-14 17:53:17,2019-03-26 20:07:34,93724,31240,1866


Let's check how this looks like, we will be using Databricks' `display` to plot an histogram of `timespan`.

7. Plot an histogram of `timespan`

In [0]:
display(users_with_timestamp.select('timespan'))

Looking like a powerlaw, let's try to log transform.

8. Use describe on the `timespan` column

In [0]:
users_with_timestamp.select('timespan').describe().toPandas()

Unnamed: 0,summary,timespan
0,count,45904.0
1,mean,127.10855263157896
2,stddev,320.85394251182385
3,min,0.0
4,max,19583.0


9. Plot a histogram of log transformed `timespan`

In [0]:
display(users_with_timestamp.select(F.log('timespan')))

ln(timespan)
7.534762657037537
7.494430215031565
7.534762657037537
7.316548177182976
7.531552381407289
7.237059026124737
7.534228326274089
6.812345094177479
6.716594773520978
7.534762657037537


10. Plot a QQ-Plot of log transformed `timespan`

In [0]:
display(users_with_timestamp.select(F.log('timespan')))

ln(timespan)
7.534762657037537
7.494430215031565
7.534762657037537
7.316548177182976
7.531552381407289
7.237059026124737
7.534228326274089
6.812345094177479
6.716594773520978
7.534762657037537


We'll filter out users who stayed for less than a day and plot an histogram of this filtered data.

11. Plot a histogram of log transformed `timespan` of users who stayed more than one day

In [0]:
display(users_with_timestamp.filter( F.col("timespan") > 0 ).select(F.log("timespan")))

### `isSingleDayUser`
What percentage of users used the service for less than one day?

12. Compute the percentage of users who used the service for less than a day

In [0]:
users_with_timestamp.filter(F.col('timespan') < 1).count() / users_with_timestamp.count() 

Out[11]: 0.5521523178807947

Wow, that's a lot! We will flag this as its own column.  
That means we will create a new Boolean column `isSingleDayUser` that is `True` if the user used the service for less than a day and `False` otherwise.

13. Create a new column (isSingleDayUser) to flag if a user used the service for less than a day

In [0]:
users_with_timestamp = users_with_timestamp.withColumn('isSingleDayUser',F.col('timespan') < 1)
users_with_timestamp.limit(5).toPandas()

Unnamed: 0,user,firstPlay,lastPlay,playCount,uniquePlayCount,timespan,isSingleDayUser
0,213,2014-02-14 15:34:17,2019-04-02 06:04:08,278749,161406,1872,False
1,7290,2014-04-30 20:12:41,2019-04-03 06:50:05,151513,83831,1798,False
2,226,2014-02-14 16:28:13,2019-04-02 13:36:34,94589,38883,1872,False
3,22332,2014-10-29 19:05:20,2018-12-13 06:50:53,63516,37584,1505,False
4,347,2014-02-14 17:53:17,2019-03-26 20:07:34,93724,31240,1866,False


### Measure of activity: `activeDaysCount` and `meanPlaycountByActiveDay`
This one is a bit harder, we want to compute:
- the number of active days for each user (not the `timespan`)
- the average play count on these active days for each user

14. Create 2 new columns
- activeDaysCount: the count of days each user was active
- dailyAvgPlayCount: the daily average playcount per user (active days only)
- activeDay

In [0]:
df_enriched.printSchema()
df_enriched.limit(5).toPandas()

root
 |-- user: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- dayofmonth: integer (nullable = true)
 |-- dayofweek: integer (nullable = true)
 |-- dayofyear: integer (nullable = true)
 |-- weekofyear: integer (nullable = true)



Unnamed: 0,user,song,datetime,year,month,dayofmonth,dayofweek,dayofyear,weekofyear
0,4,nRa-eGzpT6o,1965-07-26 03:21:43,1965,7,26,2,207,30
1,0,t1l8Z6gLPzo,2014-02-14 14:18:53,2014,2,14,6,45,7
2,22,Q24VZL8wpOM,2014-02-14 14:18:57,2014,2,14,6,45,7
3,70,VJ6ofd0pB_c,2014-02-14 14:18:57,2014,2,14,6,45,7
4,1,t1l8Z6gLPzo,2014-02-14 14:18:58,2014,2,14,6,45,7


In [0]:
playlogPerUser = df_enriched.groupBy(*['user','dayofyear','year']).count()

exprs = [  
  F.count('*').alias("activeDaysCount"),
  F.mean("count").alias("dailyAvgPlayCount")
]

users_avg = playlogPerUser.groupBy("user").agg(*exprs)

users_avg.limit(5).show()

+-----+---------------+------------------+
| user|activeDaysCount| dailyAvgPlayCount|
+-----+---------------+------------------+
| 1829|            100|             11.13|
|12046|              1|              58.0|
|  463|            813| 48.94956949569496|
| 2142|             15|19.266666666666666|
|  148|            921|  54.5928338762215|
+-----+---------------+------------------+



15. Plot a histogram of log of `activeDaysCount`

In [0]:
display(users_avg.select(F.log('activeDaysCount')))

ln(activeDaysCount)
4.605170185988092
0.0
6.70073110954781
2.70805020110221
6.825460036255307
4.382026634673881
1.791759469228055
4.204692619390966
2.9444389791664403
4.691347882229144


16. Plot a histogram of log of `dailyAvgPlayCount`

In [0]:
display(users_avg.select(F.log('dailyAvgPlayCount')))

ln(dailyAvgPlayCount)
2.4096441652874536
4.060443010546419
3.89079057416144
2.958376487010222
3.999902626448877
1.929708174479033
5.883786533309793
3.4135584784857294
2.560892556765922
3.1742238754556467
