# TracHack Submission Overview

In [6]:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import pyspark.sql.functions as func
from pyspark.ml.classification import DecisionTreeClassifier,GBTClassifier,RandomForestClassifier,NaiveBayes

To make a submission in TracHack, you will need to modify the select_data, featurize_data, and train_model functions.  All other functions should not be modified.

In [8]:
from pyspark.sql.types import StructType,ArrayType,IntegerType,FloatType,DoubleType,TimestampType,DateType
from pyspark.sql.functions import *

In [9]:
def flatten(schema, prefix=None):
    fields = []
    for field in schema.fields:
        name = prefix + '.' + field.name if prefix else field.name
        dtype = field.dataType
        if isinstance(dtype, ArrayType):
            dtype = dtype.elementType

        if isinstance(dtype, StructType):
            fields += flatten(dtype, prefix=name)
        else:
            fields.append(name)

    return fields

In [10]:
def getsum(array):
  count = 0.0
  for x in array:
    count += x
  return count

In [11]:
def time_diff(array1,array2):
  
  #first convert element in array to date format
  from datetime import datetime
  start = [datetime.strptime(x,"%Y-%m-%d") for x in array1]
  end = [datetime.strptime(y,"%Y-%m-%d") for y in array2]
  diff = [b-a for a,b in zip(start,end)] # get the time difference
  diff = [i.days for i in diff]# convert timedelta to int
  return diff

In [12]:
def select_data(data_frame):
    """Selects and transforms the raw GCR (JSON) records data frame into a data frame.

    :param data_frame: Input data frame to select the specific data elements from.
    :returns selected dataframe that includes the columns of interest.
    """
    u = udf(getsum, DoubleType())
    time_duration = udf(time_diff,ArrayType(IntegerType()))

    df=data_frame.select(flatten(data_frame.schema))
    df=df.withColumn('activation_channel',df['activation_channel'].getItem(0)) \
    .withColumn('activation_date',to_date(df['activation_date'].getItem(0),"yyyy-MM-dd")) \
    .withColumn('first_activation_date',to_date(df['first_activation_date'],"yyyy-MM-dd")) \
    .withColumn('birth_year',df.birth_year.cast("integer"))
    
    df=df.withColumn('days_from_activation_date',datediff(to_date(lit("2020-03-19")),df.activation_date)) \
    .withColumn("TotalDataUsage",u(df.data)) \
    .withColumn("sus_diff_day",time_duration(df.start_date,df.end_date))
    
    df_selection = df.select('status','manufacturer', 'retailer', 'model','volte'
                             ,"lifetime_redemptions", "lifetime_revenues")
#    ,'birth_year','lifetime_redemptions','lifetime_revenues','TotalDataUsage'
    # Annotate missing values with 'unknown' for the fields that we are interested in.
    df_selection = df_selection.fillna({'manufacturer': "unknown", 'retailer': "unknown", "model": "unknown","volte": "unknown"})
    return df_selection

In [13]:
def featurize_data(df, remove_orig_cols=True):
    """Given a selected data frame, generate a featurized dataframe.

    Example: Shows how categorical features are handled by first a string indexer and then one-hot encoding.

    if remove_orig_cols is set, only returns a dataframe with two columns - features and label.


    :param df: Input data frame to featurize.
    :param remove_orig_cols: (Default=True) If set we remove the original columns from the returned dataframe.
    :returns featurized dataframe that includes two columns - label and features. If remove_orig_cols=False, includes
    the columns from original input data frame as well.
    """

    stages = []

    categorical_columns = ['manufacturer', 'retailer', 'model','volte']
    for column in categorical_columns:
        string_indexer = StringIndexer(inputCol=column, outputCol=column + '_index')
        encoder = OneHotEncoderEstimator(inputCols=[string_indexer.getOutputCol()], outputCols=[column + "_vec"])
        stages += [string_indexer, encoder]

    label_indexer = StringIndexer(inputCol="status", outputCol="label")
    stages += [label_indexer]

    numeric_columns = ["lifetime_redemptions", "lifetime_revenues"]

    assembler_inputs = [c + "_vec" for c in categorical_columns] + numeric_columns
    assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
    stages += [assembler]

    pipeline = Pipeline(stages=stages)
    pipeline_model = pipeline.fit(df)
    df_featurized = pipeline_model.transform(df)
    if remove_orig_cols:
        selected_columns = ['features', 'label']
        return df_featurized.select(selected_columns)
    else:
        return df_featurized

In [14]:
def train_model(train_dataset):
    """Given a featurized training dataset, trains a simple logistic regression model and
    returns the trained model object.

    :param train_dataset: A dataframe with two columns - label and features.
    :returns trained model object.
    """
#     dtc = DecisionTreeClassifier(labelCol='label',featuresCol='features')
    gbc = GBTClassifier(labelCol='label',featuresCol='features')
#     rfc = RandomForestClassifier(labelCol='label',featuresCol='features')
#     nbc = NaiveBayes(labelCol='label',featuresCol='features')
#     lr  = LogisticRegression(featuresCol='features', labelCol='label', maxIter=15)
    
#     dtc_model = dtc.fit(train_dataset)
    gbc_model = gbc.fit(train_dataset)
#     rfc_model = rfc.fit(train_dataset)
#     nbc_model = nbc.fit(train_dataset)
#     nbc_model = nbc.fit(train_dataset)
#     lr_recipe = LogisticRegression(featuresCol='features', labelCol='label', maxIter=15)
#     lr_model = lr_recipe.fit(train_dataset)
    return gbc_model

In [15]:
def evaluate_model(model, test_dataset):
    """Given a model and featurized test dataset, returns the auc value.

    :param model: the pyspark.ml model object to evaluate.
    :param test_dataset: A dataframe with two columns - label and features, to evaluate the model.
    :returns AUC under ROC value.

    """
    predictions = model.transform(test_dataset)
    evaluator = BinaryClassificationEvaluator()
    auc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
    return auc

In [16]:
from pyspark.sql.functions import sum as sum_
def trachack_submission(spark, data_path, random_seed, test_ratio=0.2, num_folds=10):
    """The end to end model pipeline, printing out the AUC value for each fold and the averaged AUC value.

    :param spark: SparkSession object.
    :param data_path: path to the dataset to use for the pipeline.
    :param random_seed: seed as input to the seed selection for randomSplit for train/test split.
    :param test_ratio: (Default=0.2) Percentage of data to use for test/evaluation. Must be between 0 and 1.
    :param num_folds: (Default=10) Number of folds for averaged AUC value.
    :returns: averaged AUC value
    """
    df = spark.read.json(data_path)
#     df = spark.read.parquet(data_path)
    
    df_selected = select_data(df)
    df_featurized = featurize_data(df_selected)

    # Cache this data frame since we will be doing multiple passes to split, train and evaluate
    df_featurized.cache()
    
    fold_auc = []
    for i in range(num_folds):
        fold_seed = random_seed * i
        train, test = df_featurized.randomSplit([1.0 - test_ratio, test_ratio], seed=fold_seed)
        
        model = train_model(train)
        auc = evaluate_model(model, test)
        print(f"Fold {i} AUC: {auc}")
        fold_auc.append(auc)

    average_auc = __builtin__.sum(fold_auc) / num_folds
    print(f"Average AUC: {average_auc}")
    return average_auc

In [17]:
#get results
trachack_submission(spark, df_path, 123)