In [0]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col


In [0]:

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Income Prediction") \
    .getOrCreate()

# Read the train and test data
train = spark.read.csv("s3://adhoc-query-data/Leila/check_data/train.tsv", sep="\t", header=True, inferSchema=True)
test = spark.read.csv("s3://adhoc-query-data/Leila/check_data/test.tsv", sep="\t", header=True, inferSchema=True)

In [0]:
test.show(1,truncate=False)

+---+---+---------+------+---------+-------------+--------------+-----------------+------------+-----+----+------------+------------+--------------+--------------+
|id |age|workclass|fnlwgt|education|education-num|marital-status|occupation       |relationship|race |sex |capital-gain|capital-loss|hours-per-week|native-country|
+---+---+---------+------+---------+-------------+--------------+-----------------+------------+-----+----+------------+------------+--------------+--------------+
|0  |25 |Private  |226802|11th     |7            |Never-married |Machine-op-inspct|Own-child   |Black|Male|0           |0           |40            |United-States |
+---+---+---------+------+---------+-------------+--------------+-----------------+------------+-----+----+------------+------------+--------------+--------------+
only showing top 1 row



In [0]:
for col in train.columns:
    num_missing = train.filter(train[col].isNull()).count()
    print(f"Number of missing values in '{col}' column is {num_missing}")

Number of missing values in 'id' column is 0
Number of missing values in 'age' column is 0
Number of missing values in 'workclass' column is 1836
Number of missing values in 'fnlwgt' column is 0
Number of missing values in 'education' column is 0
Number of missing values in 'education-num' column is 0
Number of missing values in 'marital-status' column is 0
Number of missing values in 'occupation' column is 1843
Number of missing values in 'relationship' column is 0
Number of missing values in 'race' column is 0
Number of missing values in 'sex' column is 0
Number of missing values in 'capital-gain' column is 0
Number of missing values in 'capital-loss' column is 0
Number of missing values in 'hours-per-week' column is 0
Number of missing values in 'native-country' column is 583
Number of missing values in 'income' column is 0


In [0]:

# Preprocess the data
train = train.withColumn("label", (train["income"] == ">50K").cast("int"))
train = train.drop("income")
columns_to_drop = ["education-num", "fnlwgt", "capital-loss"]
train = train.drop(*columns_to_drop)
test = test.drop(*columns_to_drop)

# Fill missing values in categorical columns with mode
categorical_columns = ["workclass", "occupation", "native-country"]
for col in categorical_columns:
    mode_value = train.select(col).groupBy(col).count().orderBy("count", ascending=False).first()[col]
    train = train.na.fill({col: mode_value})
    test = test.na.fill({col: mode_value})

In [0]:

no_work_or_pay = ['Never-worked', 'Without-pay']
train = train.withColumn('workclass', F.when(train['workclass'].isin(no_work_or_pay), 'no_work_or_pay').otherwise(train['workclass']))

postgrad_education = ['Masters', 'Prof-school', 'Doctorate']
basic_education = ['Preschool', '1st-4th', '5th-6th', '11th', '9th', '7th-8th', '10th', '12th']
associate = ['Assoc-acdm', 'Assoc-voc']
train = train.withColumn('education', F.when(train['education'].isin(postgrad_education), 'postgrad').when(train['education'].isin(basic_education), 'basic_education').when(train['education'].isin(associate), 'associate').otherwise(train['education']))

single = ['Never-married', 'Separated']
after_marriage = ['Married-spouse-absent', 'Widowed', 'Divorced']
AF_civ = ['Married-AF-spouse', 'Married-civ-spouse']
train = train.withColumn('marital-status', F.when(train['marital-status'].isin(single), 'single').when(train['marital-status'].isin(after_marriage), 'after_marriage').when(train['marital-status'].isin(AF_civ), 'AF_civ').otherwise(train['marital-status']))

