In [0]:
playlog = spark.read.format("csv").option("header", "true").option("inferSchema","true").load("s3://full-stack-bigdata-datasets/Big_Data/youtube_playlog.csv")
playlog.printSchema()

root
 |-- timestamp: integer (nullable = true)
 |-- user: integer (nullable = true)
 |-- song: string (nullable = true)



In [0]:
from pyspark.sql import functions as F

from pyspark.sql.functions import unix_timestamp, from_unixtime
playlog = playlog \
  .withColumn('datetime', from_unixtime('timestamp')) \
  .drop('timestamp') \
  .orderBy('datetime')

In [0]:
import datetime
from pyspark.sql.functions import year, month, dayofmonth, dayofweek, dayofyear, weekofyear
playlog = playlog \
  .withColumn('year', year('datetime')) \
  .withColumn('month', month('datetime')) \
  .withColumn('dayofmonth', dayofmonth('datetime')) \
  .withColumn('dayofyear', dayofyear('datetime')) \
  .withColumn('weekofyear', weekofyear('datetime'))

playlog.printSchema()
playlog.count(), len(playlog.columns)
playlog.limit(5).toPandas()

"""
from functools import reduce

funcs = [F.year, F.month, F.dayofmonth, F.dayofweek, F.dayofyear, F.weekofyear]

df_enriched = reduce(
  lambda memo_df, f: memo_df.withColumn(f.__name__, f('datetime')),
  funcs, df)

# Or, alternatively
# df_enriched = df.select('*', *(f('datetime').alias(f.__name__) for f in funcs))

df_enriched.show()
"""

root
 |-- user: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- dayofmonth: integer (nullable = true)
 |-- dayofyear: integer (nullable = true)
 |-- weekofyear: integer (nullable = true)

Out[3]: "\nfrom functools import reduce\n\nfuncs = [F.year, F.month, F.dayofmonth, F.dayofweek, F.dayofyear, F.weekofyear]\n\ndf_enriched = reduce(\n  lambda memo_df, f: memo_df.withColumn(f.__name__, f('datetime')),\n  funcs, df)\n\n# Or, alternatively\n# df_enriched = df.select('*', *(f('datetime').alias(f.__name__) for f in funcs))\n\ndf_enriched.show()\n"

In [0]:
from pyspark.sql import functions as F

def compute_aggregates(df):
  agg_exprs = (
    F.min('datetime').alias('firstPlay'),
    F.max('datetime').alias('lastPlay'),
    F.count('song').alias('playCount'),
    F.countDistinct('song').alias('uniquePlayCount')
  )
  return df.groupBy('user').agg(*agg_exprs)

users = playlog.transform(compute_aggregates)

# Alternative
# users_df = df.transform(compute_aggs('user', aggs))

users.orderBy(F.desc('playCount')).limit(5).toPandas()

Unnamed: 0,user,firstPlay,lastPlay,playCount,uniquePlayCount
0,213,2014-02-14 15:34:17,2019-04-02 06:04:08,278749,161406
1,7290,2014-04-30 20:12:41,2019-04-03 06:50:05,151513,83831
2,435,2014-02-14 19:51:09,2019-04-03 19:36:28,144711,20055
3,21950,2014-10-23 09:09:36,2019-02-06 00:54:54,126285,15075
4,6270,2014-04-13 18:45:54,2018-08-11 20:46:08,125056,9247


In [0]:
users.filter(F.col('firstPlay') > F.col('lastPlay')).count()

Out[5]: 0

In [0]:
print(f"Total users: {users.count()}")
print(f"Distinct users: {users.select('user').distinct().count()}")

Total users: 45904
Distinct users: 45904


In [0]:
from pyspark.sql.types import IntegerType

def compute_timespan(df):
  return df.withColumn('timespan', (
    (F.unix_timestamp('lastPlay') - F.unix_timestamp('firstPlay')) / (60**2 * 24)).cast(IntegerType()))

users_with_timespan = users.transform(compute_timespan)
users_with_timespan.limit(5).toPandas()

