# Medical Image - TFM
<h4>subtitle: Generación de una tubería distribuida para la extracción de características en imágenes médicas patológicas</h4>
license: European Union Public Licence (EUPL) v1.2

<table>
  <tr> <td> author name: </td> <td> Israel Llorens </td> </tr>
  <tr> <td> email: </td> <td> sanchezis@hotmail.com </td> </tr>
</table>

<h7>date: 2025/03/23</h7>

---

In [1]:
%load_ext autoreload
%autoreload

In [13]:
# Copyright (c) 2024 Israel Llorens
# Licensed under the EUPL-1.2  

__author__ = "Israel Llorens <sanchezis@hotmail.com>"
__copyright__ = "Copyright 2024, Israel Llorens"
__license__ = "EUPL-1.2"

import logging
import os

import pandas as pd
import glob
import pyspark.sql.functions as F
import pyspark.sql.types as T

from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

from tiatoolbox import logger
from tiatoolbox.models.architecture.unet import UNetModel
from tiatoolbox.models.engine.semantic_segmentor import (
    IOSegmentorConfig,
    SemanticSegmentor,
)
from tiatoolbox.utils.misc import download_data, imread
from tiatoolbox.utils.visualization import overlay_prediction_mask
from tiatoolbox.wsicore.wsireader import WSIReader

try:
    import pyspark
    from pyspark.sql.functions import col, isnan, when, count,to_date,year,month,expr,hour,dayofweek,lower,array_remove,collect_list,lit
    from pyspark.sql.functions import pandas_udf,split
    from pyspark.sql.types import ArrayType, DoubleType, StringType
    from pyspark.sql.types import StructField,StructType,StringType,DoubleType,FloatType,IntegerType, LongType
    import pyspark.sql.functions as F
except:
    pass


# Clear logger to use tiatoolbox.logger
import logging
import warnings

if logging.getLogger().hasHandlers():
    logging.getLogger().handlers.clear()

import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import torch
from matplotlib import cm

from tiatoolbox import logger
from tiatoolbox.models.architecture.unet import UNetModel
from tiatoolbox.models.engine.semantic_segmentor import (
    IOSegmentorConfig,
    SemanticSegmentor,
)
from tiatoolbox.utils.misc import download_data, imread
from tiatoolbox.utils.visualization import overlay_prediction_mask
from tiatoolbox.wsicore.wsireader import WSIReader

from urllib import request
import certifi
import ssl
import os

import numpy as np
import histomicstk as htk
import skimage
import scipy as sp

import init
from digital_pathology.spark import spark
from digital_pathology.process.model_selection import ResNetModel


BINARY_FILES_SCHEMA = StructType(
    [
        # StructField("tile_prediction",  BinaryType()),
        StructField("bins", StringType()),
        StructField("out", StringType()),
        # StructField("content", ArrayType(BinaryType())),
    ]
)    



# model_file_name = os.path.join('/Users/illorens/Projects/source',  "tissue_mask_model.pth")
model_file_name = os.path.join('..', 'data',  "tissue_mask_model.pth")

spark.sparkContext.addFile(model_file_name)
# spark.sparkContext.addFile(model_file_name[:-4]+'.lock')

images_path = os.path.join('data', 'patient_extracts')

logging.info(f"******** MODEL: {model_file_name}   -----  {os.path.exists(model_file_name)}")

if not os.path.exists(model_file_name):
    download_data(
        "https://tiatoolbox.dcs.warwick.ac.uk//models/seg/fcn-tissue_mask.pth",
        model_file_name,
        overwrite=True
    )




@F.udf(returnType = BINARY_FILES_SCHEMA)
def extract_tumor(path, img_name, img_path, DEBUG=True):
    import logging
    import os
    import numpy as np
    from tiatoolbox.models.engine.semantic_segmentor import SemanticSegmentor

    context = ssl.create_default_context(cafile=certifi.where())
    https_handler = request.HTTPSHandler(context=context)
    opener = request.build_opener(https_handler)
    request.install_opener(opener)

    # Disable logging to avoid issues
    import logging
    logger = logging.getLogger()
    logger.setLevel(logging.WARNING)
    
    label_names_dict = {
        0: "Tumour",
        1: "Stroma",
        2: "Inflamatory",
        3: "Necrosis",
        4: "Others",
    }
    
    logging.warning(f"******** MODEL: {model_file_name}   -----  {os.path.exists(model_file_name)}")
    logging.warning(f"******** PATH: {path}   -----  {os.path.exists(path)}")
    logging.warning(f"******** NAME: {img_name}   ")
    logging.warning(f"******** IMG_PATH: {img_path}   -----  {os.path.exists(img_path)}")
    
    # Tile prediction
    
    out_location = os.path.join(img_path, f"sample_tile_results/{img_name}")
    logging.warning(out_location)
    
    import shutil
    #os.rmdir(f"sample_tile_results/{img[2]}")
    try:
        shutil.rmtree(out_location)
    except:
        pass
    
    
    bcc_segmentor = SemanticSegmentor(
        pretrained_model= "fcn_resnet50_unet-bcss", # Ensure this path is worker-accessible
        num_loader_workers=0,    # Avoid Multiprocessing in UDF CRUCIAL: Disable multiprocessing
        batch_size=4,
    )
    
    import tempfile
    with tempfile.TemporaryDirectory() as tmp_dirname:
        shutil.rmtree(tmp_dirname)

        output = bcc_segmentor.predict(
            [path],
            save_dir=tmp_dirname,
            mode="tile",
            resolution=1.0,
            units="baseline",
            patch_input_shape=[1024, 1024],
            patch_output_shape=[512, 512],
            stride_shape=[512, 512],
            # on_gpu=False,
            crash_on_exception=False,
        )
    
    tile_prediction_raw = output
    
    if len(tile_prediction_raw)>0:
    
        tile_prediction = np.argmax(
            tile_prediction_raw,
            axis=-1,
        ) 
        bins = np.bincount(tile_prediction.flatten())
        out = str( list( zip (label_names_dict.values(),  np.round( bins / np.sum(bins) * 100, 4)  ) ) )
        tile = imread(path)

        return [ 
                # Image.fromarray(tile_prediction), 
                str(bins),
                out,
                # [tile,] 
                ] 
    return [ 
                # Image.fromarray(tile_prediction), 
                None,
                None,
                # [tile,] 
                ] 


