## **P1: Data Processing Problem**
This dataset captures key healthcare information, including patient demographics, medical conditions, treatments, and billing. Each record represents a patient’s admission, providing insights into diagnoses, treatments, and outcomes. PySpark's distributed computing capabilities enable efficient analysis, making it ideal for uncovering patterns and trends in healthcare data.


### **Dataset Description**

| **Column Name**      | **Description**                                              |
|-----------------------|--------------------------------------------------------------|
| **Name**             | Patient’s name.                                              |
| **Age**              | Age of the patient at admission.                             |
| **Gender**           | Patient’s gender (Male/Female).                              |
| **Blood Type**       | Patient’s blood type (e.g., A+, O-).                         |
| **Medical Condition**| Primary diagnosis or health issue.                           |
| **Date of Admission**| Date of hospital admission.                                  |
| **Doctor**           | Treating doctor’s name.                                      |
| **Hospital**         | Name of the healthcare facility.                             |
| **Insurance Provider**| Patient’s insurance company.                                |
| **Billing Amount**   | Total billed amount for services.                            |
| **Room Number**      | Room assigned to the patient.                                |
| **Admission Type**   | Type of admission: Emergency, Urgent, or Elective.           |
| **Discharge Date**   | Date of discharge from the facility.                         |
| **Medication**       | Medication prescribed/administered.                          |
| **Test Results**     | Diagnostic test outcomes: Normal, Abnormal, or Inconclusive. |

### **The Problem**
The goal is to analyze this dataset to uncover insights that improve hospital performance and patient care. PySpark facilitates efficient processing of large-scale data, enabling us to address these key queries:

1. **Hospital Performance Evaluation**
   - Compare hospitals based on billing, admissions, and a custom KPI.
   - Identify hospitals with high emergency cases for resource allocation.

2. **Seasonal Trends**
   - Analyze seasonal variations in admissions and medical conditions.
   - Predict peak periods for emergency care.

3. **Patient Demographics Analysis**
   - Study medical conditions by gender and age group.
   - Explore insurance trends and their impact on billing and outcomes.

4. **Length of Stay and Admission Trends**
   - Identify conditions associated with extended stays.
   - Calculate daily admissions by type for capacity planning.

5. **Emergency Admission Trends**
   - Detect busiest days for emergency admissions.
   - Analyze weekly admission patterns to optimize resource scheduling.

6. **Patient Clustering**
   - Group patients based on similarities in demographics, medical conditions, and billing.
   
7. **Doctor Clustering**
   - Group doctors based on their specialties, patient outcomes, and billing patterns.

## **P2: Leveraging PySpark to address the challenge**

*the dataset was cleaned and checked prior to data extraction, the cleaning procedure was omitted to comply with the assignment requirements. The full procedure is included in the notebook uploaded on our group's folder.*

#### **1. Cross-Hospital Comparison**

This analysis evaluated hospitals using multiple Key Performance Indicators (KPIs) to assess their performance, workload distribution, and patient care profiles.

**Steps and PySpark Operations**

1. **Metric Calculation**:
   - **Average Billing**: Calculated the mean billing amount per hospital using:
     ```python
     average_billing = df.groupBy("Hospital").agg(avg("Billing Amount").alias("Average Billing"))
     ```
   - **Average Stay Duration**: Computed the average stay duration for each hospital:
     ```python
     df = df.withColumn("Stay Duration", datediff(col("Discharge Date"), col("Date of Admission")))
     average_stay_duration = df.groupBy("Hospital").agg(avg("Stay Duration").alias("Average Stay Duration"))
     ```
   - **Admission Type Distribution**: Counted the number of admissions per type (Emergency, Elective, Urgent) for each hospital using:
     ```python
     admission_type_distribution = df.groupBy("Hospital", "Admission Type").count().groupBy("Hospital") \
         .pivot("Admission Type", ["Emergency", "Elective", "Urgent"]).sum("count").na.fill(0)
     ```

2. **Normalization**:
   - Scaled the calculated metrics to make them comparable by dividing each metric by its maximum value

3. **KPI Index**:
   - Computed a weighted KPI Index to rank hospitals:
    
4. **Ranking**:
   - Ranked hospitals by descending KPI Index to identify the top-performing ones:
     ```python
     hospital_ranked = hospital_kpis_with_index.orderBy(col("KPI Index").desc())
     ```
