# Technical Project : NIH Chest X-ray Analysis and Disease Detection

##Goals
1.   Infiltration is the most common disease among the 14 diseases.
2.   People of age 50 to 60 have a greater chance of getting diseases like Cardiomegaly, Infiltration and Pneumonia
3.   Predict the disease of a person from the respective X-ray using deep learning
models.


##1. Data Aggregation 

There are mainly two files to consider, The first one is image files which contain chest X-rays of 112,120 samples. The second one is a CSV file that provides patient
records and corresponding disease labels for the full dataset. There will be a total of 15 classes that consist of 14 diseases plus ’No findings’. The dataset can be accessed
from the official website of NIH (Summers 2017).

##Goals 1

###Csv dataset

The first step is to load the csv file which is downloaded to the dbfs (Databricks Filesystem) to a spark dataframe.

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType

df = (sqlContext.read.format("csv").
  option("header", "true").
  option("nullValue", "NA").
  option("inferSchema", True).
  option("encoding", "UTF-8").  
  option("ignoreLeadingWhiteSpace", True).
  option("ignoreTrailingWhiteSpace", True).
  option("multiLine", True).
  load("/FileStore/tables/Data_Entry_2017.csv"))

df.printSchema()

Next step is to import nessasry libraries

In [0]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os
from glob import glob
%matplotlib inline
import matplotlib.pyplot as plt
from pyspark.sql.functions import col, countDistinct

##2. Data Transformation

In [0]:
display(df.limit(7))

Check columns names and convert them to simple format

In [0]:
df = df.withColumnRenamed("Image Index", "imageIndex")\
       .withColumnRenamed("Finding Labels", "labels")\
       .withColumnRenamed("Follow-up #", "followUp")\
       .withColumnRenamed("Patient Age", "age")\
       .withColumnRenamed("Patient ID", "id")\
       .withColumnRenamed("Patient Age", "age")\
       .withColumnRenamed("Patient Gender", "gender")\
       .withColumnRenamed("View Position", "viewPosition")\
       .withColumnRenamed("OriginalImage[Width", "width")\
       .withColumnRenamed("Height]", "height")\
       .withColumnRenamed("OriginalImagePixelSpacing[x", "imagePixelSpacingX")\
       .withColumnRenamed("y]", "imagePixelSpacingY")\
       .withColumnRenamed("_c11", "c11")

df.show()
df.printSchema()

In [0]:
df.count()

It was observed that one column constist of null values and it is not required.

In [0]:
df.filter("c11 is NULL").count() #112120
df = df.drop('c11') #droping column with null values
df.show(n=2)

In [0]:
#Filtering out outliers 
df = df.filter(df.age<100)
df.count()

Here we take the labels and make them into a more clear format. The primary step is to see the distribution of findings and then to convert them to simple binary labels

## 3.Descriptive Analytics

we can plot the dataset in graph for finding the distribution

In [0]:
#For graph reprasentation we are converting to pandas df.
fig, ax1 = plt.subplots(1,1,figsize = (12, 8))
pdf = df.toPandas()
label_counts = pdf['labels'].value_counts()[:15]
ax1.bar(np.arange(len(label_counts))+0.5, label_counts)
ax1.set_xticks(np.arange(len(label_counts))+0.5)
_ = ax1.set_xticklabels(label_counts.index, rotation = 90)

We are perfoming one hot encoding as a part of data transformation. The features which are diseases in this case, are extracted from labels column and seperated to form another columns own its own. 0 and 1 are used to represent if the patient has disease or not.

In [0]:
#similarly we can do this on spark dataframe by using udf function
all_labels = ['Atelectasis', 'Cardiomegaly', 'Consolidation', 'Edema', 'Effusion', 'Emphysema', 'Fibrosis', 'Hernia', 'Infiltration', 'Mass', 'Nodule', 'Pleural_Thickening', 'Pneumonia', 'Pneumothorax']

