In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit, mean, create_map
from pyspark.sql.types import FloatType
from itertools import chain
import requests
from bs4 import BeautifulSoup
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=com.amazonaws:aws-java-sdk-bundle:1.11.375,org.apache.hadoop:hadoop-aws:3.3.4 pyspark-shell"

# Initialize Spark session
spark = SparkSession.builder.appName("HeartDiseasePrediction")\
        .getOrCreate()

spark._jsc.hadoopConfiguration().set("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4")
spark._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")
    
# Set Hadoop configurations for S3 access
hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
hadoop_conf.set("fs.s3a.access.key", "ASIAYAAO5HRMMSII5MG7")
hadoop_conf.set("fs.s3a.secret.key", "OXOxUhDW+4aAxjexr3QsgCMHy35J8wWxDqa/shv3")
hadoop_conf.set("fs.s3a.session.token", "IQoJb3JpZ2luX2VjEPb//////////wEaCXVzLWVhc3QtMiJHMEUCIQClVIaclPf1qpD10aFMKFFc4sQ7g0QVsMz63F+Fp3o9wAIgBojPY4RT1xV6MfzYhAxMGowLcpEn2xRuhsfW371brQgq6wIIbxAAGgw1NDk3ODcwOTAwMDgiDMQmT1U2Lq4YJILbFyrIAmYS9ZYmU2bHeq8n3rAVrxIFhPXSZxEUcsEmM7+N8Dp5EiHfQTqm27fcskKl24bN9p4Oc7q7eQH4rGHMCy1GGg2BLBm6S6LtDZDRK8Tgwm/RyFVKYK8mKBGwiu1WiIm4QGLLRKj1lKOP9bmi23ptb7Jb9yXA6dqMc+STQ3fiUWJwaxouQGxpwnhoiV3Quv6yVKPAPH7cYvInpBR6s8qF76mgkHPDFuUzbqFtj3q1+ZKvjQlwYM/+EyE5uhpFJfi+0ozFnfYn+YPrWgMdL3CV8pNaqLuTxh48fOj4uiTH8NFOPVK8CihGY5cAEnSdf//VGscwDyUPEdggrLOroyBVNNf29fiMrXOaTEXZWQ0fqPizM/LFaIHWsiPkPYLjl7NdwniM5pEnsYIp1Fq8Gy82NqgmZXYU+VOyUgxN2MuXmyXbNRjKGrNw078wmrq7sgY6pwGmimQF6BRLVCHfQ9irASgPPIs+ZfKh1Iz+D7MLLJiOUK6x4ftL3EZQzcVEN4p4MNk1X/u9aUx5tYKRVt1/XSpwvwGLf00laLNsJOtYOZ+6x2SAPCWoXmMQK8g52w5lcXkHKoM4nsrnigewC4vuNL/mjAQ7lyzc0mP3dCuVtsA5WETJ3+tBaYFsxAj4sjL49aLMmWXwrMKG+aP3iuAtJ3VlyngRaWqkHg==")

# Load data
s3_bucket = "s3a://de300spring2024"
file_key = 'rachel_yao/heart_disease(in).csv'
data_path = f"{s3_bucket}/{file_key}"
data = spark.read.csv(data_path, header=True, inferSchema=True)

:: loading settings :: url = jar:file:/home/ubuntu/anaconda3/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars
com.amazonaws#aws-java-sdk-bundle added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-70d350b8-2595-43fd-a865-5a0b6c4d83ec;1.0
	confs: [default]
	found com.amazonaws#aws-java-sdk-bundle;1.11.375 in central
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 330ms :: artifacts dl 11ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.12.262 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.4 from central in [default]
	org.wildfly.openssl#wildfly-openssl;1.0.7.Final from central in [default]
	:: evicted modules:
	com.amazonaws#aws-java-sdk-bundle;1.11.375 by [com.amazonaws#aws-java-sdk-bundle;1.12.262] in [default]
	--

In [3]:
# Select relevant columns and clean data
retain = ['age', 'sex', 'painloc', 'painexer', 'cp', 'trestbps', 'smoke', 
          'fbs', 'prop', 'nitr', 'pro', 'diuretic', 'thaldur', 'thalach', 
          'exang', 'oldpeak', 'slope', 'target']
data = data.select(retain)