service = ['Priv-house-serv', 'Other-service', 'Handlers-cleaners']
other = ['Armed-Forces', 'Farming-fishing', 'Machine-op-inspct','Adm-clerical']
move_repair = ['Transport-moving', 'Craft-repair']
sale_support = ['Sales', 'Tech-support']
train = train.withColumn('occupation', F.when(train['occupation'].isin(service), 'service').when(train['occupation'].isin(other), 'other').when(train['occupation'].isin(move_repair), 'move_repair').when(train['occupation'].isin(sale_support), 'sale_support').otherwise(train['occupation']))

other_countries = ['Trinadad&Tobago', 'Cambodia', 'Thailand', 'Laos', 'Yugoslavia', 'Outlying-US(Guam-USVI-etc)', 'Honduras', 'Hungary', 'Scotland', 'Holand-Netherlands']
wealthy_countries = ['Ireland', 'United-States', 'Cuba', 'China', 'Greece', 'Hong', 'Philippines', 'Germany', 'Canada', 'England', 'Italy', 'Japan', 'Taiwan', 'India', 'France', 'Iran']
less_wealthy = ['Dominican-Republic', 'Columbia', 'Guatemala', 'Mexico', 'Nicaragua', 'Peru', 'Vietnam', 'El-Salvador', 'Haiti', 'Puerto-Rico', 'Portugal']
train = train.withColumn('native-country', F.when(train['native-country'].isin(other_countries), 'other').when(train['native-country'].isin(wealthy_countries), 'wealthy_countries').when(train['native-country'].isin(less_wealthy), 'less_wealthy').otherwise(train['native-country']))


In [0]:
train.select('marital-status').distinct().collect()

Out[6]: [Row(marital-status='single'),
 Row(marital-status='after_marriage'),
 Row(marital-status='AF_civ')]

In [0]:

# Get the columns with string data type
df_cat = train.select([col for col, dtype in train.dtypes if dtype == 'string'])

# Get the list of categorical attribute names
attr_cat = df_cat.columns



In [0]:
# Convert categorical variables into one-hot encoded format

for attr in attr_cat:
    indexer = StringIndexer(inputCol=attr, outputCol=attr+"_index")
    encoder = OneHotEncoder(inputCol=attr+"_index", outputCol=attr+"_encoded")
    pipeline = Pipeline(stages=[indexer, encoder])
    train = pipeline.fit(train).transform(train)
    test = pipeline.fit(test).transform(test)
    # Drop the original categorical column
    train = train.drop(attr)
    test = test.drop(attr)


In [0]:

X = train.select([col for col in train.columns if col not in ['label', 'id']])

# Select only the 'label' column to create target dataframe y
y = train.select('label')

In [0]:
# Ensure both train and test datasets have the same features as the training dataset
features_ls = X.columns
test = test.select(*features_ls).fillna(0)


In [0]:
# Get the schema of the DataFrame
train_schema = train.schema

# Identify string columns in the schema
string_columns = [col.name for col in train_schema if col.dataType == "string"]

# Filter the string columns based on the list of categorical columns
categorical_columns = [col for col in string_columns if col in ["workclass", "occupation", "native-country"]]

# Define the stages for StringIndexer and OneHotEncoder
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index", handleInvalid="keep") for col in categorical_columns]
encoders = [OneHotEncoder(inputCol=col+"_index", outputCol=col+"_encoded") for col in categorical_columns]

# Assemble all features into a single feature vector
assembler = VectorAssembler(inputCols=[col+"_encoded" for col in categorical_columns] + [col for col in train.columns if col not in categorical_columns + ["label"]],
                            outputCol="features")

# Apply the VectorAssembler to the train DataFrame
train = assembler.transform(train)


# Split dataset into train and holdout sets
train, holdout = train.randomSplit([0.8, 0.2], seed=42)

# Define Logistic Regression model
lr = LogisticRegression(featuresCol='features', labelCol='label')

# Fit the Logistic Regression model
model = lr.fit(train)

# Make predictions
predictions = model.transform(holdout)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label")
auc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
print("ROC AUC:", auc)



# Perform cross-validation


paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=5)

cvModel = crossval.fit(train)

# Get average AUC across all folds
avg_auc = cvModel.avgMetrics[0]
print("Mean AUC score:", avg_auc)


ROC AUC: 0.9030538945304412
Mean AUC score: 0.8881171548231684
