In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
import os
import boto3
from datasets import load_dataset
import torchvision.transforms as T
import torchvision.transforms.functional as F
from PIL import Image
import io
from transformers import CLIPTokenizerFast
from torchvision.transforms import InterpolationMode


# Initialize Spark context
sc = SparkContext(appName="PySpark Image Processing with RDDs")
# Initialize Spark session
spark = SparkSession.builder \
    .appName("PySpark Image Processing with RDDs and DataFrames") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/23 09:30:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
img_dataset = load_dataset("imagefolder", data_dir="raw_data/harvard/paintings/", split="train")
img_dataset

Resolving data files:   0%|          | 0/94 [00:00<?, ?it/s]

Dataset({
    features: ['image', 'caption'],
    num_rows: 93
})

In [3]:
rdd = sc.parallelize(list(zip(img_dataset['image'], img_dataset['caption'])))

In [4]:
def resize_and_pad_then_resize(img, final_size=(512, 512), padding_mode='constant', fill=0):
    """
    Resize an image to make its longest side equal to the original image's longest side,
    pad the shorter side to make the image a square, then resize to final_size.

    Args:
        img (PIL.Image): The image to resize and pad.
        final_size (tuple): The desired output size (height, width).
        padding_mode (str): Type of padding. Options include 'constant', 'edge', etc.
        fill (int, tuple): Pixel fill value for constant padding. Can be int or tuple.

    Returns:
        PIL.Image: The resized and padded, then resized image.
    """
    original_width, original_height = img.size
    max_side = max(original_width, original_height)

    # Determine new size keeping aspect ratio
    if original_width > original_height:
        scale = max_side / original_width
        new_width = max_side
        new_height = int(original_height * scale)
    else:
        scale = max_side / original_height
        new_height = max_side
        new_width = int(original_width * scale)

    # Resize the image to max_side to keep aspect ratio
    img = F.resize(img, (new_height, new_width), interpolation=InterpolationMode.LANCZOS)

    # Calculate padding amounts
    pad_width = (max_side - new_width) // 2
    pad_height = (max_side - new_height) // 2

    # Apply padding to make it a square
    img = F.pad(img, [pad_width, pad_height, pad_width, pad_height], padding_mode=padding_mode, fill=fill)

    # Final resize to the desired output size
    img = F.resize(img, final_size, interpolation=InterpolationMode.LANCZOS)
    return img