**Results**:

<pre style="font-size:12px; color:black;">

Top 10 best hospitals based on KPI index:
+--------------------+------------------+---------------------+---------+--------+------+------------------+
|            Hospital|   Average Billing|Average Stay Duration|Emergency|Elective|Urgent|         KPI Index|
+--------------------+------------------+---------------------+---------+--------+------+------------------+
|    Hernandez-Morton|52373.032374241826|                 14.0|        0|       0|     2| 20953.51294969673|
|       Walker-Garcia| 52170.03685355641|                  2.0|        0|       0|     2|20868.714741422562|
|        Ruiz-Anthony|52154.237721878235|                 23.0|        0|       0|     1|20868.645088751295|
|     George-Gonzalez| 52102.24088919256|                  9.0|        1|       0|     0|20843.796355677026|
|        Rocha-Carter|52092.669895844054|                  2.0|        1|       0|     0| 20837.86795833762|
|Briggs Walker Mar...| 52024.72644288463|                 26.0|        0|       1|     0|20817.740577153854|
|and Small Stephen...| 51975.96813526631|                 27.0|        0|       0|     2|20798.587254106522|
|      Clark-Espinoza| 51848.20159668146|                  8.0|        0|       0|     1|20741.730638672587|
|        Stephens Ltd| 51714.30087099009|                 27.0|        0|       0|     1|20693.870348396034|
|Pierce and Miller...| 51722.12273936527|                  7.0|        0|       1|     0| 20690.99909574611|
+--------------------+------------------+---------------------+---------+--------+------+------------------+
only showing top 10 rows

--> queries execution started at 01:02:58.884276 and ended at 01:03:00.845048, execution time: 1960ms
</pre>

#### **2. Seasonal Trends**

This analysis explored seasonal variations in hospital admissions, identifying trends in patient visits and the most common medical conditions during each season.    
Additionally, the season with the highest number of admissions was determined.

**Steps and PySpark Operations**

1. **Adding a "Season" Column**:
   - A new column, `Season`, was added based on the month of the `Date of Admission`. The months were mapped as follows:
     ```python
     df = df.withColumn(
         "Season",
         when(month(col("Date of Admission")).isin(3, 4, 5), "Spring")
         .when(month(col("Date of Admission")).isin(6, 7, 8), "Summer")
         .when(month(col("Date of Admission")).isin(9, 10, 11), "Autumn")
         .when(month(col("Date of Admission")).isin(12, 1, 2), "Winter"))
     ```
2. **Grouping by Season and Medical Condition**:
   - Admissions were grouped by `Season` and `Medical Condition`, and the count of admissions was calculated:
     ```python
     seasonal_counts = df.groupBy("Season", "Medical Condition").agg(
         count("*").alias("Admission Count"))
     ```
3. **Identifying the Most Common Medical Condition per Season**:
   - A **window function** was used to rank medical conditions by admission count within each season:
     ```python
     window_spec = Window.partitionBy("Season").orderBy(col("Admission Count").desc())
     most_common_conditions = seasonal_counts.withColumn(
         "Rank", row_number().over(window_spec)).filter(col("Rank") == 1)
     ```
4. **Finding the Season with the Most Admissions**:
   - Admissions were grouped by `Season`, and the total admissions for each season were calculated:
     ```python
     seasonal_totals = df.groupBy("Season").agg(
         count("*").alias("Total Admissions"))
     ```
   - The season with the highest number of admissions was identified using:
     ```python
     most_admissions_season = seasonal_totals.orderBy(col("Total Admissions").desc())
     ```
 **Results**:

<div style="display: flex; gap: 10px;">

  <div style="width: 35%; font-size:12px; color:black;">
    <pre>
Most Common Medical Conditions per Season:
+------+-----------------+---------------+
|Season|Medical Condition|Admission Count|
+------+-----------------+---------------+
|Autumn|         Diabetes|           2346|
|Spring|           Cancer|           2335|
|Summer|        Arthritis|           2447|
|Winter|        Arthritis|           2348|
+------+-----------------+---------------+
    </pre>
  </div>
  
  <div style="width: 25%; font-size:12px; color:black;">
    <pre>
