**Notes on running this notebook:**
- Due to the total size of the datasets (~6 GB total), this notebook should be run on a machine with high-memory.
- I successfully ran this notebook (with very limited-testing due to cost) on a AWS EMR Spark cluster, loading the data from a S3 bucket.
- However, most of my testing with this notebook was done with a single 16-core, 64 GB memory Google Cloud VM.
  - This lacks the speed of running a cluster, but is cheaper, and therefore a great method for testing a PySpark implementation on a budget.
  - In the case of running on a single machine/VM, it is important to setup the PySpark configuration to utilize all available computing cores and memory. I have done this below, using SparkConf() when initilizing the SparkContext.


In [None]:
!sudo apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar -xvf spark-3.2.0-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark

In [1]:
import numpy as np 
import pandas as pd 
import time 
import csv
from os import path, environ
import cv2
from scipy.stats import ttest_rel

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning) # ignore FutureWarning from PySpark

from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession
from pyspark.conf import SparkConf
from pyspark.mllib.feature import Normalizer, ChiSqSelector
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.tree import RandomForest, DecisionTree, GradientBoostedTrees
from pyspark.mllib.classification import NaiveBayes, SVMWithSGD, LogisticRegressionWithLBFGS
from pyspark.mllib.evaluation import MulticlassMetrics

environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"

''' 
Since I am running this on a single VM with 16 CPU cores and 64 GB of memory, 
the next lines update the Spark configuration to use all available resources.
Though I did also test with an AWS EMR cluster where this configuration step wasn't necessary. 
'''
conf = SparkConf().setAppName("App")
conf = (conf.setMaster('local[*]')
        .set('spark.cores.max', '24')
        .set('spark.yarn.am.cores', '24')
        .set('spark.yarn.am.memory', '64G')
        .set('spark.executor.memory', '64G')
        .set('spark.driver.memory', '64G')
        .set('spark.driver.maxResultSize', '64G'))

sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

In [2]:
#################
##  Load Data  ##
#################

''' 
Loads files as Pyspark RDDs
'''

fileDir = '../../content/'
x_train_file = path.join(fileDir, 'X_train_sat4.csv')
y_train_file = path.join(fileDir, 'y_train_sat4.csv')
x_test_file = path.join(fileDir, 'X_test_sat4.csv')
y_test_file = path.join(fileDir, 'y_test_sat4.csv')

x_train_orig = sqlContext.read.options(header=False, inferSchema=True).csv(x_train_file).rdd
y_train_orig = sqlContext.read.options(header=False, inferSchema=True).csv(y_train_file).rdd
x_test_orig = sqlContext.read.options(header=False, inferSchema=True).csv(x_test_file).rdd
y_test_orig =sqlContext.read.options(header=False, inferSchema=True).csv(y_test_file).rdd

In [3]:
############################
##  Data Pre-Processing   ## 
##  Function Definitions  ##
############################
'''
  Function to take the image labels (in RDD format) and return a single int for category
  Labels in csv files are lists of 4 binaries with
  ['1','0','0','0'] = 'Barren Land' ==> category 1
  ['0','1','0','0'] = 'Trees ==> category 2
  ['0','0','1','0'] = 'Grassland' ==> category 3
  ['0','0','0','1'] = 'Other' ==> category 4
'''
def convert_label(imageLabel):
  if imageLabel[0] == 1: 
    return 0.0
  if imageLabel[1] == 1:
    return 1.0
  if imageLabel[2] == 1:
    return 2.0
  if imageLabel[3] == 1:
    return 3.0

def join_rdds_lp(rdd_labels, rdd_data):
  '''
  Input: RDD of labels, RDD of data
  Output: RDD of LabeledPoints
  '''
  rdd_labels = rdd_labels.zipWithIndex().map(lambda x: (x[1], x[0]))
  rdd_data = rdd_data.zipWithIndex().map(lambda x: (x[1], x[0]))

  # Join labels and data into a single LabeledPoint RDD that can be passed to a Spark MLlib (RDD-based) classification algorithm
  combined = rdd_labels.join(rdd_data).sortBy(lambda x: x[0]).map(lambda x: x[1]) # get only the label and data tuple after joining / get rid of the index, after sorting by index
  combined = combined.map(lambda x: LabeledPoint(x[0], Vectors.dense(x[1]))) # map to LabeledPoint format for MLlib RDD-based algs
  return combined

