<a href="https://colab.research.google.com/github/jalkhudari3/DE_project/blob/main/DE_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Project: Data Engineering Pipeline


The dataset is about Healthcare

**Project Member**
- Jory Alkhudari - 444005065

- Maya Tayeb - 444002353

- Ghala Alqarni - 444000585

# **Phase 1: Relational Database**

 Import required libraries


In [None]:
import sqlite3
import pandas as pd


Load the healthcare dataset from GitHub link


In [None]:
url = 'https://raw.githubusercontent.com/wallflower777/DE_project/main/healthcare_dataset.csv'
healthcare_df = pd.read_csv(url)
print(f"Loaded {len(healthcare_df)} rows from the healthcare dataset.")

Loaded 55500 rows from the healthcare dataset.


## 1.1 Normalize the schemas

In [None]:
#clean up column names
healthcare_df.columns = [col.strip().replace(' ', '_') for col in healthcare_df.columns]
print(f"Normalized columns: {healthcare_df.columns}")

Normalized columns: Index(['Name', 'Age', 'Gender', 'Blood_Type', 'Medical_Condition',
       'Date_of_Admission', 'Doctor', 'Hospital', 'Insurance_Provider',
       'Billing_Amount', 'Room_Number', 'Admission_Type', 'Discharge_Date',
       'Medication', 'Test_Results'],
      dtype='object')


##1.3 Connect to SQLite and create tables

In [None]:
# Connect to SQLite database
conn = sqlite3.connect("healthcare.db")
cursor = conn.cursor()

# Create Patients table
cursor.execute('''
CREATE TABLE IF NOT EXISTS Patients (
    patient_id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL,
    age INTEGER,
    gender TEXT,
    blood_type TEXT
)
''')

# Create Doctors table
cursor.execute('''
CREATE TABLE IF NOT EXISTS Doctors (
    doctor_id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL UNIQUE
)
''')

# Create Hospitals table
cursor.execute('''
CREATE TABLE IF NOT EXISTS Hospitals (
    hospital_id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL UNIQUE
)
''')

# Create Insurance Providers table
cursor.execute('''
CREATE TABLE IF NOT EXISTS InsuranceProviders (
    insurance_id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL UNIQUE
)
''')

# Create Admissions table (with foreign keys for normalization)
cursor.execute('''
CREATE TABLE IF NOT EXISTS Admissions (
    admission_id INTEGER PRIMARY KEY AUTOINCREMENT,
    patient_id INTEGER,
    doctor_id INTEGER,
    hospital_id INTEGER,
    insurance_id INTEGER,
    medical_condition TEXT,
    date_of_admission DATE,
    billing_amount REAL,
    room_number INTEGER,
    admission_type TEXT,
    discharge_date DATE,
    FOREIGN KEY (patient_id) REFERENCES Patients(patient_id),
    FOREIGN KEY (doctor_id) REFERENCES Doctors(doctor_id),
    FOREIGN KEY (hospital_id) REFERENCES Hospitals(hospital_id),
    FOREIGN KEY (insurance_id) REFERENCES InsuranceProviders(insurance_id)
)
''')

# Create Medications table (with foreign key for normalization)
cursor.execute('''
CREATE TABLE IF NOT EXISTS Medications (
    medication_id INTEGER PRIMARY KEY AUTOINCREMENT,
    patient_id INTEGER,
    medication TEXT,
    test_results TEXT,
    FOREIGN KEY (patient_id) REFERENCES Patients(patient_id)
)
''')

def show_table_schema(table_name):
    print(f"\n Structure of table: {table_name}")
    cursor.execute(f"PRAGMA table_info({table_name})")
    schema = cursor.fetchall()
    df = pd.DataFrame(schema, columns=["Column ID", "Column Name", "Data Type", "Not Null", "Default", "Primary Key"])
    print(df)

table_names = ['Patients', 'Doctors', 'Hospitals', 'InsuranceProviders', 'Admissions', 'Medications']
for table in table_names:
    show_table_schema(table)


conn.commit()
print("Tables created successfully with normalization.")


 Structure of table: Patients
   Column ID Column Name Data Type  Not Null Default  Primary Key