Season with the Most Admissions:
+------+----------------+
|Season|Total Admissions|
+------+----------------+
|Summer|           14343|
|Spring|           13789|
|Autumn|           13772|
|Winter|           13596|
+------+----------------+
    </pre>
  </div>

   <div style="width: 20%; font-size:12px; color:black;">
    <pre>

Season with the Most Admissions:
+------+----------------+
|Season|Total Admissions|
+------+----------------+
|Summer|           14343|
+------+----------------+
    </pre>
  </div>

</div>

<pre style="font-size:12px; color:black;">
--> queries execution started at 01:05:29.777919 and ended at 01:05:30.217527, execution time: 439ms
</pre>


#### **3. Patient Demographics Analysis** 

This analysis examined the distribution of patients based on their medical conditions, age groups, and gender. Various charts were used to visualize these relationships and provide actionable insights.

**Steps and PySpark Operations**

1. **Counting Patients by Medical Condition**:
   - Grouped the dataset by `Medical Condition` and counted the number of patients for each condition using:
     ```python
     patients_by_condition = df.groupBy("Medical Condition").agg(count("ID").alias("Patients"))
     ```
   - The data was visualized in a **pie chart** to display the proportion of patients by condition.

2. **Counting Patients by Medical Condition and Age**:
   - Grouped by `Medical Condition` and `Age` to calculate the number of patients for each medical condition in each age group:
     ```python
     medical_condition_per_age = df.groupBy("Medical Condition", "Age").agg(
         count("ID").alias("Patients")
     ).orderBy("Age", ascending=True)
     ```
3. **Ranking and Filtering Top Conditions by Age Group**:
   - Applied a **window function** to rank medical conditions within each age group by patient count:
     ```python
     window_spec = Window.partitionBy("Age").orderBy(col("Patients").desc())
     ranked_conditions = medical_condition_per_age.withColumn("Rank", row_number().over(window_spec))
     ```
   - Filtered to retrieve the top-ranked condition for each age group:
     ```python
     top_conditions_by_age_group = ranked_conditions.filter(col("Rank") == 1)
     ```
4. **Counting Patients by Gender and Medical Condition**:
   - Grouped the dataset by `Gender` and `Medical Condition` and calculated the patient count for each combination:
     ```python
     gender_condition = df.groupBy("Gender", "Medical Condition").agg(count("ID").alias("Patients"))
     ```
   - Data was visualized in a **bar chart** to compare male and female patient counts for each condition.

**Results**:

<div style="display: flex; gap: 5px;">

  <div style="width: 40%; font-size:12px; color:black;">
    <pre>
Gender and Medical Conditions:
+------+-----------------+--------+
|Gender|Medical Condition|Patients|
+------+-----------------+--------+
|Female|        Arthritis|    4686|
|Female|     Hypertension|    4612|
|Female|           Cancer|    4602|
|Female|         Diabetes|    4651|
|Female|          Obesity|    4622|
|Female|           Asthma|    4553|
|  Male|     Hypertension|    4633|
|  Male|           Asthma|    4632|
|  Male|        Arthritis|    4622|
|  Male|          Obesity|    4609|
|  Male|         Diabetes|    4653|
|  Male|           Cancer|    4625|
+------+-----------------+--------+
    </pre>
  </div>
  
  <div style="width: 45%; font-size:12px; color:black;">
    <pre>
Medical conditions and their distribution:
+-----------------+--------+
|Medical Condition|Patients|
+-----------------+--------+
|          Obesity|    9231|
|         Diabetes|    9304|
|        Arthritis|    9308|
|     Hypertension|    9245|
|           Cancer|    9227|
|           Asthma|    9185|
+-----------------+--------+
</pre>
  </div>

</div>

<pre style="font-size:12px; color:10px;">
--> queries execution started at 01:05:30.324017 and ended at 01:05:31.735653, execution time: 1411ms
</pre>


#### **4. Analyzing Patients Length of Stay and Admission Trends**


This analysis identified the medical condition-admission type combinations with the highest total length of stay and calculated average daily admissions for each admission type, providing insights for hospital resource management.

**Steps and PySpark Operations**
1. **Converting Dates**:
   - Converted `Date of Admission` and `Discharge Date` to `date` type for accurate calculations using:
     ```python
     df.withColumn("Date of Admission", to_date("Date of Admission", "yyyy-MM-dd"))
     df.withColumn("Discharge Date", to_date("Discharge Date", "yyyy-MM-dd"))
     ```