# Fill missing values and clean data
data = data.fillna({
    'painloc': data.groupBy().agg({"painloc": "max"}).collect()[0][0],
    'painexer': data.groupBy().agg({"painexer": "max"}).collect()[0][0],
    'trestbps': data.approxQuantile("trestbps", [0.5], 0)[0],
    'oldpeak': data.approxQuantile("oldpeak", [0.5], 0)[0],
    'thaldur': data.approxQuantile("thaldur", [0.5], 0)[0],
    'thalach': data.approxQuantile("thalach", [0.5], 0)[0],
    'sex': data.groupBy().agg({"sex": "max"}).collect()[0][0],
    'cp': data.groupBy().agg({"cp": "max"}).collect()[0][0],
})
data = data.withColumn('trestbps', when(col('trestbps') < 100, lit(120)).otherwise(col('trestbps')))
data = data.withColumn('oldpeak', when((col('oldpeak') < 0) | (col('oldpeak') > 4), lit(1)).otherwise(col('oldpeak')))
data = data.withColumn('fbs', when(col('fbs') > 1, lit(0)).otherwise(col('fbs')))
data = data.withColumn('prop', when(col('prop') > 1, lit(0)).otherwise(col('prop')))
data = data.withColumn('nitr', when(col('nitr') > 1, lit(0)).otherwise(col('nitr')))
data = data.withColumn('pro', when(col('pro') > 1, lit(0)).otherwise(col('pro')))
data = data.withColumn('diuretic', when(col('diuretic') > 1, lit(0)).otherwise(col('diuretic')))

In [4]:
# scrape smoking rates by age from source 1
def scrape_smoking_rates_by_age(url):
    response = requests.get(url)
    soup = BeautifulSoup(response.content, 'html.parser')
    tables = soup.find_all('table')
    smoking_data = {}
    for table in tables[1]:
        rows = table.find_all('tr')
        for row in rows[1:]:
            ths = row.find_all('th')
            tds = row.find_all('td')
            age_range = ths[0].text.strip()
            smoking_rate = float(tds[9].text.strip())
            if 'and over' in age_range:
                min_age = int(age_range.split()[0])
                max_age = 120  # assuming 120 as an upper limit for age
            else:
                min_age, max_age = map(int, age_range.split('–'))
            for age in range(min_age, max_age + 1):
                smoking_data[age] = smoking_rate
    return smoking_data

# scrape smoking rates by age and sex from source 2
def scrape_smoking_rates_by_age_and_sex(url):
    response = requests.get(url)
    soup = BeautifulSoup(response.content, 'html.parser')
    # sex
    cards = soup.find_all("div", class_="card-body")
    gender_data = {}
    for card in cards[2:3]:
        rows = card.find_all('li', class_='main')
        for row in rows:
            text = row.text.strip()
            gender = text.split()[6]
            rate = float(text.split()[7].strip('()%'))
            gender_data[gender] = rate
    # age
    age_data = {}
    for card in cards[3:4]:
        rows = card.find_all('li')
        for row in rows:
            text = row.text.strip()
            age_range = text.split()[7]
            if 'and older' in text:
                min_age = int(age_range.split()[0])
                max_age = 120  # assuming 120 as an upper limit for age
            else:
                min_age, max_age = map(int, age_range.split('–'))
                rate = float(text.split()[9].strip('()%'))
            for age in range(min_age, max_age + 1):
                age_data[age] = rate
    return gender_data, age_data

In [5]:
# source 1
source1 = 'https://www.abs.gov.au/statistics/health/health-conditions-and-risks/smoking/latest-release'
smoking_source1 = scrape_smoking_rates_by_age(source1)

# source 2
source2 = 'https://www.cdc.gov/tobacco/data_statistics/fact_sheets/adult_data/cig_smoking/index.htm'
gender_data, age_data = scrape_smoking_rates_by_age_and_sex(source2)

broadcast_smoking_source1 = spark.sparkContext.broadcast(smoking_source1)
broadcast_gender_data = spark.sparkContext.broadcast(gender_data)
broadcast_age_data = spark.sparkContext.broadcast(age_data)

In [6]:
# Create mappings for smoke imputation
smoking_map_source1 = create_map([lit(x) for x in chain(*broadcast_smoking_source1.value.items())])
smoking_map_age = create_map([lit(x) for x in chain(*broadcast_age_data.value.items())])
men_rate = broadcast_gender_data.value['men']
women_rate = broadcast_gender_data.value['women']

# Impute smoke_source1
data = data.withColumn('smoke_source1', 
                       when(col('smoke').isNull(), smoking_map_source1[col('age')])
                       .otherwise(col('smoke')))

# Impute smoke_source2
data = data.withColumn('smoke_source2', 
                       when(col('smoke').isNull() & (col('sex') == 0), smoking_map_age[col('age')])
                       .when(col('smoke').isNull() & (col('sex') == 1), smoking_map_age[col('age')] * (men_rate / women_rate))
                       .otherwise(col('smoke')))

# Drop original smoke column
data = data.drop('smoke')

data = data.dropna()

# Show data
data.show()

