In [1]:
import pandas as pd

# Load the dataset
df = pd.read_csv("KaggleV2-May-2016.csv")

# Preview the data
df.head()


Unnamed: 0,PatientId,AppointmentID,Gender,ScheduledDay,AppointmentDay,Age,Neighbourhood,Scholarship,Hipertension,Diabetes,Alcoholism,Handcap,SMS_received,No-show
0,29872500000000.0,5642903,F,2016-04-29T18:38:08Z,2016-04-29T00:00:00Z,62,JARDIM DA PENHA,0,1,0,0,0,0,No
1,558997800000000.0,5642503,M,2016-04-29T16:08:27Z,2016-04-29T00:00:00Z,56,JARDIM DA PENHA,0,0,0,0,0,0,No
2,4262962000000.0,5642549,F,2016-04-29T16:19:04Z,2016-04-29T00:00:00Z,62,MATA DA PRAIA,0,0,0,0,0,0,No
3,867951200000.0,5642828,F,2016-04-29T17:29:31Z,2016-04-29T00:00:00Z,8,PONTAL DE CAMBURI,0,0,0,0,0,0,No
4,8841186000000.0,5642494,F,2016-04-29T16:07:23Z,2016-04-29T00:00:00Z,56,JARDIM DA PENHA,0,1,1,0,0,0,No


In [2]:
# Check dataset info
df.info()

# Check missing values
print("\nMissing values:\n", df.isnull().sum())

# Check unique values in target column
print("\nTarget column unique values:\n", df['No-show'].value_counts())


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 110527 entries, 0 to 110526
Data columns (total 14 columns):
 #   Column          Non-Null Count   Dtype  
---  ------          --------------   -----  
 0   PatientId       110527 non-null  float64
 1   AppointmentID   110527 non-null  int64  
 2   Gender          110527 non-null  object 
 3   ScheduledDay    110527 non-null  object 
 4   AppointmentDay  110527 non-null  object 
 5   Age             110527 non-null  int64  
 6   Neighbourhood   110527 non-null  object 
 7   Scholarship     110527 non-null  int64  
 8   Hipertension    110527 non-null  int64  
 9   Diabetes        110527 non-null  int64  
 10  Alcoholism      110527 non-null  int64  
 11  Handcap         110527 non-null  int64  
 12  SMS_received    110527 non-null  int64  
 13  No-show         110527 non-null  object 
dtypes: float64(1), int64(8), object(5)
memory usage: 11.8+ MB

Missing values:
 PatientId         0
AppointmentID     0
Gender            0
ScheduledDay

In [3]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .appName("Hospital-NoShow-Prediction")
         .getOrCreate())

df = (spark.read
             .option("header", True)
             .option("inferSchema", True)
             .csv("KaggleV2-May-2016.csv"))

df.show(5)
df.printSchema()
print("Rows :", df.count())


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/07/14 19:41:56 WARN Utils: Your hostname, Taukirs-MacBook-Air-779.local, resolves to a loopback address: 127.0.0.1; using 192.168.0.98 instead (on interface en0)
25/07/14 19:41:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/14 19:41:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+-------------------+-------------+------+-------------------+-------------------+---+-----------------+-----------+------------+--------+----------+-------+------------+-------+
|          PatientId|AppointmentID|Gender|       ScheduledDay|     AppointmentDay|Age|    Neighbourhood|Scholarship|Hipertension|Diabetes|Alcoholism|Handcap|SMS_received|No-show|
+-------------------+-------------+------+-------------------+-------------------+---+-----------------+-----------+------------+--------+----------+-------+------------+-------+
| 2.9872499824296E13|      5642903|     F|2016-04-30 02:38:08|2016-04-29 08:00:00| 62|  JARDIM DA PENHA|          0|           1|       0|         0|      0|           0|     No|
|5.58997776694438E14|      5642503|     M|2016-04-30 00:08:27|2016-04-29 08:00:00| 56|  JARDIM DA PENHA|          0|           0|       0|         0|      0|           0|     No|
|  4.262962299951E12|      5642549|     F|2016-04-30 00:19:04|2016-04-29 08:00:00| 62|    MATA DA PRAIA| 

In [4]:
from pyspark.sql import functions as F

# Create 'label' column: 1 if No-show == "Yes", else 0
df = (
    df.withColumnRenamed("No-show", "NoShow")
      .withColumn("label", F.when(F.col("NoShow") == "Yes", 1).otherwise(0).cast("int"))
)

# Drop unnecessary columns
df = df.drop("PatientId", "AppointmentID", "NoShow")

# Filter out invalid age rows
df = df.filter((F.col("Age") >= 0) & (F.col("Age") <= 100))

# Preview cleaned data
df.select("label", "Gender", "Age", "Neighbourhood", "Scholarship", "SMS_received").show(5)
print("Rows after cleaning:", df.count())


