In [0]:
%run /Workspace/Users/yesmanki81@gmail.com/Databricks_CV_Anomaly_Detection/Databricks_Code/00_utils

In [0]:
# Check if the mount point exists
mount_path = "/mnt/vision-test"
dbutils.fs.ls(mount_path)

In [0]:
# Unmount the directory
# dbutils.fs.unmount(f"/mnt/{mount_name}")

# Process Images

In [0]:
# Convert the list of FileInfo objects to a spark DataFrame
def create_file_into_df(source_dir:str):

    files = dbutils.fs.ls(f"{mount_path}/{source_dir}")

    file_info_list = [
        {
            "path": file.path,
            "name": file.name
        }
        for file in files
        ]

    return spark.createDataFrame(file_info_list)

file_info_df = create_file_into_df("images")
display(file_info_df)


In [0]:
# Read the label file
label_files = dbutils.fs.ls(f"{mount_path}/labels")
label_path = label_files[0].path

label_df = spark.read.csv(label_path, header=True, inferSchema=True)
label_df = label_df.withColumnRenamed("image_name", "name")

from pyspark.sql.functions import concat, lit
label_df = label_df.withColumn("name", concat(label_df["name"], lit(".jpg")))
display(label_df)

In [0]:
# Join the two DataFrames
file_info_df = file_info_df.join(label_df, on="name", how="left")
display(file_info_df)

In [0]:
import os
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image

# Spark에서는 분산되어있기 때문에 collect를 사용

def display_image(df, num_images:int=5):
    images = df.take(num_images)
    plt.figure(figsize=(10,10))
    for i, row in enumerate(images):
        img_path = row.path.replace('dbfs:/','/dbfs/')
        img = Image.open(img_path)
        plt.subplot(1,num_images,i+1)
        plt.imshow(img)
        plt.title(row.name)
        plt.axis("off")
    plt.show()

display_image(file_info_df)

In [0]:
for row in file_info_df.collect():
    img_path = row.path.replace('dbfs:/','/dbfs/')
    img = Image.open(img_path)
    width, height = img.size
    print(f"Image: {row.name}, size: {width} x {height}")
    new_size = min(width, height)

    # Crop 순서: LL, UL, UR, LR
    img = img.crop(((width-new_size)/2, (height-new_size)/2, (width+new_size)/2, (height+new_size)/2))
    print(f"{img.size = }")

    # Resize : 256 x 256
    img = img.resize((256, 256), Image.NEAREST)
    print(f"{img.size = }")
    plt.imshow(img)
    plt.show()
    break

In [0]:
import io
from pyspark.sql.functions import pandas_udf, col
from pyspark.sql.types import BinaryType

IMAGE_SIZE = 256

@pandas_udf(BinaryType())
def resize_image_udf(df_series: pd.Series) -> pd.Series:
    def resize_image(path):
        """Resize image and serialize back as jpeg"""
        # load image
        img_path = path.replace('dbfs:/','/dbfs/')
        img = Image.open(img_path)
        width, height = img.size
        new_size = min(width, height)
        
        # Crop: LL, UL, UR, LR
        img = img.crop(((width-new_size)/2, (height-new_size)/2, (width+new_size)/2, (height+new_size)/2))

        # Resize
        img = img.resize((IMAGE_SIZE,IMAGE_SIZE), Image.NEAREST)

        # Save back to jpg
        output = io.BytesIO()
        img.save(output, format="JPEG")
        return output.getvalue()

    return df_series.apply(resize_image)

# Add the metadata to enable the image preview
image_meta = {
    "spark.contentAnnotation":'{"mimeType": "image/jpeg"}'
}

df = (
    file_info_df.withColumn("image", resize_image_udf("path")).withColumn('image',col('image').alias('image', metadata=image_meta))
    )
display(df)

In [0]:
df.write.mode("overwrite").format('parquet').save(f"{mount_path}/images_resized")

In [0]:
from pyspark.sql.functions import regexp_replace, lit 

@pandas_udf(BinaryType())
def flip_image_horizontally_udf(df_series):
    def flip_image(binary_content):
        """Flip image horizontally and re-serialize back as jpeg"""
        img = Image.open(io.BytesIO(binary_content))
        img = img.transpose(Image.FLIP_LEFT_RIGHT)

        # Save back as jpeg
        output = io.BytesIO()
        img.save(output, format="JPEG")
        return output.getvalue()
    
    return df_series.apply(flip_image)

df_flipped = (
    df.withColumn("image", flip_image_horizontally_udf("image").alias('image', metadata=image_meta))
    .withColumn("name", regexp_replace("name", lit(".jpg"), lit("_flipped.jpg")))
    .withColumn("path", lit("n/a") )
)

display(df_flipped)


In [0]:
df_flipped.write.mode("append").format('parquet').save(f"{mount_path}/images_resized")

In [0]:
new_df = spark.read.format("parquet").load(f"{mount_path}/images_resized")
display(new_df)

In [0]:
noisy_df = create_file_into_df("noisy_images")
noisy_df = noisy_df.withColumn('label', lit('noisy'))
resized_noisy_df = (
    noisy_df.withColumn("image", resize_image_udf("path"))
            .withColumn('image',col('image').alias('image', metadata=image_meta))
)
display(resized_noisy_df)

In [0]:
flipped_resized_noisy_df = (
    resized_noisy_df.withColumn("image", flip_image_horizontally_udf("image").alias('image', metadata=image_meta))
)
final_noisy_df = resized_noisy_df.union(flipped_resized_noisy_df)
display(final_noisy_df)

In [0]:
from functools import reduce
from pyspark.sql.functions import DataFrame


