In [26]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, FloatType, LongType, StringType, DoubleType
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import col, when
from pyspark import SparkContext, SparkConf
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import StringIndexer, VectorAssembler, Imputer
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
import pyspark.sql.functions as F
from itertools import combinations
import os

In [2]:
# Check Python path
import sys
sys.executable

'/tmp/demos/bin/python3'

## Load Data from S3

In [18]:
DATA_FOLDER = "data"

NUMBER_OF_FOLDS = 5
SPLIT_SEED = 7576
TRAIN_TEST_SPLIT = 0.9

## Function for Data Reading

In [4]:
def read_data(spark: SparkSession) -> DataFrame:
    """
    read data; since the data has the header we let spark guess the schema
    """
    
    # Read the CSV data into a DataFrame
    hd_data = spark.read \
        .format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load(os.path.join(DATA_FOLDER,"*.csv"))

    return hd_data

In [5]:
spark = SparkSession.builder \
        .appName("Predict Heart Disease") \
        .getOrCreate()

data = read_data(spark)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/24 01:57:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [23]:
## to fix
def read_data_s3(spark: SparkSession, s3_path:str) -> DataFrame:
    """
    read data; since the data has the header we let spark guess the schema
    """
    
    # Read the heart disease CSV data into a DataFrame
    hd_data = spark.read \
        .format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load(s3_path)

    return hd_data

In [24]:
## to fix
# Set the paths to the required JAR files
hadoop_aws_jar = "/usr/local/hadoop/share/hadoop/tools/lib/hadoop-aws-3.4.0.jar"
aws_sdk_bundle_jar = "/usr/local/hadoop/share/hadoop/tools/lib/aws-java-sdk-bundle-1.11.874.jar"

spark = SparkSession.builder \
        .appName("Predict Heart Disease") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.access.key", "ASIAYAAO5HRMNACL4FW7") \
        .config("spark.hadoop.fs.s3a.secret.key", "brTyLcZyACa6e+NPpQQ4lfB1AyHGg58xNlCryQwK") \
        .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
        .config("spark.jars", f"{hadoop_aws_jar},{aws_sdk_bundle_jar}") \
        .getOrCreate()

s3_bucket_path = "s3a://de300spring2024/cheryl_chen/hw3/all_locs.csv"

data = read_data_s3(spark,s3_bucket_path)
data.show()

spark.stop()

24/05/24 02:49:48 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: s3a://de300spring2024/cheryl_chen/hw3/all_locs.csv.
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:53)
	at org.apache.spark.sql.execution.datasources.Da

Py4JJavaError: An error occurred while calling o59891.load.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:724)
	at scala.collection.immutable.List.map(List.scala:293)
	at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:722)
	at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:551)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:404)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2592)
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2686)
	... 29 more


## Writing new Transformer type class

In [6]:
class DataCleaner(Transformer):
    def _transform(self, dataset: DataFrame) -> DataFrame:
        # Select the necessary columns
        dt = dataset.select('age', 'sex', 'painloc', 'painexer', 'cp', 'trestbps', 'fbs', 
                            'prop', 'nitr', 'pro', 'diuretic', 'thaldur', 'thalach', 'exang', 
                            'oldpeak', 'slope', 'num')

        # Convert all columns to numeric, handling errors by coercion
        for column in dt.columns:
            dt = dt.withColumn(column, col(column).cast('float'))

        dt = dt.withColumn('num', F.when(F.col('num') == 0, 0).otherwise(1))

        # a. Filter painloc and painexer to be either 1 or 0
        dt = dt.filter((col('painloc') >= 0) & (col('painexer') >= 0))
        dt = dt.withColumn('painloc', when(col('painloc') > 1, 1).otherwise(col('painloc')))
        dt = dt.withColumn('painexer', when(col('painexer') > 1, 1).otherwise(col('painexer')))

        # b. Replace trestbps < 100 with 0
        dt = dt.withColumn('trestbps', when(col('trestbps') < 100, 0).otherwise(col('trestbps')))

        # c. Replace oldpeak < 0 with 0 and oldpeak > 4 with 4
        dt = dt.withColumn('oldpeak', when(col('oldpeak') < 0, 0).when(col('oldpeak') > 4, 4).otherwise(col('oldpeak')))

        # d. Remove rows with missing thaldur and thalach
        dt = dt.filter((col('thaldur') != -9) & (col('thalach') != -9))

        # e. Remove rows with missing fbs, prop, nitr, pro, diuretic, adjusting prop to be 0 or 1
        dt = dt.filter((col('fbs') != -9) & (col('prop') != -9) & (col('nitr') != -9) &
                       (col('pro') != -9) & (col('diuretic') != -9))
        dt = dt.withColumn('prop', when(col('prop') > 1, 1).otherwise(col('prop')))

        # f. Filter exang and slope to be within specific sets
        dt = dt.filter(col('exang').isin([0, 1]))
        dt = dt.filter(col('slope').isin([1, 2, 3]))

        return dt