+---+---+-------+--------+---+--------+---+----+----+---+--------+-------+-------+-----+-------+-----+------+-------------+------------------+
|age|sex|painloc|painexer| cp|trestbps|fbs|prop|nitr|pro|diuretic|thaldur|thalach|exang|oldpeak|slope|target|smoke_source1|     smoke_source2|
+---+---+-------+--------+---+--------+---+----+----+---+--------+-------+-------+-----+-------+-----+------+-------------+------------------+
| 63|  1|      1|       1|  1|     145|  1|   0|   0|  0|       0|   10.5|    150|    0|    2.3|    3|     0|         14.9|19.325742574257426|
| 67|  1|      1|       1|  4|     160|  0|   1|   0|  0|       0|    9.5|    108|    1|    1.5|    2|     1|          8.7|19.325742574257426|
| 67|  1|      1|       1|  4|     120|  0|   1|   0|  0|       0|    8.5|    129|    1|    2.6|    2|     1|          8.7|19.325742574257426|
| 37|  1|      1|       1|  3|     130|  0|   1|   0|  0|       0|   13.0|    187|    0|    3.5|    3|     0|         10.9|16.342574257425742|

In [7]:
# 3
# Feature engineering
from pyspark.sql import DataFrame

# Ensure 'target' column is categorical
indexer = StringIndexer(inputCol="target", outputCol="label")
data = indexer.fit(data).transform(data)

# Define the feature columns
feature_cols = ['age', 'sex', 'painloc', 'painexer', 'cp', 'trestbps', 'fbs', 'prop', 'nitr', 'pro', 'diuretic', 'thaldur', 'thalach', 'exang', 'oldpeak', 'slope', 'smoke_source1', 'smoke_source2']

for col_name in feature_cols:
    data = data.withColumn(col_name, col(col_name).cast('double'))

# Assemble feature columns into a single vector column
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
data = assembler.transform(data)

# Split the data into training and test sets with 90-10 split and stratification
train_data, test_data = data.randomSplit([0.9, 0.1], seed=42)

# Verify the splits
train_data.groupBy("label").count().show()
test_data.groupBy("label").count().show()

# Initialize models
lr = LogisticRegression(featuresCol='features', labelCol='label')
rf = RandomForestClassifier(featuresCol='features', labelCol='label')
gbt = GBTClassifier(featuresCol='features', labelCol='label')

# Set up cross-validation
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')

# Logistic Regression
lr_param_grid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]).build()
lr_cv = CrossValidator(estimator=lr, estimatorParamMaps=lr_param_grid, evaluator=evaluator, numFolds=5)
lr_model = lr_cv.fit(train_data)
lr_accuracy = evaluator.evaluate(lr_model.transform(test_data))

# Random Forest
rf_param_grid = ParamGridBuilder().addGrid(rf.numTrees, [10, 20]).build()
rf_cv = CrossValidator(estimator=rf, estimatorParamMaps=rf_param_grid, evaluator=evaluator, numFolds=5)
rf_model = rf_cv.fit(train_data)
rf_accuracy = evaluator.evaluate(rf_model.transform(test_data))

# Gradient Boosting
gbt_param_grid = ParamGridBuilder().addGrid(gbt.maxDepth, [5, 10]).build()
gbt_cv = CrossValidator(estimator=gbt, estimatorParamMaps=gbt_param_grid, evaluator=evaluator, numFolds=5)
gbt_model = gbt_cv.fit(train_data)
gbt_accuracy = evaluator.evaluate(gbt_model.transform(test_data))

def evaluate_model(predictions, label_col="label"):
    evaluator = MulticlassClassificationEvaluator(labelCol=label_col)
    accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
    precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
    recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
    f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})
    print(f"Accuracy: {accuracy}")
    print(f"Precision: {precision}")
    print(f"Recall: {recall}")
    print(f"F1 Score: {f1}")

# Evaluate models after balancing
lr_predictions = lr_model.transform(test_data)
print("Logistic Regression Metrics:")
evaluate_model(lr_predictions)

rf_predictions = rf_model.transform(test_data)
print("Random Forest Metrics:")
evaluate_model(rf_predictions)

gbt_predictions = gbt_model.transform(test_data)
print("Gradient Boosting Metrics:")
evaluate_model(gbt_predictions)

24/05/23 06:40:50 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-----+-----+
|label|count|
+-----+-----+
|  0.0|  282|
|  1.0|  184|
+-----+-----+

+-----+-----+
|label|count|
+-----+-----+
|  0.0|   25|
|  1.0|   15|
+-----+-----+



24/05/23 06:40:58 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


Logistic Regression Metrics:
Accuracy: 0.775
Precision: 0.7897727272727273
Recall: 0.7749999999999999
F1 Score: 0.778046421663443
Random Forest Metrics:
Accuracy: 0.85
Precision: 0.8497150997150997
Recall: 0.85
F1 Score: 0.8475274725274724
Gradient Boosting Metrics:
Accuracy: 0.775
Precision: 0.80625
Recall: 0.775
F1 Score: 0.7785714285714286


In [None]:
print("Random forest seems to perform the best out of the models tested, with a high mean accuracy of 0.85, precision of 0.85, recall of 0.85, and f1 score of 0.85.")