## INITIATE SPARK KERNEL

In [None]:
print("hi")

## IMPORT NECESSARY PACKAGES

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col, split, when, substring_index, round
from pyspark.sql.types import StringType
import pandas as pd
import matplotlib
import joblib

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
from ethnicolr import census_ln, pred_census_ln, pred_wiki_name, pred_wiki_ln, pred_fl_reg_ln, pred_fl_reg_name, pred_fl_reg_ln_five_cat, pred_fl_reg_name_five_cat, pred_nc_reg_name

## LOAD REQUIRED DATASETS

In [None]:
input_path = 's3://cinqcareai-data-lake-prod-unrestricted/layer_f/acxiom/acxiom_mom/'
acxiom = spark.read.option("mergeSchema", "true").parquet(input_path).filter((F.col('file_date') == '2023-12-01') & (F.col('input_state_3038') == 'NY'))

In [None]:
acxiom_aa = spark.read.option("mergeSchema", "true").parquet(input_path).filter((F.col('file_date') == '2023-12-01') & (F.col('input_state_3038') == 'NY') 
                                                                             & (F.col('race_code_low_detail_ibe3101') == 'Black or African American'))

## SELECT FEATURES FROM DATASETS

In [None]:
acxiom_features = acxiom.select("race_code_low_detail_ibe3101","input_first_name_3131", "input_middle_initial_3131", "input_last_name_3131", "first_name__middle_initial__gender_1st_person_in_household_first_name_ibe8610_01",
"first_name__middle_initial__gender_1st_person_in_household_gender_ibe8610_03","personicx_lifestage_segment_code_px001270_01","geo_federal_congressional_district_code_ibe2403_01", 
"heavy_transactors_ibe9358", "underbanked_ibe9351", "economic_stability_indicator_esi_ibe9350", "used_multiple_formula_with_minerals_in_the_last_6_months_ap000924",
"i_use_tv_media_for_information_and_inspiration_ap001578", "never_or_rarely_carry_a_balance_on_a_credit_card_financial_ap004921")

acxiom_concat = acxiom_features.withColumn("full_name", F.concat(acxiom_features["input_first_name_3131"], F.lit(" "), acxiom_features["input_last_name_3131"]))

In [None]:
acxiom_features_aa = acxiom_aa.select("race_code_low_detail_ibe3101","input_first_name_3131", "input_middle_initial_3131", "input_last_name_3131", "first_name__middle_initial__gender_1st_person_in_household_first_name_ibe8610_01",
"first_name__middle_initial__gender_1st_person_in_household_gender_ibe8610_03","personicx_lifestage_segment_code_px001270_01","geo_federal_congressional_district_code_ibe2403_01", 
"heavy_transactors_ibe9358", "underbanked_ibe9351", "economic_stability_indicator_esi_ibe9350", "used_multiple_formula_with_minerals_in_the_last_6_months_ap000924",
"i_use_tv_media_for_information_and_inspiration_ap001578", "never_or_rarely_carry_a_balance_on_a_credit_card_financial_ap004921")

acxiom_concat_aa = acxiom_features_aa.withColumn("full_name", F.concat(acxiom_features_aa["input_first_name_3131"], F.lit(" "), acxiom_features_aa["input_last_name_3131"]))

## CREATE DEPENDENT VARIABLE

In [None]:
acxiom_y = acxiom_concat.withColumn("likely_member", when(acxiom["race_code_low_detail_ibe3101"] == "Black or African American", 1).otherwise(0))
acxiom_y_aa = acxiom_concat_aa.withColumn("likely_member", when(acxiom_aa["race_code_low_detail_ibe3101"] == "Black or African American", 1).otherwise(0))

## CREATE SAMPLE DATASET

In [None]:
sample_data = acxiom_y.sample(withReplacement=False, fraction=.0039, seed=456)
sample_data.count()

In [None]:
sample_data_aa = acxiom_y_aa.sample(withReplacement=False, fraction=.019, seed=654)
sample_data_aa.count()

In [None]:
sample_data_final = sample_data.union(sample_data_aa)

In [None]:
sample_data_final.count()

In [None]:
full_races = sample_data.groupBy(F.col("race_code_low_detail_ibe3101")).count()
full_races.show()

