##### Script to build the experimental dataset

In [453]:
import pandas as pd
from pathlib import Path
import shutil
import numpy as np

##### Dataset Builder Class
##### - Filters the metadata from collected datasets
##### - Transfers images to the correct locations

In [513]:
class DatasetBuilder():
    def __init__(self, experimental, experimental_covid, experimental_noncovid):
        # Map for the accepted findings
        self.findings = {"COVID-19": "COVID-19",
                         "Pneumonia/Viral/COVID-19": "COVID-19",
                         "Normal": "NON-COVID",
                         "No Finding": "NON-COVID"}

        # Dataframe to store new metadata
        self.columns = ["patientid", "sex", "age", "finding", "modality", "filename"]
        self.df = pd.DataFrame(columns=self.columns)

        # Save locations for experimental images ie. COVID / NON-COVID
        self.exp = experimental
        self.exp_cv = experimental_covid
        self.exp_ncv = experimental_noncovid

    # Private method to update the main dataframe
    def __UpdateDF(self, *row):
        self.df.loc[len(self.df)] = row

    def Filter(self, location):
        df = pd.read_csv(location.joinpath("metadata.csv"))
        print(f"{'-'*50}\nFiltering {location.name}\nOld Shape: {df.shape}")

        # Normalise column names
        df.columns = df.columns.str.replace(" ", "").str.lower()

        # Filter rows
        rowFilter = {"verifiedfinding": "Yes", "folder": "images"}
        df = [df[df[c] == v] for i, (c, v) in enumerate(rowFilter.items()) if c in df.columns][0]

        # Drop all the columns that aren't needed
        df.drop(columns=[col for col in df if col not in self.columns], inplace=True)
        df["finding"] = df["finding"].map(self.findings)

        df["age"].fillna("Unknown", inplace=True)
        df["sex"].fillna("Unknown", inplace=True)

        df.dropna(inplace=True)
        df.drop_duplicates(subset=["patientid"], inplace=True)
        df["savedir"] = np.where(df["finding"] == "COVID-19", self.exp_cv, self.exp_ncv)

        print(f"New Shape: {df.shape}\nFiltering Complete\n{'-'*50}\n")
        return df, location

    def MoveImages(self, filtered):
        def Copy(_source, _target):
            if not _target.exists():
                shutil.copy(_source, _target)

        df = filtered[0]
        location = filtered[1]
        name = location.name
        print(f"{'-'*50}\nTransferring images from {name}")

        if df is not None:
            image_directory = location.joinpath("images")

            for index, row in df.iterrows():
                if name == "COVIDx-CT":
                    images = [x for x in image_directory.glob(f"*{row['patientid']}_*")]
                    for image in images:
                        target = row["savedir"].joinpath(image.name)
                        self.__UpdateDF(row["patientid"], row["sex"], row["age"], row["finding"], row["modality"], target)
                        Copy(image, target)
                else:
                    df.loc[index, "patientid"] =  f"PID_{row['patientid']}"
                    source = image_directory.joinpath(row["filename"])
                    target = row["savedir"].joinpath(f"{row['patientid']}_{row['filename']}")
                    Copy(source, target)
                    self.__UpdateDF(row["patientid"], row["sex"], row["age"], row["finding"], row["modality"], target)
        else:
            # Take NORMAL patients
            # A patients unique ID is stored within the image stem as IM-0XXX eg. IM-0115-0001 (12 chars)
            # Patients can have more than one CXR image and the stem will reflect this as such: IM-0429-0001-000X (17 chars)
            # For simplicity, patients with more than one image are removed
            images = [image for image in location.glob("*") if len(image.stem) == 12]
            for image in images:
                target = self.exp_ncv.joinpath(image.name)
                Copy(image, target)
                self.__UpdateDF(image.stem[:-5], "Unknown", "Unknown", "NON-COVID", "X-ray", target)

        print(f"Transfer Complete\n{'-'*50}")
        self.__SaveDF()

    def __SaveDF(self):
        # Remove the old metadata and create a newer version
        file = self.exp.joinpath("metadata.csv")
        if file.exists():
            file.unlink()
        file.touch()
        self.df.to_csv(file, index=False)

In [514]:
# Project root
ROOT = Path(".").resolve().parent

# Dataset Directories
DATASETS = Path("F:/Datasets/")
CV19_ICD = DATASETS.joinpath("COVID Image Collection Data")
CV19_xCT = DATASETS.joinpath("COVIDx-CT")
NORMAL = DATASETS.joinpath("Chest X-ray Pneumonia/train/NORMAL")

# Experimental Dataset
EXP_DATASET = ROOT.joinpath("dataset")
EXP_CV = EXP_DATASET.joinpath("COVID-19")
EXP_NONCV = EXP_DATASET.joinpath("NON-COVID")

In [515]:
builder = DatasetBuilder(EXP_DATASET, EXP_CV, EXP_NONCV)

In [516]:
builder.MoveImages(builder.Filter(CV19_ICD))
builder.MoveImages(builder.Filter(CV19_xCT))
builder.MoveImages([None, NORMAL])

--------------------------------------------------
Filtering COVID Image Collection Data
Old Shape: (950, 30)
New Shape: (308, 7)
Filtering Complete
--------------------------------------------------

--------------------------------------------------
Transferring images from COVID Image Collection Data
Transfer Complete
--------------------------------------------------
--------------------------------------------------
Filtering COVIDx-CT
Old Shape: (3792, 10)
New Shape: (2201, 6)
Filtering Complete
--------------------------------------------------

--------------------------------------------------
Transferring images from COVIDx-CT
Transfer Complete
--------------------------------------------------
--------------------------------------------------
Transferring images from NORMAL
Transfer Complete
--------------------------------------------------
