In [1]:
from pyspark.sql import SparkSession
import os
import subprocess
from pyspark.sql.functions import col, udf, when, filter
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import VectorAssembler, Tokenizer, Word2Vec, OneHotEncoder, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
import numpy as np
import matplotlib.pyplot as plt
import scipy
import math
# import pyspark.pandas as ps

This notebook was used to experiment with transforming the extracted data and building logistic regression models off of it, as well as evaluating those models. The predictionPipeline file consolodates this into one script that could be used to make predictions on an arbitrary extracted dataset, using the best model found here.

In [2]:
spark = SparkSession.builder.master("local[4]").appName("Transform").config("spark.ui.port", '4050').getOrCreate()
sc = spark.sparkContext

24/12/19 20:56:23 WARN Utils: Your hostname, Jacobs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.104 instead (on interface en0)
24/12/19 20:56:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/19 20:56:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# get csv file that was created in the extract file
# extract program is set up so that only one csv will be in the extractedDataset folder, so we can use this regex to
# get the right file without having to type the whole name out
fileName = subprocess.check_output('ls extractedDataset | grep ".*.csv"', shell=True, text=True).removesuffix('\n')
path = os.getcwd() + "/extractedDataset/" + fileName
df = spark.read.csv(path, header=True, nullValue='')

In [4]:
# function that applies all the transformations needed to the dataset to perform model training
def transformDataset(df):
    # in the database, they only store a value in the gender field if the artist is female
    # otherwise it is null
    # if there are multiple artists attributed to an object, the gender of each artist is shown, separated by a '|'
    df = genderColumn(df)

    # drop culture because it has a ton of nulls, I think artist nationality along with the department it's from and year
    # it was made will still allow you to somewhat infer the culture it came from
    df = df.drop('culture')
    # can't have null values in model
    df = df.dropna()

    # change nationality column into word2Vec encoding
    df = vectorizeNationality(df)

    # change object name column into word2vec encoding
    df = vectorizeObject(df)

    #change department to one hot encoding, since this is categorical
    df = oneHotDepartment(df)

    #change isHighlight and isTimelineWork to integers
    df = df.withColumn('isHighlightInt', when(df.isHighlight == True, 1).otherwise(0)).drop('isHighlight')
    df = df.withColumn('isTimelineWorkInt', when(df.isTimelineWork == True, 1).otherwise(0)).drop('isTimelineWork')

    # cast accession year and object end date data types to integers

    df = df.withColumn('accessionYearInt', df.accessionYear.cast(IntegerType())).drop('accessionYear')\
         .withColumn('objectDate', df.objectEndDate.cast(IntegerType())).drop('objectEndDate')\
          .drop('objectID')

    
    return df

In [5]:
#create custom function to convert gender column to boolean indicating if artist is female
# 0=Male, 1=Female
# in the database, they only store a value in the gender field if the artist is female
# otherwise it is null
def convertGender(s):
    if s == 'male':
        return 0
    else:
        return 1
# register user defined functions
convertGenderUDF = udf(lambda x:convertGender(x), IntegerType())

# take artistGender column, apply user-defined function to columns and put it in new column 
def genderColumn(df):
    # change nulls to male here because I don't know what datatype spark uses to represent nulls
    # So I cant detect if a value is null when I'm applying convertGender to each individual value in the column
    x = df.na.fill(value='male', subset=['artistGender'])
    # create new column that is made from applying the custom function to the gender column
    x = x.withColumn("isFemale", convertGenderUDF(col("artistGender")))
    x = x.drop("artistGender")
    return x

In [6]:
def vectorizeNationality(df):
    # change strings in nationality column to list of tokens, which is just each word in the string
    nationalityTokenizer = Tokenizer(outputCol="nationalityWords")
    nationalityTokenizer.setInputCol("artistNationality")
    df = nationalityTokenizer.transform(df)

    # word2vec doesn't really extract any interesting patterns from the nationality column
    # but it does help in reducing its dimensions
    # in the bag of words approach, there were 108 dimensions
    word2vec = Word2Vec(vectorSize=5, inputCol="nationalityWords", outputCol='nationalityVec', seed=123)
    model = word2vec.fit(df)
    df = model.transform(df)
    df = df.drop('artistNationality')
    df = df.drop('nationalityWords')
    return df