def pixels_transform(imageDataRow, rgb_mean=False, add_infra=False, remove_infra=False, infra_only=False):
  row_output = []
  img_reshaped = np.reshape(imageDataRow,(784,4))

  if rgb_mean:
    for pixel_group in img_reshaped:
      tmp_sum = 0
      for num in range(3): # only iterate through the 3 RGB values / ignore the 4th value (infrared)
        tmp_sum += pixel_group[num]
      row_output.append(float(tmp_sum/3))
      if add_infra: 
        row_output.append(float(pixel_group[-1]))
    return tuple(row_output)

  if infra_only:
    for pixel_group in img_reshaped:
      row_output.append(float(pixel_group[-1]))
    return tuple(row_output)

  if remove_infra:
    for pixel_group in img_reshaped:
      for num in range(3):
        row_output.append(pixel_group[num])
    return tuple(row_output)
    
  pass  

def cv2_transforms(img, sobel_edge=False, sobel_plus_all=False, hu_moments=False, hu_moments_plus_all=False, histogram_greyscale=False, histogram_greyscale_plus_all=False):
  '''
  Input: a row from a RDD (in a map transformation)
  Output: a list (back to an RDD in a map transformation)
  '''
  img_rgb = pixels_transform(img, remove_infra=True)
  img_rgb = np.reshape(img_rgb,(28,28,3))
  img_rgb = np.array(img_rgb, 'uint8')
  # Convert to graycsale
  img_gray = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY)
	
  if sobel_edge:
    # Sobel Edge Detection
    sobelxy = cv2.Sobel(src=img_gray, dst=cv2.CV_64F, ddepth=-1, dx=1, dy=1, ksize=1)
    sobelxy = sobelxy.flatten()
    return sobelxy.tolist()

  if sobel_plus_all:
    sobelxy = cv2.Sobel(src=img_gray, dst=cv2.CV_64F, ddepth=-1, dx=1, dy=1, ksize=1)
    sobelxy = sobelxy.flatten()
    img = np.reshape(img, (784,4)).tolist()
    for i in range(784):
      img[i].append(sobelxy[i])
    img = np.array(img).flatten()
    return img.tolist()

  if hu_moments:
    features = cv2.HuMoments(cv2.moments(img_gray)).flatten()
    return features.tolist()

  if hu_moments_plus_all:
    features = cv2.HuMoments(cv2.moments(img_gray)).flatten()
    img = np.reshape(img, (784*4,)).tolist()
    [img.append(i) for i in features]
    img = np.array(img).flatten().tolist()
    return img

  if histogram_greyscale:
    hist = cv2.calcHist([img_rgb],[0],None, [256], [0,256])
    hist= np.array(hist).flatten().tolist()
    return hist

  if histogram_greyscale_plus_all:
    hist = cv2.calcHist([img_rgb],[0],None, [256], [0,256])
    hist = np.array(hist).flatten().tolist()
    img = np.reshape(img, (784*4,)).tolist()
    [img.append(i) for i in hist]
    return img
  