2. **Grouping by Medical Condition and Admission Type**:
   - Used `groupBy` and `agg` to calculate:
     - `count("*")`: Total patient count.
     - `sum("Stay Duration")`: Total length of stay.
     - `avg("Stay Duration")`: Average length of stay.
     ```python
     group_stats = df.groupBy("Medical Condition", "Admission Type").agg(
         count("*").alias("Patient_Count"),
         sum("Stay Duration").alias("Total Length of Stay"),
         avg("Stay Duration").alias("Avg Length of Stay"))
     ```
3. **Top 5 Groups by Total Length of Stay**:
   - Ordered groups in descending order of total stay duration using `orderBy` and retrieved the top 5:
     ```python
     top_5_groups = group_stats.orderBy(desc("Total Length of Stay")).limit(5)
     ```
4. **Calculating Average Daily Admissions**:
   - Grouped by `Admission Type` and `Date of Admission` to count daily admissions, then averaged these counts:
     ```python
     daily_admissions = df.groupBy("Admission Type", "Date of Admission").count() \
                          .groupBy("Admission Type").agg(
                              avg("count").alias("Avg_Daily_Admissions"))
     ```
**Results**:


<div style="display: flex; gap: 20px;">

  <div style="width: 55%; font-size:10px; color:black;">
    <pre>
Top 5 groups with the highest total length of stay:
+-----------------+--------------+-------------+--------------------+------------------+
|Medical Condition|Admission Type|Patient_Count|Total Length of Stay|Avg Length of Stay|
+-----------------+--------------+-------------+--------------------+------------------+
|     Hypertension|      Elective|         3221|               49843|15.474386836386216|
|           Asthma|      Elective|         3102|               48836|15.743391360412637|
|         Diabetes|        Urgent|         3229|               48729|15.091049860637968|
|           Asthma|        Urgent|         3081|               48573|15.765335929892892|
|          Obesity|     Emergency|         3126|               48508|15.517594369801664|
+-----------------+--------------+-------------+--------------------+------------------+
    </pre>
  </div>
  
  <div style="width: 45%; font-size:10px; color:black;">
    <pre>

Average daily admissions per admission type:
+--------------+--------------------+
|Admission Type|Avg_Daily_Admissions|
+--------------+--------------------+
|      Elective|  10.210727969348659|
|     Emergency|   9.999452654625069|
|        Urgent|  10.173055859802847|
+--------------+--------------------+
    </pre>
  </div>

</div>

<pre style="font-size:12px; color:black;">
--> queries execution started at 01:05:31.822585 and ended at 01:05:32.089082, execution time: 266ms
</pre>


#### **5. Emergency Admission Trends** 

Analysis emergency room data to identify the busiest days for emergency admissions, with the study of weekly admission distribution.

**Steps and PySpark Operations**
1. **Filtering Emergency Cases**:
  - Filtered the dataset to include only rows where `Admission Type` is **Emergency**:
    ```python
    emergency_cases = df.filter(col("Admission Type") == "Emergency")
    ```
2. **Extracting the Day of the Week**:
  - Added a new column, `Day_of_Week`, by extracting the day of the week from the `Date of Admission`:
    ```python
    emergency_cases = emergency_cases.withColumn("Day_of_Week", date_format("Date of Admission", "E"))
    ```
3. **Counting Admissions by Day**:
  - Grouped by `Day_of_Week` and counted the number of emergency admissions for each day:
    ```python
    admissions_by_day = emergency_cases.groupBy("Day_of_Week").count().orderBy(desc("count"))
    ```
**Results**:


<div style="display: flex; gap: 1px;">

  <div style="width: 30%; font-size:12px; color:black;">
    <pre>

Admissions distribution over the week:
+-----------+-----+
|Day_of_Week|count|
+-----------+-----+
|        Tue| 2645|
|        Fri| 2628|
|        Thu| 2609|
|        Sun| 2602|
|        Mon| 2597|
|        Wed| 2597|
|        Sat| 2591|
+-----------+-----+
    </pre>
  </div>
  
  <div style="width: 45%; font-size:12px; color:black;">
    <pre>
  Top 2 Busiest Days for Emergency Admissions:
  +-----------+-----+
  |Day_of_Week|count|
  +-----------+-----+
  |        Tue| 2645|
  |        Fri| 2628|
  +-----------+-----+
    </pre>
  </div>

