In [1]:
# notebook parameters

import os

spark_master = "local[*]"
app_name = "churn-analytics"
input_prefix = ""
input_file = "churn-etl"
output_prefix = ""
output_mode = "overwrite"
output_kind = "parquet"
input_kind = "parquet"
driver_memory = '8g'
executor_memory = '8g'


In [2]:
import pyspark

session = pyspark.sql.SparkSession.builder \
    .master(spark_master) \
    .appName(app_name) \
    .config("spark.eventLog.enabled", True) \
    .config("spark.eventLog.dir", ".") \
    .config("spark.driver.memory", driver_memory) \
    .config("spark.executor.memory", executor_memory) \
    .getOrCreate()
session

In [3]:
import churn.eda
import churn.etl

churn.etl.register_options(
    output_prefix = output_prefix,
    output_mode = output_mode,
    output_kind = output_kind,
    input_kind = input_kind
)

In [4]:
df = churn.etl.read_df(session, input_prefix + input_file)

# Generating reports

In [5]:
%%time

summary = churn.eda.gen_summary(df)

CPU times: user 137 ms, sys: 74 ms, total: 211 ms
Wall time: 2min 11s


In [6]:
session.catalog.listTables()

[Table(name='cube_12', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='cube_2', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='cube_3', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='cube_4', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='cube_6', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='rollup_12', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='rollup_2', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='rollup_3', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='rollup_4', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='rollup_6', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

### Slicing and dicing by tenure in quarters

In [10]:
grouped_by_quarters = session.table("cube_3").select("3_month_spans", "Contract", "PaperlessBilling", "Churn", "Count").toPandas()

In [22]:
grouped_by_quarters = grouped_by_quarters.rename(columns = {'3_month_spans' : 'tenure_in_quarters'})

In [23]:
import altair as alt
alt.data_transformers.enable('json')

alt.Chart(grouped_by_quarters.dropna()).mark_bar().encode(
    x = 'tenure_in_quarters:O',
    y = 'sum(Count):Q',
    color = 'Churn:N',
    column = 'Contract:N'
)

# Finishing up

In [7]:
# session.stop()