In [1]:
df = spark.read.parquet('dbfs:/mnt/bigdataproject649/parquet5/')
df_sample = df.sample(withReplacement=False,fraction=.1)

In [2]:
num_files = 1
df_sample_path = "/FileStore/tables/temp5"
#df_sample.coalesce(num_files).write.option("mode", "overwrite").option("compression", "bzip2").json(df_sample_path)

In [3]:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import pyspark.sql.functions as func
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import datediff
from pyspark.sql.functions import last_day 
from pyspark.ml.classification import DecisionTreeClassifier,GBTClassifier,RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession

In [4]:
def select_data(data_frame):
    """Selects and transforms the raw GCR (JSON) records data frame into a data frame.

    :param data_frame: Input data frame to select the specific data elements from.
    :returns selected dataframe that includes the columns of interest.
    """
    #process "network-usage-domestic" to a individual column
    df_net_usage = df_sample.select(func.col("customer_info.line_id").alias('line_id'),
                               func.explode("network-usage-domestic").alias('net_usage_by_day'))
    df_net_usage_table = df_net_usage.select('line_id', 'net_usage_by_day.*')
    agg_df_net_usage_table=df_net_usage_table.groupBy('line_id').avg()
    df_net_usage_daily=agg_df_net_usage_table.select('line_id','avg(data)','avg(mms_out)','avg(sms_out)',
                                                   'avg(voice_duration_out)','avg(voice_num_total)')

    df_net_usage_daily = df_net_usage_daily.withColumnRenamed("avg(data)","daily_data_volume")
    df_net_usage_daily = df_net_usage_daily.withColumnRenamed("avg(mms_out)","daily_mms_volume")
    df_net_usage_daily = df_net_usage_daily.withColumnRenamed("avg(sms_out)","daily_sms_volume")
    df_net_usage_daily = df_net_usage_daily.withColumnRenamed("avg(voice_duration_out)","daily_voice_time")
    df_net_usage_daily = df_net_usage_daily.withColumnRenamed("avg(voice_num_total)","daily_voice_volume")
    df_phone_info = df_sample.select(func.col("customer_info.line_id").alias('line_id'),  
                                     func.col("phone_info.*"))
    df_phone_info = df_phone_info.fillna({'manufacturer': "unknown", 'retailer': "unknown", "model": "unknown"})
    df_reactive_info = df_sample.select(func.col("customer_info.line_id").alias('line_id'),  
                                   func.explode("reactivations").alias('reactive_status')                                        
                                     )
    df_reactive_info = df_reactive_info.select('line_id', 'reactive_status.*')
    #calculate reactive times per person 
    df_reactive_info_count=df_reactive_info.groupBy("line_id").count()
    df_reactive_info= df_reactive_info_count.withColumnRenamed("count","reactive_times")
    # explore reactivation per record
    df_deactive_info = df_sample.select(func.col("customer_info.line_id").alias('line_id'),  
                                   func.explode("deactivations").alias('deactive_status')                                        
                                     )
    df_deactive_info = df_deactive_info.select('line_id', 'deactive_status.*')
    #calculate reactive times per person 
    grouped=df_deactive_info.groupBy('line_id')
    df_maxddate= grouped.agg({'deactivation_date':'max'}).withColumnRenamed("max(deactivation_date)","latest_ddate")
    df_maxddate= df_maxddate.filter( df_maxddate["latest_ddate"] < '2020-03-19')
    
    df_redemption = df_sample.select(func.col("customer_info.line_id").alias('line_id'),
                                 func.explode("redemptions").alias('redemptions_by_day'))
    df_redemption_table = df_redemption.select('line_id', 'redemptions_by_day.*')
    # calculate gross revenue per person
    df_grossrev_redemption = df_redemption_table.select('line_id', 
                                                    func.col('gross_revenue').cast(IntegerType()))
    df_grossrev_redemption_table = df_grossrev_redemption.groupby('line_id').sum('gross_revenue')
    df_grossrev_redemption_table = df_grossrev_redemption_table.withColumnRenamed("sum(gross_revenue)",'gross_redemption_revenue')
    df_cc = df_sample.select(func.col("customer_info.line_id").alias("line_id"),
                         func.col("contact_info.city").alias("city"),
                         func.col("contact_info.country").alias("country"),
                         func.col("contact_info.state").alias("state"),
                         func.col("contact_info.zipcode").alias("zipcode"),  
                         func.col("status"),
                        func.col("customer_info.birth_year").alias("birth_year").cast(IntegerType()),
                        func.col("customer_info.first_activation_date").alias("first_act"),
                         func.col("customer_info.lifetime_redemptions").alias("redemption"),
                        func.col("customer_info.lifetime_revenues").alias("revenue")
                       )
    def funct(birth):
      if birth == 1753:
        return 1976
      else:
        return birth

    func_udf = func.udf(funct, IntegerType())
    df_cc = df_cc.withColumn("birth_year", func_udf(df_cc['birth_year']))
    df_cc = df_cc.fillna({"birth_year": 1976})
    df_cc = df_cc.filter( df_cc["first_act"] < '2020-03-19')
    df1 = df_cc.join(df_phone_info, on=['line_id'], how='left')
    df2 = df1.join(df_reactive_info, on=['line_id'], how='left')
    df3 = df2.join(df_grossrev_redemption_table, on=['line_id'], how='left')
    df4=df3.join(df_net_usage_daily, on=['line_id'], how='left')
    df5=df4.join(df_maxddate, on=['line_id'], how='left')
    df_final=df5.select('line_id','status','city','state','birth_year','redemption',
                      'revenue','device_type','manufacturer','operating_system','volte','reactive_times',
                        'gross_redemption_revenue','first_act','latest_ddate','daily_data_volume',
                      'daily_mms_volume','daily_sms_volume','daily_voice_time','daily_voice_volume')
    df_final = df_final.withColumn('diff', datediff(df_final.latest_ddate,df_final.first_act))
    df_final = df_final.fillna( { "volte": 'N', "reactive_times": 0, 
                                 "latest_ddate": '2020-03-19','city':'None',
                                 'state':'None','device_type':'None','manufacturer':'None',
                                 'operating_system':'None',
                                 'redemption':0,'revenue':0,'reactive_times':0,
                                 'gross_redemption_revenue':0,'diff':0,'daily_data_volume':0,
                      'daily_mms_volume':0,'daily_sms_volume': 0,'daily_voice_time':0,'daily_voice_volume':0} )
    return df_final