def data_rdd_process(data, 
                         flatten_pixels=False, 
                         infra_only=False, 
                         flatten_plus_infra=False,
                         edges_only=False, 
                         edges_plus_pixels=False,
                         hu_moments=False, 
                         hu_moments_plus_pixels=False,  
                         histogram_greyscale=False, 
                         histogram_greyscale_plus_pixels=False,
                         conv_labels=False):
  '''
  Input: a PySpark RDD 
  Params: flatten_pixels: if True, get the mean of each pixel (instead of separate int for each layer); ignores the infrared value (4th/last int in each pixel group), each image will total in 28x28x1 = 784 features 
          infra_only: if True, get only the infrared pixel value (which is the 4th/last value in each pixel group), each image will total in 28x28x1 = 784 features
          flatten_plus_infra: if True, get both the mean RGB value plus the infrared pixel value, each image will total 28x28x2 = 1,568 features
          edges_only: if True, get edges calculated with cv2.Sobel
          edges_plus_pixels: if True, get edges and add onto the existing list of pixel data 
          hu_moments: if True, calculate Hu-moment data with cv2.HuMoment
          hu_moments_plus_pixels: if True, add HuMoment to the existing list of pixel data
          histogram_greyscale: if True, get greyscale histogram from cv2.CalcHist
          histogram_greyscale_plus_pixels: if True, add greyscale histogram to existing list of pixel data
          conv_labels: if True, map labels from [0,0,0,0] format to floats 0.0, 1.0, 2.0, or 3.0
  Returns: a Pyspark RDD 
  '''
  data = data.map(tuple) # remove any potential schema info, so it's only the raw data as a tuple

  # assertion to check that only one major data transformation parameter is being passed
  assert (flatten_pixels and infra_only and edges_only and edges_plus_pixels and hu_moments and hu_moments_plus_pixels and 
          histogram_greyscale and histogram_greyscale_plus_pixels and flatten_plus_infra) != True

  if edges_only:
    data = data.map(lambda row: cv2_transforms(row, sobel_edge=True))

  if edges_plus_pixels:
    data = data.map(lambda row: cv2_transforms(row, sobel_plus_all=True))

  if hu_moments:
    data = data.map(lambda row: cv2_transforms(row, hu_moments=True))
  
  if hu_moments_plus_pixels:
    data = data.map(lambda row: cv2_transforms(row, hu_moments_plus_all=True))

  if histogram_greyscale:
    data = data.map(lambda row: cv2_transforms(row, histogram_greyscale=True))
  
  if histogram_greyscale_plus_pixels:
    data = data.map(lambda row: cv2_transforms(row, histogram_greyscale_plus_all=True))

  if flatten_pixels:
    data = data.map(lambda row: pixels_transform(row, rgb_mean=True))

  if flatten_plus_infra:
    data = data.map(lambda row: pixels_transform(row, rgb_mean=True, add_infra=True))

  if infra_only:
    data = data.map(lambda row: pixels_transform(row, infra_only=True))

  if conv_labels:
    data = data.map(lambda row: (convert_label(row)))

  return data # return a PySpark RDD


# main data preprocessing function to pass feature extraction parameters to 
# loads in original RDD data
def data_preprocess_main(norm_features=False, chi_sq_sel=False, chi_sq_num=300, **feature_extraction_params):

  timer_start = time.perf_counter() # start timer

  X_train = data_rdd_process(x_train_orig, **feature_extraction_params) 
  print(f'Loaded X_train: {X_train.count()} images')

  Y_train = data_rdd_process(y_train_orig, conv_labels=True) 
  print(f'Loaded Y_train: {Y_train.count()} labels')

  X_test = data_rdd_process(x_test_orig, **feature_extraction_params)
  print(f'Loaded X_test: {X_test.count()} images')

  Y_test = data_rdd_process(y_test_orig, conv_labels=True) 
  print(f'Loaded Y_test: {Y_test.count()} labels')

  if norm_features:
    nor = Normalizer(float("inf"))
    X_train = nor.transform(X_train)
    X_test = nor.transform(X_test)

  # Join training data into a single LabeledPoint RDD
  combined_train = join_rdds_lp(Y_train, X_train)
  print(f'Joined X_train and Y_train: {combined_train.count()} images and labels')

  # ChiSqSelector for features
  if chi_sq_sel:
    print(f'Selecting top {chi_sq_num} features...')
    feat_model = ChiSqSelector(numTopFeatures=chi_sq_num).fit(combined_train)
    X_train = feat_model.transform(X_train)
    combined_train = join_rdds_lp(Y_train, X_train)
    X_test = feat_model.transform(X_test)
    print('ChiSq feature selection complete.')

  timer_end = time.perf_counter() # end timer
  time_elapsed = timer_end - timer_start
  print(f'Data pre-processing time: {time_elapsed: .3f} seconds.\n')

  return combined_train, X_train, Y_train, X_test, Y_test, time_elapsed

In [4]:
###########################
##    Model Evaluation   ##
##  Function Definition  ##
###########################

