Imports dependencies

In [1]:
import app_events.sparksession as S
import app_events.transformations as T

In [2]:
OUTPUT_DIR = '.outputs'

Load events from JSON file

In [3]:
events = S.spark.read\
    .option("inferSchema","true")\
    .json('datasets/events.json')

Prepare & clean events dataset

In [4]:
prepared = T.prepare_events(events)\

prepared_path = f'{OUTPUT_DIR}/datasets/events/'

prepared.write.partitionBy('event')\
    .mode("overwrite")\
    .parquet(prepared_path)

print(prepared.count())
prepared.printSchema()

152041
root
 |-- browser_version: string (nullable = true)
 |-- campaign: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- event: string (nullable = true)
 |-- initiator_id: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- year_week: integer (nullable = true)



Create dataset for `registered` events only

In [5]:
registered = T.get_event_partition(S.spark, prepared_path, 'registered')

print(registered.count())
registered.printSchema()

7317
root
 |-- year_week: integer (nullable = true)
 |-- initiator_id: long (nullable = true)



Create dataset for `app_loaded` events only

In [6]:
app_loaded = T.get_event_partition(S.spark, prepared_path, 'app_loaded')
print(app_loaded.count())
app_loaded.printSchema()

144724
root
 |-- year_week: integer (nullable = true)
 |-- initiator_id: long (nullable = true)



Joins datasets over same `id`

In [7]:

joined = T.join_tables(registered, app_loaded)

print(joined.count())
joined.printSchema()

2051
root
 |-- id: long (nullable = true)
 |-- registered_at: integer (nullable = true)
 |-- app_loaded_at: integer (nullable = true)



Filters joined dataset

In [8]:
with_activity = T.add_activity(joined)

print(with_activity.count())
with_activity.printSchema()

2051
root
 |-- id: long (nullable = true)
 |-- is_active: boolean (nullable = true)



Creates dataset showing the amount of `unique_ids` for each status of `is_active`

In [9]:
uniques_activity = T.uniques_activity(with_activity)

uniques_activity.show()

+---------+----------+
|is_active|unique_ids|
+---------+----------+
|     true|       800|
|    false|      1251|
+---------+----------+



Adds `percent` column to dataset

In [10]:
with_percent = T.add_percent(uniques_activity)

with_percent.show()

+---------+----------+------------------+
|is_active|unique_ids|           percent|
+---------+----------+------------------+
|     true|       800|0.3900536323744515|
|    false|      1251|0.6099463676255485|
+---------+----------+------------------+

