In [0]:
file_path = "/FileStore/tables/clinicaltrial_2021_csv.gz"

In [0]:
def file_exists(path):
    try:
        dbutils.fs.ls(path)
        return True
    except Exception as e:
        if 'java.io.FileNotFoundException' in str(e):
            return False
        else:
            raise

In [0]:
if file_exists(file_path) == True:
    fileroot = file_path.split("/")[3].split(".")[0]
    dbutils.fs.cp(file_path,"file:/tmp/")
else:
    print("File is not found in dbfs")

In [0]:
import os
os.environ['fileroot'] = fileroot

In [0]:
%sh
gzip -d /tmp/ /tmp/$fileroot.gz

gzip: /tmp/ is a directory -- ignored


In [0]:
dbutils.fs.mv("file:/tmp/" + fileroot,"/FileStore/tables/")

Out[6]: True

In [0]:
if fileroot.endswith("_csv"):
    fileroot_new = fileroot.replace("_csv",".csv")
dbutils.fs.mv("/FileStore/tables/" + fileroot,"/FileStore/tables/" + fileroot_new)

Out[7]: True

In [0]:
def read_file(file_path,header):
    if header == True:
        rdd = sc.textFile(file_path)
        header = rdd.take(1)[0]
        return(rdd.filter(lambda line: line != header))
    else:
        return(sc.textFile(file_path))

In [0]:
file_path = "/FileStore/tables/"+fileroot_new
clinical_rdd = read_file(file_path,True)
mesh_rdd = read_file("/FileStore/tables/mesh.csv",True)
pharma_rdd = read_file("/FileStore/tables/pharma.csv",True)
clinical_trail = clinical_rdd.map(lambda s: s.split("|"))

Question 1

In [0]:
clinical_rdd.filter(lambda x: x[0]).distinct().count()

Out[10]: 387261

Question 2

In [0]:
clinical_trail.map(lambda x: (x[5],1))\
              .reduceByKey(lambda x,y : x+y)\
              .sortBy(lambda x: -x[1]).collect()

Out[11]: [('Interventional', 301472),
 ('Observational', 77540),
 ('Observational [Patient Registry]', 8180),
 ('Expanded Access', 69)]

Question 3

In [0]:
clinical_trail.map(lambda x: x[7].split(","))\
               .filter(lambda x : x[0]!= '')\
               .flatMap(lambda x :x)\
               .map(lambda x: (x,1))\
               .reduceByKey(lambda x,y : x+y)\
               .sortBy(lambda x: -x[1]).take(5)

Out[12]: [('Carcinoma', 13389),
 ('Diabetes Mellitus', 11080),
 ('Neoplasms', 9371),
 ('Breast Neoplasms', 8640),
 ('Syndrome', 8032)]

Question 4

In [0]:
clinical_trail.map(lambda x: x[7].split(","))\
              .filter(lambda x : x[0]!= '')\
              .flatMap(lambda x :x)\
              .map(lambda x: (x,1))\
              .join(mesh_rdd.map(lambda x: x.split(','))\
              .map(lambda x: (x[0],x[1].split('.')[0])))\
              .map(lambda x: (x[1][1],x[1][0]))\
              .reduceByKey(lambda x,y : x+y)\
              .sortBy(lambda x: -x[1]).take(10)

Out[13]: [('C04', 143994),
 ('C23', 136079),
 ('C01', 106674),
 ('C14', 94523),
 ('C10', 92310),
 ('C06', 85646),
 ('C08', 70720),
 ('C13', 42599),
 ('C18', 41276),
 ('C12', 40161)]

Question 5

In [0]:
parent_company = pharma_rdd.map(lambda x: x.split("\"")).map(lambda x: x[3]).collect()
clinical_trail.map(lambda s: (s[1],1))\
              .reduceByKey(lambda x,y : x+y)\
              .sortBy(lambda x: -x[1])\
              .filter(lambda x: x[0] not in parent_company).take(10)

Out[14]: [('National Cancer Institute (NCI)', 3218),
 ('M.D. Anderson Cancer Center', 2414),
 ('Assistance Publique - Hôpitaux de Paris', 2369),
 ('Mayo Clinic', 2300),
 ('Merck Sharp & Dohme Corp.', 2243),
 ('Assiut University', 2154),
 ('Novartis Pharmaceuticals', 2088),
 ('Massachusetts General Hospital', 1971),
 ('Cairo University', 1928),
 ('Hoffmann-La Roche', 1828)]

Question 6

In [0]:
month_list = [('Jan', 1),('Feb', 2),('Mar', 3),('Apr', 4),('May', 5),('Jun', 6),
              ('Jul', 7),('Aug', 8),('Sep', 9),('Oct', 10),('Nov', 11),('Dec', 12)]
month_rdd = sc.parallelize(month_list)

In [0]:
clinical_trail_completed = clinical_trail.filter(lambda s: s[2] == 'Completed')\
                                         .map(lambda x : x[4].split(' '))
clinical_trail_completed.filter(lambda x : x[0] != '')\
                        .filter(lambda x: (x[1] == '2021'))\
                        .map(lambda x: (x[0],1))\
                        .reduceByKey(lambda x,y : x+y)\
                        .join(month_rdd)\
                        .sortBy(lambda x: x[1][1])\
                        .map(lambda x: (x[0],x[1][0])).collect()

Out[16]: [('Jan', 1131),
 ('Feb', 934),
 ('Mar', 1227),
 ('Apr', 967),
 ('May', 984),
 ('Jun', 1094),
 ('Jul', 819),
 ('Aug', 700),
 ('Sep', 528),
 ('Oct', 187)]