# SPARK hackfest-in-a-box for TELCO - LAB 1

The objetive is this lab is to perform exploratory data analysis and data preparation on a customer churn telco dataset 

## 1 Exploratory data analysis

In statistics, exploratory data analysis (EDA) is an approach of analyzing data sets to summarize their main characteristics, often using statistical graphics and other data visualization methods

In [None]:
import sys
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from datetime import datetime
import random

In [None]:
# Pre-defined input variables
print('....Setting input variables')
projectNbr = "YOUR_PROJECT_NBR"
projectID = "YOUR_PROJECT_ID"
appBaseName = "customer-churn-model"
appNameSuffix = "preprocessing"
appName = f"{appBaseName}-{appNameSuffix}"
sourceBucketUri = f"gs://s8s_data_bucket-{projectNbr}/telco_customer_churn_train_data.csv"

In [None]:
# SPARK session creation
print('....Initializing spark & spark configs')
spark = SparkSession.builder.appName(appName).getOrCreate()

In [None]:
# Data loading
print('....Read source data')
rawChurnDF = spark.read.options(inferSchema = True, header= True).csv(sourceBucketUri)

In [None]:
# Show table schema
rawChurnDF.printSchema()

#### Q - What do you think about the infered data types on the previous cell?
* Show a couple of rows from the table using the `show` function
* Count the number of rows using the `count` function

In [None]:
rawChurnDF._______INSERT_CODE_HERE_______(2,vertical=True)
print(rawChurnDF._______INSERT_CODE_HERE_______)

#### Q - Compute the per row distribution statistics, what business insights can you get?
* Use the `describe` function

In [None]:
rawChurnDF._______INSERT_CODE_HERE_______.show(vertical=True)

#### Q - Clean the data, for each column, look for how many rows have None, NULLs, or `' '` values
* Using `pyspark.sql.functions` 
* Using `spark.sql` 

In [None]:
rawChurnDF.select([count(_______INSERT_CODE_HERE_______, c )).alias(c) for c in rawChurnDF.columns]).show(vertical=True)

In [None]:
#INSERT CODE, use sql functions to find nulls, emptys, NULLS,NanS ..
rawChurnDF.createOrReplaceTempView("base_customer_churn")
for c in rawChurnDF.columns:
    print('Column: {}'.format(c))
    spark.sql(_______INSERT_CODE_HERE_______).show()

#### Q - Try to identify which columns have categorical values
* Using `countDistinc` from `pyspark.sql.functions` 
* Using `spark.sql` 

In [None]:
#INSERT CODE
rawChurnDF.select([_______INSERT_CODE_HERE_______ for c in rawChurnDF.columns]).show(vertical=True)

#### Q - Draw histograms to understand continous variables dsitributions, , what business insights can you get?
* Transform results from `spark.sql` to pandas using `toPandas()` function 
* Draw histogra using `hist()` 

In [None]:
#INSERT CODE
MonthlyChargesPDF = spark.sql(_______INSERT_CODE_HERE_______).toPandas()
MonthlyChargesPDF._______INSERT_CODE_HERE_______

## 2 Data preprocessing

Data preprocessing is the manipulation or dropping of data before it is used in order to ensure or enhance performance, and is an important step in the data mining process. The phrase "garbage in, garbage out" is particularly applicable to data mining and machine learning projects. Data-gathering methods are often loosely controlled, resulting in out-of-range values (e.g., Income: −100), impossible data combinations (e.g., Sex: Male, Pregnant: Yes), and missing values, etc.

#### Q - List data preprocessing steps
* Based on the insights derived from the previous section, this the proposed list the preprocessing steps you will apply to the dataset:
    * Null,empty .. field replacement with `None` value
    * Drop rows with `None` values
    * Uniform values in rows (e.g. change No internet service or No phone service to No)
    * Bucketize the `tenure` field
    * Change field names to `snake_case` 

#### Q - Write a chain of data transformations serializing data at each step to ensure traceablity and debugging
* Generate the following chain of datataframes `nullsReplacedDF`, `nullDroppedDF`,`partiallyProcessedDF`, `modelTrainingReadyDF` and `persistDF`

