## Initialize TALC Cluster

In [0]:
# COMPLETED BY: Ajoy

import os
import atexit
import sys
import time

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
import findspark
from sparkhpc import sparkjob

#Exit handler to clean up the Spark cluster if the script exits or crashes
def exitHandler(sj,sc):
    try:
        print('Trapped Exit cleaning up Spark Context')
        sc.stop()
    except:
        pass
    try:
        print('Trapped Exit cleaning up Spark Job')
        sj.stop()
    except:
        pass

findspark.init()

#Parameters for the Spark cluster
nodes=1
tasks_per_node=4 
memory_per_task=4096 #4 gig per process, adjust accordingly
# Please estimate walltime carefully to keep unused Spark clusters from sitting 
# idle so that others may use the resources.
walltime="1:00" #1 hours
#os.environ['SBATCH_PARTITION']='cpu2019' #Set the appropriate ARC partition

sj = sparkjob.sparkjob(
     ncores=nodes*tasks_per_node,
     cores_per_executor=tasks_per_node,
     memory_per_core=memory_per_task,
     walltime=walltime
    )

sj.wait_to_start()
time.sleep(60)
sc = sj.start_spark()

#Register the exit handler                                                                                                     
atexit.register(exitHandler,sj,sc)

#You need this line if you want to use SparkSQL
sqlCtx=SQLContext(sc)

INFO:sparkhpc.sparkjob:Submitted batch job 16059

INFO:sparkhpc.sparkjob:Submitted cluster 0


# ENSF 612 Term Project: Extending the README Classifier

#### Loading feature matrix and target vector into PySpark dataframe

In [0]:
# COMPLETED BY: Zach Frena
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
# COMPLETED BY: Zach Frena
# Original + New Samples (can be found following One Drive Link)
FULL_X = "./FULL_X.csv"
FULL_Y = "./TARGET_MATRIX_YTRUE_FULL.csv"

## Original Samples (can be found following One Drive Link)
# FULL_X = "./OG_FULL_X.csv"
# FULL_Y = "./OG_TARGET_MATRIX_YTRUE_FULL.csv"

# # New Samples (can be found following One Drive Link)
# FULL_X = "./NEW_FULL_X.csv"
# FULL_Y = "./NEW_TARGET_MATRIX_YTRUE_FULL.csv"

file_type = "csv"
first_row_is_header = "true"
delimiter = ","
 
spark = sqlCtx

X = spark.read.format(file_type) \
  .option("inferSchema",True) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(FULL_X)
 
Y = spark.read.format(file_type) \
  .option("inferSchema",True) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(FULL_Y)

### Select Input Dataset
This could be either the old + new (considered the main), old, or new datasets as specifed above in the comments.

In [0]:
# COMPLETED BY: Zach Frena
print("Originial + New Samples\n\tX:",(X.count(), len(X.columns)))
print("\tY:",(Y.count(), len(Y.columns)))

Originial + New Samples
	X: (4331, 13350)
	Y: (4331, 8)


## Preprocessing for ML Pipeline via StopWord Removal and Column Dropping

In [0]:
# COMPLETED BY: Zach Frena
print("Number of features before preprocessing:", len(X.columns))

Number of features before preprocessing: 13350


In [0]:
# COMPLETED BY: Ziryan Seddek
!pip install nltk
import nltk
from nltk.corpus import stopwords
nltk.download('stopwords')
stop_en = stopwords.words('english')
my_stop_words = ['!', '.', ',', '?', '\\', '/', ':', 'n', 'I\'m',
                '[', ']', '(', ')', '{', '}', '_']