In [None]:
full_races = sample_data.groupBy(F.col("race_code_low_detail_ibe3101")).count()
full_races.show()

In [None]:
distinct_names = sample_data_final.select("full_name").distinct()
distinct_names= distinct_names.orderBy("full_name")
count1 = distinct_names.count()
# distinct_names.show()
print(count1)

In [None]:
ethnicolr_preds = sample_data_final.groupBy("full_name").agg(
    F.first("input_first_name_3131").alias("first_name"),
    F.first("input_last_name_3131").alias("last_name")
)
preds = ethnicolr_preds.orderBy("full_name")
count3 = ethnicolr_preds.count()
preds.show()
print(count3)

In [None]:
pd_preds = ethnicolr_preds.toPandas()

## APPLY ETHNICOLR FUNCTION TO DATA

In [None]:
# Define the columns to drop
drop_columns = ["first_name", "last_name"]

# Prefix for new columns
pred_fl_5_name_prefix = 'pred_fl_5_'

# Batch size
batch_size = 12000

# Calculate the number of iterations needed
num_iterations = -(-len(pd_preds) // batch_size)  # Equivalent to math.ceil(len(pd_preds_va) / batch_size)

# Create a new DataFrame to store the results
result_df = pd.DataFrame()

# Run the loop for each batch
for iteration in range(num_iterations):
    start_index = iteration * batch_size
    end_index = min((iteration + 1) * batch_size, len(pd_preds))

    # Extract the current batch
    current_batch = pd_preds.iloc[start_index:end_index]

    # Run the code for the current batch
    pred_fl_5_name_va = pred_fl_reg_name_five_cat(current_batch, "last_name", "first_name")
    pred_fl_5_name_va = pred_fl_5_name_va.drop(columns=drop_columns)
    
    new_column_names5 = [pred_fl_5_name_prefix + column for column in pred_fl_5_name_va.columns]
    pred_fl_5_name_va.columns = new_column_names5
    
    # Append the result to the new DataFrame
    result_df = result_df.append(pred_fl_5_name_va, ignore_index=True)

# Display the final result DataFrame
print("Final Result:")

In [None]:
pred_fl_5_name_match = result_df
new_column_names5 = {'pred_fl_5_full_name': 'full_name'}
pred_fl_5_name_match.rename(columns=new_column_names5, inplace=True)

In [None]:
ethnicolr_race = pred_fl_5_name_match[["full_name", "pred_fl_5_race"]]

In [None]:
ethnicolr_race_match = spark.createDataFrame(ethnicolr_race)

In [None]:
ethnicolr_race_match.count()

## USE ETHNICOLR OUTPUT AS NEW FEATURE IN SAMPLE DATA

In [None]:
final_sample_data = sample_data_final.join(ethnicolr_race_match, on = "full_name", how ="left")

In [None]:
final_sample_data = final_sample_data.withColumn("p_race", when(col("pred_fl_5_race") == "nh_black", "black").otherwise("other"))

In [None]:
final_sample_data.count()

In [None]:
sample_data_final.count()

In [None]:
full_races = acxiom_features.groupBy(F.col("race_code_low_detail_ibe3101")).count()
sample_races = final_sample_data.groupBy(F.col("race_code_low_detail_ibe3101")).count()
# countries = acxiom.groupBy(F.col("country_of_origin_high_detail_ibe3102")).count()
# countries = countries.orderBy(countries['count'].desc())
sample_races = sample_races.orderBy(sample_races['count'].desc())
full_races = full_races.orderBy(full_races['count'].desc())

In [None]:
total_count = sample_races.agg({"count": "sum"}).collect()[0][0]

# Add a new column with the percentage of the total for each row
sample_races = sample_races.withColumn("percentage", round((col("count") / total_count) * 100, 2))

sample_races.show(20, False)

## CLEAN NULL VALUES

In [None]:
filled_final_sample_data_cat = final_sample_data.withColumn("first_name__middle_initial__gender_1st_person_in_household_first_name_ibe8610_01", when(col("first_name__middle_initial__gender_1st_person_in_household_first_name_ibe8610_01").isNull(), "unknown").otherwise(col("first_name__middle_initial__gender_1st_person_in_household_first_name_ibe8610_01")))
filled_final_sample_data_cat = filled_final_sample_data_cat.withColumn("input_last_name_3131", when(col("input_last_name_3131").isNull(), "unknown").otherwise(col("input_last_name_3131")))
filled_final_sample_data_cat = filled_final_sample_data_cat.withColumn("first_name__middle_initial__gender_1st_person_in_household_gender_ibe8610_03", when(col("first_name__middle_initial__gender_1st_person_in_household_gender_ibe8610_03").isNull(), "Unknown").otherwise(col("first_name__middle_initial__gender_1st_person_in_household_gender_ibe8610_03")))
filled_final_sample_data_cat = filled_final_sample_data_cat.withColumn("personicx_lifestage_segment_code_px001270_01", when(col("personicx_lifestage_segment_code_px001270_01").isNull(), "unknown").otherwise(col("personicx_lifestage_segment_code_px001270_01")))
filled_final_sample_data_cat = filled_final_sample_data_cat.withColumn("geo_federal_congressional_district_code_ibe2403_01", when(col("geo_federal_congressional_district_code_ibe2403_01").isNull(), "unknown").otherwise(col("geo_federal_congressional_district_code_ibe2403_01")))
filled_final_sample_data_cat = filled_final_sample_data_cat.withColumn("heavy_transactors_ibe9358", when(col("heavy_transactors_ibe9358").isNull(), "unknown").otherwise(col("heavy_transactors_ibe9358")))
filled_final_sample_data_cat = filled_final_sample_data_cat.withColumn("underbanked_ibe9351", when(col("underbanked_ibe9351").isNull(), "unknown").otherwise(col("underbanked_ibe9351")))
filled_final_sample_data_cat = filled_final_sample_data_cat.withColumn("economic_stability_indicator_esi_ibe9350", when(col("economic_stability_indicator_esi_ibe9350").isNull(), "unknown").otherwise(col("economic_stability_indicator_esi_ibe9350")))
filled_final_sample_data_cat = filled_final_sample_data_cat.withColumn("used_multiple_formula_with_minerals_in_the_last_6_months_ap000924", when(col("used_multiple_formula_with_minerals_in_the_last_6_months_ap000924").isNull(), "unknown").otherwise(col("used_multiple_formula_with_minerals_in_the_last_6_months_ap000924")))
filled_final_sample_data_cat = filled_final_sample_data_cat.withColumn("never_or_rarely_carry_a_balance_on_a_credit_card_financial_ap004921", when(col("never_or_rarely_carry_a_balance_on_a_credit_card_financial_ap004921").isNull(), "unknown").otherwise(col("never_or_rarely_carry_a_balance_on_a_credit_card_financial_ap004921")))
filled_final_sample_data_cat = filled_final_sample_data_cat.withColumn("i_use_tv_media_for_information_and_inspiration_ap001578", when(col("i_use_tv_media_for_information_and_inspiration_ap001578").isNull(), "unknown").otherwise(col("i_use_tv_media_for_information_and_inspiration_ap001578")))

## Define the features and label columns

In [None]:

categorical_feature_columns = ["input_last_name_3131", "geo_federal_congressional_district_code_ibe2403_01", "personicx_lifestage_segment_code_px001270_01", "first_name__middle_initial__gender_1st_person_in_household_gender_ibe8610_03", 
                               "first_name__middle_initial__gender_1st_person_in_household_first_name_ibe8610_01", "heavy_transactors_ibe9358", "underbanked_ibe9351", "economic_stability_indicator_esi_ibe9350", "used_multiple_formula_with_minerals_in_the_last_6_months_ap000924", 
                               "never_or_rarely_carry_a_balance_on_a_credit_card_financial_ap004921", "i_use_tv_media_for_information_and_inspiration_ap001578", "p_race"]  # List the names of your feature columns


#  
label_column = "likely_member"  # Name of your label column

## SETUP MACHINE LEARNING PIPELINE

In [None]:
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index", handleInvalid="keep") for col in categorical_feature_columns]
encoder = [OneHotEncoder(inputCol=col+"_index", outputCol=col+"_encoded") for col in categorical_feature_columns]

In [None]:
stages = indexers + encoder

# Create a Pipeline
pipeline = Pipeline(stages=stages)

# Fit and transform the data using the pipeline
pipeline_model = pipeline.fit(filled_final_sample_data_cat)
data_encoded = pipeline_model.transform(filled_final_sample_data_cat)

In [None]:
pipeline_model_path = "Ethnicolr/pipeline_model"
pipeline_model.write().overwrite().save(pipeline_model_path)

In [None]:
data_encoded = data_encoded.select("full_name", "input_last_name_3131_encoded", "geo_federal_congressional_district_code_ibe2403_01_encoded", "personicx_lifestage_segment_code_px001270_01_encoded", "first_name__middle_initial__gender_1st_person_in_household_gender_ibe8610_03_encoded", 
                                   "first_name__middle_initial__gender_1st_person_in_household_first_name_ibe8610_01_encoded", "heavy_transactors_ibe9358_encoded", "underbanked_ibe9351_encoded", "economic_stability_indicator_esi_ibe9350_encoded", "used_multiple_formula_with_minerals_in_the_last_6_months_ap000924_encoded",
                                   "never_or_rarely_carry_a_balance_on_a_credit_card_financial_ap004921_encoded", "i_use_tv_media_for_information_and_inspiration_ap001578_encoded", "p_race_encoded", "p_race","likely_member")

# 

In [None]:
# data_encoded.printSchema()

In [None]:
feature_columns = ["input_last_name_3131_encoded", "geo_federal_congressional_district_code_ibe2403_01_encoded", "personicx_lifestage_segment_code_px001270_01_encoded", "first_name__middle_initial__gender_1st_person_in_household_gender_ibe8610_03_encoded", "first_name__middle_initial__gender_1st_person_in_household_first_name_ibe8610_01_encoded", 
                  "heavy_transactors_ibe9358_encoded", "underbanked_ibe9351_encoded", "economic_stability_indicator_esi_ibe9350_encoded", "used_multiple_formula_with_minerals_in_the_last_6_months_ap000924_encoded",
                   "never_or_rarely_carry_a_balance_on_a_credit_card_financial_ap004921_encoded", "i_use_tv_media_for_information_and_inspiration_ap001578_encoded", "p_race_encoded"]
 
# 
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

## SPLIT DATA INTO TRAINING AND TESTING

In [None]:
final_data_encoded = assembler.transform(data_encoded)
(training_data, test_data) = final_data_encoded.randomSplit([0.7, 0.3], seed=123)

## FIT LOGISTIC REGRESSION MODEL AND EVALUATE PERFORMANCE

In [None]:
lr = LogisticRegression(labelCol="likely_member", featuresCol="features")
lr_model = lr.fit(training_data)

# Make predictions on the test data
lr_predictions = lr_model.transform(test_data)

# Evaluate the model's performance
auc_evaluator = BinaryClassificationEvaluator(labelCol="likely_member", metricName="areaUnderROC")
auc = auc_evaluator.evaluate(lr_predictions)
print("Area Under ROC = %f" % auc)

In [None]:
# Get the coefficients
coefficients = lr_model.coefficients
intercept = lr_model.intercept

# Print the coefficients
print("Intercept: {}".format(intercept))

In [None]:
# tp = lr_predictions.filter((col("likely_member") == 1) & (col("prediction") == 1)).count()
# tn = lr_predictions.filter((col("likely_member") == 0) & (col("prediction") == 0)).count()
# fp = lr_predictions.filter((col("likely_member") == 0) & (col("prediction") == 1)).count()
# fn = lr_predictions.filter((col("likely_member") == 1) & (col("prediction") == 0)).count()

# # Calculate Accuracy
# calc_accuracy = (tp + tn) / (tp + tn + fp + fn)

# # Print the accuracy
# print("Accuracy:", calc_accuracy)

## SAVE MODEL PATH

In [None]:
model_path = "Ethnicolr/lr_model"
lr_model.write().overwrite().save(model_path)

## CLEAN FINAL DATASET AND SAVE TO S3

In [None]:
final_lr_predictions = lr_predictions.select("full_name", "p_race", "likely_member", "prediction")

In [None]:
final_lr_predictions.repartition(1).write.format("csv").mode("overwrite").option("header", "true").save("s3://cinqcareai-ds-dev-unrestricted/layer_f/ethnicolr_analysis/model_analysis/")