Unnamed: 0,user,firstPlay,lastPlay,playCount,uniquePlayCount,timespan
0,26,2014-02-14 14:20:45,2016-04-23 18:44:03,10354,4223,799
1,27,2014-02-14 14:20:48,2014-02-15 12:08:44,55,32,0
2,28,2014-02-14 14:20:49,2019-01-31 18:22:23,15155,7371,1812
3,31,2014-02-14 14:21:15,2018-01-05 08:30:40,43879,20293,1420
4,34,2014-02-14 14:21:28,2016-08-10 17:16:07,411,262,908


In [0]:
display(users_with_timespan.select('timespan'))

timespan
999
1066
1853
1857
883
797
293
88
1874
588


In [0]:
users_with_timespan.select('timespan').describe().toPandas().set_index('summary')

Unnamed: 0_level_0,timespan
summary,Unnamed: 1_level_1
count,45904.0
mean,127.10855263157896
stddev,320.85394251182464
min,0.0
max,19583.0


In [0]:
display(users_with_timespan.select(F.log('timespan')))

ln(timespan)
6.906754778648554
6.97166860472579
7.52456122628536
7.526717561352706
6.78332520060396
6.680854678790215
5.680172609017068
4.477336814478207
7.535830462798367
6.376726947898627


In [0]:
display(users_with_timespan.where(F.col('timespan') != 0).select(F.log('timespan')))

ln(timespan)
6.906754778648554
6.97166860472579
7.52456122628536
7.526717561352706
6.78332520060396
6.680854678790215
5.680172609017068
4.477336814478207
7.535830462798367
6.376726947898627


In [0]:
users_with_timespan \
  .select(F.sum((F.col('timespan') < 1).cast(IntegerType()))) \
  .rdd.map(lambda r: r[0]).first() / users.count() * 100

Out[12]: 55.215231788079464

In [0]:
users_with_single_day = users_with_timespan.withColumn('isSingleDayUser', (F.col('timespan') < 1))
users_with_single_day.limit(5).toPandas()

Unnamed: 0,user,firstPlay,lastPlay,playCount,uniquePlayCount,timespan,isSingleDayUser
0,26,2014-02-14 14:20:45,2016-04-23 18:44:03,10354,4223,799,False
1,27,2014-02-14 14:20:48,2014-02-15 12:08:44,55,32,0,True
2,28,2014-02-14 14:20:49,2019-01-31 18:22:23,15155,7371,1812,False
3,31,2014-02-14 14:21:15,2018-01-05 08:30:40,43879,20293,1420,False
4,34,2014-02-14 14:21:28,2016-08-10 17:16:07,411,262,908,False


In [0]:
def computeDailyStats(df):
  gb = df.groupBy(*(F.col(c) for c in ('user', 'year', 'dayofyear'))).count()
  exprs = (F.mean('count').alias('dailyAvgPlayCount'),
           F.count('count').alias('activeDaysCount'))
  return gb.groupBy('user').agg(*exprs)

users_with_avg = users_with_single_day.join(
  playlog.transform(computeDailyStats), 'user')
users_with_avg.limit(5).toPandas()

Unnamed: 0,user,firstPlay,lastPlay,playCount,uniquePlayCount,timespan,isSingleDayUser,dailyAvgPlayCount,activeDaysCount
0,26,2014-02-14 14:20:45,2016-04-23 18:44:03,10354,4223,799,False,45.612335,227
1,27,2014-02-14 14:20:48,2014-02-15 12:08:44,55,32,0,True,27.5,2
2,28,2014-02-14 14:20:49,2019-01-31 18:22:23,15155,7371,1812,False,25.950342,584
3,31,2014-02-14 14:21:15,2018-01-05 08:30:40,43879,20293,1420,False,69.871019,628
4,34,2014-02-14 14:21:28,2016-08-10 17:16:07,411,262,908,False,15.222222,27


In [0]:
display(users_with_avg.select(F.log('activeDaysCount')))

ln(activeDaysCount)
4.605170185988092
1.791759469228055
5.831882477283517
4.382026634673881
0.0
4.204692619390966
6.825460036255307
1.791759469228055
1.6094379124341005
6.70073110954781


In [0]:
display(users_with_avg.select(F.log('dailyAvgPlayCount')))

ln(dailyAvgPlayCount)
2.4096441652874536
2.908720896564361
3.80385634050828
1.929708174479033
2.6390573296152584
3.4135584784857294
3.999902626448877
5.883786533309793
1.6094379124341005
3.89079057416144