##################################### OUT

# Check installation across cluster
downloaded = spark.read.parquet(os.path.join('..', 'data', '1-download.parquet'))

downloaded.show(truncate=False)

result = downloaded\
                .select(
                        extract_tumor( 
                                    F.lit(images_path),
                                    F.col('filename') ,
                                    F.lit(images_path)
                                    ) 
                ) 

# downloaded.write.mode('overwrite').parquet(os.path.join('data', '3-nucleotids.parquet'))
result.limit(20).write.mode('overwrite').parquet(os.path.join('data', '4-nucleotids.parquet'))



25/03/24 12:38:34 WARN SparkContext: The path ../data/tissue_mask_model.pth has been added already. Overwriting of added paths is not supported in the current version.
INFO:root:******** MODEL: ../data/tissue_mask_model.pth   -----  True
25/03/24 12:38:34 INFO InMemoryFileIndex: It took 14 ms to list leaf files for 1 paths.
25/03/24 12:38:34 INFO SparkContext: Starting job: parquet at NativeMethodAccessorImpl.java:0
25/03/24 12:38:34 INFO DAGScheduler: Got job 30 (parquet at NativeMethodAccessorImpl.java:0) with 1 output partitions
25/03/24 12:38:34 INFO DAGScheduler: Final stage: ResultStage 30 (parquet at NativeMethodAccessorImpl.java:0)
25/03/24 12:38:34 INFO DAGScheduler: Parents of final stage: List()
25/03/24 12:38:34 INFO DAGScheduler: Missing parents: List()
25/03/24 12:38:34 INFO DAGScheduler: Submitting ResultStage 30 (MapPartitionsRDD[131] at parquet at NativeMethodAccessorImpl.java:0), which has no missing parents
25/03/24 12:38:34 INFO MemoryStore: Block broadcast_50 store

+-----------------------+------+---------------------------------------------------------------+----------------------+-----------+
|modificationTime       |length|filename                                                       |patient_key           |patient_id |
+-----------------------+------+---------------------------------------------------------------+----------------------+-----------+
|2025-03-23 03:14:40.053|131610|patient_008_node_0.tif_tile_108_x54272_y89088_score20020.7.png |patient_008_node_0.tif|patient_008|
|2025-03-23 02:12:16.767|128753|patient_004_node_1.tif_tile_218_x92160_y117760_score20655.0.png|patient_004_node_1.tif|patient_004|
|2025-03-23 02:00:03.117|128735|patient_000_node_3.tif_tile_98_x36864_y70656_score20863.6.png  |patient_000_node_3.tif|patient_000|
|2025-03-23 02:26:25.706|127061|patient_004_node_2.tif_tile_158_x32768_y108544_score24235.5.png|patient_004_node_2.tif|patient_004|
|2025-03-23 03:47:09.81 |125027|patient_010_node_4.tif_tile_112_x76800_y1044

  check_for_updates()

  check_for_updates()

  check_for_updates()

  check_for_updates()

  check_for_updates()

  check_for_updates()

  saved_state_dict = torch.load(pretrained_weights, map_location="cpu")

  saved_state_dict = torch.load(pretrained_weights, map_location="cpu")

  saved_state_dict = torch.load(pretrained_weights, map_location="cpu")

  saved_state_dict = torch.load(pretrained_weights, map_location="cpu")

  saved_state_dict = torch.load(pretrained_weights, map_location="cpu")

  saved_state_dict = torch.load(pretrained_weights, map_location="cpu")

|2025-03-24|12:38:43.336| [ERROR] Crashed on /private/var/folders/4l/tlk6snvn7h74kq99qnbyg3ch0000gn/T/tmpwhxgk80e/0
Traceback (most recent call last):
  File "/Users/illorens/Projects/source/Universidad/viu-MU-BD-and-DS-Image-Pathology/.venv/lib/python3.11/site-packages/tiatoolbox/models/engine/semantic_segmentor.py", line 1230, in _predict_wsi_handle_exception
    self._predict_one_wsi(wsi_idx, ioconfig, str(wsi_save_pa