Set constants

In [187]:
import os

ROOT = r".."
DATA_IN = os.path.join(ROOT, "data")
SOURCE_DATASET = os.path.join(DATA_IN ,"adult.data")
SOURCE_ATTRIBUTES = os.path.join(DATA_IN, "attributes.csv")
SOURCE_TEST_DATA = os.path.join(DATA_IN, "adult.test")
MODELS_DIR = os.path.join(ROOT, ".models")

print(SOURCE_DATASET)
print(SOURCE_ATTRIBUTES)
print(SOURCE_TEST_DATA)

../data/adult.data
../data/attributes.csv
../data/adult.test


Initiate Spark session

In [188]:
from pyspark.sql import SparkSession


spark=SparkSession \
    .builder \
    .appName("Income Predictor") \
    .getOrCreate()

Load the dataset

In [189]:
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType, BooleanType

attrs = spark.read.csv(SOURCE_ATTRIBUTES, header=True)
attrs.show()

def get_field(t: str):
    if t == "float":
        return FloatType()
    elif t == "integer":
        return IntegerType()
    elif t == "enum":
        return StringType()
    elif t == "boolean":
        return BooleanType()
    else:
        raise Exception("Not expected: %s" % t)

def to_struct(row) -> StructField:
    return StructField(row['name'], get_field(row['type']), nullable=False)

struct_fields = attrs.rdd.map(to_struct).collect()

schema=StructType(struct_fields)

+-----+-----+--------------------+--------------------+
| name| type|         description|              values|
+-----+-----+--------------------+--------------------+
|  f_1|float|                 age|                null|
|  f_2| enum|           workclass|Private, Self-emp...|
|  f_3|float|              fnlwgt|                null|
|  f_4| enum|           education|Bachelors, Some-c...|
|  f_5|float|       education-num|                null|
|  f_6| enum|      marital-status|Married-civ-spous...|
|  f_7| enum|          occupation|Tech-support, Cra...|
|  f_8| enum|        relationship|Wife, Own-child, ...|
|  f_9| enum|                race|White, Asian-Pac-...|
| f_10| enum|                 sex|        Female, Male|
| f_11|float|        capital-gain|                null|
| f_12|float|        capital-loss|                null|
| f_13|float|      hours-per-week|                null|
| f_14| enum|      native-country|United-States, Ca...|
|label| enum|predictor class: ...|              

In [191]:
def load_data(file):
    return spark.read.load(file, format="csv", sep=",", header=False, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True, schema=schema)

df = load_data(SOURCE_DATASET)
df.show(5)
df.printSchema()

+----+----------------+--------+---------+----+------------------+-----------------+-------------+-----+------+------+----+----+-------------+-----+
| f_1|             f_2|     f_3|      f_4| f_5|               f_6|              f_7|          f_8|  f_9|  f_10|  f_11|f_12|f_13|         f_14|label|
+----+----------------+--------+---------+----+------------------+-----------------+-------------+-----+------+------+----+----+-------------+-----+
|39.0|       State-gov| 77516.0|Bachelors|13.0|     Never-married|     Adm-clerical|Not-in-family|White|  Male|2174.0| 0.0|40.0|United-States|<=50K|
|50.0|Self-emp-not-inc| 83311.0|Bachelors|13.0|Married-civ-spouse|  Exec-managerial|      Husband|White|  Male|   0.0| 0.0|13.0|United-States|<=50K|
|38.0|         Private|215646.0|  HS-grad| 9.0|          Divorced|Handlers-cleaners|Not-in-family|White|  Male|   0.0| 0.0|40.0|United-States|<=50K|
|53.0|         Private|234721.0|     11th| 7.0|Married-civ-spouse|Handlers-cleaners|      Husband|Black|  

In [192]:
#create a list of the columns that are string typed
categoricalColumns = [item[0] for item in df.dtypes if item[1].startswith('string')]


Next we need to convery text values to numeric index values

In [193]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

indexers = [StringIndexer(inputCol=column, outputCol=column+"i").fit(df) for column in categoricalColumns ]
pipeline = Pipeline(stages=indexers)


def index_strings(df):
    dfi=pipeline.fit(df).transform(df)
    return dfi.drop(*categoricalColumns).withColumnRenamed('labeli', 'label')

df_i=index_strings(df)
df_i.show(5)