In [7]:
#exact same process as vectorizeNationality
def vectorizeObject(df):
    objectNameTokenizer = Tokenizer(outputCol="objectNameWords")
    objectNameTokenizer.setInputCol("objectName")
    df = objectNameTokenizer.transform(df)

    word2vecObject = Word2Vec(vectorSize=5, inputCol="objectNameWords", outputCol='objectNameVec', seed=456)
    model = word2vecObject.fit(df)
    df = model.transform(df)
    
    df = df.drop('objectName')
    df = df.drop('objectNameWords')

    return df


In [8]:
def oneHotDepartment(df):
    stringid = StringIndexer(inputCol='department', outputCol='departmentIndex')
    model = stringid.fit(df)
    df = model.transform(df)

    ohe = OneHotEncoder(inputCol='departmentIndex', outputCol='departmentCode')
    model2 = ohe.fit(df)
    df = model2.transform(df)
    
    df = df.drop('department')
    df = df.drop('departmentIndex')
    

    return df


In [9]:
# Split into train and test sets, and transform each one
# standard to put 75% of data in training set and 25% in test set
transformed_df = transformDataset(df)
trainDf, testDf = transformed_df.randomSplit(weights=[0.75, 0.25], seed=123)

24/12/19 20:56:31 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/12/19 20:56:31 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


In [10]:
# transforms the dataset into the form we need to train the model
def transformIntoInput(df, weightVals:dict=None):
    features = df.columns
    features.remove('isHighlightInt')
    vector = VectorAssembler(inputCols=features, outputCol='features', handleInvalid='skip')
    vec = vector.transform(df)
    vec = vec.select(col("isHighlightInt").alias("label"), col("features"))

    if weightVals != None:
        vec = vec.withColumn('weight', when(vec.label == 1, weightVals[1]).otherwise(weightVals[0]))
    return vec


In [11]:
# creates the logistic regression model and datasets
# uses cross validation and evaluates the models produced to find the best model
# by default uses 2/3 of the data in the training set and 1/3 in the validation set
def makeAndEvaluateModel(df, regularizationParam = 0., threshold=0.5):
    inputVec = transformIntoInput(df)
    
    logReg = LogisticRegression()
    logReg =logReg.setMaxIter(10)
    logReg =logReg.setRegParam(regularizationParam)
    logReg =logReg.setFeaturesCol('features')
    logReg =logReg.setProbabilityCol('probability')
    logReg = logReg.setThreshold(threshold)


    grid = ParamGridBuilder().addGrid(logReg.maxIter, [0,1]).build()
    evaluator = BinaryClassificationEvaluator()
    cv = CrossValidator(estimator=logReg, estimatorParamMaps=grid, evaluator=evaluator, parallelism=2)
    cvModel = cv.fit(inputVec)

    return cvModel

In [12]:
# Train the above model using our training set
m = makeAndEvaluateModel(trainDf)

24/12/19 20:56:34 WARN BlockManager: Block rdd_75_0 already exists on this machine; not re-adding it


Below are the metrics of the best model made through cross-validation:

In [13]:
best = m.bestModel

summary = best.summary
print("Accuracy: ",summary.accuracy)
print("precision by label: ",summary.precisionByLabel)
print("recall by label: ", summary.recallByLabel)
print("F score by label: ", summary.fMeasureByLabel())
print("area under ROC: ", summary.areaUnderROC)


Accuracy:  0.9973835688121402
precision by label:  [0.9973835688121402, 0.0]
recall by label:  [1.0, 0.0]
F score by label:  [0.9986900707361802, 0.0]
area under ROC:  0.9164742917103897


This initial model has a very low F score and recall for the positive classification. We can experiment with the threshold to find the optimal F score:

In [14]:
# want to find optimal f score for positive classifications
# Can do this by iterating through a range of thresholds
# and finding the highest f score
# This takes a while so you can change the step value to make it go
# faster
thresholds = np.arange(start=0.01, stop=1, step=0.05)
precision = []
recall = []
f = []
AUC = []
for t in thresholds:
    m = makeAndEvaluateModel(trainDf, 0.01, t)
    best = m.bestModel
    summary = best.summary
    precision.append(summary.precisionByLabel[1])
    recall.append(summary.recallByLabel[1])
    f.append(summary.fMeasureByLabel())
    AUC.append(summary.areaUnderROC)

This shows improvement in the F score for positive classifications, but the recall and precision are still quite low.

