In [None]:
import time
import numpy as np
import pandas as pd
from sklearn import metrics
from tabulate import tabulate
from datetime import timedelta
import matplotlib.pyplot as plt

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import UnivariateFeatureSelector
from pyspark.ml.feature import ChiSqSelector, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier as pyspark_DecisionTreeClassifier
from pyspark.sql.functions import col

In [None]:
# Creating a new spark session
spark = SparkSession.builder.master("local[*]").appName("MLlib lab").getOrCreate()

In [None]:
#sparkDF = spark.read.option("maxColumns", 22285).parquet("/mnt/the-data-transformers/spark_output/preprocessed_data_final2.pqt")
#sparkDF = spark.read.option("maxColumns", 22285).csv("/mnt/the-data-transformers/Leukemia_GSE9476_test.csv")
#sparkDF = spark.read.option("maxColumns", 20532).csv("/mnt/the-data-transformers/gene_encoded.csv", header='true')
sparkDF = spark.read.csv("/mnt/the-data-transformers/toxicity.csv", header='true')
"""
1 PRAD
2 LUAD
3 BRCA
4 KIRC
5 COAD
"""

In [None]:
sparkDF = sparkDF.select(*(col(c).cast("float").alias(c) for c in sparkDF.columns))
sparkDF.head()

In [None]:
sparkDF.count()

In [None]:
len(sparkDF.columns)

In [None]:
'''
Function: spark_UnivariateFeatureSelector
INPUT:
------
i: Number of Features to select
_vector_sparkDF: DataFrame from which the Features will be selected

OUTPUT:
-------
1. The feature importances returned by the newly trained model
'''
def spark_UnivariateFeatureSelector(i, _vector_sparkDF):
   
  # Selecting the best i features from the entire dataset
  selector = UnivariateFeatureSelector(
    featuresCol="features", 
    outputCol="selectedFeatures", 
    labelCol="type", 
    selectionMode="numTopFeatures"
  )
  selector.setFeatureType("continuous").setLabelType("categorical").setSelectionThreshold(i)
  result = selector.fit(_vector_sparkDF).transform(_vector_sparkDF)

  # Using pyspark DecisionTreeClassifier to define and fit the ML model
  dt = pyspark_DecisionTreeClassifier(labelCol="type", featuresCol="selectedFeatures")
  model = dt.fit(result)
  
  return model.featureImportances 


In [None]:
'''
Function: get_important_features
INPUT:
------
num: the number of features present in a chunk/subset
i: the chunk number

OUTPUT:
-------
1. The features selected by the UnivariateFeatureSelector
'''
def get_important_features(num, i):
  num_columns = num
  num_features = num
  
  # get chunk using indexing
  print("range, lower: "+str(1+(i*num_columns))+ " upper:"+str(num_columns*(i+1)))
  chunk =  sparkDF.select(*sparkDF.columns[1+(i*num_columns):num_columns*(i+1)], sparkDF['type']).distinct()
  vecAssembler = VectorAssembler(inputCols=chunk.columns, outputCol="features")
  newDF = vecAssembler.transform(chunk)
  selected_features = spark_UnivariateFeatureSelector(num_features, newDF)

  return selected_features


'''
Function: remove_features
INPUT:
------
sparkDF: the spark DataFrame consisting of the dataset

OUTPUT:
-------
1. The new spark DataFrame that only contains the most important subset of features
'''
def remove_features(sparkDF):
  print("removing features")
  columns = 100 # 2000
  iterations = 10 # 11
  toDrop = []
  
  # loop through chunks
  for i in range(0, iterations):
    print("chunk: "+str(i+1))

    # set num columns to amount left if below set columns
    if (len(sparkDF.columns) < columns):
      print("updating remaining columns")
      columns = len(sparkDF.columns)
      
    # get important features
    important_feature_vectors = get_important_features(columns, i) 
    vector = important_feature_vectors.toArray()
    index  = columns*i
    
    # loop through all colums and and add low importance ones to array
    for c in range(0, columns):
      if (vector[c] == 0): # if unimportant
        tmp_index = index + c
        if (sparkDF.columns[tmp_index] and sparkDF.columns[tmp_index] != "type"): # if index is correct
          toDrop.append(sparkDF.columns[tmp_index])
        
  # drop low importance columns
  sparkDF = sparkDF.drop(*toDrop)
  print("remaining columns: "+str(len(sparkDF.columns)))

  return sparkDF

sparkDF = remove_features(sparkDF)

In [None]:
len(sparkDF.columns)

In [None]:
# Make predictions
selector = UnivariateFeatureSelector(
  featuresCol="features", 
  outputCol="selectedFeatures", 
  labelCol="type", 
  selectionMode="numTopFeatures"
)

vecAssembler = VectorAssembler(inputCols=sparkDF.columns, outputCol="features")
sparkDF = vecAssembler.transform(sparkDF)
  
selector.setFeatureType("continuous").setLabelType("categorical") #.setSelectionThreshold(i)
result = selector.fit(sparkDF).transform(sparkDF)

# Splitting the data into training & testing
(train, test) = result.randomSplit([0.8, 0.2])

# Using pyspark DecisionTreeClassifier to define and fit the ML model
dt = pyspark_DecisionTreeClassifier(labelCol="type", featuresCol="selectedFeatures")
model = dt.fit(train)
predictions = model.transform(test)

# Evaluating the predictions
evaluator = MulticlassClassificationEvaluator(
  labelCol="type", 
  predictionCol="prediction", 
  metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)
print("Test accuracy = "+str(accuracy))