stop_en += my_stop_words
stop_en

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
Out[20]: ['i',
 'me',
 'my',
 'myself',
 'we',
 'our',
 'ours',
 'ourselves',
 'you',
 "you're",
 "you've",
 "you'll",
 "you'd",
 'your',
 'yours',
 'yourself',
 'yourselves',
 'he',
 'him',
 'his',
 'himself',
 'she',
 "she's",
 'her',
 'hers',
 'herself',
 'it',
 "it's",
 'its',
 'itself',
 'they',
 'them',
 'their',
 'theirs',
 'themselves',
 'what',
 'which',
 'who',
 'whom',
 'this',
 'that',
 "that'll",
 'these',
 'those',
 'am',
 'is',
 'are',
 'was',
 'were',
 'be',
 'been',
 'being',
 'have',
 'has',
 'had',
 'having',
 'do',
 'does',
 'did',
 'doing',
 'a',
 'an',
 'the',
 'and',
 'but',
 'if',
 'or',
 'because',
 'as',
 'until',
 'while',
 'of',
 'at',
 'by',
 'for',
 'with',
 'about',
 'against',
 'between',
 'into',
 'through',
 'during',
 'before',
 'after',
 'above',
 'below',
 'to',
 'from',
 'up',
 'down',
 'in',
 'out',
 'on',
 'off',
 'over',
 'under

1. Removing features with "_" present

In [0]:
# COMPLETED BY: Ziryan Seddek 
sc = spark.sparkSession.sparkContext
columns_rdd = sc.parallelize(X.columns)
words_no_dash = columns_rdd.filter(lambda word: '_' not in word)

2. Removing Stop Words

In [0]:
# COMPLETED BY: Ziryan Seddek

final_columns = words_no_dash.filter(lambda word: word.lower() not in stop_en).collect()

In [0]:
# COMPLETED BY: Ziryan Seddek
print(len(final_columns))

11969


3. Updating Dataframe Columns to reflect final_columns list

In [0]:
# COMPLETED BY: Ziryan Seddek
X = X.select(final_columns)

In [0]:
# COMPLETED BY: Ziryan Seddeck 
len(X.columns)

Out[19]: 11969

## Machine Learning Classification Pipeline

In [0]:
# COMPLETED BY: Tobi Odufeso

from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window
import pyspark.sql.functions as F

from pyspark.ml import Pipeline
from pyspark.ml.classification import LinearSVC
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.ml.tuning import TrainValidationSplitModel

import sklearn
import pandas as pd
import numpy as np
from sklearn.metrics import classification_report, confusion_matrix

In [0]:
# COMPLETED BY: Zach Frena
# COMPLETED BY: Tobi Odufeso
# COMPLETED BY: Ziryan Seddek

def trainModel(traning_data, testing_data, name): 
  label_name = str(name)
  formatter = "{0:10.4f}"
  # Pipeline
  print("\nGrid Start")
  SVC = LinearSVC()
  grid = ParamGridBuilder().addGrid(SVC.regParam, [0.00001, 0.0001, 0.001]).addGrid(SVC.threshold, [0.0, 0.5, 1]).build()
  evaluator = MulticlassClassificationEvaluator()
  tvs = TrainValidationSplit(estimator=SVC, estimatorParamMaps=grid, evaluator=evaluator, trainRatio=0.75, parallelism=24, seed=42) 
  print("Grid End")
  
  print("\nTraining Start")
  trained_model = tvs.fit(traning_data)
  print("Training Scores (F1):",trained_model.validationMetrics)
  print("Trining End")

  # Training Scores 
  print("\nStart Writting Training Score (F1) to Text File.")
  textfile = open("models_metrics.txt", "a") 
  textfile.write("AVERAGE TRAINING SCORES(F1) FOR '"+label_name+"': "+str(trained_model.validationMetrics)+"\n")
  for model in list(zip(trained_model.validationMetrics, grid)):
      textfile.write(str(model)+"\n")
  
  bestModel = trained_model.bestModel
  message = "The best value for the regularization constant(C) is: "+str(bestModel._java_obj.getRegParam())
  print("\n"+message)
  textfile.write(message+"\n\n")
  textfile.close()
  print("Write Complete")

  # Testing Model
  print("\nTransform Start")
  y_pred = trained_model.transform(testing_data)
  print("Transform End")
  # y_pred_data = trained_model.transform(testing_data).select("label", "prediction")  
  y_pred_data = y_pred.select("label", "prediction")  
    
  print("\nWritting Predicted Labels to File")
  y_pred_data.write.csv(label_name) # Create directory with same name as the target label 
  print("Writting Complete")

  print("\nWritting F1 Score to File")
  evaluator = MulticlassClassificationEvaluator()
  trained_scores_file = open("trained_scores.txt", "a") 
  trained_scores_file.write("WEIGHTED AVERAGE SCORE FOR '"+label_name+"': "+str(evaluator.evaluate(y_pred_data)) +"\n")
  print("Test Score(F1):", formatter.format(evaluator.evaluate(y_pred_data)))
  print("Writting Complete")

  # Create Confusion Matrix and Binary Classification Report 
  print("\nWritting Confusion Matrix and Binary Classification Report to File")

  y_true = y_pred_data.select("label").collect()
  y_pred = y_pred_data.select("prediction").collect()

  crp = classification_report(y_true, y_pred, output_dict=True)
  class_report = pd.DataFrame(crp)
  cfm = np.array2string(confusion_matrix(y_true, y_pred))

  cr_textfile = open("classification_report.csv", "a") 
  cr_textfile.write("CONFUSION MATRIX FOR '"+label_name+"' \n")
  cr_textfile.write("\t"+cfm+"\n\n")
  cr_textfile.close()

  cr_textfile = open("classification_report.csv", "a") 
  cr_textfile.write("CLASSIFICATION REPORT FOR '"+label_name+"' \n")
  cr_textfile.close()
  class_report.to_csv("classification_report.csv", mode="a", header=True)

  # Formating 
  cr_textfile = open("classification_report.csv", "a") 
  cr_textfile.write("\n\n\n")
  cr_textfile.close()
  print("Writting Complete")


## Training and Prediction using Optimized SVC ML Classifier

In [0]:
# COMPLETED BY: Tobi Odufeso
 
y_name = 8  # SELECT COLUMN IN TARGET MATRIX (make sure to reload data before running)
print("Converting Target Matrix to Selected Vector")
y =  Y.select(str(y_name)) 
label_name = y.columns[0]

# Convert features into vector 
assembler = VectorAssembler(inputCols=X.columns, outputCol="features_", handleInvalid="keep")
X = assembler.transform(X)
y_label = y.withColumnRenamed(label_name, "label_")
y = y_label.withColumn('rowIdx', row_number().over(Window.orderBy(monotonically_increasing_id())))
X = X.withColumn('rowIdx', row_number().over(Window.orderBy(monotonically_increasing_id()))) 
features_with_label = (X.join(y, on=["rowIdx"]).drop("rowIdx")).select("features_", "label_").withColumnRenamed("features_", "features").withColumnRenamed("label_", "label")

training_data, testing_data = features_with_label.randomSplit([0.9,0.1], seed=42)

trainModel(training_data, testing_data, y_name)

Converting Target Matrix to Selected Vector

Grid Start
Grid End

Training Start
Training Scores (F1): [0.9801935479262307, 0.9808179077732465, 0.9814550464691736, 0.9801935479262307, 0.9808179077732465, 0.9814550464691736, 0.9801935479262307, 0.9808179077732465, 0.9814550464691736]
Trining End

Start Writting Training Score (F1) to Text File.

The best value for the regularization constant(C) is: 1e-05
Write Complete

Transform Start
Transform End

Writting Predicted Labels to File
Writting Complete

Writting F1 Score to File
Test Score(F1):     0.9759
Writting Complete

Writting Confusion Matrix and Binary Classification Report to File
Writting Complete
