In [1]:

from pyspark.sql import SparkSession
import os

In [2]:
access_key = os.getenv("access_key")
secret_key = os.getenv("secret_key")

In [3]:
spark = SparkSession.builder \
    .appName("MinIO reader") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:32768") \
    .config("spark.hadoop.fs.s3a.access.key", access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", secret_key) \
    .config("spark.hadoop.fs.s3a.connection.maximum", "1000") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\
    .config("spark.jars", "hadoop-aws-3.3.1.jar,aws-java-sdk-bundle-1.11.901.jar,hadoop-common-3.3.1.jar") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .getOrCreate()


In [4]:
raw_event = spark.read.format("csv").option("header", "true").option("inferSchema","true").option("nullvalue","nan").load("s3a://onedata/raw/event.csv")
raw_care_reception = spark.read.format("csv").option("header", "true").option("inferSchema","true").load("s3a://onedata/raw/care_reception.csv")
raw_participation = spark.read.format("csv").option("header", "true").option("inferSchema","true").load("s3a://onedata/raw/participation.csv")
raw_patients = spark.read.format("csv").option("header", "true").option("inferSchema","true").load("s3a://onedata/raw/patients.csv")

raw_event=raw_event.dropDuplicates()
raw_care_reception=raw_care_reception.dropDuplicates()
raw_participation=raw_participation.dropDuplicates(["id","event_id"])
raw_patients=raw_patients.dropDuplicates(["id"])

In [22]:
from pyspark.sql.functions import col ,to_timestamp,split,when,concat,lit,count


In [6]:
columnstoadd={
    "frequency":split(col("recurrence")[0],"=")[1],
    "untill":to_timestamp(split(col("recurrence")[1],"=")[1],"yyyyMMdd'T'HHmmss'Z'"),
    "interval":split(col("recurrence")[2],"=")[1],
    "by_day":split(col("recurrence")[3],"=")[1],
}

In [7]:
processed_event=raw_event\
    .withColumn("recurrence",split("recurrence",';'))\
    .withColumns(columnstoadd)\
    .select("id", "title", to_timestamp("start").alias("start"), "end","frequency","untill","interval","by_day",col("created_at").alias("created_at_event"))


In [8]:
processed_patients=raw_patients\
    .withColumn("marital_status",when(col("marital_status")==0.0,False).when(col("marital_status")==1.0,True))\
    .withColumn("fullName",concat(col("first_name"),lit(" "),col("last_name")))\
    .select("id","fullName","gender","date_of_birth","email11","organ_donner","blood_donner","marital_status",col("created_at").alias("created_at_patients"))



In [10]:
processed_care_reception=raw_care_reception\
    .select("participation_id",col("patient_id").cast("int"),"message","meeting_status","patient_attendant_name","patient_attendant_phone_number","cancellation_reason")  

In [11]:
processed_participation=raw_participation\
    .select(col("id").alias("participation_id"),"event_id",col("created_at").alias("created_at_participation"))   

In [12]:
processed_event.write.format("parquet") \
    .mode("overwrite") \
    .save("s3a://onedata/processed/event")

processed_patients.write.format("parquet")\
    .mode("overwrite")\
    .save("s3a://onedata/processed/patients")   

processed_care_reception.write.format("parquet")\
    .mode("overwrite")\
    .save("s3a://onedata/processed/care_reception")

processed_participation.write.format("parquet")\
    .mode("overwrite")\
    .save("s3a://onedata/processed/participation")


In [13]:
joined_event_participation=processed_participation.alias("participation")\
    .join(processed_event.alias("event"),col("participation.event_id")==col("event.id"),"inner")\
    .drop(processed_event['id'])

    

In [14]:
joined_event_participation.show(truncate=False)

