##Image Processing Notebook

###1. Process Description

We design our process to leverage the parralel processing power from Spark while ensuring we can train multiple models in parallel. One option we decided on was to transform the images using pyspark and save those transformed data into parquet files. Doing this it will allow us to load this dataset on other models in parralel and reduce the duplication of image transformation.

First, we needed load the needed libraries. In this case, we need PIL and torchvision for imaging transformation.

In [0]:
#Add needed packages
from pyspark.sql.functions import *
import numpy as np
import pandas as pd
from pyspark import SparkContext
import random
from pyspark.sql.types import *
from PIL import Image
from torchvision import transforms
from pyspark.sql.window import Window

We use Azure Data Lake Store for storing all the images and transformed dataset. This mounting section allow us to mount to the storage account using Storage Key. Different mounting points point to different containers

- **/mnt/files**: mounting alias for pictures
- **/mnt/dim**: excel dataset about the patients
- **/mnt/datasets**: transformed dataset

In [0]:
#mount blob container to Databricks
try:
  dbutils.fs.mount(
    source = "wasbs://images@chestxraystorage1.blob.core.windows.net",
    mount_point = "/mnt/files",
    extra_configs = {"fs.azure.account.key.chestxraystorage1.blob.core.windows.net":"VtYP71BSCTyhUtCeGkMSmRRnAvTPVC3v4RxDy9sQEBOdtWCWF2BWp4lsEm0iSkLI1/pmfKO3GrjWVfMCjTD1MA=="})
except:
  print('Already mounted')
  
  
#mount dimension container 
try:
  dbutils.fs.mount(
    source = "wasbs://dimension@chestxraystorage1.blob.core.windows.net",
    mount_point = "/mnt/dim",
    extra_configs = {"fs.azure.account.key.chestxraystorage1.blob.core.windows.net":"VtYP71BSCTyhUtCeGkMSmRRnAvTPVC3v4RxDy9sQEBOdtWCWF2BWp4lsEm0iSkLI1/pmfKO3GrjWVfMCjTD1MA=="})
except:
  print('Already mounted')
  
  
#mount datasets container 
try:
  dbutils.fs.mount(
    source = "wasbs://datasets@chestxraystorage1.blob.core.windows.net",
    mount_point = "/mnt/datasets",
    extra_configs = {"fs.azure.account.key.chestxraystorage1.blob.core.windows.net":"VtYP71BSCTyhUtCeGkMSmRRnAvTPVC3v4RxDy9sQEBOdtWCWF2BWp4lsEm0iSkLI1/pmfKO3GrjWVfMCjTD1MA=="})
except:
  print('Already mounted')

#### 2. Transform the Patient Demography

We need to transform the output deseases into 14 output binary columns. Each column represent a desease. The output of this pyspark dataframe is a patient dimension with the Image Index (name) is the key column.

In [0]:
#clean dimension response. '

df_response = spark.read.csv("/mnt/dim/Data_Entry_2017_v2020.csv",header=True)
df_response = df_response.select(col('Image Index').alias('name'),
                        col('Finding Labels').alias('label'),
                        col('Follow-up #').alias('followUp'),
                        col('Patient ID').alias('pId'),
                        col('Patient Age').alias('Age'),
                        col('Patient Gender').alias('Gender'),
                        col('View Position').alias('Position'),
                        col('OriginalImage[Width').alias('W'),
                        col('Height]').alias('H'),
                        col('OriginalImagePixelSpacing[x').alias('x'),
                        col('y]').alias('y'))

#pass label into response variables

labels = {'y0':'Atelectasis',
          'y1':'Cardiomegaly',
          'y2':'Consolidation',
          'y3':'Edema',
          'y4':'Effusion',
          'y5':'Emphysema',
          'y6':'Fibrosis',
          'y7':'Hernia',
          'y8':'Infiltration',
          'y9':'Mass',
          'y10':'Nodule',
          'y11':'Pleural_Thickening',
          'y12':'Pneumonia',
          'y13':'Pneumothorax'} #this may be used after training

df_response = df_response.withColumn('y0',when(col('label').contains('Atelectasis'),lit(1)).otherwise(lit(0))) \
                          .withColumn('y1',when(col('label').contains('Cardiomegaly'),lit(1)).otherwise(lit(0))) \
                          .withColumn('y2',when(col('label').contains('Consolidation'),lit(1)).otherwise(lit(0))) \
                          .withColumn('y3',when(col('label').contains('Edema'),lit(1)).otherwise(lit(0))) \
                          .withColumn('y4',when(col('label').contains('Effusion'),lit(1)).otherwise(lit(0))) \
                          .withColumn('y5',when(col('label').contains('Emphysema'),lit(1)).otherwise(lit(0))) \
                          .withColumn('y6',when(col('label').contains('Fibrosis'),lit(1)).otherwise(lit(0))) \
                          .withColumn('y7',when(col('label').contains('Hernia'),lit(1)).otherwise(lit(0))) \
                          .withColumn('y8',when(col('label').contains('Infiltration'),lit(1)).otherwise(lit(0))) \
                          .withColumn('y9',when(col('label').contains('Mass'),lit(1)).otherwise(lit(0))) \
                          .withColumn('y10',when(col('label').contains('Nodule'),lit(1)).otherwise(lit(0))) \
                          .withColumn('y11',when(col('label').contains('Pleural_Thickening'),lit(1)).otherwise(lit(0))) \
                          .withColumn('y12',when(col('label').contains('Pneumonia'),lit(1)).otherwise(lit(0))) \
                          .withColumn('y13',when(col('label').contains('Pneumothorax'),lit(1)).otherwise(lit(0)))


