In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, countDistinct, mean, stddev
from pyspark.sql.window import Window
import pyspark.sql.functions as F
import pandas as pd
import pyspark.pandas as ps
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.jars", "path_to_jar/spark-sas7bdat-3.0.0-s_2.12.jar") \
    .getOrCreate()
print(spark.version)



3.4.1


In [32]:
spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.jars", "path_to_jar/spark-sas7bdat-3.0.0-s_2.12.jar") \
    .getOrCreate()


In [5]:
# Read the XPT file
# df1 = pd.read_sas('LLCP2021.XPT')
df1 = spark.read.csv('LLCP2021.csv', header=True, inferSchema=True)
df_cvd = spark.read.csv('CVD_cleaned.csv', header=True, inferSchema=True)
df_heart = spark.read.csv('heart.csv', header=True, inferSchema=True)

In [49]:
df = df1
# Cross-tabulation between CVDINFR4 and INCOME3
ct = df.crosstab('INCOME3', 'CVDINFR4')

# Cross-tabulation between cp and target
ct_heart = df_heart.crosstab('cp', 'target')

from pyspark.sql.types import DoubleType, FloatType, IntegerType
from pyspark.sql.functions import col


def verify_data_integrity(df):
    results = {
        'total_values': df.count() * len(df.columns),
        'missing_values': {},
        'missing_values_percentage': {},
        'outliers': {},
        'outliers_percentage': {}
    }

    for column in df.columns:
        # Missing values
        missing_values = df.where(col(column).isNull()).count()
        results['missing_values'][column] = missing_values
        results['missing_values_percentage'][column] = (missing_values / df.count()) * 100

        if isinstance(df.schema[column].dataType, (DoubleType, FloatType, IntegerType)):
            quantiles = df.approxQuantile(column, [0.25, 0.75], 0.01)
            Q1, Q3 = quantiles
            IQR = Q3 - Q1
            outliers = df.where((col(column) < Q1 - 1.5*IQR) | (col(column) > Q3 + 1.5*IQR)).count()
            results['outliers'][column] = outliers
            results['outliers_percentage'][column] = (outliers / df.count()) * 100
        else:
            print(f"Skipping outlier check for non-numeric column: {column}")
            results['outliers'][column] = None
            results['outliers_percentage'][column] = None
    return results

df_results = verify_data_integrity(df)


Skipping outlier check for non-numeric column: IDATE
Skipping outlier check for non-numeric column: IMONTH
Skipping outlier check for non-numeric column: IDAY
Skipping outlier check for non-numeric column: IYEAR
Skipping outlier check for non-numeric column: SEQNO
Skipping outlier check for non-numeric column: TOLDCFS
Skipping outlier check for non-numeric column: HAVECFS
Skipping outlier check for non-numeric column: WORKCFS


In [51]:
df_results

