In [1]:
# Required Imports
import numpy as np
import cv2
from pyspark.sql import Row
from pyspark.sql import SparkSession

# Start Spark Session if not already running
wsl_ip = "172.27.235.9"
spark = SparkSession.builder \
    .appName("Pneumonia X-ray Preprocessing") \
    .master("local[*]") \
    .config("spark.hadoop.fs.defaultFS", f"hdfs://{wsl_ip}:9000") \
    .config("spark.driver.host", "127.0.0.1") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .config("spark.sql.shuffle.partitions", "2") \
    .getOrCreate()

25/05/07 19:04:18 WARN Utils: Your hostname, DESKTOP-EK3V8PF resolves to a loopback address: 127.0.1.1; using 172.27.235.9 instead (on interface eth0)
25/05/07 19:04:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/07 19:04:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
from tqdm import tqdm
import os

os.makedirs("preprocessed_npy", exist_ok=True)
all_images, all_labels = [], []

# Preprocessing function
def process_image(path_content):
    path, content = path_content
    try:
        np_img = np.frombuffer(content, dtype=np.uint8)
        img = cv2.imdecode(np_img, cv2.IMREAD_GRAYSCALE)
        if img is None:
            return None
        img_resized = cv2.resize(img, (224, 224))
        img_normalized = img_resized.astype(np.float32) / 255.0
        label = path.split("/")[-2]
        return Row(path=path, label=label, image=img_normalized.tolist())
    except Exception as e:
        print(f"[SKIPPED] {path} due to {e}")
        return None

# Load and process images
binary_rdd = sc.binaryFiles(f"hdfs://{wsl_ip}:9000/data/pneumonia_dataset/*/*.jpg")
processed_rdd = binary_rdd.map(process_image).filter(lambda x: x is not None)
image_df = spark.createDataFrame(processed_rdd)

iterator = image_df.select("image", "label").toLocalIterator()

for row in tqdm(iterator, desc="Collecting all data"):
    all_images.append(np.array(row["image"], dtype=np.float32))
    all_labels.append(row["label"])

# Step 2: Encode labels globally
from sklearn.preprocessing import LabelEncoder
label_encoder = LabelEncoder()
labels_encoded = label_encoder.fit_transform(all_labels)

# Optional: save classes
np.save("preprocessed_npy/label_classes.npy", label_encoder.classes_)

# Step 3: Save in chunks
chunk_size = 1000
for i in range(0, len(all_images), chunk_size):
    chunk_imgs = np.stack(all_images[i:i+chunk_size])
    chunk_labels = labels_encoded[i:i+chunk_size]
    np.save(f"preprocessed_npy/images_chunk{i//chunk_size}.npy", chunk_imgs)
    np.save(f"preprocessed_npy/labels_chunk{i//chunk_size}.npy", chunk_labels)

print(f"✅ Saved {len(all_images) // chunk_size + 1} chunks.")


Collecting all data: 9208it [02:00, 76.53it/s]                      (0 + 1) / 1]


✅ Saved 10 chunks.


In [6]:
spark.stop()