In [15]:
# getting the highest F score that was found along with corresponding precision, recall, and threshold
positiveFScores = [x[1] for x in f]
maxF = max(positiveFScores)
index = positiveFScores.index(maxF)
t = thresholds[index]
p = precision[index]
r = recall[index]
auc = AUC[index]
print("threshold: {d}\n\nhighest F score: {a}\nprecision: {b}\nrecall:{c}\nArea under ROC: {e}".format(a = maxF, b=p, c=r, d=t, e=auc))

threshold: 0.060000000000000005

highest F score: 0.3
precision: 0.3
recall:0.3
Area under ROC: 0.9164742917103897


We can get the confusion matrices for the train and test datasets to further evaluate the model selected:

In [16]:
bestModel = makeAndEvaluateModel(trainDf, 0.01, t)
trainModelVector = transformIntoInput(trainDf)
testModelVector = transformIntoInput(testDf)
trainModelVector = bestModel.transform(trainModelVector)
testModelVector = bestModel.transform(testModelVector)

In [17]:
def makeConfusionMatrix(modelVector):
    # get positive predictions
      positivePredictions = modelVector.filter(modelVector.prediction == 1.)
      #true positives and false positives
      truePositives = positivePredictions.filter(positivePredictions.label == 1.).count()
      falsePositives = positivePredictions.filter(positivePredictions.label == 0.).count()
      # get all negative predictions
      negativePredictions = modelVector.filter(modelVector.prediction == 0.)
      # true negatives and false negatives
      trueNegatives = negativePredictions.filter(negativePredictions.label == 0.).count()
      falseNegatives = negativePredictions.filter(negativePredictions.label == 1.).count()
      # print the confusion matrix
      print("Confusion Matrix:\n\
            \t\t\tpositive prediction\tnegative prediction\n\
            positive label\t\t\t{tp}\t{fn}\n\
            negative label\t\t\t{fp}\t{tn}"\
            .format(tp = truePositives, fn = falseNegatives, fp = falsePositives, tn = trueNegatives))
      return [[truePositives, falseNegatives], [falsePositives, trueNegatives]]

Confusion Matrix for training set:

In [18]:
matrix1 = makeConfusionMatrix(trainModelVector)

Confusion Matrix:
            			positive prediction	negative prediction
            positive label			3	7
            negative label			7	3805


Confusion matrix for test set:

In [19]:
matrix2 = makeConfusionMatrix(testModelVector)

Confusion Matrix:
            			positive prediction	negative prediction
            positive label			0	4
            negative label			2	1298


We can see that across both the train and test sets give a poor true positive rate.

In this dataset, I think that finding true positives is more important, so I will try to improve the true positive rate.

Another way of addressing class imbalance is to add a weight associated with each class. One way to do this is to make the inverse of the proportion of the class the weight like so: $$\begin{align}
p_{positive} &= \frac{\Sigma^n_{i=0}{y_i}}{n},\ y\in \{0,1\} \\
W_{positive} &= \frac{1}{p_{positive}} \\
p_{negative} &= 1-p_{positive} \\
W_{negative} &= \frac{1}{p_{negative}} \\
\end{align}$$
Where $y$ is the label column in the dataset. This will give more importance to the minority class in the dataset.
The weights for this dataset are calculated below:

In [20]:
df = transformDataset(df)
totalHighlights = df.select('isHighlightInt').rdd.map(lambda x: x[0]).sum()
propHighlight = totalHighlights/df.count()
weightHighlight = 1/propHighlight
print(weightHighlight)
propNonHighlight = 1 - propHighlight
weightNonHighlight = 1/propNonHighlight
print(weightNonHighlight)

                                                                                

366.2857142857143
1.0027375831052014


Creating a new model that implements the weights:

In [21]:
def makeModelWithWeights(df, regularizationParam = 0., threshold=0.5):
    input_vec = transformIntoInput(df, weightVals={0: weightNonHighlight, 1: weightHighlight})
    logReg = LogisticRegression()
    logReg = logReg.setMaxIter(10)
    logReg = logReg.setRegParam(regularizationParam)
    logReg = logReg.setFeaturesCol('features')
    logReg = logReg.setProbabilityCol('probability')
    logReg = logReg.setWeightCol('weight')
    logReg = logReg.setThreshold(threshold)

    grid = ParamGridBuilder().addGrid(logReg.maxIter, [0,1]).build()
    evaluator = BinaryClassificationEvaluator()
    cv = CrossValidator(estimator=logReg, estimatorParamMaps=grid, evaluator=evaluator, parallelism=2)
    cvModel = cv.fit(input_vec)

    return cvModel

In [22]:
mWeighted = makeModelWithWeights(trainDf)

