In [0]:
spark.conf.set(
    ="
)


In [0]:
# If scikit-image is not installed, uncomment and run once:
#%pip install scikit-image

import time
import numpy as np
import pandas as pd

from pyspark.sql import Row
import pyspark.sql.functions as F

from skimage import io
from skimage.filters import gaussian, sobel, prewitt, hessian
from skimage.filters.rank import entropy as entropy_filter
from skimage.morphology import disk
from skimage.filters import gabor

# Correct NEW names (skimage >= 0.20)
from skimage.feature import graycomatrix, graycoprops


In [0]:
bronze_path = "abfss://lakehouse@lab5mri.dfs.core.windows.net/bronze/tumor_images_raw/"

bronze_df = (
    spark.read.parquet(bronze_path)
    .select("image_id", "label", "image_path")
)

bronze_df.show(5)
rows = bronze_df.collect()
print(f"Total images found in Bronze: {len(rows)}")


+----------+-----+--------------------+
|  image_id|label|          image_path|
+----------+-----+--------------------+
| 34 no.jpg|    0|abfss://lakehouse...|
|no 100.jpg|    0|abfss://lakehouse...|
| 30 no.jpg|    0|abfss://lakehouse...|
| 23 no.jpg|    0|abfss://lakehouse...|
| 38 no.jpg|    0|abfss://lakehouse...|
+----------+-----+--------------------+
only showing top 5 rows
Total images found in Bronze: 253


In [0]:
from skimage.feature import graycomatrix, graycoprops

ANGLES = [0, np.pi/4, np.pi/2, 3*np.pi/4]
DISTANCES = [1]
GLCM_PROPS = ["contrast", "dissimilarity", "homogeneity", "ASM", "energy", "correlation"]

def compute_glcm_features(image, prefix):
    """
    Computes GLCM properties for a given image (2D array) using multiple angles.
    Returns a dictionary of mean & std for each property.
    """
    arr = np.asarray(image, dtype=np.float32)

    # normalize 0-1
    arr = arr - arr.min()
    if arr.max() > 0:
        arr = arr / arr.max()

    # convert back to uint8
    arr_uint8 = (arr * 255).astype(np.uint8)

    # compute GLCM
    glcm = graycomatrix(
        arr_uint8,
        distances=DISTANCES,
        angles=ANGLES,
        levels=256,
        symmetric=True,
        normed=True
    )

    feats = {}
    for prop in GLCM_PROPS:
        vals = graycoprops(glcm, prop)[0]  # one value per angle
        feats[f"{prefix}_{prop}_mean"] = float(vals.mean())
        feats[f"{prefix}_{prop}_std"] = float(vals.std())
    return feats


In [0]:
from io import BytesIO
import imageio

def process_one_row(row):
    image_id = row["image_id"]
    label = row["label"]
    path = row["image_path"]

    try:
        # Load image bytes via Spark
        img_bytes = (
            spark.read.format("binaryFile")
            .load(path)
            .select("content")
            .first()
            .content
        )

        # Convert bytes -> BytesIO -> grayscale float image
        img = imageio.v2.imread(BytesIO(img_bytes), mode='F')  # ← FIXED

        # Normalize for safety (0–1)
        img = img.astype(np.float32)
        img = (img - img.min()) / (img.max() - img.min() + 1e-8)

        # ---- filters ----
        ent = entropy_filter((img * 255).astype(np.uint8), disk(3))
        gauss = gaussian(img, sigma=1)
        sob = sobel(img)
        prew = prewitt(img)
        hess = hessian(img, mode="reflect")
        gabor_real, _ = gabor(img, frequency=0.2)

        # ---- GLCM features ----
        features = {
            "image_id": image_id,
            "label": int(label)
        }

        features.update(compute_glcm_features(img, "orig"))
        features.update(compute_glcm_features(ent, "entropy"))
        features.update(compute_glcm_features(gauss, "gaussian"))
        features.update(compute_glcm_features(sob, "sobel"))
        features.update(compute_glcm_features(prew, "prewitt"))
        features.update(compute_glcm_features(hess, "hessian"))
        features.update(compute_glcm_features(gabor_real, "gabor"))

        return features

    except Exception as e:
        print(f"❌ ERROR processing {image_id}")
        print(f"Reason: {e}")
        return None


In [0]:
import time

start_time = time.time()

results = []

print("Starting safe feature extraction...\n")

for i, row in enumerate(rows):
    print(f"Processing image {i+1}/{len(rows)}: {row['image_id']}")
    feat = process_one_row(row)
    if feat is not None:
        results.append(feat)

end_time = time.time()
extraction_time_seconds = end_time - start_time

print("\n====================")
print("SAFE EXTRACTION DONE")
print("====================")
print(f"Images processed successfully: {len(results)}")
print(f"Total extraction time: {extraction_time_seconds:.2f} seconds")


Starting safe feature extraction...

Processing image 1/253: 34 no.jpg
Processing image 2/253: no 100.jpg
Processing image 3/253: 30 no.jpg
Processing image 4/253: 23 no.jpg
Processing image 5/253: 38 no.jpg
Processing image 6/253: 8 no.jpg
Processing image 7/253: 22 no.jpg
Processing image 8/253: 31 no.jpg
Processing image 9/253: 39 no.jpg
Processing image 10/253: 15 no.jpg
Processing image 11/253: 35 no.jpg
Processing image 12/253: 32 no.jpg
Processing image 13/253: 18 no.jpg
Processing image 14/253: 19 no.jpg
Processing image 15/253: 40 no.jpg
Processing image 16/253: 41 no.jpg
Processing image 17/253: 42 no.jpg
Processing image 18/253: 47 no.jpg
Processing image 19/253: 28 no.jpg
Processing image 20/253: 14 no.jpg
Processing image 21/253: no 1.jpg
Processing image 22/253: no 923.jpg
Processing image 23/253: no 6.jpg
Processing image 24/253: 25 no.jpg
Processing image 25/253: 5 no.jpg
Processing image 26/253: 29 no.jpg
Processing image 27/253: 45 no.jpg
Processing image 28/253: N19.

In [0]:
# Convert results to pandas DF
import pandas as pd

silver_pdf = pd.DataFrame(results)
print("Silver dataframe shape:", silver_pdf.shape)

# Convert to Spark DataFrame
silver_df = spark.createDataFrame(silver_pdf)

# Save path
silver_path = "abfss://lakehouse@lab5mri.dfs.core.windows.net/silver/tumor_features/"

silver_df.write.mode("overwrite").parquet(silver_path)

print("Silver layer saved to:", silver_path)


Silver dataframe shape: (253, 86)
Silver layer saved to: abfss://lakehouse@lab5mri.dfs.core.windows.net/silver/tumor_features/


In [0]:
num_images = len(results)
num_features = len(silver_df.columns) - 2   # minus image_id + label

print("=== METRICS ===")
print("Images:", num_images)
print("Features per image:", num_features)
print("Total extraction time (sec):", extraction_time_seconds)

# Detect compute SKU
import os
compute_sku = os.environ.get("AZUREML_COMPUTE", "databricks_cluster")
print("Compute SKU:", compute_sku)


=== METRICS ===
Images: 253
Features per image: 84
Total extraction time (sec): 662.25750041008
Compute SKU: databricks_cluster