</div>

<pre style="font-size:12px; color:black;">
--> queries execution started at 01:05:31.822585 and ended at 01:05:32.089082, execution time: 266ms
</pre>


#### **6. Patient Clustering**

Clustering patient admissions to identify patterns in admission criteria and utilize historical data to predict possible medications or treatment plans.  
Admissions were treated as independent entities, accounting for potential readmissions.

**Steps and PySpark Operations**

1. **Data Preparation**:
   - Selected relevant columns (`ID`, `Age`, `Stay Duration`, `Medical Condition`, `Billing Amount`) for clustering
  
   - **Tokenization**: Split `Medical Condition` into tokens using:
     ```python
     tokenizer = Tokenizer(inputCol="Medical Condition", outputCol="Medical Condition Tokens")
     ```
   - **Hashing**: Converted medical condition tokens into numerical features:
     ```python
     hashingTF = HashingTF(inputCol="Medical Condition Tokens", outputCol="Medical Condition Hashed", numFeatures=42)
     ```
2. **Feature Engineering**:
   - Combined features (`Age`, `Stay Duration`, `Billing Amount`, `Medical Condition Hashed`) using:
     ```python
     assembler = VectorAssembler( inputCols=["Age", "Stay Duration", "Billing Amount", "Medical Condition Hashed"], outputCol="features")
     ```
   - Scaled the features for normalization using:
     ```python
     scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=False)
     ```
3. **Finding Optimal Number of Clusters (k)**:
   - Best k was evaluated via **Silhouette Score** for `k` values between 2 and 10

4. **Training the KMeans Model**:
   - Trained the final KMeans model with the optimal `k` (found to be 6):
     ```python
     kmeans = KMeans(featuresCol="scaled_features", k=best_k)
     model = kmeans.fit(dataset_for_clustering)
     patients = model.transform(dataset_for_clustering)
     ```
5. **Cluster Statistics**:
   - Grouped patients by their clusters to compute aggregate statistics (e.g., average age, stay duration, billing amount), check the output below.

**Insights**
- Clustering revealed patterns in admissions, such as similarities in patient demographics, medical conditions, and financial data.
- PySpark’s tools like **Tokenizer**, **HashingTF**, **VectorAssembler**, and **KMeans** efficiently processed and modeled the data at scale.

**Results**:

<pre style="font-size:8px; color:black;">
Obtained clusters:
+----------+------------------+------------------+-----------+-----------+---------------------+---------------------+---------------------+----------------------+----------------------+----------------------+
|prediction|Number of Patients|Average Age       |Minimum Age|Maximum Age|Average Stay Duration|Minimum Stay Duration|Maximum Stay Duration|Average Billing Amount|Minimum Billing Amount|Maximum Billing Amount|
+----------+------------------+------------------+-----------+-----------+---------------------+---------------------+---------------------+----------------------+----------------------+----------------------+
|1         |9304              |51.55417024935512 |14         |89         |15.422936371453138   |1                    |30                   |25640.13433762909     |31.030955450754846    |52211.85296638021     |
|3         |9308              |51.56532015470563 |13         |89         |15.517404383326172   |1                    |30                   |25498.583437499496    |26.112523116277544    |52170.03685355641     |
|5         |9231              |51.240277326400175|13         |89         |15.464305059040191   |1                    |30                   |25808.21690005182     |36.217270349314276    |52024.72644288463     |
|4         |9245              |51.741914548404544|13         |89         |15.458626284478097   |1                    |30                   |25498.991212821315    |23.866729145456247    |52764.276736469175    |
|2         |9227              |51.558794841226835|13         |89         |15.495827462880676   |1                    |30                   |25164.17765586996     |9.238787497393332     |52373.032374241826    |
|0         |9185              |51.575830157866086|13         |89         |15.69657049537289    |1                    |30                   |25637.40628768246     |32.34872934198097     |52181.837792399056    |
+----------+------------------+------------------+-----------+-----------+---------------------+---------------------+---------------------+----------------------+----------------------+----------------------+

