In [7]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('rheeza').getOrCreate()

## Understand the Dataset

In [8]:
trials_df = spark.read.json('dataset.json', multiLine=True) # read the json file

trials_df.show(5)

+----------------+--------------------+---------+-----------------+-------------------+------------------------------+--------------------+
|ageofparticipant|           clinician|drug_used|experimentenddate|experimentstartdate|noofhourspassedatfirstreaction|              result|
+----------------+--------------------+---------+-----------------+-------------------+------------------------------+--------------------+
|              19|{Ontario, Saul, t...|  Placebo|    1619827200000|      1617235200000|                            52|{BP normalized, r...|
|              14|{Ontario, Saul, n...| Naproxen|    1619827200000|      1617235200000|                            78|    {Follow-up, N/A}|
|              17|{Ontario, Saul, n...|  Placebo|    1619827200000|      1617235200000|                            14|    {Follow-up, N/A}|
|              18|{Ontario, Will, n...| Naproxen|    1619827200000|      1617235200000|                            14|{BP normalized, N/A}|
|              17|{O

In [9]:
trials_df.printSchema() #view the schema

root
 |-- ageofparticipant: long (nullable = true)
 |-- clinician: struct (nullable = true)
 |    |-- branch: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- role: string (nullable = true)
 |-- drug_used: string (nullable = true)
 |-- experimentenddate: string (nullable = true)
 |-- experimentstartdate: string (nullable = true)
 |-- noofhourspassedatfirstreaction: long (nullable = true)
 |-- result: struct (nullable = true)
 |    |-- conclusion: string (nullable = true)
 |    |-- sideeffectsonparticipant: string (nullable = true)



In [10]:
trials_df.columns 

['ageofparticipant',
 'clinician',
 'drug_used',
 'experimentenddate',
 'experimentstartdate',
 'noofhourspassedatfirstreaction',
 'result']

In [11]:
columns = ['ageofparticipant'
 ,'clinician.branch'
 ,'clinician.name'
 ,'clinician.role'
 ,'drug_used'
 ,'experimentenddate'
 ,'experimentstartdate'
 ,'noofhourspassedatfirstreaction'
 ,'result.conclusion'
 ,'result.sideeffectsonparticipant'] # flatten the columns (there are nested columns in the jason file)



In [12]:
trials_df.select(columns).show(5)

+----------------+-------+-------+---------+---------+-----------------+-------------------+------------------------------+-------------+------------------------+
|ageofparticipant| branch|   name|     role|drug_used|experimentenddate|experimentstartdate|noofhourspassedatfirstreaction|   conclusion|sideeffectsonparticipant|
+----------------+-------+-------+---------+---------+-----------------+-------------------+------------------------------+-------------+------------------------+
|              19|Ontario|   Saul|therapist|  Placebo|    1619827200000|      1617235200000|                            52|BP normalized|          rashes on neck|
|              14|Ontario|   Saul|    nurse| Naproxen|    1619827200000|      1617235200000|                            78|    Follow-up|                     N/A|
|              17|Ontario|   Saul|    nurse|  Placebo|    1619827200000|      1617235200000|                            14|    Follow-up|                     N/A|
|              18|Onta

In [13]:
from pyspark.sql import functions as fn

In [14]:
trials_df.select([ fn.count(fn.when(fn.col(column).isNull(), column)).alias(column) for column in columns]).show() #count the null value in each column

+----------------+----------------+--------------+--------------+---------+-----------------+-------------------+------------------------------+-----------------+-------------------------------+
|ageofparticipant|clinician.branch|clinician.name|clinician.role|drug_used|experimentenddate|experimentstartdate|noofhourspassedatfirstreaction|result.conclusion|result.sideeffectsonparticipant|
+----------------+----------------+--------------+--------------+---------+-----------------+-------------------+------------------------------+-----------------+-------------------------------+
|               0|               0|             0|           109|        0|                0|                  0|                            73|               53|                              0|
+----------------+----------------+--------------+--------------+---------+-----------------+-------------------+------------------------------+-----------------+-------------------------------+



## Cleaning

In [15]:
# flatten df
# address null values
# rename columns

In [16]:
new_trials_df = trials_df.select(columns)

In [17]:
new_trials_df.printSchema()

root
 |-- ageofparticipant: long (nullable = true)
 |-- branch: string (nullable = true)
 |-- name: string (nullable = true)
 |-- role: string (nullable = true)
 |-- drug_used: string (nullable = true)
 |-- experimentenddate: string (nullable = true)
 |-- experimentstartdate: string (nullable = true)
 |-- noofhourspassedatfirstreaction: long (nullable = true)
 |-- conclusion: string (nullable = true)
 |-- sideeffectsonparticipant: string (nullable = true)



In [18]:
# rename columns and flatten the df

new_trials_df = new_trials_df.withColumnsRenamed({'ageofparticipant': 'age of participant', 'branch': 'clinician_branch','name': 'clinician_name','role':'clinician_role','experimentenddate':'experiment_end_date','experimentstartdate':' experiment_start_date','noofhourspassedatfirstreaction':'hours_passed_first_reaction','sideeffectsonparticipant':'side_effects_on_participant'})


In [19]:
new_trials_df.printSchema()

root
 |-- age of participant: long (nullable = true)
 |-- clinician_branch: string (nullable = true)
 |-- clinician_name: string (nullable = true)
 |-- clinician_role: string (nullable = true)
 |-- drug_used: string (nullable = true)
 |-- experiment_end_date: string (nullable = true)
 |--  experiment_start_date: string (nullable = true)
 |-- hours_passed_first_reaction: long (nullable = true)
 |-- conclusion: string (nullable = true)
 |-- side_effects_on_participant: string (nullable = true)



In [20]:
new_trials_df.show() # veiw the dataset for the applied changes

+------------------+----------------+--------------+--------------+---------+-------------------+----------------------+---------------------------+-------------+---------------------------+
|age of participant|clinician_branch|clinician_name|clinician_role|drug_used|experiment_end_date| experiment_start_date|hours_passed_first_reaction|   conclusion|side_effects_on_participant|
+------------------+----------------+--------------+--------------+---------+-------------------+----------------------+---------------------------+-------------+---------------------------+
|                19|         Ontario|          Saul|     therapist|  Placebo|      1619827200000|         1617235200000|                         52|BP normalized|             rashes on neck|
|                14|         Ontario|          Saul|         nurse| Naproxen|      1619827200000|         1617235200000|                         78|    Follow-up|                        N/A|
|                17|         Ontario|        

In [21]:
new_trials_df.dtypes # check the datatypes of the each columns

[('age of participant', 'bigint'),
 ('clinician_branch', 'string'),
 ('clinician_name', 'string'),
 ('clinician_role', 'string'),
 ('drug_used', 'string'),
 ('experiment_end_date', 'string'),
 (' experiment_start_date', 'string'),
 ('hours_passed_first_reaction', 'bigint'),
 ('conclusion', 'string'),
 ('side_effects_on_participant', 'string')]

In [22]:
new_trials_df.describe().show() #summary of the dataset

+-------+------------------+----------------+--------------+--------------+---------+--------------------+----------------------+---------------------------+-------------+---------------------------+
|summary|age of participant|clinician_branch|clinician_name|clinician_role|drug_used| experiment_end_date| experiment_start_date|hours_passed_first_reaction|   conclusion|side_effects_on_participant|
+-------+------------------+----------------+--------------+--------------+---------+--------------------+----------------------+---------------------------+-------------+---------------------------+
|  count|              3586|            3586|          3586|          3477|     3586|                3586|                  3586|                       3513|         3533|                       3586|
|   mean|17.507250418293363|            null|          null|          null|     null|1.618381578137200...|  1.615813671834913...|          44.89097637346997|         null|                       null|


In [23]:
new_trials_df = new_trials_df.na.fill({'conclusion':'unknown','clinician_role':'unknown'}) # filling the null values 

In [24]:

new_trials_df.show()

+------------------+----------------+--------------+--------------+---------+-------------------+----------------------+---------------------------+-------------+---------------------------+
|age of participant|clinician_branch|clinician_name|clinician_role|drug_used|experiment_end_date| experiment_start_date|hours_passed_first_reaction|   conclusion|side_effects_on_participant|
+------------------+----------------+--------------+--------------+---------+-------------------+----------------------+---------------------------+-------------+---------------------------+
|                19|         Ontario|          Saul|     therapist|  Placebo|      1619827200000|         1617235200000|                         52|BP normalized|             rashes on neck|
|                14|         Ontario|          Saul|         nurse| Naproxen|      1619827200000|         1617235200000|                         78|    Follow-up|                        N/A|
|                17|         Ontario|        

In [25]:

new_trials_df.describe().show()

+-------+------------------+----------------+--------------+--------------+---------+--------------------+----------------------+---------------------------+-------------+---------------------------+
|summary|age of participant|clinician_branch|clinician_name|clinician_role|drug_used| experiment_end_date| experiment_start_date|hours_passed_first_reaction|   conclusion|side_effects_on_participant|
+-------+------------------+----------------+--------------+--------------+---------+--------------------+----------------------+---------------------------+-------------+---------------------------+
|  count|              3586|            3586|          3586|          3586|     3586|                3586|                  3586|                       3513|         3586|                       3586|
|   mean|17.507250418293363|            null|          null|          null|     null|1.618381578137200...|  1.615813671834913...|          44.89097637346997|         null|                       null|


# Transformation

In [31]:
new_trials_df.printSchema()

root
 |-- age of participant: long (nullable = true)
 |-- clinician_branch: string (nullable = true)
 |-- clinician_name: string (nullable = true)
 |-- clinician_role: string (nullable = false)
 |-- drug_used: string (nullable = true)
 |-- experiment_end_date: string (nullable = true)
 |--  experiment_start_date: string (nullable = true)
 |-- hours_passed_first_reaction: long (nullable = true)
 |-- conclusion: string (nullable = false)
 |-- side_effects_on_participant: string (nullable = true)



In [None]:
# convert the datatypes to integers

# divide by 1000

# convert to normal date

In [28]:
from pyspark.sql import types as dtypes

In [33]:
new_trials_df.withColumn('start_ts', fn.from_unixtime(fn.col('experiment_start_date').cast(dtypes.LongType())/1000,'yyyy-MM-dd HH:mm:ss') ).show()

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `experiment_start_date` cannot be resolved. Did you mean one of the following? [` experiment_start_date`, `experiment_end_date`, `clinician_name`, `clinician_role`, `age of participant`].;
'Project [age of participant#309L, clinician_branch#307, clinician_name#303, clinician_role#1391, drug_used#53, experiment_end_date#310,  experiment_start_date#308, hours_passed_first_reaction#306L, conclusion#1392, side_effects_on_participant#305, from_unixtime((cast('experiment_start_date as bigint) / 1000), yyyy-mm-dd, Some(America/Los_Angeles)) AS start_ts#2460]
+- Project [age of participant#309L, clinician_branch#307, clinician_name#303, coalesce(clinician_role#304, cast(unknown as string)) AS clinician_role#1391, drug_used#53, experiment_end_date#310,  experiment_start_date#308, hours_passed_first_reaction#306L, coalesce(conclusion#291, cast(unknown as string)) AS conclusion#1392, side_effects_on_participant#305]
   +- Project [ageofparticipant#51L AS age of participant#309L, branch#288 AS clinician_branch#307, name#289 AS clinician_name#303, role#290 AS clinician_role#304, drug_used#53, experimentenddate#54 AS experiment_end_date#310, experimentstartdate#55 AS  experiment_start_date#308, noofhourspassedatfirstreaction#56L AS hours_passed_first_reaction#306L, conclusion#291, sideeffectsonparticipant#292 AS side_effects_on_participant#305]
      +- Project [ageofparticipant#51L, clinician#52.branch AS branch#288, clinician#52.name AS name#289, clinician#52.role AS role#290, drug_used#53, experimentenddate#54, experimentstartdate#55, noofhourspassedatfirstreaction#56L, result#57.conclusion AS conclusion#291, result#57.sideeffectsonparticipant AS sideeffectsonparticipant#292]
         +- Relation [ageofparticipant#51L,clinician#52,drug_used#53,experimentenddate#54,experimentstartdate#55,noofhourspassedatfirstreaction#56L,result#57] json
