In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('rheeza').getOrCreate()

In [4]:
##understand the dataset
trials_df=spark.read.json('dataset.json', multiLine=True)

In [5]:
trials_df.show(5)

+----------------+--------------------+---------+-----------------+-------------------+------------------------------+--------------------+
|ageofparticipant|           clinician|drug_used|experimentenddate|experimentstartdate|noofhourspassedatfirstreaction|              result|
+----------------+--------------------+---------+-----------------+-------------------+------------------------------+--------------------+
|              19|{Ontario, Saul, t...|  Placebo|    1619827200000|      1617235200000|                            52|{BP normalized, r...|
|              14|{Ontario, Saul, n...| Naproxen|    1619827200000|      1617235200000|                            78|    {Follow-up, N/A}|
|              17|{Ontario, Saul, n...|  Placebo|    1619827200000|      1617235200000|                            14|    {Follow-up, N/A}|
|              18|{Ontario, Will, n...| Naproxen|    1619827200000|      1617235200000|                            14|{BP normalized, N/A}|
|              17|{O

In [58]:
##to see the structure of the data and to plan data processing effectively(flattening the data)
trials_df.printSchema()

root
 |-- ageofparticipant: long (nullable = true)
 |-- clinician: struct (nullable = true)
 |    |-- branch: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- role: string (nullable = true)
 |-- drug_used: string (nullable = true)
 |-- experimentenddate: string (nullable = true)
 |-- experimentstartdate: string (nullable = true)
 |-- noofhourspassedatfirstreaction: long (nullable = true)
 |-- result: struct (nullable = true)
 |    |-- conclusion: string (nullable = true)
 |    |-- sideeffectsonparticipant: string (nullable = true)



In [59]:
##to see columns so to perform operations on columns or to be able to select specific columns 
trials_df.columns


['ageofparticipant',
 'clinician',
 'drug_used',
 'experimentenddate',
 'experimentstartdate',
 'noofhourspassedatfirstreaction',
 'result']

In [60]:
columns = ['ageofparticipant',
 'clinician.branch',
'clinician.name',
'clinician.role',
 'drug_used',
 'experimentenddate',
 'experimentstartdate',
 'noofhourspassedatfirstreaction',
 'result.conclusion',
'result.sideeffectsonparticipant']

In [61]:
trials_df.select(columns).show(5)

+----------------+-------+-------+---------+---------+-----------------+-------------------+------------------------------+-------------+------------------------+
|ageofparticipant| branch|   name|     role|drug_used|experimentenddate|experimentstartdate|noofhourspassedatfirstreaction|   conclusion|sideeffectsonparticipant|
+----------------+-------+-------+---------+---------+-----------------+-------------------+------------------------------+-------------+------------------------+
|              19|Ontario|   Saul|therapist|  Placebo|    1619827200000|      1617235200000|                            52|BP normalized|          rashes on neck|
|              14|Ontario|   Saul|    nurse| Naproxen|    1619827200000|      1617235200000|                            78|    Follow-up|                     N/A|
|              17|Ontario|   Saul|    nurse|  Placebo|    1619827200000|      1617235200000|                            14|    Follow-up|                     N/A|
|              18|Onta

In [62]:
##inorder to access built in functions and expressions
from pyspark.sql import functions as fn

##counting null values per column
trials_df.select([ fn.count(fn.when(fn.col(column).isNull(), column)).alias(column) for column in columns]).show()


##cleaning
##flatten the df
##address null values
##rename columns

In [63]:
new_trials_df = trials_df.select(columns)

In [64]:
new_trials_df.printSchema()

root
 |-- ageofparticipant: long (nullable = true)
 |-- branch: string (nullable = true)
 |-- name: string (nullable = true)
 |-- role: string (nullable = true)
 |-- drug_used: string (nullable = true)
 |-- experimentenddate: string (nullable = true)
 |-- experimentstartdate: string (nullable = true)
 |-- noofhourspassedatfirstreaction: long (nullable = true)
 |-- conclusion: string (nullable = true)
 |-- sideeffectsonparticipant: string (nullable = true)



In [65]:
new_column_names = {
    'ageofparticipant':'age_of_participant'
    ,'branch': 'clinic_branch'
    ,'name': 'head_clinician'
    ,'role': 'assistants_role'
    ,'experimentenddate':'experiment_end_date'
    ,'experimentstartdate':'experiment_start_date'
    ,'noofhourspassedatfirstreaction':'no_of_hours_passed _at_first_reaction'
    ,'sideeffectsonparticipant':'side_effect_on_participant'
}
new_trials_df=new_trials_df.withColumnsRenamed(new_column_names)
new_trials_df.show(2)


+------------------+-------------+--------------+---------------+---------+-------------------+---------------------+-------------------------------------+-------------+--------------------------+
|age_of_participant|clinic_branch|head_clinician|assistants_role|drug_used|experiment_end_date|experiment_start_date|no_of_hours_passed _at_first_reaction|   conclusion|side_effect_on_participant|
+------------------+-------------+--------------+---------------+---------+-------------------+---------------------+-------------------------------------+-------------+--------------------------+
|                19|      Ontario|          Saul|      therapist|  Placebo|      1619827200000|        1617235200000|                                   52|BP normalized|            rashes on neck|
|                14|      Ontario|          Saul|          nurse| Naproxen|      1619827200000|        1617235200000|                                   78|    Follow-up|                       N/A|
+--------------

In [66]:
new_trials_df.dtypes

[('age_of_participant', 'bigint'),
 ('clinic_branch', 'string'),
 ('head_clinician', 'string'),
 ('assistants_role', 'string'),
 ('drug_used', 'string'),
 ('experiment_end_date', 'string'),
 ('experiment_start_date', 'string'),
 ('no_of_hours_passed _at_first_reaction', 'bigint'),
 ('conclusion', 'string'),
 ('side_effect_on_participant', 'string')]

In [67]:
new_trials_df.describe().show()

+-------+------------------+-------------+--------------+---------------+---------+--------------------+---------------------+-------------------------------------+-------------+--------------------------+
|summary|age_of_participant|clinic_branch|head_clinician|assistants_role|drug_used| experiment_end_date|experiment_start_date|no_of_hours_passed _at_first_reaction|   conclusion|side_effect_on_participant|
+-------+------------------+-------------+--------------+---------------+---------+--------------------+---------------------+-------------------------------------+-------------+--------------------------+
|  count|              3586|         3586|          3586|           3477|     3586|                3586|                 3586|                                 3513|         3533|                      3586|
|   mean|17.507250418293363|         null|          null|           null|     null|1.618381578137200...| 1.615813671834913...|                    44.89097637346997|         nul

In [68]:
new_trials_df=new_trials_df.na.fill({'conclusion':'unknown','assistants_role':'unknown'})

In [43]:
new_trials_df.describe().show()

+-------+------------------+-------+-------+---------+---------+--------------------+--------------------+------------------------------+-------------+------------------------+
|summary|  ageofparticipant| branch|   name|     role|drug_used|   experimentenddate| experimentstartdate|noofhourspassedatfirstreaction|   conclusion|sideeffectsonparticipant|
+-------+------------------+-------+-------+---------+---------+--------------------+--------------------+------------------------------+-------------+------------------------+
|  count|              3586|   3586|   3586|     3477|     3586|                3586|                3586|                          3513|         3533|                    3586|
|   mean|17.507250418293363|   null|   null|     null|     null|1.618381578137200...|1.615813671834913...|             44.89097637346997|         null|                    null|
| stddev|2.3066401927555233|   null|   null|     null|     null|2.3250351904618263E9|2.2862846039555306E9|         

Transormation
#transorming experiment start and end date to datetype

In [72]:
new_trials_df.printSchema()


root
 |-- age_of_participant: long (nullable = true)
 |-- clinic_branch: string (nullable = true)
 |-- head_clinician: string (nullable = true)
 |-- assistants_role: string (nullable = false)
 |-- drug_used: string (nullable = true)
 |-- experiment_end_date: string (nullable = true)
 |-- experiment_start_date: string (nullable = true)
 |-- no_of_hours_passed _at_first_reaction: long (nullable = true)
 |-- conclusion: string (nullable = false)
 |-- side_effect_on_participant: string (nullable = true)
 |-- start_ts: timestamp (nullable = true)
 |-- end_ts: timestamp (nullable = true)



In [70]:
from pyspark.sql import types as dtypes

In [73]:
##convert datatype to long int
##divide by 1000
##convert from unix to datetime
new_trials_df=new_trials_df\
    .withColumn('start_ts', fn.from_unixtime(fn.col('experiment_start_date').cast(dtypes.LongType())/1000, 'yyyy-MM-dd HH:mm:ss'))\
    .withColumn('start_ts', fn.col('start_ts').cast(dtypes.TimestampType()))\
            .withColumn('end_ts', fn.from_unixtime(fn.col('experiment_end_date').cast(dtypes.LongType())/1000, 'yyyy-MM-dd HH:mm:ss'))\
            .withColumn('end_ts', fn.col('end_ts').cast(dtypes.TimestampType()))



DataFrame[age_of_participant: bigint, clinic_branch: string, head_clinician: string, assistants_role: string, drug_used: string, experiment_end_date: string, experiment_start_date: string, no_of_hours_passed _at_first_reaction: bigint, conclusion: string, side_effect_on_participant: string, start_ts: timestamp, end_ts: timestamp]

In [74]:
new_trials_df.show(3)

+------------------+-------------+--------------+---------------+---------+-------------------+---------------------+-------------------------------------+-------------+--------------------------+-------------------+-------------------+
|age_of_participant|clinic_branch|head_clinician|assistants_role|drug_used|experiment_end_date|experiment_start_date|no_of_hours_passed _at_first_reaction|   conclusion|side_effect_on_participant|           start_ts|             end_ts|
+------------------+-------------+--------------+---------------+---------+-------------------+---------------------+-------------------------------------+-------------+--------------------------+-------------------+-------------------+
|                19|      Ontario|          Saul|      therapist|  Placebo|      1619827200000|        1617235200000|                                   52|BP normalized|            rashes on neck|2021-04-01 01:00:00|2021-05-01 01:00:00|
|                14|      Ontario|          Saul|   