In [None]:
print('....Replacing null, empty values ... with None')
nullsReplacedDF=rawChurnDF.select([when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            (col(c) == ' ')  | \
                            col(c).isNull() | \
                            isnan(c),_______INSERT_CODE_HERE_______).otherwise(col(c)).alias(c) for c in rawChurnDF.columns])

In [None]:
print('....Number of rows before dropping None values')
print(nullsReplacedDF._______INSERT_CODE_HERE_______)
print('....Dropping None values')
nullDroppedDF = nullsReplacedDF._______INSERT_CODE_HERE_______
print('....Number of rows after dropping None values')
print(nullDroppedDF._______INSERT_CODE_HERE_______)

In [None]:
print('....Homogenization of categorical values')
partiallyProcessedDF = nullDroppedDF.select(_______INSERT_CODE_HERE_______).otherwise(col(c)).alias(c) for c in nullDroppedDF.columns])

In [None]:
print('....Bucketizing the tenure field')
partiallyProcessedDF.createOrReplaceTempView("partially_transformed_customer_churn")
modelTrainingReadyDF = spark.sql("""
                                select  customerID 
                                        ,gender as Gender
                                        ,cast(SeniorCitizen as int) SeniorCitizen
                                        ,Partner
                                        ,Dependents
                                        ,cast(tenure as int)  Tenure
                                        ,case when _______INSERT_CODE_HERE_______ then "Tenure_0-12"
                                              when _______INSERT_CODE_HERE_______ then "Tenure_12-24"
                                              when _______INSERT_CODE_HERE_______ then "Tenure_24-48"
                                              when _______INSERT_CODE_HERE_______ then "Tenure_48-60"
                                              when _______INSERT_CODE_HERE_______ then "Tenure_gt_60"
                                        end as Tenure_Group
                                        ,PhoneService
                                        ,MultipleLines
                                        ,InternetService
                                        ,OnlineSecurity
                                        ,OnlineBackup
                                        ,DeviceProtection
                                        ,TechSupport
                                        ,StreamingTV
                                        ,StreamingMovies
                                        ,Contract
                                        ,PaperlessBilling
                                        ,PaymentMethod
                                        ,cast(MonthlyCharges as float) MonthlyCharges
                                        ,cast(TotalCharges as float) TotalCharges
                                        ,lcase(Churn) as Churn
                                from partially_transformed_customer_churn  
                                """)

In [None]:
print('....Format column names in snake_case for consistency')
persistDF = modelTrainingReadyDF.select("customerID", "gender", "SeniorCitizen", "Partner", "Dependents", "tenure", "Tenure_Group", "PhoneService", "MultipleLines", "InternetService", "OnlineSecurity", "OnlineBackup", "DeviceProtection", "TechSupport", "StreamingTV", "StreamingMovies", "Contract", "PaperlessBilling", "PaymentMethod", "MonthlyCharges", "TotalCharges","Churn") \
                                .toDF("customer_id", "gender", "senior_citizen", "partner", "dependents", "tenure", "tenure_group", "phone_service", "multiple_lines", "internet_service", "online_security", "online_backup", "device_protection", "tech_support", "streaming_tv", "streaming_movies", "contract", "paperless_billing", "payment_method", "monthly_charges", "total_charges","churn") 

In [None]:
print('....Save data in BigQuery for next steps')
bqDatasetNm = f"{projectID}.customer_churn_ds"
bigQueryTargetTableFQN = f"{bqDatasetNm}.training_data_notebook"
scratchBucketUri = f"s8s-spark-bucket-{projectNbr}/{appBaseName}/{appNameSuffix}"
spark.conf.set("parentProject", projectID)
spark.conf.set("temporaryGcsBucket", scratchBucketUri)

persistDF.write.format('bigquery') \
.mode("append")\
.option('table', bigQueryTargetTableFQN) \
.save()

## 3 Extra ball
Once you have your data in BQ you can use this magic for a full fledge EdA
* `%bigquery_stats spark-hackfest-dev.customer_churn_ds.training_data`

**End of LAB 1**