# Function to train and evaluate a MLlib RDD-based classification model
def train_test_model(str_name, combined_train_rdd, test_images_rdd, test_labels_rdd, preprocess_time=0, get_preds_and_labels_only=False):

  # Setup info for each algorithm with model_data dict
  '''
  type: whether it's an algo from mllib.tree or mllib.classification, because the train() functions are named differently for each
  model: the model object to initialize prior to training
  train_params: a dict of the parameters to use for training the model
  '''
  model_data = {
      'Random Forest': {
         'type': 'mllib_tree',
         'model': RandomForest,
         'train_params': {'numClasses': 4, 'categoricalFeaturesInfo': {}, 'numTrees': 100}}, # set numTrees = 100 because that's the sklearn default
     'Decision Tree': {
         'type': 'mllib_tree',
          'model': DecisionTree,
          'train_params': {'numClasses': 4, 'categoricalFeaturesInfo': {}}},
     'Gradient Boosted Trees': {
         'type': 'mllib_tree',
          'model': GradientBoostedTrees,
          'train_params': {'categoricalFeaturesInfo': {}}},
     'Naive Bayes': {
          'type': 'mllib_class',
          'model': NaiveBayes,
          'train_params': {}},
      'Support Vector Machine': {
          'type': 'mllib_class',
         'model': SVMWithSGD,
         'train_params': {'validateData': False}},    
      'Logistic Regression': {
         'type': 'mllib_class',
         'model': LogisticRegressionWithLBFGS,
         'train_params': {'numClasses': 4}}
      }

  # extract model info from model_data dict
  alg_type = model_data[str_name]['type']
  Model = model_data[str_name]['model']
  train_params = model_data[str_name]['train_params']

  # print the algorithm name (decorated with a border of *'s)
  [print('*', end='') for i in range(len(str_name)+6)]
  print(f'\n** {str_name} **') 
  [print('*', end='') for i in range(len(str_name)+6)]
  print('\n', end='')

  # train model
  timer_start = time.perf_counter() # start timer for training
  # using 'if-else' statement to handle differences between training function names for algos
  if alg_type == 'mllib_tree':
    out_model = Model.trainClassifier(combined_train_rdd, **train_params)
  else:
    out_model = Model.train(combined_train_rdd, **train_params)
  timer_end = time.perf_counter() # end timer for training
  train_time = timer_end - timer_start

  # get predictions for model
  timer_start = time.perf_counter() # start timer for predictions
  predictions = out_model.predict(test_images_rdd) 
  timer_end = time.perf_counter() # stop timer for predictions
  pred_time = timer_end - timer_start
  total_time = preprocess_time + train_time + pred_time

  # make sure prediction is a float, and add an index for joining with ground truth labels for evaluation
  predictions = predictions.map(lambda x: float(x)).zipWithIndex().map(lambda x: (x[1], x[0]))
  test_labels_rdd = test_labels_rdd.zipWithIndex().map(lambda x: (x[1], x[0]))
  # join predictions and ground truth labels as "test_pred" RDD
  test_pred = predictions.join(test_labels_rdd).sortBy(lambda x: x[0]).map(lambda x: (x[1])) 
  if get_preds_and_labels_only:
    return test_pred
  # get evaluation metrics for trained model, using the test_pred RDD
  out_metrics = MulticlassMetrics(test_pred)
  precision = out_metrics.weightedPrecision
  recall = out_metrics.weightedRecall
  f1score = out_metrics.weightedFMeasure()
  accuracy = out_metrics.accuracy
  print(f'Precision score: {precision:.2f}')
  print(f'Recall score: {recall:.2f}')
  print(f'F1 score: {f1score:.2f}')
  print(f'Accuracy score: {accuracy:.2f}')
  print(f'Pre-process time: {preprocess_time:.2f}')
  print(f'Training time: {train_time:.2f}')
  print(f'Prediction time: {pred_time:.2f}', end='\n\n')

  return [str_name, precision, recall, f1score, accuracy, preprocess_time, train_time, pred_time, total_time]

In [None]:
############################
##     Initial MLlib      ##
##  Algorithm Evaluation  ##
############################

# Use main pre-process function on data
# No transformations on data (except for required joins, etc.) for initial algorithm evaluation
combined_train, X_train, Y_train, X_test, Y_test, preprocess_time = data_preprocess_main()

# train and test algorithms and output data to a .csv file
output_data = []
testing_list = ['Decision Tree', 'Random Forest', 'Gradient Boosted Trees', 'Naive Bayes', 'Support Vector Machine', 'Logistic Regression']
for alg in testing_list:
  output_data.append(train_test_model(alg, combined_train, X_test, Y_test, preprocess_time))
df = pd.DataFrame(output_data)
df.columns = ['Algorithm Name', 'Precision Score', 'Recall Score', 'F1 Score', 'Accuracy Score', 'Pre-Processing Time', 'Training Time', 'Prediction Time', 'Total Time']
df = df.sort_values(by='F1 Score', ascending=False)
df.to_csv('1_initial_testing_output_data.csv', index=False)

