Matthew Koton


Nofar Yungman


# **Initial Setup**

In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install findspark

In [None]:
!pip install pyspark

In [None]:
!pip install kaggle --upgrade

In [None]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()

import pyspark

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('treatmeant').getOrCreate()

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.sql import functions as f
from pyspark.ml.feature import Imputer

# **Read Data**

In [None]:
df_train = spark.read.csv("/content/drive/MyDrive/Big data/hw3/HW3-sparkML/bdp_train.csv", inferSchema=True, header=True)
df_train_size = df_train.count()
num_columns_original = len(df_train.columns)

# **Null columns**

In [None]:
null_counts = df_train.select([f.count(f.when(f.col(c).isNull(), c)).alias(c) for c in df_train.columns]).collect()[0].asDict()

In [None]:
null_threshold_percent = 0.1
##null_threshold_percent = 1

to_drop = [k for k, v in null_counts.items() if (v / df_train_size) >= null_threshold_percent]

In [None]:
df_train = df_train.drop(*to_drop)

In [None]:
# print threashold percentage
print("threashold percentage:", null_threshold_percent)

# Print original number of columns
print("Number of columns in the original DataFrame:", num_columns_original)

# Print the number of columns dropped
num_columns_after_drop = len(df_train.columns)
print("Number of columns dropped:", num_columns_original - num_columns_after_drop)

# Print the number of columns in the new DataFrame
print("Number of columns in the new DataFrame:", num_columns_after_drop)

In [None]:
# print schema of our df after dropping null columns
df_train.printSchema()

# **Null rows**

In [None]:
#threshold = int(len(df_train.columns)*0.1)
#print(threshold)
#print(df_train.count())
#cols_minus_index = [col for col in df_train.columns if col != "Index"]
#df_train = df_train.dropna(how='any', thresh=threshold, subset=cols_minus_index)
print(df_train.count())


# **Get catagorical and numeric columns**

In [None]:
column_types = df_train.dtypes

str_column_names = [f"A{i}" for i in range(1, 84)]
non_str_column_names = [f"A{i}" for i in range(84, 453)]

numerical_columns = []
categorical_columns = []

for col_name, col_type in column_types:
  if (col_name in str_column_names):
    categorical_columns.append(col_name)
    if(col_type != 'string'):
      df_train = df_train.withColumn(col_name, df_train[col_name].cast("string"))

  if (col_name in non_str_column_names):
    numerical_columns.append(col_name)
    if (col_type != 'int' or col_type != 'double'):
      df_train = df_train.withColumn(col_name, df_train[col_name].cast("int"))



# **Create Pipeline**

<u>changing features</u>

In [None]:
categorical_columns_indexed = [c + "_index" for c in categorical_columns]


In [None]:
indexers = []

for column in categorical_columns:
    indexer = StringIndexer(inputCol=column, outputCol=column+"_index")
    indexer.setHandleInvalid("keep")
    indexers.append(indexer)

In [None]:

imputer_numerical = Imputer(
    inputCols= numerical_columns,
    outputCols=[f"{x}_imputed" for x in numerical_columns],
    strategy="mean"
)

imputer_catagorical = Imputer(
    inputCols= categorical_columns_indexed,
    outputCols=[f"{x}_imputed" for x in categorical_columns_indexed],
    strategy="mode"
)



In [None]:
features_combined = [f"{x}_imputed" for x in numerical_columns] + [f"{x}_imputed" for x in categorical_columns_indexed]

In [None]:
assembler = VectorAssembler(inputCols=features_combined, outputCol="features_asm", handleInvalid='keep' )

In [None]:
from pyspark.ml.feature import PCA, StandardScaler

scaler = StandardScaler(inputCol="features_asm", outputCol="scaledFeatures", withStd=False, withMean=True)


pca = PCA(k=70, inputCol="scaledFeatures")
pca.setOutputCol("features")

<u>Create ML model</u>

In [None]:
rfc = RandomForestClassifier(labelCol='CLASSIndex', featuresCol='features', numTrees=100)

