In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Creating a spark session and reading the raw data into the dataframe

In [8]:
# Create a spark session
spark = SparkSession.builder.appName("Process_Patients_Data").getOrCreate()

In [9]:
# Reading the raw data from gcs
patients_df = spark.read.format("csv")\
                    .option("header", True)\
                    .option("inferschema", True)\
                    .option("mode", "PERMISSIVE")\
                    .load("gs://dataproc_bucket_prac/patients_data/data/patients_data.csv")

In [10]:
patients_df.show(5)

+---+-------------+---+------+-------------+--------------+----------------+------------+
| id|         name|age|gender|    diagnosis|admission_date|treatment_status|      doctor|
+---+-------------+---+------+-------------+--------------+----------------+------------+
|  1|     John Doe| 32|  Male|     Diabetes|    01-12-2024| Under Treatment|   Dr. Smith|
|  2|   Jane Smith| 28|Female|       Asthma|    05-12-2024|      Discharged|   Dr. Adams|
|  3| Samuel Green| 45|  Male| Hypertension|    25-11-2024| Under Treatment|     Dr. Lee|
|  4|  Emily White| 37|Female|Fractured Arm|    10-12-2024| Under Treatment|  Dr. Carter|
|  5|Michael Brown| 60|  Male|Heart Disease|    01-12-2024|      Discharged|Dr. Williams|
+---+-------------+---+------+-------------+--------------+----------------+------------+
only showing top 5 rows



# Checking for Valid and Invalid records

In [11]:
# Checking for invalid columns dynamically (Columns that are null is this case)
invalid_df = patients_df.filter((" OR ").join([f"{column} IS NULL" for column in patients_df.columns]))
invalid_df.show()

+---+--------------+----+------+---------+--------------+----------------+------------+
| id|          name| age|gender|diagnosis|admission_date|treatment_status|      doctor|
+---+--------------+----+------+---------+--------------+----------------+------------+
| 12|          NULL|  41|Female|   Stroke|          NULL|           Death|Dr. Williams|
| 13|  George White|NULL|  Male|  Malaria|    05-12-2024| Under Treatment|   Dr. Black|
| 17|   John Martin|  43|  Male|   Asthma|    04-12-2024|           Death|        NULL|
| 37|Benjamin Scott|  30|  Male|   Stroke|    06-12-2024|           Death|        NULL|
+---+--------------+----+------+---------+--------------+----------------+------------+



In [12]:
# Creating a dataframe for invalid ids (Single Column "id" Dataframe)
invalid_ids = [row["id"] for row in invalid_df.collect()]
invalid_ids_df = spark.createDataFrame([(id,) for id in invalid_ids], ["id"])
invalid_ids_df.show()

                                                                                

+---+
| id|
+---+
| 12|
| 13|
| 17|
| 37|
+---+



In [13]:
# Getting the valid records by joining using Left-Anti join
valid_df = patients_df.join(broadcast(invalid_ids_df), on="id", how="leftanti")
valid_df.show(40)

+---+---------------+---+------+-------------+--------------+----------------+------------+
| id|           name|age|gender|    diagnosis|admission_date|treatment_status|      doctor|
+---+---------------+---+------+-------------+--------------+----------------+------------+
|  1|       John Doe| 32|  Male|     Diabetes|    01-12-2024| Under Treatment|   Dr. Smith|
|  2|     Jane Smith| 28|Female|       Asthma|    05-12-2024|      Discharged|   Dr. Adams|
|  3|   Samuel Green| 45|  Male| Hypertension|    25-11-2024| Under Treatment|     Dr. Lee|
|  4|    Emily White| 37|Female|Fractured Arm|    10-12-2024| Under Treatment|  Dr. Carter|
|  5|  Michael Brown| 60|  Male|Heart Disease|    01-12-2024|      Discharged|Dr. Williams|
|  6|   Sophia Black| 52|Female|    Pneumonia|    20-11-2024| Under Treatment|   Dr. Patel|
|  7|   Chris Martin| 30|  Male|          Flu|    08-12-2024| Under Treatment| Dr. Johnson|
|  8|   Linda Taylor| 29|Female|     COVID-19|    05-12-2024|      Discharged|  

In [17]:
# Changing the date format for admission_date column in the valid_records_df
final_valid_df = valid_df.withColumn("admission_date", to_date("admission_date", "dd-MM-yyyy"))
final_valid_df.show(5)

+---+-------------+---+------+-------------+--------------+----------------+------------+
| id|         name|age|gender|    diagnosis|admission_date|treatment_status|      doctor|
+---+-------------+---+------+-------------+--------------+----------------+------------+
|  1|     John Doe| 32|  Male|     Diabetes|    2024-12-01| Under Treatment|   Dr. Smith|
|  2|   Jane Smith| 28|Female|       Asthma|    2024-12-05|      Discharged|   Dr. Adams|
|  3| Samuel Green| 45|  Male| Hypertension|    2024-11-25| Under Treatment|     Dr. Lee|
|  4|  Emily White| 37|Female|Fractured Arm|    2024-12-10| Under Treatment|  Dr. Carter|
|  5|Michael Brown| 60|  Male|Heart Disease|    2024-12-01|      Discharged|Dr. Williams|
+---+-------------+---+------+-------------+--------------+----------------+------------+
only showing top 5 rows



In [16]:
# Writing invalid records to the patients_invalid_bucket
patients_invalid_bucket = "gs://dataproc_bucket_prac/patients_data/invalid_data/"
invalid_df.write.mode("append").option("header", "true").csv(patients_invalid_bucket)
print(f"Successfully wrote the invalid records to the bucket : {patients_invalid_bucket}")

                                                                                

Successfully wrote the invalid records to the bucket : gs://dataproc_bucket_prac/patients_data/invalid_data/


In [19]:
# Writing the final valid record to the patients_valid_bucket
patients_valid_bucket = "gs://dataproc_bucket_prac/patients_data/valid_data/"
final_valid_df.write.mode("append").option("header", "true").csv(patients_valid_bucket)
print(f"Successfully wrote the final valid records to the bucket : {patients_invalid_bucket}")

                                                                                

Successfully wrote the final valid records to the bucket : gs://dataproc_bucket_prac/patients_data/invalid_data/


In [20]:
# Writing the final valid records to the BigQuery Table 
bq_table = "synthetic-nova-438808-k6.patients_dataset.patientsData"
bq_temp_bucket = "gs://dataproc_bucket_prac/patients_data/bq_temp_bucket/"
final_valid_df.write \
            .format("bigquery") \
            .option("table", bq_table) \
            .option("temporaryGcsBucket", bq_temp_bucket) \
            .mode("append") \
            .save()
print(f"Successfully wrote the final valid records to the BigQuery Table : {bq_table}")

                                                                                

Successfully wrote the final valid records to the BigQuery Table : synthetic-nova-438808-k6.patients_dataset.patientsData