+----+--------+----+-------+----+----+----+----+----+----+----+----+-----+-----+-----+
| f_1|     f_3| f_5|   f_11|f_12|f_13|f_2i|f_4i|f_6i|f_7i|f_8i|f_9i|f_10i|f_14i|label|
+----+--------+----+-------+----+----+----+----+----+----+----+----+-----+-----+-----+
|39.0| 77516.0|13.0| 2174.0| 0.0|40.0| 4.0| 2.0| 1.0| 3.0| 1.0| 0.0|  0.0|  0.0|  0.0|
|50.0| 83311.0|13.0|    0.0| 0.0|13.0| 1.0| 2.0| 0.0| 2.0| 0.0| 0.0|  0.0|  0.0|  0.0|
|38.0|215646.0| 9.0|    0.0| 0.0|40.0| 0.0| 0.0| 2.0| 9.0| 1.0| 0.0|  0.0|  0.0|  0.0|
|53.0|234721.0| 7.0|    0.0| 0.0|40.0| 0.0| 5.0| 0.0| 9.0| 0.0| 1.0|  0.0|  0.0|  0.0|
|28.0|338409.0|13.0|    0.0| 0.0|40.0| 0.0| 2.0| 0.0| 0.0| 4.0| 1.0|  1.0|  9.0|  0.0|
|37.0|284582.0|14.0|    0.0| 0.0|40.0| 0.0| 3.0| 0.0| 2.0| 4.0| 0.0|  1.0|  0.0|  0.0|
|49.0|160187.0| 5.0|    0.0| 0.0|16.0| 0.0|10.0| 5.0| 5.0| 1.0| 1.0|  1.0| 11.0|  0.0|
|52.0|209642.0| 9.0|    0.0| 0.0|45.0| 1.0| 0.0| 0.0| 2.0| 0.0| 0.0|  0.0|  0.0|  1.0|
|31.0| 45781.0|14.0|14084.0| 0.0|50.0| 0.0|

In [194]:
from pyspark.ml.feature import VectorAssembler, Normalizer
VECTOR_COL='features'
features=list(df_i.drop("label").toPandas().columns)

assembler=VectorAssembler(inputCols=features, outputCol=VECTOR_COL)

def create_feature_vector(df):
    df_vector = assembler.transform(df)
    return df_vector.select(*[VECTOR_COL, 'label'])

df_vector = create_feature_vector(df_i)
df_vector.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[39.0,77516.0,13....|  0.0|
|(14,[0,1,2,5,6,7,...|  0.0|
|(14,[0,1,2,5,8,9,...|  0.0|
|(14,[0,1,2,5,7,9,...|  0.0|
|[28.0,338409.0,13...|  0.0|
+--------------------+-----+
only showing top 5 rows



Next we to normalize the feature vectors using $L^1$ norm 

In [195]:
normalizer = Normalizer().setInputCol(VECTOR_COL).setOutputCol(f'{VECTOR_COL}_n').setP(1.0)


def normalize_vector(df_vector):
    df_vector = normalizer.transform(df_vector)
    return df_vector.drop(VECTOR_COL).withColumnRenamed(f'{VECTOR_COL}_n',VECTOR_COL)

df_vector = normalize_vector(df_vector)
df_vector.show(5)


+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[4.88764678605892...|
|  0.0|(14,[0,1,2,5,6,7,...|
|  0.0|(14,[0,1,2,5,8,9,...|
|  0.0|(14,[0,1,2,5,7,9,...|
|  0.0|[8.27161624427264...|
+-----+--------------------+
only showing top 5 rows



In [198]:
from pyspark.ml.classification import LogisticRegression

training_data=df_vector

# Create a LogisticRegression instance. This instance is an Estimator.
lr = LogisticRegression(maxIter=10, regParam=0.01, featuresCol=VECTOR_COL, labelCol="label")
# Print out the parameters, documentation, and any default values.
print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")
# Learn a LogisticRegression models. This uses the parameters stored in lr.
model1 = lr.fit(training_data)
print("Model 1 was fit using parameters: ")
model1.extractParamMap()
# We may alternatively specify parameters using a Python dictionary as a paramMap
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 30  # Specify 1 Param, overwriting the original maxIter.
# Specify multiple Params.
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55})  # type: ignore
# You can combine paramMaps, which are python dictionaries.
# Change output column name
paramMap2 = {lr.probabilityCol: "probability"}  # type: ignore
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2)  # type: ignore
# Now learn a new models using the paramMapCombined parameters.
# paramMapCombined overrides all parameters set earlier via lr.set* methods.
model2 = lr.fit(training_data, paramMapCombined)
print("Model 2 was fit using parameters: ")
model2.extractParamMap()

model=model2

LogisticRegression parameters:
aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features, current: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: label)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under boun

In [207]:
test_data=load_data(SOURCE_TEST_DATA)
test_data=index_strings(test_data)
test_data=create_feature_vector(test_data)
test_data=normalize_vector(test_data)

test_data.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[1.10186038106739...|
|  0.0|(14,[0,1,2,5,9],[...|
|  1.0|(14,[0,1,2,5,6,7,...|
|  1.0|(14,[0,1,2,3,5,7,...|
|  0.0|[1.73795500627594...|
+-----+--------------------+
only showing top 5 rows



In [208]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.util import JavaMLWritable
from pyspark.sql import DataFrame

# Make predictions on test data using the Transformer.transform() method.
# LogisticRegression.transform will only use the 'features' column.
prediction = model.transform(test_data)
# obtain evaluator.
evaluator = MulticlassClassificationEvaluator(metricName="accuracy", labelCol='label')
# compute the classification error on test data.
accuracy = evaluator.evaluate(prediction)
print("Test Accuracy = %g" % accuracy)
accuracy

Test Accuracy = 0.781095


0.7810945273631841