# 1. Build Classification Model
First, you will need to prepare each of the input features. While age is a numeric feature, state and name are not. These need to be converted into numeric vectors before you can train the model. Use a StringIndexer along with the OneHotEncoderEstimator to convert the name, state, and sex columns into numeric vectors. Use the VectorAssembler to combine the name, state, and age vectors into a single features vector. Your final dataset should contain a column called features containing the prepared vector and a column called label containing the sex of the person.

NOTE: my work in this notebook borrows heavily from https://docs.databricks.com/applications/machine-learning/mllib/binary-classification-mllib-pipelines.html.

**NOTE** I only used one of the `.parquet` files because it took so long to run, the Databricks cluster stopped before the code ran (and that was only for the initial fitting). Even with only one file, it takes a long time to run and I recognize that the scores are all skewed as a result, but it shows that it works, which I believe is the ultimate objective.

In [2]:
# Imports
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

## Load The Data
Again, I'm only loading one file. In a real-world scenario, I'd load all the files. If this were a dynamic model, I'd actually use Stream Processing to add new files periodically and re-run the model with a periodicity either equal to the stream flow, or something that makes sense for the given business context.

In [4]:
#dbutils.fs.rm('/FileStore/tables/baby-data', True) 
dbutils.fs.ls("/FileStore/tables/bn-parquet")

In [5]:
df = spark.read.parquet("/FileStore/tables/bn-parquet/part_00018_5eb32e4b_dc40_4002_8fc0_581dba1df650_c000_snappy-3ef31.parquet")
print((df.count(), len(df.columns)))
cols = df.columns

## Transform Data
A rew things are happening here:

1. Columns are setup for the one-hot encoding since OHE adds columns for each categorical value
2. The OHE is done for name and state
3. The label is set for sex, which itself is encoded

In [7]:
categoricalColumns = ["name", "state"]
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

label_stringIdx = StringIndexer(inputCol="sex", outputCol="label")
stages += [label_stringIdx]

numericCols = ["age"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [8]:
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(df)
preppedDataDF = pipelineModel.transform(df)
print((preppedDataDF.count(), len(preppedDataDF.columns)))
print(preppedDataDF.describe)
print(preppedDataDF.head(10))

## Useless LR
I left this here for some reason that I'm not quite sure of. It does a fit on the prepped data, but the model is never used because in the next steps, the data is split into test and train and the LR is re-built.

In [10]:
# Fit model to prepped data
lrModel = LogisticRegression().fit(preppedDataDF)

# ROC for training data
display(lrModel, preppedDataDF, "ROC")
display(lrModel, preppedDataDF)

fitted values,residuals
-15.691365487314451,-1.5322389173313912e-07
-11.330694571902546,-1.1998768112439006e-05
-14.709322095219555,-4.090933348735817e-07
-20.6251670358232,-1.1030717632373219e-09
-17.920270910658985,-1.649397082844531e-08
-16.95458223921644,-4.3322994913639166e-08
-14.527364286673585,-4.907334258772004e-07
-17.919169571813025,-1.6512146285774298e-08
-19.659443944302343,-2.897423488219549e-09
-20.21734441319765,-1.6585130071445314e-09


## Split Data
This is a simple split for test and train. The data is so strange that I'm not really sure if it's even value. For example, it looks like the exact same observations appear many times in the `.parquet` files. Regardless, the data here is a 70%/30% split, train to test.

In [12]:
selectedcols = ["label", "features"] + cols
dataset = preppedDataDF.select(selectedcols)
display(dataset)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)
print(trainingData.count())
print(testData.count())

label,features,name,state,sex,age
0.0,"List(0, 6756, List(210, 6753, 6755), List(1.0, 1.0, 53.0))",Phillip,MO,M,53
0.0,"List(0, 6756, List(210, 6753, 6755), List(1.0, 1.0, 53.0))",Phillip,MO,M,53
0.0,"List(0, 6756, List(210, 6753, 6755), List(1.0, 1.0, 53.0))",Phillip,MO,M,53
0.0,"List(0, 6756, List(210, 6753, 6755), List(1.0, 1.0, 53.0))",Phillip,MO,M,53
0.0,"List(0, 6756, List(210, 6753, 6755), List(1.0, 1.0, 53.0))",Phillip,MO,M,53
0.0,"List(0, 6756, List(210, 6753, 6755), List(1.0, 1.0, 53.0))",Phillip,MO,M,53
0.0,"List(0, 6756, List(210, 6753, 6755), List(1.0, 1.0, 53.0))",Phillip,MO,M,53
0.0,"List(0, 6756, List(210, 6753, 6755), List(1.0, 1.0, 53.0))",Phillip,MO,M,53
0.0,"List(0, 6756, List(210, 6753, 6755), List(1.0, 1.0, 53.0))",Phillip,MO,M,53
0.0,"List(0, 6756, List(210, 6753, 6755), List(1.0, 1.0, 53.0))",Phillip,MO,M,53


# 2. Fit and Evaluate Model
## First Build the Model
The logistic regression model is fit with the training data and then run against the test data. The output shows not only the predictions, but also the probability for each option (1 = Male, 2 = Female). Given the weird data, the model is likely pretty accurate, but not necessarily predictive with out-of-sample data. Without statistical evidence one way or another, my intuition says the data results in an overfitted model.

In [14]:
# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.3, elasticNetParam=0.8)
# Train model with Training Data
lrModel = lr.fit(trainingData)

predictions = lrModel.transform(testData)
selected = predictions.select("label", "prediction", "probability", "age", "name", "state")
display(selected)

label,prediction,probability,age,name,state
0.0,0.0,"List(1, 2, List(), List(0.5231280822547849, 0.476871917745215))",56,Mary,MS
0.0,0.0,"List(1, 2, List(), List(0.5231280822547849, 0.476871917745215))",60,Mary,MS
0.0,0.0,"List(1, 2, List(), List(0.5231280822547849, 0.476871917745215))",60,Mary,MS
0.0,0.0,"List(1, 2, List(), List(0.5231280822547849, 0.476871917745215))",60,Mary,MS
0.0,0.0,"List(1, 2, List(), List(0.5231280822547849, 0.476871917745215))",61,Mary,MS
0.0,0.0,"List(1, 2, List(), List(0.5231280822547849, 0.476871917745215))",61,Mary,MS
0.0,0.0,"List(1, 2, List(), List(0.5231280822547849, 0.476871917745215))",61,Mary,MS
0.0,0.0,"List(1, 2, List(), List(0.5231280822547849, 0.476871917745215))",62,Mary,MS
0.0,0.0,"List(1, 2, List(), List(0.5231280822547849, 0.476871917745215))",62,Mary,MS
0.0,0.0,"List(1, 2, List(), List(0.5231280822547849, 0.476871917745215))",63,Mary,MS


## Evaluation
The area under the curve is quite good at `82.44%`, but again, this seems like the result of an overfitted model based on flawed (manufactured) data. That could be totally false, but just looking at the repeated observations within the files leads me to this theory. 

A great deal more would be required to move this forward:
1. Build deep desriptive analytics for the data to see if it's flawed. If so, rebuild it to more accurately represent the real population.
2. Use cross-fold validation for data split.
3. Use hyper-parameter tuning for the LR model.
4. Try other algorithms, especially an ensemble such as Random Forest.

In [16]:
# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
results = evaluator.evaluate(predictions)
print(results)
evaluator.getMetricName()