for c_label in all_labels: #loop the values
    if len(c_label)>1: # leave out empty labels
        oneHotEncoding = udf(lambda row: 1.0 if c_label in row else 0.0, DoubleType())  #udf to perform one Hot Encoding
        df = df.withColumn(c_label, oneHotEncoding('labels')) #adding new columns using withColumn fn
        df.count()
display(df.limit(7))

In [0]:
pdf['labels'] = pdf['labels'].map(lambda x: x.replace('No Finding', ''))
from itertools import chain
all_labels = np.unique(list(chain(*pdf['labels'].map(lambda x: x.split('|')).tolist())))
all_labels = [x for x in all_labels if len(x)>0]
print('All Labels ({}): {}'.format(len(all_labels), all_labels))
for c_label in all_labels:
    if len(c_label)>1: # leave out empty labels
        pdf[c_label] = pdf['labels'].map(lambda finding: 1.0 if c_label in finding else 0)
pdf.sample(2)

Since we have too many categories, we can prune a few out by taking the ones with only a few examples

In [0]:
# keep at least 1000 cases
MIN_CASES = 1000
all_labels = [c_label for c_label in all_labels if pdf[c_label].sum()>MIN_CASES]
print('Clean Labels ({})'.format(len(all_labels)), 
      [(c_label,int(pdf[c_label].sum())) for c_label in all_labels])

##4.Data Visualisation

In [0]:
# since the dataset is very unbiased, we can resample it to be a more reasonable collection
# weight is 0.1 + number of findings
sample_weights = pdf['labels'].map(lambda x: len(x.split('|')) if len(x)>0 else 0).values + 4e-2
sample_weights /= sample_weights.sum()
pdf = pdf.sample(40000, weights=sample_weights)

label_counts = pdf['labels'].value_counts()[:15]
fig, ax1 = plt.subplots(1,1,figsize = (12, 8))
ax1.bar(np.arange(len(label_counts))+0.5, label_counts)

ax1.set_xticks(np.arange(len(label_counts))+0.5)
ax1.set_title('Count of Diseases in Patient Group')
_ = ax1.set_xticklabels(label_counts.index, rotation = 90)

In [0]:
#normalization is performed on  label count to get a clearer graph
label_counts = 100*np.mean(pdf[all_labels].values,0)
fig, ax1 = plt.subplots(1,1,figsize = (12, 8))
ax1.bar(np.arange(len(label_counts))+0.5, label_counts)
ax1.set_xticks(np.arange(len(label_counts))+0.5)
ax1.set_xticklabels(all_labels, rotation = 90)
ax1.set_title('Adjusted Frequency of Diseases in Patient Group')
_ = ax1.set_ylabel('Frequency (%)')

By inferring the above graph we can conclude that Infiltration is the most common disease which has the highest frequency of diseases in patient group

So our hypothesis, Infiltration is the most common disease among the 14 diseases is true.

##GOAL 2
##1. Data Aggregation 

The data loading part is same as goal 1 and we can use the dataframe from goal 1 to perform descriptive analysis.

##2. Data Transformation

In [0]:
from pyspark.sql.types import *
mySchema = StructType([ StructField("imageIndex", StringType(), True)\
                       ,StructField("labels", StringType(), True)\
                       ,StructField("followUp", IntegerType(), True)\
                       ,StructField("id", IntegerType(), True)\
                       ,StructField("age", IntegerType(), True)\
                       ,StructField("gender", StringType(), True)\
                       ,StructField("viewPosition", StringType(), True)\
                       ,StructField("width", DoubleType(), True)\
                       ,StructField("height", DoubleType(), True)\
                       ,StructField("imagePixelSpacingX", DoubleType(), True)\
                       ,StructField("imagePixelSpacingY", DoubleType(), True)\
                       ,StructField("Atelectasis", IntegerType(), True)\
                       ,StructField("Cardiomegaly", IntegerType(), True)\
                       ,StructField("Consolidation", IntegerType(), True)\
                       ,StructField("Edema", IntegerType(), True)\
                       ,StructField("Effusion", IntegerType(), True)\
                       ,StructField("Emphysema", IntegerType(), True)\
                       ,StructField("Fibrosis", IntegerType(), True)\
                       ,StructField("Hernia", IntegerType(), True)\
                       ,StructField("Infiltration", IntegerType(), True)\
                       ,StructField("Mass", IntegerType(), True)\
                       ,StructField("Nodule", IntegerType(), True)\
                       ,StructField("Pleural_Thickening", IntegerType(), True)\
                       ,StructField("Pneumonia", IntegerType(), True)\
                       ,StructField("Pneumothorax", IntegerType(), True)])