0          0  patient_id   INTEGER         0    None            1
1          1        name      TEXT         1    None            0
2          2         age   INTEGER         0    None            0
3          3      gender      TEXT         0    None            0
4          4  blood_type      TEXT         0    None            0

 Structure of table: Doctors
   Column ID Column Name Data Type  Not Null Default  Primary Key
0          0   doctor_id   INTEGER         0    None            1
1          1        name      TEXT         1    None            0

 Structure of table: Hospitals
   Column ID  Column Name Data Type  Not Null Default  Primary Key
0          0  hospital_id   INTEGER         0    None            1
1          1         name      TEXT         1    None            0

 Structure of table: InsuranceProviders
   Column ID   Column Name Data Type  Not Null Default  Primary Key
0  

## 1.2 Insert sample data from the CSV file with normalization

In [None]:
for index, row in healthcare_df.iterrows():
    # Insert patient data
    cursor.execute("INSERT INTO Patients (name, age, gender, blood_type) VALUES (?, ?, ?, ?)",
                   (row['Name'], row['Age'], row['Gender'], row['Blood_Type']))
    patient_id = cursor.lastrowid

    # Insert or ignore doctor data (unique names)
    cursor.execute("INSERT OR IGNORE INTO Doctors (name) VALUES (?)", (row['Doctor'],))
    cursor.execute("SELECT doctor_id FROM Doctors WHERE name = ?", (row['Doctor'],))
    doctor_id = cursor.fetchone()[0]

    # Insert or ignore hospital data (unique names)
    cursor.execute("INSERT OR IGNORE INTO Hospitals (name) VALUES (?)", (row['Hospital'],))
    cursor.execute("SELECT hospital_id FROM Hospitals WHERE name = ?", (row['Hospital'],))
    hospital_id = cursor.fetchone()[0]

    # Insert or ignore insurance provider data (unique names)
    cursor.execute("INSERT OR IGNORE INTO InsuranceProviders (name) VALUES (?)", (row['Insurance_Provider'],))
    cursor.execute("SELECT insurance_id FROM InsuranceProviders WHERE name = ?", (row['Insurance_Provider'],))
    insurance_id = cursor.fetchone()[0]

    # Insert admission data with normalized references
    cursor.execute("INSERT INTO Admissions (patient_id, doctor_id, hospital_id, insurance_id, medical_condition, date_of_admission, billing_amount, room_number, admission_type, discharge_date) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
                   (patient_id, doctor_id, hospital_id, insurance_id, row['Medical_Condition'], row['Date_of_Admission'], row['Billing_Amount'], row['Room_Number'], row['Admission_Type'], row['Discharge_Date']))

    # Insert medication data
    cursor.execute("INSERT INTO Medications (patient_id, medication, test_results) VALUES (?, ?, ?)",
                   (patient_id, row['Medication'], row['Test_Results']))


# ✅ دالة لعرض أول 5 صفوف من كل جدول
def preview_table(name):
    print(f"\n Top rows from {name}:")
    df = pd.read_sql_query(f"SELECT * FROM {name} LIMIT 5", conn)
    print(df)

# ✅ عرض البيانات من كل جدول
tables = ['Patients', 'Doctors', 'Hospitals', 'InsuranceProviders', 'Admissions', 'Medications']
for t in tables:
    preview_table(t)

conn.commit()
print("\nSample data inserted successfully.")


 Top rows from Patients:
   patient_id           name  age  gender blood_type
0           1  Bobby JacksOn   30    Male         B-
1           2   LesLie TErRy   62    Male         A+
2           3    DaNnY sMitH   76  Female         A-
3           4   andrEw waTtS   28  Female         O+
4           5  adrIENNE bEll   43  Female        AB+

 Top rows from Doctors:
   doctor_id              name
0          1     Matthew Smith
1          2   Samantha Davies
2          3  Tiffany Mitchell
3          4       Kevin Wells
4          5    Kathleen Hanna

 Top rows from Hospitals:
   hospital_id                        name
0            1             Sons and Miller
1            2                     Kim Inc
2            3                    Cook PLC
3            4  Hernandez Rogers and Vang,
4            5                 White-White

 Top rows from InsuranceProviders:
   insurance_id              name
0             1        Blue Cross
1             2          Medicare
2             3       

## Print tables before CRUD operations

In [None]:
print(pd.read_sql_query("SELECT * FROM Patients LIMIT 5", conn))

   patient_id           name  age  gender blood_type