{'total_values': 132923979,
 'missing_values': {'_STATE': 0,
  'FMONTH': 0,
  'IDATE': 0,
  'IMONTH': 0,
  'IDAY': 0,
  'IYEAR': 0,
  'DISPCODE': 0,
  'SEQNO': 0,
  '_PSU': 0,
  'CTELENM1': 320907,
  'PVTRESD1': 320907,
  'COLGHOUS': 438663,
  'STATERE1': 320907,
  'CELPHON1': 320907,
  'LADULT1': 320907,
  'COLGSEX': 438663,
  'NUMADULT': 320926,
  'LANDSEX': 388847,
  'NUMMEN': 373255,
  'NUMWOMEN': 373255,
  'RESPSLCT': 374029,
  'SAFETIME': 117786,
  'CTELNUM1': 117787,
  'CELLFON5': 117786,
  'CADULT1': 117786,
  'CELLSEX': 117788,
  'PVTRESD3': 117786,
  'CCLGHOUS': 437438,
  'CSTATE1': 117786,
  'LANDLINE': 117786,
  'HHADULT': 117788,
  'SEXVAR': 0,
  'GENHLTH': 4,
  'PHYSHLTH': 3,
  'MENTHLTH': 2,
  'POORHLTH': 205279,
  'PRIMINSR': 3,
  'PERSDOC3': 2,
  'MEDCOST1': 5,
  'CHECKUP1': 2,
  'EXERANY2': 2,
  'BPHIGH6': 2,
  'BPMEDS': 266560,
  'CHOLCHK3': 2,
  'TOLDHI3': 60836,
  'CHOLMED3': 61571,
  'CVDINFR4': 2,
  'CVDCRHD4': 2,
  'CVDSTRK3': 2,
  'ASTHMA3': 2,
  'ASTHNOW': 377

In [52]:


df_des = df.describe()
# Note: Spark does not directly calculate correlation between all variables and a specific variable
# You would use df.stat.corr('var1', 'var2') for each variable pair you're interested in

df_cvd_results = verify_data_integrity(df_cvd)
df_cvd_des = df_cvd.describe()

df_heart_results = verify_data_integrity(df_heart)
df_heart_des = df_heart.describe()
# Again, df.corr() method for each variable pair

# To get specific columns from describe(), you would typically use select:
df_des.select('summary', 'HEIGHT3', 'WEIGHT2', 'FLSHTMY3').show()

Skipping outlier check for non-numeric column: General_Health
Skipping outlier check for non-numeric column: Checkup
Skipping outlier check for non-numeric column: Exercise
Skipping outlier check for non-numeric column: Heart_Disease
Skipping outlier check for non-numeric column: Skin_Cancer
Skipping outlier check for non-numeric column: Other_Cancer
Skipping outlier check for non-numeric column: Depression
Skipping outlier check for non-numeric column: Diabetes
Skipping outlier check for non-numeric column: Arthritis
Skipping outlier check for non-numeric column: Sex
Skipping outlier check for non-numeric column: Age_Category
Skipping outlier check for non-numeric column: Smoking_History
+-------+------------------+-----------------+------------------+
|summary|           HEIGHT3|          WEIGHT2|          FLSHTMY3|
+-------+------------------+-----------------+------------------+
|  count|            425734|           426877|            211636|
|   mean|  809.564843305914|828.573661

In [3]:
df3 = spark.read.csv('df3.csv', header=True, inferSchema=True)

In [7]:
df = df1
# List of columns to drop
columns_to_drop = [
    'FMONTH', 'IDATE', 'IMONTH', 'IDAY', 'IYEAR', 'DISPCODE',
    'SEQNO', '_PSU', 'CTELENM1', 'PVTRESD1', 'COLGHOUS', 'STATERE1', 'CELPHON1'
]
# Drop the columns
df = df.drop(*columns_to_drop)

In [18]:
len(df.columns)

290

In [20]:
from pyspark.sql.functions import col, mean, when, count
# Calculate the percentage of missing values per column
missing_percentage_expr = [
    (count(when(col(c).isNotNull(), c)) / df.count()).alias(c)
    for c in df.columns
]
missing_percentage = df.select(*missing_percentage_expr)

# Collecting the results to Driver (not recommended for large datasets)
missing_percentage_local = missing_percentage.collect()[0].asDict()
columns_to_drop = [
    col_name for col_name, val in missing_percentage_local.items() if val <= 0.71
]

# Drop the columns with more than 29% missing values
df = df.drop(*columns_to_drop)

# Descriptive statistics
df_des1 = df.describe()

# Show results
df_des1.show()

+-------+------------------+--------------------+--------------------+--------------------+--------------------+------------------+-------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------

In [21]:
len(df.columns)

140

In [22]:
# List of columns to drop
columns_to_drop = [
    '_STATE', 'SAFETIME', 'CTELNUM1', 'CELLFON5', 'CADULT1'
]

# Drop the columns
df = df.drop(*columns_to_drop)

# Descriptive statistics
df_des2 = df.describe()
df_des2.show()

# Assuming verify_data_integrity is a custom function you need to define...
def verify_data_integrity(df):
    # Your custom data integrity checks and logic here...
    pass  # Remove or replace this line according to your implementation

# Call your custom function
df_results1 = verify_data_integrity(df)

# If df2 is just a copy of df after the above operations...
df2 = df

+-------+------------------+-------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+-----------------+------------------+------------------+------------------+-

In [24]:
# List of values to replace and their replacement
values_to_replace = [88, 555, 888]
value_replacement = 0

# Creating a new DataFrame df3 with replaced values
df3 = df2
for c in df2.columns:
    df3 = df3.withColumn(
        c,
        when(col(c).isin(values_to_replace), value_replacement).otherwise(col(c))
    )

# Show result
df3.show()

+-------+--------+-------+--------+-------+------+-------+--------+--------+--------+--------+--------+--------+--------+-------+--------+-------+--------+--------+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+-------+-----+--------+--------+--------+-------+--------+-------+-------+-------+----+-----+------+--------+--------+--------+--------+-------+--------+-------+--------+--------+-------+------+--------+--------+--------+--------+--------+------+-------+--------+--------+-------+------------------+--------+------------------+--------+--------+--------+------------------+------------------+-------+--------+--------+--------+--------+--------+--------+--------+--------+------+--------+--------+--------+--------+--------+--------+-------+-------+--------+-----+--------+--------+--------+----+--------+--------+------+------+-----+-----+-------+------+--------+-------+--------+-------+--------+--------+--------+--------+--------+-------------

In [None]:
# Create an empty DataFrame to union all results later
filtered_df = spark.createDataFrame([], df3.schema)

# Iterate through each column in the original dataframe
for col_name in df3.columns:
    # Compute Q1, Q3, and IQR for the column
    Q1, Q3 = df3.approxQuantile(col_name, [0.15, 0.85], 0)
    IQR = Q3 - Q1

    # Lower and Upper bounds for outliers
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR

    # Filtering the non-outliers
    # The | symbol represents 'or', so we're chaining filters together
    condition = (col(col_name) >= lower_bound) & (col(col_name) <= upper_bound)
    filtered_col_df = df3.filter(condition)

    # Union the results with the previous non-outliers DataFrame
    filtered_df = filtered_df.union(filtered_col_df)

# Drop duplicates from the union operation
filtered_df = filtered_df.distinct()

In [28]:
df3 = spark.read.csv('df3.csv', header=True, inferSchema=True)
df3.count()


83567

In [29]:
len(df3.columns)

112

In [30]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql import DataFrame

# Drop rows where specified columns have missing values
specified_columns = ['CHOLMED3', 'TOLDHI3', '_RFCHOL3']
df3 = df3.dropna(subset=specified_columns)

# Define a function to impute NaN values with samples from the available values
def impute_with_sample(df: DataFrame, column: str) -> DataFrame:
    # Counting the number of missing values
    num_missing = df.filter(col(column).isNull()).count()

    # If there are missing values, perform imputation
    if num_missing > 0:
        # Creating a window spec to sample from non-null values
        window_spec = Window().orderBy(F.rand())
        non_null_sample = df.filter(col(column).isNotNull()).withColumn("row_num", F.row_number().over(window_spec))

        # Creating a DataFrame of the sampled non-null values
        replacement_sample = non_null_sample.filter(col("row_num") <= num_missing).select(column).withColumnRenamed(column, "replace_val")

        # Imputing the missing values
        window_spec = Window().orderBy(F.rand())
        null_values = df.filter(col(column).isNull()).withColumn("row_num", F.row_number().over(window_spec))
        df = df.alias("a").join(null_values.alias("b"), df.index == null_values.index, how="left_outer")
        df = df.withColumn(column, when(col("b." + column).isNotNull(), col("b." + column)).otherwise(col("a." + column)))
        df = df.drop("b." + column).drop("row_num")

    return df

# Iterate through columns and apply the impute_with_sample function
for column in df3.columns:
    df3 = impute_with_sample(df3, column)

# Run your data integrity checks
# Assuming verify_data_integrity is a custom function you need to define...
def verify_data_integrity(df):
    # Your custom data integrity checks and logic here...
    pass  # Remove or replace this line according to your implementation

# Call your custom function
df3_results = verify_data_integrity(df3)

In [34]:
from pyspark.sql.types import IntegerType, FloatType
# Altering all columns type to IntegerType
for column_name in df3.columns:
    df3 = df3.withColumn(column_name, col(column_name).cast(FloatType()))

# Retrieve and print the data types of each column
df3_type = df3.dtypes
print(df3_type)

[('SEXVAR', 'float'), ('GENHLTH', 'float'), ('PHYSHLTH', 'float'), ('MENTHLTH', 'float'), ('PRIMINSR', 'float'), ('PERSDOC3', 'float'), ('MEDCOST1', 'float'), ('CHECKUP1', 'float'), ('EXERANY2', 'float'), ('BPHIGH6', 'float'), ('CHOLCHK3', 'float'), ('TOLDHI3', 'float'), ('CHOLMED3', 'float'), ('CVDINFR4', 'float'), ('CVDCRHD4', 'float'), ('CVDSTRK3', 'float'), ('CHCSCNCR', 'float'), ('CHCOCNCR', 'float'), ('CHCCOPD3', 'float'), ('ADDEPEV3', 'float'), ('CHCKDNY2', 'float'), ('DIABETE4', 'float'), ('HAVARTH5', 'float'), ('MARITAL', 'float'), ('EDUCA', 'float'), ('RENTHOM1', 'float'), ('VETERAN3', 'float'), ('EMPLOY1', 'float'), ('CHILDREN', 'float'), ('INCOME3', 'float'), ('WEIGHT2', 'float'), ('HEIGHT3', 'float'), ('DEAF', 'float'), ('BLIND', 'float'), ('DECIDE', 'float'), ('DIFFWALK', 'float'), ('DIFFDRES', 'float'), ('DIFFALON', 'float'), ('SMOKE100', 'float'), ('USENOW3', 'float'), ('ECIGNOW1', 'float'), ('ALCDAY5', 'float'), ('FLUSHOT7', 'float'), ('PNEUVAC4', 'float'), ('HIVTST7',

In [35]:
for column_name in df3.columns:
    df3 = df3.withColumn(column_name, col(column_name).cast(IntegerType()))

# Retrieve and print the data types of each column
df3_type = df3.dtypes
print(df3_type)

[('SEXVAR', 'int'), ('GENHLTH', 'int'), ('PHYSHLTH', 'int'), ('MENTHLTH', 'int'), ('PRIMINSR', 'int'), ('PERSDOC3', 'int'), ('MEDCOST1', 'int'), ('CHECKUP1', 'int'), ('EXERANY2', 'int'), ('BPHIGH6', 'int'), ('CHOLCHK3', 'int'), ('TOLDHI3', 'int'), ('CHOLMED3', 'int'), ('CVDINFR4', 'int'), ('CVDCRHD4', 'int'), ('CVDSTRK3', 'int'), ('CHCSCNCR', 'int'), ('CHCOCNCR', 'int'), ('CHCCOPD3', 'int'), ('ADDEPEV3', 'int'), ('CHCKDNY2', 'int'), ('DIABETE4', 'int'), ('HAVARTH5', 'int'), ('MARITAL', 'int'), ('EDUCA', 'int'), ('RENTHOM1', 'int'), ('VETERAN3', 'int'), ('EMPLOY1', 'int'), ('CHILDREN', 'int'), ('INCOME3', 'int'), ('WEIGHT2', 'int'), ('HEIGHT3', 'int'), ('DEAF', 'int'), ('BLIND', 'int'), ('DECIDE', 'int'), ('DIFFWALK', 'int'), ('DIFFDRES', 'int'), ('DIFFALON', 'int'), ('SMOKE100', 'int'), ('USENOW3', 'int'), ('ECIGNOW1', 'int'), ('ALCDAY5', 'int'), ('FLUSHOT7', 'int'), ('PNEUVAC4', 'int'), ('HIVTST7', 'int'), ('FRUIT2', 'int'), ('FRUITJU2', 'int'), ('FVGREEN1', 'int'), ('FRENCHF1', 'int'

In [37]:
from pyspark.sql.functions import monotonically_increasing_id
# 1. Add an 'ID' column to df
df = df.withColumn("ID", monotonically_increasing_id() + 1)

# 2. Split df into df_a and df_b
cols_a = ['ID'] + df.columns[:-1][:200000]  # ID + first 200,000 columns
cols_b = ['ID'] + df.columns[:-1][200000:]  # ID + remaining columns

df_a = df.select(*cols_a)
df_b = df.select(*cols_b)

# 3. Merge df_a and df_b on 'ID' to get df_s
df_s = df_a.join(df_b, on='ID')

# Drop 'ID' from df and df_s for comparison
df = df.drop("ID")
df_s = df_s.drop("ID")

# 4. Validate if df_s is the same as df
if df.subtract(df_s).count() == 0 and df_s.subtract(df).count() == 0:
    print("df_s is the same as df")
else:
    print("df_s and df are different")

df_s is the same as df


In [3]:
df4 = spark.read.csv('df4.csv', header=True, inferSchema=True)

In [8]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
import time

# Set up feature matrix X and target vector y
feature_columns = [col_name for col_name in df4.columns if col_name != 'CVDINFR4']
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(df4)

# Split data into 'label' and 'features'
data = data.select(col("CVDINFR4").alias("label"), "features")

# Initialize the logistic regression model
lr = LogisticRegression(
    maxIter=1000,
    regParam=1.0,
    elasticNetParam=0,
    family="multinomial"
)

# Record the start time
start_time = time.time()

# Train the model
lr_model = lr.fit(data)

# Calculate the time taken
time_taken = time.time() - start_time
print(f"Time taken to train the model: {time_taken:.2f} seconds")

# Predict on the entire dataset
predictions = lr_model.transform(data)

# Calculate and print the training accuracy
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)
print(f"Training Accuracy: {accuracy * 100:.2f}%")

Time taken to train the model: 8.15 seconds
Training Accuracy: 95.45%


In [9]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import time

# Create a Spark session
spark = SparkSession.builder \
    .appName("DecisionTreeWithPySpark") \
    .getOrCreate()

# Assuming df4 is a PySpark DataFrame with 'CVDINFR4' as the target variable...
# Define the feature columns
feature_columns = [col_name for col_name in df4.columns if col_name != 'CVDINFR4']

# Set up feature matrix X and target vector y
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(df4)

# Split data into 'label' and 'features'
data = data.select(col("CVDINFR4").alias("label"), "features")

# Initialize the Decision Tree classifier
tree_clf = DecisionTreeClassifier(
    labelCol="label",
    featuresCol="features",
    maxDepth=4,
    minInstancesPerNode=12,
    minInfoGain=0.0,
    impurity='gini'
)

# Record the start time
start_time = time.time()

# Train the model
tree_model = tree_clf.fit(data)

# Calculate the time taken
time_taken = time.time() - start_time

# Predict on the entire dataset
predictions = tree_model.transform(data)

# Calculate and print the training accuracy
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)

print(f"Training Accuracy: {accuracy * 100:.2f}%")
print(f"Time taken to train the model: {time_taken:.2f} seconds")


Training Accuracy: 97.85%
Time taken to train the model: 4.87 seconds


In [11]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
import time

# Create a Spark session
spark = SparkSession.builder \
    .appName("NeuralNetworkWithPySpark") \
    .getOrCreate()

# Assuming df4 is a PySpark DataFrame with 'CVDINFR4' as the target variable...

# Define the feature columns
feature_columns = [col_name for col_name in df4.columns if col_name != 'CVDINFR4']

# Set up feature matrix X and target vector y
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(df4)

# Split data into 'label' and 'features'
data = data.select(col("CVDINFR4").alias("label"), "features")

# Specify layers for the neural network:
# input layer of size number_of_features (e.g., len(feature_columns))
# two intermediate layers of size 100
# output layer of size number_of_classes
# Note: Modify the output size according to the number of label classes
layers = [len(feature_columns), 100, 100, len(feature_columns)]

# Initialize the Multilayer Perceptron classifier
nn_clf = MultilayerPerceptronClassifier(
    maxIter=300,
    layers=layers,
    blockSize=200,
    seed=1234
)

# Record the start time
start_time = time.time()

# Train the model
nn_model = nn_clf.fit(data)

# Calculate the time taken
time_taken = time.time() - start_time

# Predict on the entire dataset
predictions = nn_model.transform(data)

# Calculate and print the training accuracy
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)

print(f"Training Accuracy: {accuracy * 100:.2f}%")
print(f"Time taken to train the model: {time_taken:.2f} seconds")


Training Accuracy: 95.45%
Time taken to train the model: 510.60 seconds


In [12]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler

# Create a Spark session
spark = SparkSession.builder \
    .appName("TrainTestSplitWithPySpark") \
    .getOrCreate()

# Assuming df4 is a PySpark DataFrame with 'CVDINFR4' as the target variable...

# Define the feature columns
feature_columns = [col_name for col_name in df4.columns if col_name != 'CVDINFR4']

# Set up feature matrix X and target vector y
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(df4)

# Split data into 'label' and 'features'
data = data.select(col("CVDINFR4").alias("label"), "features")

# Split the data into training and test sets (80% / 20% split)
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Displaying count of train and test data
print(f"Number of training records: {train_data.count()}")
print(f"Number of test records: {test_data.count()}")


Number of training records: 66979
Number of test records: 16588


In [14]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
import time

# Create a Spark session
spark = SparkSession.builder \
    .appName("LogisticRegressionWithPySpark") \
    .getOrCreate()

# Define the feature columns
feature_columns = [col_name for col_name in df4.columns if col_name != 'CVDINFR4']

# Set up feature matrix X and target vector y
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(df4)

# Split data into 'label' and 'features'
data = data.select(col("CVDINFR4").alias("label"), "features")

# Split the data into training and test sets (80% / 20% split)
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Initialize the logistic regression model
logreg = LogisticRegression(
    maxIter=1000,
    regParam=1.0,  # equivalent to C in sklearn, smaller values specify stronger regularization
    elasticNetParam=0,  # equivalent to l2 penalty in sklearn
    fitIntercept=True
)
# Record the start time
start_time = time.time()
# Train the logistic regression model
logreg_model = logreg.fit(train_data)
# Calculate the time taken
time_taken = time.time() - start_time
# Predict on the test set
predictions = logreg_model.transform(test_data)
# Evaluate the model
# Calculating accuracy
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)

print(f"Test Accuracy: {accuracy * 100:.2f}%")
print(f"Time taken to train the model: {time_taken:.2f} seconds")

Test Accuracy: 95.43%
Time taken to train the model: 6.48 seconds


In [16]:
# Define the feature columns
feature_columns = [col_name for col_name in df4.columns if col_name != 'CVDINFR4']

# Set up feature matrix X and target vector y
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(df4)

# Split data into 'label' and 'features'
data = data.select(col("CVDINFR4").alias("label"), "features")

# Split the data into training and test sets (80% / 20% split)
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Define the model
tree_clf = DecisionTreeClassifier(labelCol="label",
                                  featuresCol="features",
                                  maxDepth=4,
                                  minInstancesPerNode=12,
                                  minInfoGain=0.0)

# Record start time
start_time = time.time()
# Train the model
model = tree_clf.fit(train_data)
# Calculate the time taken
time_taken = time.time() - start_time
print(f"Time taken to train the model: {time_taken:.2f} seconds")
# Make predictions
predictions = model.transform(test_data)
# Evaluate model
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy:.2f}")

Time taken to train the model: 4.84 seconds
Test Accuracy: 0.98


In [3]:
df4 = spark.read.csv('df4.csv', header=True, inferSchema=True)

In [8]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
import time

# Create a Spark session
spark = SparkSession.builder \
    .appName("NeuralNetworkWithPySpark") \
    .getOrCreate()


# Define the feature columns
feature_columns = [col_name for col_name in df4.columns if col_name != 'CVDINFR4']

# Set up feature matrix X and target vector y
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(df4)

# Split data into 'label' and 'features'
data = data.select(col("CVDINFR4").alias("label"), "features")

# Split the data into training and test sets (80% / 20% split)
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Specify layers for the neural network:
# input layer of size number_of_features (e.g., len(feature_columns))
# two intermediate layers of size 100
# output layer of size number_of_classes
# Note: Modify the output size according to the number of label classes
layers = [len(feature_columns), 100, 100, len(feature_columns)]

# Initialize the Multilayer Perceptron classifier
nn_clf = MultilayerPerceptronClassifier(
    maxIter=300,
    layers=layers,
    blockSize=200,
    seed=1234
)

# Record the start time
start_time = time.time()

# Train the model
nn_model = nn_clf.fit(train_data)

# Calculate the time taken
time_taken = time.time() - start_time

# Predict on the entire dataset
predictions = nn_model.transform(test_data)

# Calculate and print the training accuracy
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)

print(f"Test Accuracy: {accuracy * 100:.2f}%")
print(f"Time taken to train the model: {time_taken:.2f} seconds")


Test Accuracy: 95.43%
Time taken to train the model: 339.39 seconds