Loaded X_train: 400000 images
Loaded Y_train: 400000 labels
Loaded X_test: 100000 images
Loaded Y_test: 100000 labels
Joined X_train and Y_train: 400000 images and labels
Data pre-processing time:  184.742 seconds.

*******************
** Decision Tree **
*******************
Precision score: 0.77
Recall score: 0.76
F1 score: 0.76
Accuracy score: 0.76
Pre-process time: 184.74
Training time: 73.27
Prediction time: 0.03

*******************
** Random Forest **
*******************
Precision score: 0.82
Recall score: 0.82
F1 score: 0.81
Accuracy score: 0.82
Pre-process time: 184.74
Training time: 90.40
Prediction time: 0.03

****************************
** Gradient Boosted Trees **
****************************
Precision score: 0.31
Recall score: 0.44
F1 score: 0.33
Accuracy score: 0.44
Pre-process time: 184.74
Training time: 1137.21
Prediction time: 0.02

*****************
** Naive Bayes **
*****************
Precision score: 0.57
Recall score: 0.51
F1 score: 0.51
Accuracy score: 0.51
Pre-pr

In [None]:
################################################################
##             Testing Feature Extraction Methods             ##
##  with Random Forest, Decision Tree, & Logistic Regression  ##
################################################################

# Setup parameter options to enumerate through
feature_extraction_params = ['hu_moments_plus_pixels', 'hu_moments', 'histogram_greyscale', 'histogram_greyscale_plus_pixels',
                               'flatten_pixels', 'infra_only', 'flatten_plus_infra', 'edges_only', 'edges_plus_pixels']

output_data = []
for i in range(len(feature_extraction_params)):
  p = feature_extraction_params[i]
  print(f'Testing with feature extraction parameter: {p}')
  tmp_feature_extraction_param = {}
  tmp_feature_extraction_param[p] = True 
  # Use main pre-process function on data
  combined_train, X_train, Y_train, X_test, Y_test, preprocess_time = data_preprocess_main(**tmp_feature_extraction_param)

  for algo in ['Random Forest', 'Decision Tree', 'Logistic Regression']:
    # train and test algorithms and append to output_data
    output_data.append(train_test_model(algo, combined_train, X_test, Y_test, preprocess_time))
    output_data[-1].insert(1, p) # add the feature extraction method string to the output

df = pd.DataFrame(output_data)
df.columns = ['Algorithm Name', 'Feature Extraction Method', 'Precision Score', 'Recall Score', 'F1 Score', 'Accuracy Score', 'Pre-Processing Time', 'Training Time', 'Prediction Time', 'Total Time']
df = df.sort_values(by='F1 Score', ascending=False)
df.to_csv(f'2_testing_output_feature_extraction.csv', index=False)


Testing with feature extraction parameter: hu_moments_plus_pixels
Loaded X_train: 400000 images
Loaded Y_train: 400000 labels
Loaded X_test: 100000 images
Loaded Y_test: 100000 labels
Joined X_train and Y_train: 400000 images and labels
Data pre-processing time:  627.179 seconds.

*******************
** Random Forest **
*******************
Precision score: 0.83
Recall score: 0.83
F1 score: 0.82
Accuracy score: 0.83
Pre-process time: 627.18
Training time: 144.35
Prediction time: 0.04

*******************
** Decision Tree **
*******************
Precision score: 0.77
Recall score: 0.77
F1 score: 0.76
Accuracy score: 0.77
Pre-process time: 627.18
Training time: 117.33
Prediction time: 0.03

*************************
** Logistic Regression **
*************************
Precision score: 0.76
Recall score: 0.76
F1 score: 0.76
Accuracy score: 0.76
Pre-process time: 627.18
Training time: 3640.47
Prediction time: 0.00

Testing with feature extraction parameter: hu_moments
Loaded X_train: 400000 i

In [None]:
################################################################################
##       Retest all algos to confirm if Logistic Regression is the best       ##
##  performing algorithm when using 'histogram_greyscale' feature extraction  ##
################################################################################

# Use main pre-process function on data with greyscale histogram feature extraction
feat_extract = {'histogram_greyscale': True}
combined_train, X_train, Y_train, X_test, Y_test, preprocess_time = data_preprocess_main(**feat_extract)