+----------------+--------+--------------------------+---------+-----------------------+-----------------------+---------+-------------------+--------+--------------------+--------------------------+
|participation_id|event_id|created_at_participation  |title    |start                  |end                    |frequency|untill             |interval|by_day              |created_at_event          |
+----------------+--------+--------------------------+---------+-----------------------+-----------------------+---------+-------------------+--------+--------------------+--------------------------+
|1709            |1335    |2024-10-21 16:52:31.379277|Test     |2024-10-21 19:15:00    |2024-10-21 19:30:00    |DAILY    |2024-12-31 04:30:00|1       |SU,TU,MO,WE,FR,TH,SA|2024-10-21 16:52:31.218149|
|1702            |1328    |2024-10-21 16:52:31.267422|Test     |2024-10-21 17:30:00    |2024-10-21 17:45:00    |DAILY    |2024-12-31 04:30:00|1       |SU,TU,MO,WE,FR,TH,SA|2024-10-21 16:52:31.218149|


In [15]:
joined_reception_patents=processed_care_reception.alias("care_reception")\
.join(processed_patients.alias("patients"),col("care_reception.patient_id")==col("patients.id"),"inner")\
.drop(processed_patients['id'])


In [17]:
final=joined_event_participation.alias("event")\
    .join(joined_reception_patents.alias("care_reception"),col("event.participation_id")==col("care_reception.participation_id"),"inner")\
    .drop(joined_reception_patents['participation_id'])



In [18]:
final.show()

+----------------+--------+------------------------+-----------+--------------------+--------------------+---------+-------------------+--------+------+--------------------+----------+-------+--------------+----------------------+------------------------------+-------------------+----------------+------+-------------+-------+------------+------------+--------------+--------------------+
|participation_id|event_id|created_at_participation|      title|               start|                 end|frequency|             untill|interval|by_day|    created_at_event|patient_id|message|meeting_status|patient_attendant_name|patient_attendant_phone_number|cancellation_reason|        fullName|gender|date_of_birth|email11|organ_donner|blood_donner|marital_status| created_at_patients|
+----------------+--------+------------------------+-----------+--------------------+--------------------+---------+-------------------+--------+------+--------------------+----------+-------+--------------+-------------

In [19]:
final.write.format("parquet")\
    .mode("overwrite")\
    .save("s3a://onedata/final/unified")

In [20]:
final.printSchema()

root
 |-- participation_id: integer (nullable = true)
 |-- event_id: integer (nullable = true)
 |-- created_at_participation: timestamp (nullable = true)
 |-- title: string (nullable = true)
 |-- start: timestamp (nullable = true)
 |-- end: timestamp (nullable = true)
 |-- frequency: string (nullable = true)
 |-- untill: timestamp (nullable = true)
 |-- interval: string (nullable = true)
 |-- by_day: string (nullable = true)
 |-- created_at_event: timestamp (nullable = true)
 |-- patient_id: integer (nullable = true)
 |-- message: string (nullable = true)
 |-- meeting_status: boolean (nullable = true)
 |-- patient_attendant_name: string (nullable = true)
 |-- patient_attendant_phone_number: string (nullable = true)
 |-- cancellation_reason: string (nullable = true)
 |-- fullName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- date_of_birth: date (nullable = true)
 |-- email11: string (nullable = true)
 |-- organ_donner: boolean (nullable = true)
 |-- blood_donner: 

count of event the patients attents

In [26]:
final\
    .groupBy("patient_id","fullname")\
    .agg(count(col("event_id")).alias("cnt"))\
    .orderBy(col("cnt").desc()).show()

+----------+----------------+---+
|patient_id|        fullname|cnt|
+----------+----------------+---+
|       145|       Raj kumar| 30|
|       148|    Rebecca Tran| 29|
|       147|Janice Castaneda| 28|
|       146|  Barry Martinez| 25|
|       149|     Jeremy Ward| 22|
|       153|  Kimberly Smith| 12|
|      1300|   Penny Sanchez|  7|
|      1298|   Daniel Moreno|  7|
|      1299|Tiffany Campbell|  7|
|      1208|      Amy Oliver|  7|
|       174|     Mary Nguyen|  7|
|      1337|   Matthew Boyle|  6|
|      1212|  Tanya Anderson|  6|
|      1213|   Aimee Carlson|  6|
|      1205|      Lisa David|  6|
|      1216|   James Jenkins|  6|
|      1214| Charlene Murray|  6|
|      1209|     David Poole|  6|
|      2367|  Raja Marimuthu|  6|
|       151|    Dinesh kumar|  6|
+----------+----------------+---+
only showing top 20 rows