#### 3. Preparing train, validation, and test datasets

With over 112,000 images, we found that data splitting process is crucial. For example, if we split by randomzied images, it would creating bias since the images of the same patients may appear in train, validation and test dataset (e.g. think about a patient with 3 images of Hernia positive and each may go to each dataset, it's like training and validating on the same patient). As such, we have to do random split based on patient instead of images. 

In addition, due to limitation of memory, we split our train and test dataset of patients into 20 partitions (14 for train and 6 for validation) with seed of 2510. The same thing happen for test patients with 7 partitions. 

At the end, we created a three lists that contain coresponding partitions (e.g. 14 train partions will go to Train list and 7 test partitions will go to Test list). This would help us easily loop through each partition and do image transformation.

In [0]:
filesDF = spark.read.format("csv").load('/mnt/dim/filesDF.csv',header=True)

seed=2510

df_trainvalid = spark.read.format("text").load("/mnt/dim/train_val_list.txt")
df_test = spark.read.format("text").load("/mnt/dim/test_list.txt")

df_pId = df_response.select('name','pId').distinct()
df_trainvalid = df_trainvalid.join(df_pId, df_trainvalid.value == df_pId.name, 'inner')
df_test = df_test.join(df_pId, df_test.value == df_pId.name, 'inner')

#---------------------
df_trainvalidpId = df_trainvalid.select('pId').distinct().orderBy(rand(seed))

df_trainpId1, df_trainpId2, df_trainpId3, df_trainpId4, df_trainpId5, df_trainpId6, df_trainpId7, df_trainpId8, df_trainpId9, df_trainpId10, df_trainpId11, df_trainpId12, df_trainpId13, df_trainpId14, df_validpId1, df_validpId2, df_validpId3, df_validpId4, df_validpId5, df_validpId6  = df_trainvalidpId.randomSplit([0.05,0.05,0.05,0.05,0.05,0.05,0.05,0.05,0.05,0.05,0.05,0.05,0.05,0.05,0.05, 0.05, 0.05, 0.05, 0.05, 0.05], seed=2510)

#---------------------
df_testpId = df_test.select('pId').distinct()
df_testpId = df_testpId.orderBy(rand(seed))

df_testpId1, df_testpId2, df_testpId3, df_testpId4, df_testpId5, df_testpId6, df_testpId7 = df_testpId.randomSplit([0.15,0.15,0.15,0.15,0.15,0.15,0.10], seed=2510)

#----------------------

train = [df_trainpId1, df_trainpId2, df_trainpId3, df_trainpId4, df_trainpId5, df_trainpId6, df_trainpId7, df_trainpId8, df_trainpId9, df_trainpId10, df_trainpId11, df_trainpId12, df_trainpId13, df_trainpId14]
valid = [df_validpId1, df_validpId2, df_validpId3, df_validpId4, df_validpId5, df_validpId6]
test = [df_testpId1, df_testpId2, df_testpId3, df_testpId4, df_testpId5, df_testpId6, df_testpId7]

#### 4. Define the transformation function

We use Torchvision to transform the original images (1024x1024x4) into 512x512x3 and 256x256x3 format. In addition, we also normalized the image tensor using mean and standard diviation. The output of this UDF function is a flatten numpy array.

In [0]:
def img2ArrayPIL(name):
  
  try:
    im = Image.open("/dbfs/mnt/files/" + name)
    im = im.convert(mode='RGB')
    preprocess = transforms.Compose([
      transforms.Resize((256,256)),
      transforms.ToTensor(),
      transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])

    output = preprocess(im).numpy().flatten()
  except:
    output = np.array(0)
  
  return output

img2ArrayUDF = udf(lambda z: img2ArrayPIL(z).tolist(),ArrayType(FloatType()))


####5. Define export function

The function would allow us call the transformation function and save the output into parquet files in 8 different partitions. When doing the repartition(8) method, we obversed the power of Spark. This parallel processing speed up the entire process and reduced the processing time by by at least 60%. In the past, we had struggle of exporting a 256x256x1 to csv and took around 90 minutes for 5000 images. With spark parralel processing, we can do up to 256x256x3 in less than 10 minutes for 5000 images.

In [0]:
def extract_features(df, i, dataset, originalDF):
  resultDF = originalDF.join(df, df.pId == originalDF.pId, 'inner').drop('name').drop('pId').drop('Id')
  resultDF = resultDF.join(filesDF, resultDF.value == filesDF.name, 'inner')
  resultDF = resultDF.repartition(8).withColumn('pixels',img2ArrayUDF(filesDF.name))
  dT = resultDF.select(col('name').alias('name1'),'pixels')
  dT = dT.join(df_response, dT.name1 == df_response.name, 'inner') \
          .select('name','y0','y1','y2','y3','y4','y5','y6','y7','y8','y9','y10','y11','y12','y13','pixels')
          #.withColumn('response',array('y0','y1','y2','y3','y4','y5','y6','y7','y8','y9','y10','y11','y12','y13')) \
  dT.repartition(8).write.mode("overwrite").option("header", "true").format("parquet").save('/mnt/datasets/df'+dataset+'256_ALL_batchNo'+str(i))
  

####6. Main Operation

This final for loops will go through each patient partition, transform the images and extract pixel features, then save the pyspark dataframe into storage account.

In [0]:
for i, df in enumerate(train):
  extract_features(df, i, 'train', df_trainvalid)

for i, df in enumerate(valid):
  extract_features(df, i, 'valid', df_trainvalid)
  
for i, df in enumerate(test):
  extract_features(df, i, 'test', df_test)