final_df = reduce(DataFrame.unionAll, [df, df_flipped, resized_noisy_df, flipped_resized_noisy_df])
display(final_df)


In [0]:
final_df.write.mode("overwrite").format('parquet').save(f"{mount_path}/images_final")

In [0]:
(
    final_df.groupBy('label')
            .count()
).display()

# Image Augmentation (이미지 증강)

In [0]:
dbutils.fs.ls(mount_path)

In [0]:
df = spark.read.format("parquet").load(f"{mount_path}/images_final")
df.show()

In [0]:
display(df)

In [0]:
display(df.groupBy('label').count())

Databricks visualization. Run in Databricks to view.

In [0]:
import io
import random
from PIL import Image
from pyspark.sql.functions import pandas_udf, col, regexp_replace, lit
from pyspark.sql.types import BinaryType

@pandas_udf(BinaryType())
def transpose_image_udf(df_series):
    def transpose_image(content):
        """Transpose image and serialize back as jpeg"""
        image = Image.open(io.BytesIO(content))
        transpose_types = ['horizontal', 'vertical', 'rotate_90', 'rotate_180', 'rotate_270', 'squash&skew']

        # Randomly selet a subset of transpose types to apply
        selected_transpose_types = random.sample(transpose_types, random.randint(1, len(transpose_types)))

        # selected_transpose_types = transpose_types[-1:]

        # squash & skew matrix
        width, height = image.size
        ss_matrix = (1, 0.3, -width * 0.15,
                     0.3, 1, - height * 0.15)

        for transpose_type in selected_transpose_types:
            match transpose_type:
                case 'horizontal':
                    image = image.transpose(Image.FLIP_LEFT_RIGHT)
                case 'vertical':
                    image = image.transpose(Image.FLIP_TOP_BOTTOM)
                case 'rotate_90':
                    image = image.transpose(Image.ROTATE_90)
                case 'rotate_180':
                    image = image.transpose(Image.ROTATE_180)
                case 'rotate_270':
                    image = image.transpose(Image.ROTATE_270)
                case 'squash&skew':
                    image = image.transform((width, height), Image.AFFINE, ss_matrix)
        
        # Save back as jpeg
        output = io.BytesIO()
        image.save(output, format="JPEG")
        return output.getvalue()
    
    return df_series.apply(transpose_image)

# Define the image metadata
image_meta = {
    "spark.contentAnnotation":'{"mimeType": "image/jpeg"}'
}

# Apply the UDF to transpose image randomly with multiple transpose types
noisy_df_transposed = (
    df.filter(col('label') == 'noisy')
    .withColumn("image", transpose_image_udf("image").alias('image', metadata=image_meta))
    .withColumn('name', regexp_replace(col('name'), '.jpg', '_tr.jpg'))
    .withColumn('path', lit('n/a'))
)

display(df.filter(col('label') == 'noisy'))
display(noisy_df_transposed)



In [0]:
from functools import reduce
from pyspark.sql.functions import DataFrame

# Generate muliple df_transposed DataFrames
num_transposed_dfs = 5
transposed_dfs = []

for _ in range(num_transposed_dfs):
    df_transposed = (
        df.filter(col('label') == 'noisy')
        .withColumn("image", transpose_image_udf("image").alias('image', metadata=image_meta))
        .withColumn('name', regexp_replace(col('name'), '.jpg', '_tr.jpg'))
        .withColumn('path', lit('n/a'))
    )
    transposed_dfs.append(df_transposed)

noisy_df_transposed = reduce(DataFrame.union, transposed_dfs)
display(noisy_df_transposed)




In [0]:
from PIL import ImageDraw
import numpy as np

@pandas_udf(BinaryType())
def add_salt_pepper_patches_udf(df_series):
    def add_salt_pepper_patches(content):
        """Adds an irregular, polygonal noise patchs to the image and serialize back as jpeg"""
        patch_pixels = 500
        noise_value = 255
        
        image = Image.open(io.BytesIO(content))
        draw = ImageDraw.Draw(image)
        width, height = image.size

        # radius r
        r = int(np.sqrt(patch_pixels) / np.pi)
        r = max(r, 5) # 반경이 최소 5 pix 이상되어야함

        # random center point for the noise patch
        center_x = np.random.randint(r, width - r)
        center_y = np.random.randint(r, height - r)
        
        num_points = np.random.randint(5,10)
        angles = np.linspace(0, 2*np.pi, num_points, endpoint=False)
        angles += np.random.uniform(0,2 * np.pi / num_points, size = num_points)
        radii = np.random.uniform(0.5 * r, r * 1.5, size=num_points)

        points = [
          (int(center_x + radius * np.cos(angle)),
           int(center_y + radius * np.sin(angle)))
          for angle, radius in zip(angles, radii)
        ]
        fill_color = (noise_value,) * 3 if image.mode == 'RGB' else noise_value
        draw.polygon(points, fill=fill_color)

        output = io.BytesIO()
        image.save(output, format="JPEG")
        return output.getvalue()
      
    return df_series.apply(add_salt_pepper_patches)
  
noise_df_damaged = (
  df.filter(col('label') == 'dog')
  .withColumn('image', add_salt_pepper_patches_udf('image').alias('image', metadata=image_meta))
  .withColumn('name', regexp_replace(col('name'), '.jpg', '_damaged.jpg'))
  .withColumn('path', lit('n/a'))
)

display(noise_df_damaged)

In [0]:
display_image_content(noise_df_damaged)

In [0]:
noisy_df_final = noisy_df_transposed.union(noise_df_damaged)
noisy_df_final.write.mode('overwrite').format('parquet').save(f"{mount_path}/images_noisy_final")

In [0]:
display_image_content(noisy_df_final)