In [1]:
# import libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.types import LongType, IntegerType, DoubleType, StructType, StructField
import plotly.graph_objects as gob

In [2]:
from model import UserLogTransformer, UserLabelTransformer, TrainingAssembler, MasterTransformer, LogCleanTransformer, FEATURE_COLUMNS, FeatureUnassembler, KNOWN_MESSAGES, KNOWN_BROWSERS, KNOWN_OS

In [3]:
# create a Spark session
spark = SparkSession \
    .builder \
    .appName("Churn") \
    .getOrCreate()

In [4]:
in_schema = "artist STRING, auth STRING, firstName STRING, gender STRING, itemInSession INT, lastName STRING," \
            "length DOUBLE, level STRING, location STRING, method STRING, page STRING, registration LONG," \
            "sessionId INT, song STRING, status int, ts LONG, userAgent STRING, userId STRING"

#df_raw = spark.read.json('mini_sparkify_event_data.json', schema=in_schema)\
    #.withColumn('iuid', F.col('userId').cast('long')).drop('userId').withColumnRenamed('iuid', 'userId')
#df_raw.sample(fraction=0.001).show()

In [5]:
#cleaner = LogCleanTransformer()
#cleaned = cleaner.transform(df_raw)

In [6]:
#features_trans = UserLogTransformer()
#features = features_trans.transform(cleaned)

In [7]:
#features.show(5)

In [8]:
#df_os_browser = cleaned.select('userId', 'userAgent',
#                                       F.regexp_replace(F.regexp_extract(F.col('userAgent'), ".*?(\(.*\)).*", 1),
#                                        '[\(\);:;\s\/.,]+', '').alias('os'),
#                                       F.regexp_replace(F.regexp_extract(F.col('userAgent'), ".*\s(.*)", 1),
#                                        '[\(\);:;\s\/.,]+', '').alias('browser'))

In [9]:
#df_os_browser.select(F.col('browser')).distinct().orderBy('browser').agg(F.collect_list('browser').alias('l')).collect()[0]['l']

In [10]:
#df_os_browser.select(F.col('os')).distinct().orderBy('os').agg(F.collect_list('os').alias('l')).collect()[0]['l']

In [53]:
schema = StructType([StructField('userId', LongType(), False),
                     StructField('features', VectorUDT(), False),
                     StructField('label', IntegerType(), True)])
df_pred = spark.read.json('app/results_model/lsvc-prediction.json')
df_train = spark.read.json('app/results_model/traindata.json', schema=schema)
df_test = spark.read.json('app/results_model/testdata.json', schema=schema)
df_all = df_train.union(df_test)
df_pred.count(), df_train.count(), df_test.count(), df_all.count()

(34, 414, 34, 448)

In [60]:
df_pred.withColumn('true negative', ((1.0 - F.col('prediction')) * (1.0 - F.col('label')))).show(40)
df_pred.filter((F.col('label') == 0) & (F.col('prediction') == 0)).count()