# train and test algorithms and output data to a .csv file
output_data = []
testing_list = ['Decision Tree', 'Random Forest', 'Gradient Boosted Trees', 'Naive Bayes', 'Support Vector Machine', 'Logistic Regression']
for alg in testing_list:
  output_data.append(train_test_model(alg, combined_train, X_test, Y_test, preprocess_time))
df = pd.DataFrame(output_data)
df.columns = ['Algorithm Name', 'Precision Score', 'Recall Score', 'F1 Score', 'Accuracy Score', 'Pre-Processing Time', 'Training Time', 'Prediction Time', 'Total Time']
df = df.sort_values(by='F1 Score', ascending=False)
df.to_csv('3_retest_histogram_greyscale.csv', index=False)

Loaded X_train: 400000 images
Loaded Y_train: 400000 labels
Loaded X_test: 100000 images
Loaded Y_test: 100000 labels
Joined X_train and Y_train: 400000 images and labels
Data pre-processing time:  384.123 seconds.

*******************
** Decision Tree **
*******************
Precision score: 0.85
Recall score: 0.86
F1 score: 0.85
Accuracy score: 0.86
Pre-process time: 384.12
Training time: 9.17
Prediction time: 0.04

*******************
** Random Forest **
*******************
Precision score: 0.83
Recall score: 0.83
F1 score: 0.82
Accuracy score: 0.83
Pre-process time: 384.12
Training time: 16.85
Prediction time: 0.03

****************************
** Gradient Boosted Trees **
****************************
Precision score: 0.31
Recall score: 0.44
F1 score: 0.33
Accuracy score: 0.44
Pre-process time: 384.12
Training time: 89.76
Prediction time: 0.03

*****************
** Naive Bayes **
*****************
Precision score: 0.75
Recall score: 0.73
F1 score: 0.73
Accuracy score: 0.73
Pre-proce

In [None]:
###########################################################################
##                   Further Refinement of Best Model                    ## 
##  (Logistic Regression with 'histogram greyscale' feature extraction)  ##
##             with Data Normalization and Feature Selection             ##
###########################################################################

feature_extraction_param = {'histogram_greyscale': True}

output_data = []

# Test Normalizing features
for param in ['Normalize Features', 'ChiSq Feature Selector']:
 
  if param == 'Normalize Features':
    combined_train, X_train, Y_train, X_test, Y_test, preprocess_time = data_preprocess_main(norm_features=True, **feature_extraction_param)
    output_data.append(['Normalize Features', 'N/A'])
    [output_data[-1].append(i) for i in train_test_model('Logistic Regression', combined_train, X_test, Y_test, preprocess_time)]
    output_data[-1].insert(3, 'histogram_greyscale')

  else:
    chi_sq_range = [200, 100]
    for int in chi_sq_range:
      combined_train, X_train, Y_train, X_test, Y_test, preprocess_time = data_preprocess_main(chi_sq_sel=True, chi_sq_num=int, **feature_extraction_param)
      output_data.append(['ChiSq Selection', int])
      [output_data[-1].append(i) for i in train_test_model('Logistic Regression', combined_train, X_test, Y_test, preprocess_time)]
      output_data[-1].insert(3, 'histogram_greyscale')

df = pd.DataFrame(output_data)
df.columns = ['New Param', 'ChiSq Num', 'Algorithm Name', 'Feature Extraction Method', 'Precision Score', 'Recall Score', 'F1 Score', 'Accuracy Score', 'Pre-Processing Time', 'Training Time', 'Prediction Time', 'Total Time']
df = df.sort_values(by='F1 Score', ascending=False)
df.to_csv(f'4_testing_output_extra_params.csv', index=False)

Loaded X_train: 400000 images
Loaded Y_train: 400000 labels
Loaded X_test: 100000 images
Loaded Y_test: 100000 labels
Joined X_train and Y_train: 400000 images and labels
Data pre-processing time:  383.302 seconds.

*************************
** Logistic Regression **
*************************
Precision score: 0.93
Recall score: 0.93
F1 score: 0.93
Accuracy score: 0.93
Pre-process time: 383.30
Training time: 225.86
Prediction time: 0.00

Loaded X_train: 400000 images
Loaded Y_train: 400000 labels
Loaded X_test: 100000 images
Loaded Y_test: 100000 labels
Joined X_train and Y_train: 400000 images and labels
Selecting top 200 features...
ChiSq feature selection complete.
Data pre-processing time:  490.548 seconds.