def preprocess_image(image, caption):
    # Setup the transformation pipeline with the updated function
    transform_pipeline = T.Compose([
        T.Lambda(lambda img: resize_and_pad_then_resize(img, final_size=(512, 512), padding_mode='constant', fill=0)),
        T.ToTensor(),
        T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    processed_image = transform_pipeline(image)

    # Tokenization
    tokenizer = CLIPTokenizerFast.from_pretrained("openai/clip-vit-large-patch14")
    tokens = tokenizer.encode(caption, max_length=77, truncation=True, return_tensors="pt").tolist()[0]

    return (processed_image, tokens)

In [5]:
processed_rdd = rdd.map(lambda x: preprocess_image(x[0], x[1]))

In [6]:
processed_rdd.take(1)

24/04/23 09:30:37 WARN TaskSetManager: Stage 0 contains a task of very large size (4964 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

[(tensor([[[-2.1179, -2.1179, -2.1179,  ..., -2.1179, -2.1179, -2.1179],
           [-2.1179, -2.1179, -2.1179,  ..., -2.1179, -2.1179, -2.1179],
           [-2.1179, -2.1179, -2.1179,  ..., -2.1179, -2.1179, -2.1179],
           ...,
           [-2.1179, -2.1179, -2.1179,  ..., -2.1179, -2.1179, -2.1179],
           [-2.1179, -2.1179, -2.1179,  ..., -2.1179, -2.1179, -2.1179],
           [-2.1179, -2.1179, -2.1179,  ..., -2.1179, -2.1179, -2.1179]],
  
          [[-2.0357, -2.0357, -2.0357,  ..., -2.0357, -2.0357, -2.0357],
           [-2.0357, -2.0357, -2.0357,  ..., -2.0357, -2.0357, -2.0357],
           [-2.0357, -2.0357, -2.0357,  ..., -2.0357, -2.0357, -2.0357],
           ...,
           [-2.0357, -2.0357, -2.0357,  ..., -2.0357, -2.0357, -2.0357],
           [-2.0357, -2.0357, -2.0357,  ..., -2.0357, -2.0357, -2.0357],
           [-2.0357, -2.0357, -2.0357,  ..., -2.0357, -2.0357, -2.0357]],
  
          [[-1.8044, -1.8044, -1.8044,  ..., -1.8044, -1.8044, -1.8044],
           

In [7]:
train_set = processed_rdd.collect()

24/04/23 09:30:50 WARN TaskSetManager: Stage 1 contains a task of very large size (4964 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

24/04/22 21:28:13 WARN TaskSetManager: Stage 4 contains a task of very large size (4964 KiB). The maximum recommended task size is 1000 KiB.
24/04/22 21:28:18 WARN TaskSetManager: Stage 5 contains a task of very large size (6396 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+---------------+--------------------+
|processed_image|              tokens|
+---------------+--------------------+
|             {}|[49406, 4271, 308...|
|             {}|[49406, 4271, 308...|
|             {}|[49406, 4271, 308...|
|             {}|[49406, 4271, 308...|
|             {}|[49406, 4271, 308...|
|             {}|[49406, 4271, 308...|
|             {}|[49406, 4271, 308...|
|             {}|[49406, 4271, 308...|
|             {}|[49406, 4271, 308...|
|             {}|[49406, 4271, 308...|
|             {}|[49406, 4271, 308...|
|             {}|[49406, 4271, 308...|
|             {}|[49406, 4271, 308...|
|             {}|[49406, 4271, 308...|
|             {}|[49406, 4271, 308...|
|             {}|[49406, 4271, 308...|
|             {}|[49406, 4271, 308...|
|             {}|[49406, 4271, 308...|
|             {}|[49406, 4271, 308...|
|             {}|[49406, 4271, 308...|
+---------------+--------------------+


In [6]:
processed_rdd.take(1)

24/04/22 20:41:42 WARN TaskSetManager: Stage 0 contains a task of very large size (4964 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

[(tensor([[[-2.1179, -2.1179, -2.1179,  ..., -2.1179, -2.1179, -2.1179],
           [-2.1179, -2.1179, -2.1179,  ..., -2.1179, -2.1179, -2.1179],
           [-2.1179, -2.1179, -2.1179,  ..., -2.1179, -2.1179, -2.1179],
           ...,
           [-2.1179, -2.1179, -2.1179,  ..., -2.1179, -2.1179, -2.1179],
           [-2.1179, -2.1179, -2.1179,  ..., -2.1179, -2.1179, -2.1179],
           [-2.1179, -2.1179, -2.1179,  ..., -2.1179, -2.1179, -2.1179]],
  
          [[-2.0357, -2.0357, -2.0357,  ..., -2.0357, -2.0357, -2.0357],
           [-2.0357, -2.0357, -2.0357,  ..., -2.0357, -2.0357, -2.0357],
           [-2.0357, -2.0357, -2.0357,  ..., -2.0357, -2.0357, -2.0357],
           ...,
           [-2.0357, -2.0357, -2.0357,  ..., -2.0357, -2.0357, -2.0357],
           [-2.0357, -2.0357, -2.0357,  ..., -2.0357, -2.0357, -2.0357],
           [-2.0357, -2.0357, -2.0357,  ..., -2.0357, -2.0357, -2.0357]],
  
          [[-1.8044, -1.8044, -1.8044,  ..., -1.8044, -1.8044, -1.8044],
           

In [8]:
import pandas as pd
import torch

formatted_data = {
    'pixel_values': [item[0].tolist() for item in train_set],
    'input_ids': [item[1] for item in train_set]
}

# Create DataFrame
df = pd.DataFrame(formatted_data)

In [10]:
from datasets import Dataset

# DataFrame to Dataset
dataset = Dataset.from_pandas(df)
dataset

Dataset({
    features: ['pixel_values', 'input_ids'],
    num_rows: 93
})