+--------------------+---+---+-----+----------+--------------------+---+------+-------------+
|            features| fn| fp|label|prediction|       rawPrediction| tp|userId|true negative|
+--------------------+---+---+-----+----------+--------------------+---+------+-------------+
|[[0, 1, 2, 3, 4, ...|0.0|0.0|    0|       0.0|[1, [3.6301276405...|0.0|200003|          1.0|
|[[0, 1, 2, 3, 4, ...|0.0|0.0|    1|       1.0|[1, [-0.746249320...|1.0|   198|          0.0|
|[[0, 1, 2, 3, 4, ...|0.0|0.0|    0|       0.0|[1, [2.6468250495...|0.0|     7|          1.0|
|[[0, 1, 2, 5, 6, ...|0.0|0.0|    0|       0.0|[1, [1.6506865061...|0.0|   144|          1.0|
|[[0, 1, 2, 3, 4, ...|0.0|0.0|    0|       0.0|[1, [1.1234051603...|0.0|200026|          1.0|
|[[0, 1, 2, 3, 4, ...|0.0|0.0|    0|       0.0|[1, [1.3641412178...|0.0|   145|          1.0|
|[[0, 1, 2, 5, 6, ...|0.0|0.0|    1|       1.0|[1, [-0.284445690...|1.0|100004|          0.0|
|[[0, 1, 2, 3, 4, ...|0.0|0.0|    0|       0.0|[1, [2.062924

27

In [12]:
df_train.show(5)

+------+--------------------+-----+
|userId|            features|label|
+------+--------------------+-----+
|    29|(111,[0,1,2,3,4,5...|    0|
|    26|(111,[0,1,2,3,4,5...|    0|
|100031|(111,[0,1,2,5,6,7...|    0|
|    65|(111,[0,1,2,3,4,5...|    0|
|   191|(111,[0,1,2,7,8,1...|    0|
+------+--------------------+-----+
only showing top 5 rows



In [13]:
fu = FeatureUnassembler()
unassembled = fu.transform(df_all).join(df_all.select('userId', 'label'), on='userId')

+------+--------+--------------+------------------+-----------+--------------------+----------------+-------------------+---------------------+--------------------+---------------+-------------------+-----------+--------------------+----------+--------------------+----------+--------------------+------------+-------------------+--------------+------------------+-----------------+--------------------+-------------------+--------------------+--------------+--------------------+--------------------+-------------------+-----------------+--------------------+---------------+-------------------+-------------+-------------------+------+--------+------------+----------------------------+----------------------------+----------------------------+----------------------------+-------------------------------------------------------+-------------------------------------------------------+--------------------------------------------------------+--------------------------------------------------------

In [14]:
unassembled_bylabel = unassembled.groupBy('label')

In [15]:
browser_dist = unassembled_bylabel.agg(*[(F.sum(col)/F.count(col)).alias(col) for col in KNOWN_BROWSERS])
browser_dist.cache()
browser_dist.show()

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+------------------+
|label|          Firefox240|          Firefox300|          Firefox310|          Firefox320|               Gecko|        Safari53736"|       Safari537749"|      Safari5377514"|       Safari537764"|       Safari537774"|       Safari537782"|       Safari60013"|        Safari60018"|       Safari953753"|           Trident50|         Trident60|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+------------------

In [18]:
bd_kept = browser_dist.filter(F.col('label')==0).collect()[0].asDict()
kept_browsers = [bd_kept[browser] for browser in KNOWN_BROWSERS]
bd_churn = browser_dist.filter(F.col('label')==1).collect()[0].asDict()
churn_browsers = [bd_churn[browser] for browser in KNOWN_BROWSERS]

fig = gob.Figure(data=[
        gob.Bar(name="kept", x=KNOWN_BROWSERS, y=kept_browsers),
        gob.Bar(name="churned", x=KNOWN_BROWSERS, y=churn_browsers)
    ]
)
fig.update_layout(barmode='group',
                  title="Fraction of Users per Browser")
fig.show()

In [21]:
os_dist = unassembled_bylabel.agg(*[(F.sum(col)/F.count(col)).alias(col) for col in KNOWN_OS])
os_dist.cache()
od_kept = os_dist.filter(F.col('label')==0).collect()[0].asDict()
kept_os = [od_kept[browser] for browser in KNOWN_OS]
od_churn = os_dist.filter(F.col('label')==1).collect()[0].asDict()
churn_os = [od_churn[browser] for browser in KNOWN_OS]

fig = gob.Figure(data=[
        gob.Bar(name="kept", x=KNOWN_OS, y=kept_os),
        gob.Bar(name="churned", x=KNOWN_OS, y=churn_os)
    ]
)
fig.update_layout(barmode='group',
                  title="Fraction of Users per OS")
fig.show()

In [29]:
kept_ad_freq = unassembled.select('label', 'Roll Advert_freq').filter(F.col('label')==0).collect()
churn_ad_freq = unassembled.select('label', 'Roll Advert_freq').filter(F.col('label')==1).collect()

In [39]:
fig = gob.Figure(data=[gob.Histogram(x=[kaf['Roll Advert_freq'] for kaf in kept_ad_freq], histnorm='percent',
                                     name='kept'),
                       gob.Histogram(x=[caf['Roll Advert_freq'] for caf in churn_ad_freq], histnorm='percent',
                                     name='churned')])
fig.update_layout(barmode='group', yaxis_type="log", 
                  title="Distribution of Advert Frequencies per User")
fig.show()

In [40]:
kept_period = unassembled.select('label', 'period').filter(F.col('label')==0).collect()
churn_period = unassembled.select('label', 'period').filter(F.col('label')==1).collect()

In [45]:
fig = gob.Figure(data=[gob.Histogram(x=[kaf['period'] for kaf in kept_period], histnorm='percent',
                                     name='kept'),
                       gob.Histogram(x=[caf['period'] for caf in churn_period], histnorm='percent',
                                     name='churned')])
fig.update_layout(barmode='group', 
                  title="Distribution of Periods per User")
fig.show()

In [50]:
kept_nsc = unassembled.select('label', 'NextSong_freq').filter(F.col('label')==0).collect()
churn_nsc = unassembled.select('label', 'NextSong_freq').filter(F.col('label')==1).collect()

In [52]:
fig = gob.Figure(data=[gob.Histogram(x=[kaf['NextSong_freq'] for kaf in kept_nsc], histnorm='percent',
                                     name='kept'),
                       gob.Histogram(x=[caf['NextSong_freq'] for caf in churn_nsc], histnorm='percent',
                                     name='churned')])
fig.update_layout(barmode='group', 
                  title="Distribution of Next Song Frequencies per User")
fig.show()

In [61]:
kept_maxlev = unassembled.select('label', 'maxLevel').filter(F.col('label')==0).collect()
churn_maxlev = unassembled.select('label', 'maxLevel').filter(F.col('label')==1).collect()

In [66]:
fig = gob.Figure(data=[gob.Histogram(x=[kaf['maxLevel'] for kaf in kept_maxlev], histnorm='percent',
                                     name='kept'),
                       gob.Histogram(x=[caf['maxLevel'] for caf in churn_maxlev], histnorm='percent',
                                     name='churned')])
fig.update_layout(barmode='group', 
                  title="Distribution of Maximum Subscription Level per User")
fig.show()

In [70]:
kept_clev = unassembled.select('label', 'changedLevel').filter(F.col('label')==0).collect()
churn_clev = unassembled.select('label', 'changedLevel').filter(F.col('label')==1).collect()

In [72]:
fig = gob.Figure(data=[gob.Histogram(x=[kaf['changedLevel'] for kaf in kept_clev], histnorm='percent',
                                     name='kept'),
                       gob.Histogram(x=[caf['changedLevel'] for caf in churn_clev], histnorm='percent',
                                     name='churned')])
fig.update_layout(barmode='group', 
                  title="Distribution of Subscription Level Changes per User")
fig.show()