--> queries execution started at 02:10:43.734777 and ended at 02:10:55.449092, execution time: 11714ms
</pre>


#### **7. Doctor Clustering**

In this analysis, we clustered doctors based on their performance metrics and patient profiles to identify patterns in treatment approaches and specialties.    
Although it's a different analysis, the pySpark operations applied do not differ from the ones applied in the clustering of patients.    
Let's simply jump to the results    

**Results**:
<pre style="font-size:12px; color:black;">
Obtained clusters:
+----------+-----------------+--------------------------+----------------------+---------------------+-------------------------+
|prediction|Number of Doctors|Average Number of Patients|Average Billing Amount|Average Stay Duration|Average Unique Conditions|
+----------+-----------------+--------------------------+----------------------+---------------------+-------------------------+
|1         |3397             |3.587871651457168         |25354.59790957738     |15.603407572963658   |3.587871651457168        |
|0         |36944            |1.1723689909051538        |25590.374342654613    |15.488644976180165   |1.1723689909051538       |
+----------+-----------------+--------------------------+----------------------+---------------------+-------------------------+

--> queries execution started at 02:30:31.987874 and ended at 02:30:53.599090, execution time: 21611ms
</pre>

## **P3: Was it really worth it?**

#### **Cluster Mode vs Local Execution**

This final part evaluates the performance of a Spark program when executed in two different environments. The first environment is our university cluster of 10 computers. The second environment involves running the program locally on a single machine, which is tested in the following configurations: single core (`local[1]`), two cores, and using all available core (`local[*]`). 

**dataset description**: 55000 rows, 15 columns [≈8MB]
 
results:

| **Mode**                | **Average Execution Time** (ms) |
|--------------------------|--------------------|
| `cluster mode`           | ≈ 413017      |
| `local[1]`| ≈ 39008       |
| `local[2]` | ≈ 31258       |
| `local[*]`| ≈ 28624      |                  
                               
*The reported times represent the average of 10 runs for each configuration*


The significant difference in execution times between cluster mode and local execution highlights the impact of workload distribution and system overhead on performance. In cluster mode, Spark distributes the dataset into partitions and sends tasks to multiple worker nodes in the cluster. This process involves overhead such as communication between the driver and workers, data shuffling, and coordination of tasks. For small datasets like the one used in this experiment (8MB), the communication and coordination overhead become the dominant factors, leading to much slower execution compared to local processing. Distributed systems like Spark are designed for large-scale datasets, where the benefits of parallel computation outweigh these overheads. However, for small workloads, such overhead can render cluster mode inefficient.

On the other hand, local execution demonstrates a substantial performance advantage due to the absence of such overhead. When running with a single thread (`local[1]`), the program executes each task sequentially. Although this is slower than using multiple threads, it still outperforms cluster mode by avoiding distributed communication. When executed with all available threads (`local[*]`), Spark leverages all CPU cores on the local machine to parallelize the workload, further reducing execution time. This improvement highlights the efficiency of parallel processing at the local level, where Spark can process tasks concurrently without the need for inter-node communication. 

Interestingly, the difference in execution times between `local[1]` and `local[*]` is not as significant as one might expect for this dataset size. This could be attributed to the relatively small dataset, where the overhead of managing parallel threads is comparable to the benefits gained from parallel execution.

summary:

| **Execution Environment** | **Performance Impact**                                         |
|----------------------------|---------------------------------------------------------------|
| **Cluster Mode**           | Slower due to distributed computing overhead and small dataset. |
| **Local Mode (1 Thread)**  | Faster than cluster mode but limited by single-thread execution. |
| **Local Mode (local[*])**  | Fastest as it utilizes all available CPU cores for parallelism. |

This experiment underscores the importance of choosing the appropriate execution environment based on the size of the dataset and the characteristics of the workload. Cluster mode, while powerful for large datasets, introduces significant overhead when processing smaller data, making it a suboptimal choice in this scenario. Local execution, particularly with `local[*]`, proves to be the most efficient option, as it capitalizes on the available computational resources without incurring the overhead of distributed communication. 

Ultimately this introduces a trade-off where, for small datasets like the one used here, local execution is to be chosen, while cluster mode should be reserved for larger datasets where the benefits of distributed computation justify its overhead.