In [None]:
from xgboost.spark import SparkXGBClassifier
spark_reg_estimator = SparkXGBClassifier(
  features_col="features",
  label_col="CLASSIndex",
  num_workers=2,
  learning_rate = 0.01,
  max_depth = 7,
)


In [None]:
spark_reg_estimator.getParam("learning_rate")

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

#paramGrid = ParamGridBuilder()\
#  .addGrid(spark_reg_estimator.max_depth, [2, 5])\
#  .addGrid(spark_reg_estimator.n_estimators, [10, 100])\
#  .build()



#paramGrid = ParamGridBuilder() \
#    .addGrid(spark_reg_estimator.learning_rate, [0.05, 0.1]) \
#    .addGrid(spark_reg_estimator.max_depth, [5, 75,150]) \
#    .build()

paramGrid = ParamGridBuilder() \
    .addGrid(spark_reg_estimator.learning_rate, [0.01]) \
    .addGrid(spark_reg_estimator.max_depth, [5,7]) \
    .build()





binary_eval = BinaryClassificationEvaluator(labelCol='CLASSIndex')

# Declare the CrossValidator, which performs the model tuning.
cv = CrossValidator(estimator=spark_reg_estimator, evaluator=binary_eval, estimatorParamMaps=paramGrid)

<u>Define pipeline</u>

In [None]:
#pipeline = Pipeline(stages=indexers + [imputer_numerical, imputer_catagorical, assembler, scaler, pca, spark_reg_estimator])
pipeline = Pipeline(stages=indexers + [imputer_numerical, imputer_catagorical, assembler, scaler, pca, cv])



# **Fit pipeline**

In [None]:
# create class indexer
indexer = StringIndexer(inputCol='CLASS', outputCol='CLASSIndex')
df_train = indexer.fit(df_train).transform(df_train)

In [None]:
'''
print(df_train.count())

# remove null rows
df_train = df_train.na.drop()
print(df_train.count())
'''

In [None]:
# split data
train_data, val_data = df_train.randomSplit([0.8, 0.2])

In [None]:
pipeline = pipeline.fit(train_data)
#pipeline = pipeline.fit(df_train)

In [None]:
cv_model = pipeline.stages[-1]
best_model = cv_model.bestModel

# Assuming 'learningRate' and 'n_estimators' are parameters of the model
best_learning_rate = best_model.getOrDefault('learning_rate')
best_n_estimators = best_model.getOrDefault('max_depth')

print(best_learning_rate, best_n_estimators)
# 0.1, 0.5

# **transform pipeline**

In [None]:
predictions_df = pipeline.transform(val_data)

In [None]:
binary_eval = BinaryClassificationEvaluator(labelCol='CLASSIndex')
print('RFC prediction AUC:', binary_eval.evaluate(predictions_df))

# **Test Dataset**

In [None]:
df_test = spark.read.csv("/content/drive/MyDrive/Big data/hw3/HW3-sparkML/bdp_test.csv", inferSchema=True, header=True)

In [None]:
column_types_test = df_test.dtypes

for col_name, col_type in column_types_test:
  if (col_name in str_column_names):
    if(col_type != 'string'):
      df_test = df_test.withColumn(col_name, df_test[col_name].cast("string"))

  if (col_name in non_str_column_names):
    if (col_type != 'int' or col_type != 'double'):
      df_test = df_test.withColumn(col_name, df_test[col_name].cast("int"))

In [None]:
print(df_test.count())
print(len(df_test.columns))

print(df_train.count())
print(len(df_train.columns))

In [None]:
test_predictions_df = pipeline.transform(df_test)

In [None]:
from pyspark.ml.functions import vector_to_array
test_predictions_df = test_predictions_df.withColumn('ProbToYes', vector_to_array(f.col('probability')).getItem(1)).select('index', 'ProbToYes')


In [None]:
print(test_predictions_df.printSchema())
print(test_predictions_df.count())


In [None]:
test_predictions_df.coalesce(1).write.mode('overwrite').csv('/content/drive/MyDrive/Big data/hw3/HW3-sparkML/pred-no-cv.csv', header = 'true')