In [37]:
import logging
from luigi import Task
from luigi.parameter import BoolParameter, IntParameter
from luigi.task import ExternalTask
import luigi
from csci_utils.luigi.dask.target import CSVTarget
from csci_utils.luigi.dask.target import ParquetTarget
from csci_utils.luigi.task import Requirement
from csci_utils.luigi.task import Requires
from csci_utils.luigi.task import TargetOutput
from luigi.contrib.s3 import S3Target
import pandas as pd
import pandas as pd
import numpy as np
from dask import dataframe as dd
from sklearn.metrics.pairwise import cosine_similarity, nan_euclidean_distances
from sklearn.preprocessing import LabelEncoder, normalize
import dask.array as da

In [30]:
def encode_objects_general(ddf, object_cols):
    LE = LabelEncoder()
    for object_col in object_cols:
        ddf[object_col] = da.from_array(
            LE.fit_transform(ddf[object_col].astype(str)))
    return ddf

def normalize_general(ddf,columns):
    result = ddf.copy()
    for feature_name in columns:
        max_value = ddf[feature_name].max()
        min_value = ddf[feature_name].min()
        result[feature_name] = 2*(ddf[feature_name] - min_value) / (max_value - min_value) - 1
    return result

def normalize_chex(ddf, object_cols):
    ddf = normalize_general(ddf,object_cols)
    ddf =  normalize_general(ddf,['Age'])
    return ddf

In [2]:
import os

In [3]:
import pandas as pd

In [4]:
import dask.dataframe as dd

In [5]:
class ChexpertDataframe(ExternalTask):

    s3_path = 's3://radio-star-csci-e-29/unzipped/'

    output = TargetOutput(
        file_pattern="",
        ext="train.csv",
        target_class=S3Target,
        path=s3_path
    )

In [6]:
os.path.relpath(('C:\\Users\\wmj\\PycharmProjects\\radio-star\\models\\Tasks\\', 'C:\\Users\\wmj\\PycharmProjects\\radio-star\\data\\processed\\')[-1])

'..\\data\\processed'

In [20]:
class ProcessChexpertDfToParquet(Task):
    requires = Requires()
    chexpertdf = Requirement(ChexpertDataframe)

    output = TargetOutput(
        target_class=ParquetTarget,
        path="../data/processed/",
        ext="",
        flag=False,
        storage_options=dict(requester_pays=True),
    )

    def run(self):
        pathCSV = self.input()["chexpertdf"].path
        ddf = dd.read_csv(pathCSV)
        self.output().write_dask(ddf, compression="gzip")


In [31]:
class NormalizeDF(Task):
    """The Dataframe is best normalized before similarity calculations are
    run on it."""


    requires = Requires()
    proc_chexpertdf = Requirement(ProcessChexpertDfToParquet)

    output = TargetOutput(
        target_class=ParquetTarget,
        path="../data/processed/",
        ext="",
        flag=False,
        storage_options=dict(requester_pays=True),
    )

    def run(self):
        ddf = self.input()["proc_chexpertdf"].read_dask()
        ddf_raw = ddf.copy()
        ddf = ddf.drop(columns=['Path'])
        object_cols = ddf.dtypes[(ddf.dtypes == object)].index.values

        ddf = encode_objects_general(ddf, object_cols)

        ddf = normalize_chex(ddf, object_cols)

        self.output().write_dask(ddf, compression="gzip")

In [59]:
class FindSimilar(Task):
    """The Dataframe is best normalized before similarity calculations are
    run on it."""

    requires = Requires()
    proc_chexpertdf = Requirement(ProcessChexpertDfToParquet)
    normalize_df = Requirement(NormalizeDF)
    comparator_index = IntParameter(default = 37959)
    n_images = IntParameter(default = 5)

    output = TargetOutput(
        target_class=ParquetTarget,
        path="../data/processed/",
        ext="",
        flag=False,
        storage_options=dict(requester_pays=True),
    )

    def run(self):
        ddf = self.input()["normalize_df"].read_dask()
        ddf_raw = self.input()["proc_chexpertdf"].read_dask()
        
        object_cols = ddf.dtypes[(ddf.dtypes == object)].index.values

        row_comparator_raw = ddf.loc[self.comparator_index]

        # This compensate for a bug in dask row equality calculations
        row_comparator_na = row_comparator_raw.isna().compute().iloc[0]

        similar_features_idx = (ddf.isna() == row_comparator_na).sum(
            1).compute().nlargest(n=100).index

        argsorted = nan_euclidean_distances(
            row_comparator_raw.compute().values.reshape(1, -1),
            ddf.loc[similar_features_idx.to_list()].compute().values).argsort()
        
        top_n = similar_features_idx[argsorted][0][self.n_images]
        
        top_n_close_images = ddf_raw.loc[top_n]

        self.output().write_dask(top_n_close_images, compression="gzip")
        

In [None]:
FindSimilar().run()

In [43]:
index_of_interest = 37959

In [55]:
ddf = FindSimilar().output().read_dask()

INFO:root:self.target_kwargs['path'] is ../data/processed/FindSimilar
INFO:root:BaseDaskTarget path is ../data/processed/FindSimilar/


unadjusted path is:  ../data/processed/FindSimilar
read_parquet_path is:  ../data/processed/FindSimilar\*.parquet


In [56]:
ddf.compute()

Unnamed: 0,Sex,Age,Frontal/Lateral,AP/PA,No Finding,Enlarged Cardiomediastinum,Cardiomegaly,Lung Opacity,Lung Lesion,Edema,Consolidation,Pneumonia,Atelectasis,Pneumothorax,Pleural Effusion,Pleural Other,Fracture,Support Devices
0,-1.0,0.511111,-1.0,-1.0,1.0,,,,,,,,,0.0,,,,1.0
1,-1.0,0.933333,-1.0,-1.0,,,-1.0,1.0,,-1.0,-1.0,,-1.0,,-1.0,,1.0,
2,-1.0,0.844444,-1.0,-1.0,,,,1.0,,,-1.0,,,,,,1.0,
3,-1.0,0.844444,1.0,0.0,,,,1.0,,,-1.0,,,,,,1.0,
4,0.0,-0.088889,-1.0,-1.0,,,,,,1.0,,,,0.0,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
223409,0.0,0.311111,-1.0,-1.0,,,,-1.0,,,,,-1.0,0.0,1.0,,,
223410,0.0,0.311111,-1.0,-1.0,,,,-1.0,,,,0.0,-1.0,,-1.0,,,
223411,-1.0,-1.000000,-1.0,-1.0,,,,,,-1.0,,,,,,,,
223412,-1.0,-1.000000,-1.0,-1.0,,,1.0,1.0,,,,-1.0,1.0,0.0,,,,0.0