## The ML Pipeline

In [20]:
def pipeline(data: DataFrame):
    # Step 1: Data Cleaning
    cleaner = DataCleaner()
    cleaned_data = cleaner.transform(data)

    # Step 2: Feature Engineering
    feature_columns = [col for col in cleaned_data.columns if col != 'num']
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

    # Step 3: Define Classifiers
    lr = LogisticRegression(featuresCol='features', labelCol='num')
    rf = RandomForestClassifier(featuresCol='features', labelCol='num')

    # Step 4: Pipeline Setup
    pipeline_lr = Pipeline(stages=[assembler, lr])
    pipeline_rf = Pipeline(stages=[assembler, rf])

    # Step 5: Hyperparameter Tuning Setup
    paramGrid_lr = ParamGridBuilder() \
        .addGrid(lr.regParam, [0.01, 0.1, 0.5]) \
        .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
        .build()

    paramGrid_rf = ParamGridBuilder() \
        .addGrid(rf.numTrees, [10, 20, 50]) \
        .addGrid(rf.maxDepth, [5, 10, 20]) \
        .build()

    # Step 6: Cross-Validation Setup
    evaluator = BinaryClassificationEvaluator(labelCol='num', rawPredictionCol="rawPrediction", metricName='areaUnderROC')
    cv_lr = CrossValidator(estimator=pipeline_lr, estimatorParamMaps=paramGrid_lr, evaluator=evaluator, numFolds=NUMBER_OF_FOLDS, seed=SPLIT_SEED)
    cv_rf = CrossValidator(estimator=pipeline_rf, estimatorParamMaps=paramGrid_rf, evaluator=evaluator, numFolds=NUMBER_OF_FOLDS, seed=SPLIT_SEED)

    # Step 7: Train-Test Split
    train_data, test_data = cleaned_data.randomSplit([TRAIN_TEST_SPLIT, 1-TRAIN_TEST_SPLIT], seed=SPLIT_SEED)

    # Step 8: Fit the models on the training data
    model_lr = cv_lr.fit(train_data)
    model_rf = cv_rf.fit(train_data)

    # Step 9: Model Evaluation on test data
    predictions_lr = model_lr.transform(test_data)
    predictions_rf = model_rf.transform(test_data)

    auc_lr = evaluator.evaluate(predictions_lr)
    auc_rf = evaluator.evaluate(predictions_rf)

    # Step 10: Select the better model based on AUC and print the best parameters
    if auc_lr > auc_rf:
        best_model = model_lr
        best_auc = auc_lr
        best_model_name = "Logistic Regression"
        best_params = best_model.bestModel.stages[-1].extractParamMap()
    else:
        best_model = model_rf
        best_auc = auc_rf
        best_model_name = "Random Forest"
        best_params = best_model.bestModel.stages[-1].extractParamMap()

    # Print the details of the best model
    print(f"The best model is: {best_model_name}")
    print(f"AUC of the best model: {best_auc}")
    print("Best parameters:")
    for param, value in best_params.items():
        print(f"{param}: {value}")


## Main Class

In [21]:
def main():
    # Create a Spark session
    spark = SparkSession.builder \
        .appName("Predict Heart Disease") \
        .getOrCreate()

    data = read_data(spark)
    pipeline(data)

    spark.stop()
    
main()

                                                                                

The best model is: Logistic Regression
AUC of the best model: 0.8
Best parameters:
LogisticRegression_10645af2273b__aggregationDepth: 2
LogisticRegression_10645af2273b__elasticNetParam: 0.0
LogisticRegression_10645af2273b__family: auto
LogisticRegression_10645af2273b__featuresCol: features
LogisticRegression_10645af2273b__fitIntercept: True
LogisticRegression_10645af2273b__labelCol: num
LogisticRegression_10645af2273b__maxBlockSizeInMB: 0.0
LogisticRegression_10645af2273b__maxIter: 100
LogisticRegression_10645af2273b__predictionCol: prediction
LogisticRegression_10645af2273b__probabilityCol: probability
LogisticRegression_10645af2273b__rawPredictionCol: rawPrediction
LogisticRegression_10645af2273b__regParam: 0.1
LogisticRegression_10645af2273b__standardization: True
LogisticRegression_10645af2273b__threshold: 0.5
LogisticRegression_10645af2273b__tol: 1e-06