In [23]:
bestWeighted = mWeighted.bestModel

weightedSummary = bestWeighted.summary
print("Accuracy: ",weightedSummary.accuracy)
print("precision by label: ",weightedSummary.precisionByLabel)
print("recall by label: ", weightedSummary.recallByLabel)
print("F score by label: ", weightedSummary.fMeasureByLabel())
print("area under ROC: ", weightedSummary.areaUnderROC)

Accuracy:  0.7606690396907412
precision by label:  [0.7904613619601705, 0.734559034760129]
recall by label:  [0.7229800629590752, 0.8]
F score by label:  [0.7552162849872751, 0.7658841588977512]
area under ROC:  0.8675760755508924


This model improves the recall and F score drastically, while keeping precision and accuracy fairly high.
We can find the confusion matrices of this model for the train and test sets:

In [24]:
weightedTrainModelVector = transformIntoInput(trainDf, {0: weightNonHighlight, 1: weightHighlight})
weightedTestModelVector = transformIntoInput(testDf, {0: weightNonHighlight, 1: weightHighlight})
weightedTrainModelVector = bestWeighted.transform(weightedTrainModelVector)
weightedTestModelVector = bestWeighted.transform(weightedTestModelVector)

In [25]:
wMatrix1 = makeConfusionMatrix(weightedTrainModelVector)

Confusion Matrix:
            			positive prediction	negative prediction
            positive label			8	2
            negative label			1056	2756


In [26]:
wMatrix2 = makeConfusionMatrix(weightedTestModelVector)

Confusion Matrix:
            			positive prediction	negative prediction
            positive label			2	2
            negative label			399	901


This model improved the true positive rate, but now the rate of false positives is really high. We can try to adjust the threshold like before.

In [27]:
thresholds = np.arange(start=0.01, stop=1, step=0.05)
precision = []
recall = []
f = []
AUC = []
models = []
for t in thresholds:
    m = makeModelWithWeights(trainDf, 0.01, t)
    best = m.bestModel
    summary = best.summary
    precision.append(summary.precisionByLabel[1])
    recall.append(summary.recallByLabel[1])
    f.append(summary.fMeasureByLabel())
    AUC.append(summary.areaUnderROC)
    models.append(best)

# getting the highest F score that was found along with corresponding precision, recall, and threshold
positiveFScores = [x[1] for x in f]
maxF = max(positiveFScores)
index = positiveFScores.index(maxF)
t = thresholds[index]
p = precision[index]
r = recall[index]
auc = AUC[index]
print("threshold: {d}\n\nhighest F score: {a}\nprecision: {b}\nrecall:{c}\nArea under ROC: {e}".format(a = maxF, b=p, c=r, d=t, e=auc))

threshold: 0.7100000000000001

highest F score: 0.78678652278072
precision: 0.8981383912890755
recall:0.7
Area under ROC: 0.8675760755508924


Making a model using this threshold:

In [28]:
finalModel = models[index]
trainModelVector = transformIntoInput(trainDf)
testModelVector = transformIntoInput(testDf)
trainModelVector = finalModel.transform(trainModelVector)
testModelVector = finalModel.transform(testModelVector)

In [29]:
finalSummary = finalModel.summary
print("Accuracy: ",finalSummary.accuracy)
print("precision by label: ",finalSummary.precisionByLabel)
print("recall by label: ", finalSummary.recallByLabel)
print("F score by label: ", finalSummary.fMeasureByLabel())
print("area under ROC: ", finalSummary.areaUnderROC)

Accuracy:  0.8143491407356328
precision by label:  [0.762691415313216, 0.8981383912890755]
recall by label:  [0.9239244491080757, 0.7]
F score by label:  [0.8356013489467639, 0.78678652278072]
area under ROC:  0.8675760755508924


In [30]:
finalMatrix1 = makeConfusionMatrix(trainModelVector)

Confusion Matrix:
            			positive prediction	negative prediction
            positive label			7	3
            negative label			290	3522


In [31]:
finalMatrix1 = makeConfusionMatrix(testModelVector)

Confusion Matrix:
            			positive prediction	negative prediction
            positive label			2	2
            negative label			121	1179


The true positive rate hasn't changed much, but the rate of false positives has improved. This is the final model I will go with.

In [32]:
# saving the model
modelPath = os.getcwd() + "/final_model"
finalModel.write().overwrite().save(modelPath)

                                                                                

In [33]:
spark.stop()