*************************
** Logistic Regression **
*************************
Precision score: 0.92
Recall score: 0.91
F1 score: 0.91
Accuracy score: 0.91
Pre-process time: 490.55
Training time: 289.20
Prediction time: 0.00

Loaded X_train: 400000 images
Loaded Y_train: 400000 l

In [5]:
#################################################################
##                 10-Fold Cross Validation                    ## 
##             and Paired T-Test of Best Model:                ##
##  normalized, greyscale histogram, with Logistic Regression  ##
#################################################################

# Function for preprocessing data for cross-validation
def crossVal_preprocess(seed, norm=False, **feature_extraction_param):
  if norm:
    nor = Normalizer(float("inf")) # initialize Normalizer

  X_train = data_rdd_process(x_train_orig, **feature_extraction_param)
  if norm:
    X_train = nor.transform(X_train)
  print(f'Loaded X_train: {X_train.count()} images')

  Y_train = data_rdd_process(y_train_orig, conv_labels=True) 
  print(f'Loaded Y_train: {Y_train.count()} labels')

  X_test = data_rdd_process(x_test_orig, **feature_extraction_param)
  if norm:
    X_test = nor.transform(X_test)
  print(f'Loaded X_test: {X_test.count()} images')

  Y_test = data_rdd_process(y_test_orig, conv_labels=True) 
  print(f'Loaded Y_test: {Y_test.count()} labels')

  # Combine datasets into a single PySpark RDD
  all_data_x = X_train.union(X_test).zipWithIndex().map(lambda x: (x[1], x[0]))
  all_data_y = Y_train.union(Y_test).zipWithIndex().map(lambda x: (x[1], x[0]))
  all_data_rdd = all_data_y.join(all_data_x).sortBy(lambda x: x[0]) # sort and leave index after joining
  print(f'Joined all data: {all_data_rdd.count()} images with labels')

  # Split combined RDD into train and test sets, using seed number
  train_rdd, test_rdd = all_data_rdd.randomSplit([0.8, 0.2], seed=seed)

  # Convert train_rdd into LabeledPoint format, sort (to ensure order after splitting), and remove index
  train_rdd = train_rdd.mapValues(lambda x: LabeledPoint(x[0], Vectors.dense(x[1]))).sortBy(lambda x: x[0]).map(lambda x: x[1])

  # Convert test_rdd into test_labels_rdd and test_images_rdd
  # Sort then remove index for each
  test_labels_rdd = test_rdd.mapValues(lambda x: x[0]).sortBy(lambda x: x[0]).map(lambda x: x[1])
  test_images_rdd = test_rdd.mapValues(lambda x: x[1]).sortBy(lambda x: x[0]).map(lambda x: x[1])

  return train_rdd, test_labels_rdd, test_images_rdd

baseline_scores = []
best_model_scores = []
seeds = [44,32,883,12333,5,98,623,1,57,42] # using seed for randomSplit on RDDs (to ensure the same sets of data are evaluated for each model)
for fold in range(10):

  seed = seeds[fold] # set seed value for fold iteration

  # combine all data for unmodified images
  baseline_train, baseline_test_labels, baseline_test_images = crossVal_preprocess(seed) 

  # combine all data with feature extraction
  feature_extraction_param = {'histogram_greyscale': True}
  best_model_train, best_model_test_labels, best_model_test_images = crossVal_preprocess(seed,norm=True, **feature_extraction_param)

  # Evaluations return [str_name, precision, recall, f1score, accuracy, preprocess_time, train_time, pred_time, total_time]
  baseline_eval = train_test_model('Random Forest', baseline_train, baseline_test_images, baseline_test_labels)
  best_model_eval = train_test_model('Logistic Regression', best_model_train, best_model_test_images, best_model_test_labels)
  
  baseline_f1 = baseline_eval[3]
  best_model_f1 = best_model_eval[3]
  print(f'Fold {fold}:{chr(10)}Baseline model weighted-F1 score: {baseline_f1}{chr(10)}Best Model weighted-F1 score: {best_model_f1}{chr(10)}')
  baseline_scores.append(baseline_f1)
  best_model_scores.append(best_model_f1)

# Compute paired t-test
test_result = ttest_rel(baseline_scores, best_model_scores)
t_stat = test_result[0]
p_val = test_result[1]
print(f'T-test statistic: {t_stat}{chr(10)}P-value: {p_val}')

