In [1]:
#importing all functions
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('rheeza').getOrCreate()

In [2]:
from pyspark.sql import functions as fn

## Understanding the Dataset

In [3]:
#reading the dataset 
trails_df = spark.read.json('dataset.json',multiLine=True)

In [5]:
#displaying the content of the dataframe limiting to 5 records
trails_df.show(5)

+----------------+--------------------+---------+-----------------+-------------------+------------------------------+--------------------+
|ageofparticipant|           clinician|drug_used|experimentenddate|experimentstartdate|noofhourspassedatfirstreaction|              result|
+----------------+--------------------+---------+-----------------+-------------------+------------------------------+--------------------+
|              19|{Ontario, Saul, t...|  Placebo|    1619827200000|      1617235200000|                            52|{BP normalized, r...|
|              14|{Ontario, Saul, n...| Naproxen|    1619827200000|      1617235200000|                            78|    {Follow-up, N/A}|
|              17|{Ontario, Saul, n...|  Placebo|    1619827200000|      1617235200000|                            14|    {Follow-up, N/A}|
|              18|{Ontario, Will, n...| Naproxen|    1619827200000|      1617235200000|                            14|{BP normalized, N/A}|
|              17|{O

In [5]:
#structure of the datset
trails_df.printSchema()

root
 |-- ageofparticipant: long (nullable = true)
 |-- clinician: struct (nullable = true)
 |    |-- branch: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- role: string (nullable = true)
 |-- drug_used: string (nullable = true)
 |-- experimentenddate: string (nullable = true)
 |-- experimentstartdate: string (nullable = true)
 |-- noofhourspassedatfirstreaction: long (nullable = true)
 |-- result: struct (nullable = true)
 |    |-- conclusion: string (nullable = true)
 |    |-- sideeffectsonparticipant: string (nullable = true)



In [6]:
# displaying the columns in the dataframe
trails_df.columns

['ageofparticipant',
 'clinician',
 'drug_used',
 'experimentenddate',
 'experimentstartdate',
 'noofhourspassedatfirstreaction',
 'result']

## Flattening the json object(nested cloumns) within the structure
- maintaining same hierachy as clinician for branch,name and role
- maintaining same hierachy as result for conclusion and sideeffectsonparticipant

In [4]:
columns = ['ageofparticipant',
 'clinician.branch',
 'clinician.name',
 'clinician.role',
 'drug_used',
 'experimentenddate',
 'experimentstartdate',
 'noofhourspassedatfirstreaction',
 'result.conclusion',
 'result.sideeffectsonparticipant']

In [7]:
trails_df.select(columns).show(5)

+----------------+-------+-------+---------+---------+-----------------+-------------------+------------------------------+-------------+------------------------+
|ageofparticipant| branch|   name|     role|drug_used|experimentenddate|experimentstartdate|noofhourspassedatfirstreaction|   conclusion|sideeffectsonparticipant|
+----------------+-------+-------+---------+---------+-----------------+-------------------+------------------------------+-------------+------------------------+
|              19|Ontario|   Saul|therapist|  Placebo|    1619827200000|      1617235200000|                            52|BP normalized|          rashes on neck|
|              14|Ontario|   Saul|    nurse| Naproxen|    1619827200000|      1617235200000|                            78|    Follow-up|                     N/A|
|              17|Ontario|   Saul|    nurse|  Placebo|    1619827200000|      1617235200000|                            14|    Follow-up|                     N/A|
|              18|Onta

In [8]:
#checking null values by counting null values per column
trails_df.select([fn.count(fn.when(fn.col(column).isNull(), column)).alias(column) for column in columns]).show()



+----------------+----------------+--------------+--------------+---------+-----------------+-------------------+------------------------------+-----------------+-------------------------------+
|ageofparticipant|clinician.branch|clinician.name|clinician.role|drug_used|experimentenddate|experimentstartdate|noofhourspassedatfirstreaction|result.conclusion|result.sideeffectsonparticipant|
+----------------+----------------+--------------+--------------+---------+-----------------+-------------------+------------------------------+-----------------+-------------------------------+
|               0|               0|             0|           109|        0|                0|                  0|                            73|               53|                              0|
+----------------+----------------+--------------+--------------+---------+-----------------+-------------------+------------------------------+-----------------+-------------------------------+



## Cleaning

In [None]:
#flatten  df
# address null values
# renaming


In [9]:
#creating a copy of the dataframe
new_trials_df = trails_df.select(columns)

In [10]:
#checking the structure of the new df
new_trials_df.printSchema()

root
 |-- ageofparticipant: long (nullable = true)
 |-- branch: string (nullable = true)
 |-- name: string (nullable = true)
 |-- role: string (nullable = true)
 |-- drug_used: string (nullable = true)
 |-- experimentenddate: string (nullable = true)
 |-- experimentstartdate: string (nullable = true)
 |-- noofhourspassedatfirstreaction: long (nullable = true)
 |-- conclusion: string (nullable = true)
 |-- sideeffectsonparticipant: string (nullable = true)



In [12]:
#renaming columns using withColumnsRenamed method
new_trials_df = new_trials_df.withColumnsRenamed(
    {'ageofparticipant':'age_of_participant','branch' : 'clinic_branch','name': 'head_clinician','role':'clinician_role',
    'experimentenddate' : 'experiment_end_date','experimentstartdate': 'experiment_start_date',
    'noofhourspassedatfirstreaction' : 'hours_passed_at_first_reaction',
    'conclusion' : 'experiment_conclusion','sideeffectsonparticipant':'patient_side_effect'
    })

In [13]:
#checking new df
new_trials_df.show()

+------------------+-------------+--------------+--------------+---------+-------------------+---------------------+------------------------------+---------------------+-------------------+
|age_of_participant|clinic_branch|head_clinician|clinician_role|drug_used|experiment_end_date|experiment_start_date|hours_passed_at_first_reaction|experiment_conclusion|patient_side_effect|
+------------------+-------------+--------------+--------------+---------+-------------------+---------------------+------------------------------+---------------------+-------------------+
|                19|      Ontario|          Saul|     therapist|  Placebo|      1619827200000|        1617235200000|                            52|        BP normalized|     rashes on neck|
|                14|      Ontario|          Saul|         nurse| Naproxen|      1619827200000|        1617235200000|                            78|            Follow-up|                N/A|
|                17|      Ontario|          Saul| 

# Working all Null Values

In [14]:
# working on null values
new_trials_df.describe().show()

+-------+------------------+-------------+--------------+--------------+---------+--------------------+---------------------+------------------------------+---------------------+-------------------+
|summary|age_of_participant|clinic_branch|head_clinician|clinician_role|drug_used| experiment_end_date|experiment_start_date|hours_passed_at_first_reaction|experiment_conclusion|patient_side_effect|
+-------+------------------+-------------+--------------+--------------+---------+--------------------+---------------------+------------------------------+---------------------+-------------------+
|  count|              3586|         3586|          3586|          3477|     3586|                3586|                 3586|                          3513|                 3533|               3586|
|   mean|17.507250418293363|         null|          null|          null|     null|1.618381578137200...| 1.615813671834913...|             44.89097637346997|                 null|               null|
| std

In [15]:
#assigning values to null values seen in multiple columns
new_trials_df = new_trials_df.na.fill({'experiment_conclusion': 'unknown','clinician_role' : 'unknown'})


In [16]:
new_trials_df.describe().show()

+-------+------------------+-------------+--------------+--------------+---------+--------------------+---------------------+------------------------------+---------------------+-------------------+
|summary|age_of_participant|clinic_branch|head_clinician|clinician_role|drug_used| experiment_end_date|experiment_start_date|hours_passed_at_first_reaction|experiment_conclusion|patient_side_effect|
+-------+------------------+-------------+--------------+--------------+---------+--------------------+---------------------+------------------------------+---------------------+-------------------+
|  count|              3586|         3586|          3586|          3586|     3586|                3586|                 3586|                          3513|                 3586|               3586|
|   mean|17.507250418293363|         null|          null|          null|     null|1.618381578137200...| 1.615813671834913...|             44.89097637346997|                 null|               null|
| std

# Transformation

In [None]:
# transform experiment start date and end date

In [17]:
#printing my schema
new_trials_df.printSchema()

root
 |-- age_of_participant: long (nullable = true)
 |-- clinic_branch: string (nullable = true)
 |-- head_clinician: string (nullable = true)
 |-- clinician_role: string (nullable = false)
 |-- drug_used: string (nullable = true)
 |-- experiment_end_date: string (nullable = true)
 |-- experiment_start_date: string (nullable = true)
 |-- hours_passed_at_first_reaction: long (nullable = true)
 |-- experiment_conclusion: string (nullable = false)
 |-- patient_side_effect: string (nullable = true)



In [18]:
from pyspark.sql import types as dtypes

In [19]:
#converting the date columns from string to integers: long
#divide by 1000
# convert to mins to datetime
new_trials_df.withColumn('start_ts',fn.from_unixtime(fn.col('experiment_start_date').cast(dtypes.LongType())/1000, 'yyyy-MM-dd HH:mm:ss')).show(5)

+------------------+-------------+--------------+--------------+---------+-------------------+---------------------+------------------------------+---------------------+-------------------+-------------------+
|age_of_participant|clinic_branch|head_clinician|clinician_role|drug_used|experiment_end_date|experiment_start_date|hours_passed_at_first_reaction|experiment_conclusion|patient_side_effect|           start_ts|
+------------------+-------------+--------------+--------------+---------+-------------------+---------------------+------------------------------+---------------------+-------------------+-------------------+
|                19|      Ontario|          Saul|     therapist|  Placebo|      1619827200000|        1617235200000|                            52|        BP normalized|     rashes on neck|2021-04-01 01:00:00|
|                14|      Ontario|          Saul|         nurse| Naproxen|      1619827200000|        1617235200000|                            78|            F

In [20]:
new_trials_df = new_trials_df\
    .withColumn('start_ts', fn.from_unixtime(fn.col('experiment_start_date').cast(dtypes.LongType())/1000, 'yyyy-MM-dd HH:mm:ss'))\
    .withColumn('start_ts', fn.col('start_ts').cast(dtypes.TimestampType()))\
            .withColumn('end_ts', fn.from_unixtime(fn.col('experiment_end_date').cast(dtypes.LongType())/1000, 'yyyy-MM-dd HH:mm:ss'))\
            .withColumn('end_ts', fn.col('end_ts').cast(dtypes.TimestampType()))

In [21]:
new_trials_df.dtypes

[('age_of_participant', 'bigint'),
 ('clinic_branch', 'string'),
 ('head_clinician', 'string'),
 ('clinician_role', 'string'),
 ('drug_used', 'string'),
 ('experiment_end_date', 'string'),
 ('experiment_start_date', 'string'),
 ('hours_passed_at_first_reaction', 'bigint'),
 ('experiment_conclusion', 'string'),
 ('patient_side_effect', 'string'),
 ('start_ts', 'timestamp'),
 ('end_ts', 'timestamp')]

In [23]:
new_trials_df.show(5)

+------------------+-------------+--------------+--------------+---------+-------------------+---------------------+------------------------------+---------------------+-------------------+-------------------+-------------------+
|age_of_participant|clinic_branch|head_clinician|clinician_role|drug_used|experiment_end_date|experiment_start_date|hours_passed_at_first_reaction|experiment_conclusion|patient_side_effect|           start_ts|             end_ts|
+------------------+-------------+--------------+--------------+---------+-------------------+---------------------+------------------------------+---------------------+-------------------+-------------------+-------------------+
|                19|      Ontario|          Saul|     therapist|  Placebo|      1619827200000|        1617235200000|                            52|        BP normalized|     rashes on neck|2021-04-01 01:00:00|2021-05-01 01:00:00|
|                14|      Ontario|          Saul|         nurse| Naproxen|      

# Loading (for Clinicians)

In [24]:
reordered_columns = ['start_ts', 'end_ts','clinic_branch', 'head_clinician', 'clinician_role', 'drug_used', 'age_of_participant', 'hours_passed_at_first_reaction', 'experiment_conclusion', 'patient_side_effect' ]
new_trials_df.select(reordered_columns).sort('start_ts').write.option('header', True).partitionBy('drug_used').mode("overwrite").format("csv").save('clinician')

#  Loading (For ML Engineers)

In [25]:
reordered_columns = ['start_ts', 'end_ts','clinic_branch', 'head_clinician', 'clinician_role', 'drug_used', 'age_of_participant', 'hours_passed_at_first_reaction', 'experiment_conclusion', 'patient_side_effect' ]
new_trials_df.select(reordered_columns)\
    .withColumn('start_month',fn.month('start_ts'))\
        .sort('start_ts').write.partitionBy('start_month').mode("overwrite").format("parquet").save('ml_engineers')