0           1  Bobby JacksOn   30    Male         B-
1           2   LesLie TErRy   62    Male         A+
2           3    DaNnY sMitH   76  Female         A-
3           4   andrEw waTtS   28  Female         O+
4           5  adrIENNE bEll   43  Female        AB+


## 1.3 Implements CRUD Operations


1.3.1 Update data

In [None]:
cursor.execute("UPDATE Patients SET age = age + 1 WHERE age < 30")
conn.commit()
print(" Patients under 30 years old have been aged by 1 year.")


 Patients under 30 years old have been aged by 1 year.


1.3.2 Delete data

In [None]:
cursor.execute("DELETE FROM Patients WHERE age > 80")
conn.commit()
print(" Patients older than 80 years have been deleted.")

 Patients older than 80 years have been deleted.


1.3.2 Read data

In [None]:
print("\n Patients Over 50 Years Old:")
cursor.execute("SELECT * FROM Patients WHERE age > 50 LIMIT 5")
print(cursor.fetchall())


 Patients Over 50 Years Old:
[(2, 'LesLie TErRy', 62, 'Male', 'A+'), (3, 'DaNnY sMitH', 76, 'Female', 'A-'), (10, 'ChRISTopher BerG', 58, 'Female', 'AB-'), (11, 'mIchElLe daniELs', 72, 'Male', 'O+'), (13, 'connOR HANsEn', 75, 'Female', 'A+')]


## Print tables after CRUD operations


In [None]:
print(pd.read_sql_query("SELECT * FROM Patients LIMIT 5", conn))

   patient_id           name  age  gender blood_type
0           1  Bobby JacksOn   30    Male         B-
1           2   LesLie TErRy   62    Male         A+
2           3    DaNnY sMitH   76  Female         A-
3           4   andrEw waTtS   29  Female         O+
4           5  adrIENNE bEll   43  Female        AB+


## 1.4 Apply indexing for performance optimization


1.4.1 Indexing

In [None]:
conn = sqlite3.connect("healthcare.db")
cursor = conn.cursor()

print("Step 1: Creating index on Patients(Name) column...")

cursor.execute("CREATE INDEX IF NOT EXISTS idx_patient_name ON Patients (name)")
conn.commit()

print("Index 'idx_patient_name' created (if not exists).")


Step 1: Creating index on Patients(Name) column...
Index 'idx_patient_name' created (if not exists).


1.4.2. Query Optimization:

In [None]:
conn = sqlite3.connect("healthcare.db")
cursor = conn.cursor()


print("\nStep 2: Explain query plan for searching patient by name 'John Doe':")

cursor.execute("EXPLAIN QUERY PLAN SELECT * FROM Patients WHERE name = 'John Doe'")
query_plan = cursor.fetchall()

# عرض خطة تنفيذ الاستعلام بشكل منسق
for step in query_plan:
    print(f" - {step}")

conn.close()
print("\n Query plan displayed successfully.")


Step 2: Explain query plan for searching patient by name 'John Doe':
 - (3, 0, 0, 'SEARCH Patients USING INDEX idx_patient_name (name=?)')

 Query plan displayed successfully.


# **Phase 2: NoSQL Database (MongoDB)**


In [None]:
!pip install tinydb


Collecting tinydb
  Downloading tinydb-4.8.2-py3-none-any.whl.metadata (6.7 kB)
Downloading tinydb-4.8.2-py3-none-any.whl (24 kB)
Installing collected packages: tinydb
Successfully installed tinydb-4.8.2


In [None]:
from tinydb import TinyDB, Query

## 2.1 Transforming Data to Document Format

In [None]:
tinydb = TinyDB("healthcare_nosql.json")
healthcare_json = healthcare_df.to_dict(orient="records")
tinydb.insert_multiple(healthcare_json)

