# Adult Dataset

<https://archive.ics.uci.edu/dataset/2/adult>

In [45]:
import sys
sys.path.append("..")
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.sql.functions import col, when, ltrim
from pyspark.sql.types import  IntegerType, DoubleType,StringType, StructField, StructType
from pyspark.sql.session import SparkSession
from helpers.path_translation import translate_to_file_string
from helpers.data_prep_and_print import print_df

In [46]:
input_file = translate_to_file_string("../../data/adult.data")

In [47]:
spark = (SparkSession
       .builder   
       .master("local[*]")
       .appName("Adult DataSet")
       .getOrCreate())

In [48]:
adultSchema = StructType([ \
    StructField("age",IntegerType(),nullable=False), \
    StructField("workclass",StringType(),nullable=True), \
    StructField("fnlwgt",DoubleType(),nullable=False), \
    StructField("education", StringType(), nullable=False), \
    StructField("education-num", DoubleType(), False), \
    StructField("marital-status", StringType(), False), \
    StructField("occupation", StringType(), True), \
    StructField("relationship", StringType(), False),  \
    StructField("race", StringType(), False),  \
    StructField("sex", StringType(), False),  \
    StructField("capital-gain", DoubleType(), False),  \
    StructField("capital-loss", DoubleType(), False),  \
    StructField("hours-per-week", DoubleType(), False),  \
    StructField("native-country", StringType(), False),  \
    StructField("income", StringType(), False)])

In [None]:
# load data file.
# create a DataFrame
df = spark.read.option("header", "false") \
       .option("inferSchema","false")\
       .schema(adultSchema) \
       .option("delimiter", ",") \
       .csv(input_file) \
       .withColumn("label",col("income")== "<=50") \
       .withColumn("workclass",when (col("workclass") == " ?", None).otherwise(ltrim(col("workclass"))))\
       .withColumn("sex",when(col("sex")==" Male", False).when(col("sex")==" Female",True).otherwise(None)) 
df.printSchema()


### Remove null values

In [None]:
df_without_null= df.dropna()
print("# "+ str(df.count() - df_without_null.count())+ " Rows deleted")

### Transform string attributes

In [None]:
# Build StringIndexer
workclass_indexer = StringIndexer().setInputCol("workclass").setOutputCol("workclass-num").fit(df_without_null)
marital_indexer = StringIndexer().setInputCol("marital-status").setOutputCol("marital-status-num").fit(df_without_null)
occupation_indexer = StringIndexer().setInputCol("occupation").setOutputCol("occupation-num").fit(df_without_null)
relationship_indexer = StringIndexer().setInputCol("relationship").setOutputCol("relationship-num").fit(df_without_null)
race_indexer = StringIndexer().setInputCol("race").setOutputCol("race-num").fit(df_without_null)
country_indexer = StringIndexer().setInputCol("native-country").setOutputCol("native-country-num").fit(df_without_null)
df_indexed =  country_indexer.transform(
                race_indexer.transform(
                relationship_indexer.transform(
                occupation_indexer.transform(
                marital_indexer.transform(
                workclass_indexer.transform(df_without_null))))))
# Build one hot encoding
onehot_encoder = OneHotEncoder(inputCols=["workclass-num","marital-status-num","occupation-num","relationship-num","race-num","native-country-num"], 
              outputCols=["workclass-ohe","marital-status-ohe","occupation-ohe","relationship-ohe","race-ohe","native-country-ohe"]).fit(df_indexed)
df_encoded = onehot_encoder.transform(df_indexed)
print_df(df_encoded, 5)

In [None]:
#feature columns
feature_cols = ["age","workclass-ohe","education-num","marital-status-ohe","occupation-ohe",
               "relationship-ohe","race-ohe","sex","capital-gain","capital-loss","hours-per-week",
               "native-country-ohe"]

#vector assembler of all features
assembler =  VectorAssembler(outputCol="features", inputCols=feature_cols)

labeled_point_ds = assembler.transform(df_encoded)
print_df(labeled_point_ds,5)

In [53]:
#split data for testing
splits = labeled_point_ds.randomSplit([0.6, 0.4 ], 5756)
train = splits[0]
test = splits[1]

In [54]:
spark.stop()