+-----+------+---+-----------------+-----------+------------+
|label|Gender|Age|    Neighbourhood|Scholarship|SMS_received|
+-----+------+---+-----------------+-----------+------------+
|    0|     F| 62|  JARDIM DA PENHA|          0|           0|
|    0|     M| 56|  JARDIM DA PENHA|          0|           0|
|    0|     F| 62|    MATA DA PRAIA|          0|           0|
|    0|     F|  8|PONTAL DE CAMBURI|          0|           0|
|    0|     F| 56|  JARDIM DA PENHA|          0|           0|
+-----+------+---+-----------------+-----------+------------+
only showing top 5 rows
Rows after cleaning: 110519


In [5]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

# Categorical columns to index and encode
categorical_cols = ["Gender", "Neighbourhood"]

# Numeric columns to keep as-is
numeric_cols = ["Age", "Scholarship", "Hipertension", "Diabetes", "Alcoholism", "Handcap", "SMS_received"]

# Indexers and Encoders
indexers = [StringIndexer(inputCol=col, outputCol=col + "_Index") for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=col + "_Index", outputCol=col + "_Vec") for col in categorical_cols]

# Final feature columns
feature_cols = [col + "_Vec" for col in categorical_cols] + numeric_cols

# Vector assembler to combine all features into a single vector column
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Create the pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler])

# Fit and transform the data
model = pipeline.fit(df)
df_transformed = model.transform(df)

# Show features and label
df_transformed.select("features", "label").show(5, truncate=False)


+------------------------------------------+-----+
|features                                  |label|
+------------------------------------------+-----+
|(88,[0,4,81,83],[1.0,1.0,62.0,1.0])       |0    |
|(88,[4,81],[1.0,56.0])                    |0    |
|(88,[0,50,81],[1.0,1.0,62.0])             |0    |
|(88,[0,76,81],[1.0,1.0,8.0])              |0    |
|(88,[0,4,81,83,84],[1.0,1.0,56.0,1.0,1.0])|0    |
+------------------------------------------+-----+
only showing top 5 rows


In [7]:
df.columns


['Gender',
 'ScheduledDay',
 'AppointmentDay',
 'Age',
 'Neighbourhood',
 'Scholarship',
 'Hipertension',
 'Diabetes',
 'Alcoholism',
 'Handcap',
 'SMS_received',
 'label']

In [9]:
# Split the dataset into training (80%) and testing (20%)
train_data, test_data = df_transformed.randomSplit([0.8, 0.2], seed=42)


In [10]:
from pyspark.ml.classification import LogisticRegression

# Initialize logistic regression model
lr = LogisticRegression(featuresCol='features', labelCol='label')

# Train the model
lr_model = lr.fit(train_data)


25/07/14 19:44:09 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/07/14 19:44:11 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


In [11]:
from pyspark.ml.classification import LogisticRegression

# Initialize logistic regression model
lr = LogisticRegression(featuresCol='features', labelCol='label')

# Train the model
lr_model = lr.fit(train_data)


In [12]:
# Make predictions on the test data
predictions = lr_model.transform(test_data)

# Show predictions
predictions.select("features", "label", "prediction", "probability").show(5, truncate=False)


+-------------------------------------------+-----+----------+----------------------------------------+
|features                                   |label|prediction|probability                             |
+-------------------------------------------+-----+----------+----------------------------------------+
|(88,[0,24,81,83,87],[1.0,1.0,66.0,1.0,1.0])|0    |0.0       |[0.7385515994321306,0.26144840056786944]|
|(88,[0,27,81,87],[1.0,1.0,39.0,1.0])       |0    |0.0       |[0.7412969647410792,0.2587030352589208] |
|(88,[0,24,81,83],[1.0,1.0,82.0,1.0])       |0    |0.0       |[0.8568118507769966,0.14318814922300338]|
|(88,[0,27,81],[1.0,1.0,78.0])              |1    |0.0       |[0.8744093395076928,0.12559066049230716]|
|(88,[0,24,81,83,87],[1.0,1.0,56.0,1.0,1.0])|0    |0.0       |[0.7268708092198128,0.27312919078018716]|
+-------------------------------------------+-----+----------+----------------------------------------+
only showing top 5 rows


In [13]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Initialize evaluators
accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
precision_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
recall_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
f1_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

# Calculate metrics
accuracy = accuracy_evaluator.evaluate(predictions)
precision = precision_evaluator.evaluate(predictions)
recall = recall_evaluator.evaluate(predictions)
f1_score = f1_evaluator.evaluate(predictions)

# Print the results
print(f"Accuracy:  {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall:    {recall:.4f}")
print(f"F1 Score:  {f1_score:.4f}")


Accuracy:  0.7959
Precision: 0.6335
Recall:    0.7959
F1 Score:  0.7055