In [0]:
#we can create spark dataframe from pandas dataframe using createDataFrame which requires schema
sdf = spark.createDataFrame(pdf,schema=mySchema)

In [0]:
#Temporary view is created using createOrReplaceTempView, for performing sql operations. Table name is given as params. 
sdf.createOrReplaceTempView("patients_details")

##2. Descriptive Analytics

In [0]:
# we can create new spark df by using sqlContext.sql() method by giving sal query as params.
cardio_df = sqlContext.sql("select id, age from patients_details where Cardiomegaly = 1").distinct()
display(cardio_df)

In [0]:
from pyspark.sql.functions import udf

#udf function is used to sort out data in age range vise.
age_range = udf(lambda age: '1 < 20' if age < 20 else 
                   '20-30' if (age >= 20 and age < 30) else
                   '30-40' if (age >= 30 and age < 40) else
                   '40-50' if (age >= 40 and age < 50) else
                   '50-60' if (age >= 50 and age < 60) else
                   '60-70' if (age >= 60 and age < 70) else
                   '70-80' if (age >= 70 and age < 80) else
                    '80+'  if (age >= 80) else '')

cardio_df = cardio_df.withColumn('age_range', age_range(cardio_df.age))


In [0]:

#we can use display method provided by pyspark to visualize data 
#count according to age range is calculated using groupby followed with count function 
display(cardio_df.groupBy(['age_range']).count().sort('age_range'))


In [0]:
#Alternative way to plot graph is to use matlib lib but the dataframe has to be converted to pandas. toPandas() method can be used to perfrom convertion. 
cardio_pdf = cardio_df.groupBy(['age_range']).count().sort('age_range').toPandas()

cardio_pdf.plot(kind='bar', x='age_range', y='count', label='Cardiomegaly')

In [0]:
#Similarly we can combine two or more dataframes in a single graph.
infiltration_df = sqlContext.sql("select id, age from patients_details where Infiltration = 1").distinct()
pneumonia_df = sqlContext.sql("select id, age from patients_details where Pneumonia = 1").distinct()

##4.Data Visualisation

In [0]:
infiltration_df = infiltration_df.withColumn('age_range', age_range(infiltration_df.age))
pneumonia_df = pneumonia_df.withColumn('age_range', age_range(pneumonia_df.age))

infiltration_pdf = infiltration_df.groupBy(['age_range']).count().sort('age_range').toPandas()
pneumonia_pdf = pneumonia_df.groupBy(['age_range']).count().sort('age_range').toPandas()

# we will be able to plot multiple items against it
ax = plt.gca()

cardio_pdf.plot(kind='line', x='age_range', y='count', label='Cardiomegaly', ax=ax , marker='o' )
infiltration_pdf.plot(kind='line', x='age_range', y='count', label='Infiltration', ax=ax , marker='*')
pneumonia_pdf.plot(kind='line', x='age_range', y='count', label='Pneumonia', ax=ax , marker='+' )

The graph incicates the comparison between three different diseases and thier occurance in relavance to their age group. We can clearly see that at in age range of 50 to 60 the number of patients are more. The line of each observation have a sligt increase in slope till age range 50-60 but later gradual decrease can be noticed.