In [5]:
def featurize_data(df, remove_orig_cols=True):
    """Given a selected data frame, generate a featurized dataframe.

    Example: Shows how categorical features are handled by first a string indexer and then one-hot encoding.

    if remove_orig_cols is set, only returns a dataframe with two columns - features and label.


    :param df: Input data frame to featurize.
    :param remove_orig_cols: (Default=True) If set we remove the original columns from the returned dataframe.
    :returns featurized dataframe that includes two columns - label and features. If remove_orig_cols=False, includes
    the columns from original input data frame as well.
    """

    stages = []

    categorical_columns = ['city','state','device_type','manufacturer','operating_system','volte']
    for column in categorical_columns:
        string_indexer = StringIndexer(inputCol=column, outputCol=column + '_index')
        encoder = OneHotEncoderEstimator(inputCols=[string_indexer.getOutputCol()], outputCols=[column + "_vec"])
        stages += [string_indexer, encoder]

    label_indexer = StringIndexer(inputCol="status", outputCol="label")
    stages += [label_indexer]

    numeric_columns = ['birth_year','redemption','revenue','reactive_times','gross_redemption_revenue','diff','daily_data_volume',
                      'daily_mms_volume','daily_sms_volume','daily_voice_time','daily_voice_volume']

    assembler_inputs = [c + "_vec" for c in categorical_columns] + numeric_columns
    assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
    stages += [assembler]

    pipeline = Pipeline(stages=stages)
    pipeline_model = pipeline.fit(df)
    df_featurized = pipeline_model.transform(df)
    if remove_orig_cols:
        selected_columns = ['features', 'label']
        return df_featurized.select(selected_columns)
    else:
        return df_featurized


In [6]:
def train_model(train_dataset):
    """Given a featurized training dataset, trains a simple logistic regression model and
    returns the trained model object.

    :param train_dataset: A dataframe with two columns - label and features.
    :returns trained model object.
    """
    rfc = RandomForestClassifier(numTrees=150, featuresCol='features', labelCol='label')
    rfc_model = rfc.fit(train_dataset)
    return rfc_model


In [7]:
def evaluate_model(model, test_dataset):
    """Given a model and featurized test dataset, returns the auc value.

    :param model: the pyspark.ml model object to evaluate.
    :param test_dataset: A dataframe with two columns - label and features, to evaluate the model.
    :returns AUC under ROC value.

    """
    predictions = model.transform(test_dataset)
    evaluator = BinaryClassificationEvaluator()
    auc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
    return auc

In [8]:
def trachack_submission(spark, data_path, random_seed, test_ratio=0.2, num_folds=10):
    """The end to end model pipeline, printing out the AUC value for each fold and the averaged AUC value.

    :param spark: SparkSession object.
    :param data_path: path to the dataset to use for the pipeline.
    :param random_seed: seed as input to the seed selection for randomSplit for train/test split.
    :param test_ratio: (Default=0.2) Percentage of data to use for test/evaluation. Must be between 0 and 1.
    :param num_folds: (Default=10) Number of folds for averaged AUC value.
    :returns: averaged AUC value
    """
    df = spark.read.json(data_path)
    df_selected = select_data(df)
    df_featurized = featurize_data(df_selected)

    # Cache this data frame since we will be doing multiple passes to split, train and evaluate
    df_featurized.cache()

    fold_auc = []
    for i in range(num_folds):
        fold_seed = random_seed * i
        train, test = df_featurized.randomSplit([1.0 - test_ratio, test_ratio], seed=fold_seed)
        model = train_model(train)
        auc = evaluate_model(model, test)
        print(f"Fold {i} AUC: {auc}")
        fold_auc.append(auc)

    average_auc = sum(fold_auc) / num_folds
    print("Average AUC: {average_auc}")
    return average_auc

In [9]:
trachack_submission(spark, df_sample_path, 123)