In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('create_tables').getOrCreate()

clinicaltrial_2019 = spark.read.csv('/FileStore/tables/clinicaltrial_2019.csv', header=True, inferSchema=True, sep='|')
clinicaltrial_2019.createOrReplaceTempView('clinicaltrial_2019')

clinicaltrial_2020 = spark.read.csv('/FileStore/tables/clinicaltrial_2020.csv', header=True, inferSchema=True, sep='|')
clinicaltrial_2020.createOrReplaceTempView('clinicaltrial_2020')

clinicaltrial_2021 = spark.read.csv('/FileStore/tables/clinicaltrial_2021.csv', header=True, inferSchema=True, sep='|')
clinicaltrial_2021.createOrReplaceTempView('clinicaltrial_2021')

pharma = spark.read.csv("/FileStore/tables/pharma.csv", header=True, inferSchema=True, sep=',')
pharma.createOrReplaceTempView('pharma')

clinicaltrial_2021.display()

Id,Sponsor,Status,Start,Completion,Type,Submission,Conditions,Interventions
NCT02758028,The University of Hong Kong,Recruiting,Aug 2005,Nov 2021,Interventional,Apr 2016,,
NCT02751957,Duke University,Completed,Jul 2016,Jul 2020,Interventional,Apr 2016,"Autistic Disorder,Autism Spectrum Disorder",
NCT02758483,Universidade Federal do Rio de Janeiro,Completed,Mar 2017,Jan 2018,Interventional,Apr 2016,Diabetes Mellitus,
NCT02759848,Istanbul Medeniyet University,Completed,Jan 2012,Dec 2014,Observational,May 2016,"Tuberculosis,Lung Diseases,Pulmonary Disease",
NCT02758860,University of Roma La Sapienza,"Active, not recruiting",Jun 2016,Sep 2020,Observational [Patient Registry],Apr 2016,"Diverticular Diseases,Diverticulum,Diverticulosis",
NCT02757209,Consorzio Futuro in Ricerca,Completed,Apr 2016,Jan 2018,Interventional,Apr 2016,Asthma,"Fluticasone,Xhance,Budesonide,Formoterol Fumarate,Salmeterol Xinafoate"
NCT02752438,Ankara University,Unknown status,May 2016,Jul 2017,Observational [Patient Registry],Apr 2016,Hypoventilation,
NCT02753543,Ruijin Hospital,Unknown status,Nov 2015,Nov 2019,Interventional,Apr 2016,Lymphoma,
NCT02757508,Washington University School of Medicine,Completed,Mar 2016,Jul 2017,Interventional,Apr 2016,,Vitamins
NCT02753530,Orphazyme,Completed,Aug 2017,Jan 2021,Interventional,Apr 2016,Myositis,


In [0]:
#Question 1
clinicaldf = spark.table("clinicaltrial_2021")
studycount = clinicaldf.select("Id").count()

# Print the count
print(studycount)

387261


In [0]:
#Question 2
from pyspark.sql.functions import count, desc

type_countsdf =clinicaldf.groupBy("Type").agg(count("*").alias("count")).orderBy(desc("count"))

# Print the type counts
type_countsdf.show()

+--------------------+------+
|                Type| count|
+--------------------+------+
|      Interventional|301472|
|       Observational| 77540|
|Observational [Pa...|  8180|
|     Expanded Access|    69|
+--------------------+------+



In [0]:
#Question 3

from pyspark.sql.functions import split, explode, count

# Assuming `condition_counts` is the name of my dataframe and `Condition` is the name of the column containing study conditions
condition_counts = clinicaldf.select(explode(split(clinicaldf.Conditions, ",")).alias("Condition")) \
                    .filter("Condition is not null") \
                    .groupBy("Condition") \
                    .agg(count("*").alias("Count")) \
                    .orderBy("Count", ascending=False).limit(5)

# Print the resulting dataframe
condition_counts.show()

+-----------------+-----+
|        Condition|Count|
+-----------------+-----+
|        Carcinoma|13389|
|Diabetes Mellitus|11080|
|        Neoplasms| 9371|
| Breast Neoplasms| 8640|
|         Syndrome| 8032|
+-----------------+-----+



In [0]:
# Question 4
from pyspark.sql.functions import desc, col
pharma = spark.table("pharma")
clinicaltrial_2021 = spark.table("clinicaltrial_2021")

pharmadf = clinicaltrial_2021.join(pharma.select(col("Parent_Company").alias("pc")), col("Sponsor") == col("pc"), "left_anti") \
                      .groupBy("Sponsor") \
                      .agg({"Sponsor": "count"}) \
                      .withColumnRenamed("count(Sponsor)", "Count") \
                      .orderBy(desc("Count")).limit(10)
pharmadf.show()

+--------------------+-----+
|             Sponsor|Count|
+--------------------+-----+
|National Cancer I...| 3218|
|M.D. Anderson Can...| 2414|
|Assistance Publiq...| 2369|
|         Mayo Clinic| 2300|
|Merck Sharp & Doh...| 2243|
|   Assiut University| 2154|
|Novartis Pharmace...| 2088|
|Massachusetts Gen...| 1971|
|    Cairo University| 1928|
|   Hoffmann-La Roche| 1828|
+--------------------+-----+



In [0]:
# Question 5
from pyspark.sql.functions import split, regexp_replace, to_date, count
from pyspark.sql.types import IntegerType

# create Completion view
complete_df = clinicaltrial_2021.select(
    'Status', 
    split('Completion', ' ')[0].alias('Completion_month'), 
    regexp_replace('Completion', ' ', '').alias('Completion')
)
complete_df.createOrReplaceTempView('Completion')

# create Complete view
complete_df = spark.sql("""
    SELECT Status, Completion_month, to_date(Completion, 'MMMyyyy') AS Completion_date 
    FROM Completion
""")
complete_df.createOrReplaceTempView('Complete')

# final query
result_df = spark.sql("""
    SELECT Completion_month, COUNT(*) AS Count 
    FROM Complete 
    WHERE Completion_date LIKE '2021%' AND Status = 'Completed' 
    GROUP BY Completion_month 
    ORDER BY MIN(Completion_date)
""")

result_df = result_df.withColumn('Count', result_df['Count'].cast(IntegerType()))

result_df.show()

+----------------+-----+
|Completion_month|Count|
+----------------+-----+
|             Jan| 1131|
|             Feb|  934|
|             Mar| 1227|
|             Apr|  967|
|             May|  984|
|             Jun| 1094|
|             Jul|  819|
|             Aug|  700|
|             Sep|  528|
|             Oct|  187|
+----------------+-----+

