In [None]:
import os
import sys
import os.path as op
import PIL
from PIL import Image
from fastai.vision import *
from fastai.vision.all import *
from typing import Union, List
import pandas as pd
import shutil

import uvicorn
from fastapi import File, UploadFile, FastAPI, status
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse

from modules import (
    DRModel,
    DRClassifier,
    ImageQualityModel,
    FundusImageQualityClassifier,
    YoloCupDiscSegmentor,
    GlaucomaClassifier,
    EyeScreener,
)
from modules.utils.segmentation_utils import *

## allow loading large images
from PIL import ImageFile

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

ImageFile.LOAD_TRUNCATED_IMAGES = True

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
TEMPORARY_IMAGE_FOLDER = "temp_images"
YOLO_CUP_DISC_SEGMENTOR_TEMP_DIR = "temp_predicted_masks"

image_quality_model_path = "trained_models/image_quality_resnet18_512d_512_18ep"
dr_model_path = "trained_models/dr_se_resnext50_32x4d_224_15ep"
cup_model_path = "trained_models/yolo-cup-15ep/weights/best.pt"
disc_model_path = "trained_models/yolo-disc-15ep/weights/best.pt"

In [None]:
screener = EyeScreener(
    image_quality_model_path=image_quality_model_path,
    dr_model_path=dr_model_path,
    cup_model_path=cup_model_path,
    disc_model_path=disc_model_path,
    glaucoma_model_path=None,
    device=DEVICE,
    # Select the machine type (Nidek or Eidon).
    machine_type="Nidek",
    # Optional kwargs for the cup_disc_segmentor and their default values.
    # measure_mask_length_from_height = True, # If False, then from width.
    temp_save_dir=YOLO_CUP_DISC_SEGMENTOR_TEMP_DIR,  # Change the name of the temp dir.
    clear_temp_dir_after=False,
)

In [None]:
def predict(image):
    # Save the image to a temporary folder so that the screener can load it.
    temp_image_name = "temp_image.png"
    temp_image_path = op.join(TEMPORARY_IMAGE_FOLDER, temp_image_name)
    os.makedirs(TEMPORARY_IMAGE_FOLDER, exist_ok=True)
    image.save(temp_image_path)

    # Predict the image.
    predicted_dict = screener.predict(image_path=temp_image_path)
    # Parse the output.
    image_quality_output = {
        "Image Quality: Good": predicted_dict["image_quality"]["probability"]["good"],
        "Image Quality: Acceptable": predicted_dict["image_quality"]["probability"]["acceptable"],
        "Image Quality: Poor": predicted_dict["image_quality"]["probability"]["poor"],
    }

    return [image_quality_output]

In [None]:
df = pd.read_csv("../all_labels.csv")
df = df[["filename", "original_path", "image_quality"]]
df["original_path"] = df["original_path"].apply(lambda x: os.path.basename(x))
df.dropna(inplace=True)
df.head()

In [None]:
# os.makedirs("all_images", exist_ok=True)

# imgs = glob("../20220728-dr-dataset/*/*/*/*.jpg")

# for img in imgs:
#     shutil.copy(img, "all_images")

In [None]:
from glob import glob 

imgs = glob("all_images/*.jpg")
original_paths = list(df["original_path"])

In [None]:
from tqdm.auto import tqdm

for img in tqdm(imgs):
    img_name = os.path.basename(img)

    if img_name in original_paths:
        results = predict(Image.open(img))
        name = os.path.basename(img)
        quality = results[0]
        highest_quality = max(quality, key=quality.get)
        quality_type = highest_quality.split(': ')[1].upper()

        df.loc[df["original_path"] == name, "pred_image_quality"] = quality_type

df.dropna(inplace=True)
df.to_csv("pred_img_quality.csv", index=False)

In [40]:
dif = [df["image_quality"] == df["pred_image_quality"]]

In [43]:
new_df = df[df["image_quality"] != df["pred_image_quality"]]

In [47]:
os.makedirs("dif_quality_pred_images", exist_ok=True)

for img in new_df["original_path"]:
    shutil.copy(f"all_images/{img}", f"dif_quality_pred_images/{img}")