[1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 39,
 40,
 41,
 42,
 43,
 44,
 45,
 46,
 47,
 48,
 49,
 50,
 51,
 52,
 53,
 54,
 55,
 56,
 57,
 58,
 59,
 60,
 61,
 62,
 63,
 64,
 65,
 66,
 67,
 68,
 69,
 70,
 71,
 72,
 73,
 74,
 75,
 76,
 77,
 78,
 79,
 80,
 81,
 82,
 83,
 84,
 85,
 86,
 87,
 88,
 89,
 90,
 91,
 92,
 93,
 94,
 95,
 96,
 97,
 98,
 99,
 100,
 101,
 102,
 103,
 104,
 105,
 106,
 107,
 108,
 109,
 110,
 111,
 112,
 113,
 114,
 115,
 116,
 117,
 118,
 119,
 120,
 121,
 122,
 123,
 124,
 125,
 126,
 127,
 128,
 129,
 130,
 131,
 132,
 133,
 134,
 135,
 136,
 137,
 138,
 139,
 140,
 141,
 142,
 143,
 144,
 145,
 146,
 147,
 148,
 149,
 150,
 151,
 152,
 153,
 154,
 155,
 156,
 157,
 158,
 159,
 160,
 161,
 162,
 163,
 164,
 165,
 166,
 167,
 168,
 169,
 170,
 171,
 172,
 173,
 174,
 175,
 176,
 177,
 178,
 179,
 180,
 181,
 182,
 183,
 184,
 185

## 2.2 Basic NoSQL Queries


In [None]:

# عدد كل المستندات
total_records = len(tinydb)
print(f" Total Records: {total_records}\n")

# طباعة عينة من السجلات
sample_record = tinydb.all()[0]
print(" Sample Record:")
for key, value in sample_record.items():
    print(f"  {key}: {value}")
print()

# البحث عن المرضى بحالة حرجة
critical_patients = tinydb.search(Query().Medical_Condition == 'Critical')
print(f" Critical Patients Count: {len(critical_patients)}")
if critical_patients:
    print("Details of Critical Patients:")
    for i, patient in enumerate(critical_patients, 1):
        print(f" Patient {i}:")
        for key, value in patient.items():
            print(f"   {key}: {value}")
        print()
else:
    print("No critical patients found.")


 Total Records: 55500

 Sample Record:
  Name: Bobby JacksOn
  Age: 30
  Gender: Male
  Blood_Type: B-
  Medical_Condition: Cancer
  Date_of_Admission: 2024-01-31
  Doctor: Matthew Smith
  Hospital: Sons and Miller
  Insurance_Provider: Blue Cross
  Billing_Amount: 18856.281305978155
  Room_Number: 328
  Admission_Type: Urgent
  Discharge_Date: 2024-02-02
  Medication: Paracetamol
  Test_Results: Normal

 Critical Patients Count: 0
No critical patients found.


# **PHASE 3: Stream Processing with PySpark**


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

## 3.1 Simulating Stream Data


In [None]:
spark = SparkSession.builder.appName('HealthcareStream').getOrCreate()
stream_df = spark.createDataFrame(healthcare_df)

## 3.2 PySpark Processing


In [None]:
from pyspark.sql.types import FloatType

# Convert Billing_Amount to float before sorting
stream_df = stream_df.withColumn("Billing_Amount", stream_df["Billing_Amount"].cast(FloatType()))

# Get the top 20 most expensive cases
expensive_cases_df = stream_df.orderBy(stream_df['Billing_Amount'].desc()).limit(20)
expensive_cases_df.show()

+-----------------+---+------+----------+-----------------+-----------------+-----------------+--------------------+------------------+--------------+-----------+--------------+--------------+-----------+------------+
|             Name|Age|Gender|Blood_Type|Medical_Condition|Date_of_Admission|           Doctor|            Hospital|Insurance_Provider|Billing_Amount|Room_Number|Admission_Type|Discharge_Date| Medication|Test_Results|
+-----------------+---+------+----------+-----------------+-----------------+-----------------+--------------------+------------------+--------------+-----------+--------------+--------------+-----------+------------+
|    tOdd CARrILlO| 51|Female|        A+|     Hypertension|       2023-09-08| Kathleen Griffin|       Griffin Group|        Blue Cross|     52764.277|        209|      Elective|    2023-10-04|  Ibuprofen|      Normal|
|      kARen klInE| 79|Female|       AB+|           Cancer|       2021-06-19|Dr. Joseph Gordon|    Hernandez-Morton|  UnitedHeal

## 3.3 Save Processed Data


In [None]:
expensive_cases_df.write.mode('overwrite').csv('processed_expensive_patients')
spark.stop()

print("Stream processing completed and saved.")

Stream processing completed and saved.


Close database connections


In [None]:
conn.close()
tinydb.close()