# Create Pandas dataframe from metrics and export to csv 
all_metrics = {'Baseline Model': baseline_scores, 'Best Model': best_model_scores}   
metrics_df = pd.DataFrame(all_metrics)
metrics_df.to_csv('cross_validation_metrics.csv', index=False)

# Create Pandas dataframe for t-test statistics and export to csv
stat_df = pd.DataFrame([[t_stat, p_val]], columns=['Statistic', 'P-Value'])
stat_df.to_csv('paired_t_test_statistics.csv', index=False)

Loaded X_train: 400000 images
Loaded Y_train: 400000 labels
Loaded X_test: 100000 images
Loaded Y_test: 100000 labels
Joined all data: 500000 images with labels
Loaded X_train: 400000 images
Loaded Y_train: 400000 labels
Loaded X_test: 100000 images
Loaded Y_test: 100000 labels
Joined all data: 500000 images with labels
*******************
** Random Forest **
*******************
Precision score: 0.83
Recall score: 0.82
F1 score: 0.82
Accuracy score: 0.82
Pre-process time: 0.00
Training time: 99.16
Prediction time: 0.03

*************************
** Logistic Regression **
*************************
Precision score: 0.93
Recall score: 0.93
F1 score: 0.93
Accuracy score: 0.93
Pre-process time: 0.00
Training time: 149.74
Prediction time: 0.00

Fold 0:
Baseline model weighted-F1 score: 0.8166619604402402
Best Model weighted-F1 score: 0.925250882045282

Loaded X_train: 400000 images
Loaded Y_train: 400000 labels
Loaded X_test: 100000 images
Loaded Y_test: 100000 labels
Joined all data: 500000

In [None]:
############################################################################################################################
##  Save predictions from best model, ground truth labels, original image data, and greyscale histogram transformed data  ##
############################################################################################################################

# Use main pre-process function on data
feat_extract = {'histogram_greyscale': True}
combined_train, X_train, Y_train, X_test, Y_test, preprocess_time = data_preprocess_main(norm_features=True, **feat_extract)

# Train model, get predictions, and save to a .csv file
# Get predictions with labels
preds_labels = train_test_model('Logistic Regression', combined_train, X_test, Y_test, preprocess_time, get_preds_and_labels_only=True)
idx_preds_labels = preds_labels.zipWithIndex().map(lambda x: (x[1], (x[0][0], x[0][1]))) # add index

idx_x_test_orig = x_test_orig.zipWithIndex().map(lambda x: (x[1], x[0])).mapValues(list) # add index
print(x_test_orig.take)
idx_X_test = X_test.zipWithIndex().map(lambda x: (x[1], x[0])) # add index
preds_labels_X = idx_preds_labels.join(idx_x_test_orig).map(lambda x: (x[0], (x[1][0][0], x[1][0][1], x[1][1]))) # join preds, labels, and orig_x_test
preds_labels_X = preds_labels_X.join(idx_X_test).sortBy(lambda x: x[0]).map(lambda x: [x[1][0][0], x[1][0][1], x[1][0][2], x[1][1]]) # join with transformed X_test
preds_labels_X = preds_labels_X.collect()
df = pd.DataFrame(preds_labels_X, columns=['Predicted','Actual','Orig Image', 'Greyscale Histogram Transformed'])
print(df['Orig Image'].head())
df.to_csv('6_predictions.csv', index=False, header=True)

Loaded X_train: 400000 images
Loaded Y_train: 400000 labels
Loaded X_test: 100000 images
Loaded Y_test: 100000 labels
Joined X_train and Y_train: 400000 images and labels
Data pre-processing time:  376.718 seconds.

*************************
** Logistic Regression **
*************************
<bound method RDD.take of MapPartitionsRDD[44] at javaToPython at NativeMethodAccessorImpl.java:0>
0    [130, 120, 113, 143, 137, 134, 127, 159, 133, ...
1    [164, 139, 111, 196, 171, 147, 122, 201, 168, ...
2    [120, 114, 114, 132, 111, 105, 100, 128, 61, 4...
3    [126, 121, 99, 166, 129, 122, 106, 167, 127, 1...
4    [92, 90, 65, 175, 92, 94, 73, 176, 100, 104, 8...
Name: Orig Image, dtype: object
