### Necessary packages to import

In [None]:
import os
import io
import boto3

import numpy as np
from PIL import Image

from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.ml.functions import array_to_vector
from pyspark.sql.functions import udf

import tensorflow as tf
from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input

### Initialize the spark session

In [None]:
spark = (SparkSession
         .builder
         .master('local[*]')
         .getOrCreate())

### Extraction of file paths of all images
- This is a necessary step because we have to read/open the images one at a time to be able to feed it to a feature extraction model. More of it to be discussed in the following steps
- *replace placeholders for* `YOUR_ACCESS_KEY` *and* `YOUR_SECRET_KEY` 

In [None]:
# Initialize S3 client
s3 = boto3.client(
    "s3",
    aws_access_key_id="YOUR_ACCESS_KEY",  # Encode access key
    aws_secret_access_key="YOUR_SECRET_KEY",  # Encode secret key
)

# Bucket and prefix for the filepath of the images in the s3 bucket
bucket = "bdcc2024-cpt5-finalproject"
prefix = "cats_550k"

# Create a paginator to handle multiple pages of results
paginator = s3.get_paginator("list_objects_v2")

# Initialize an empty list to hold the image paths
image_paths = []

# Iterate through all pages of results
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
    # Append paths of image files to the list
    for item in page.get("Contents", []):
        if item["Key"].lower().endswith((".png", ".jpg", ".jpeg",
                                         ".bmp", ".gif")):
            image_paths.append(f"s3a://{bucket}/{item['Key']}")

### Creation of a Feature Extraction Function
- A function is defined to access the images in the s3 bucket and then convert it to an array then subjected to a convolutional neural network to extract the features it generates.
- The function should be registered in the `pyspark.sql.functions`

In [None]:
# Initialize the model
# In this case, the team used VGG16.
model = VGG16(include_top=False, weights='imagenet', pooling='avg')

def extract_features_from_path(image_path, model):
    try:
        # Extract bucket name and key
        bucket, key = image_path.replace("s3a://", "").split("/", 1)
        
        # Fetch the image using boto3
        s3 = boto3.client('s3')
        response = s3.get_object(Bucket=bucket, Key=key)
        image_bytes = response['Body'].read()

        # Load the image directly from bytes
        image = Image.open(io.BytesIO(image_bytes))
        
        # Convert image to RGB if it's not already
        if image.mode != 'RGB':
            image = image.convert('RGB')
        
        # Resize the image
        image = image.resize((150, 150))
        
        # Convert image to array and preprocess
        img_array = tf.keras.utils.img_to_array(image)
        img_array = np.expand_dims(img_array, axis=0)
        img_array = preprocess_input(img_array)
        
        # Extract features using the model
        features = model.predict(img_array).flatten()
        return features.tolist()
    
    except Exception as e:
        # Log the error for debugging purposes
        print(f"Error processing image {image_path}: {e}")
        
        # Return a default value, e.g., an empty list or a list with zeros
        return [0.0] * 512  # 512 is the feature vector length from VGG16

# Register the UDF for spark usage
extract_features_udf = udf(extract_features_from_path, ArrayType(DoubleType()))



### Save extracted features as a spark dataframe for persistence.
- The saved files can be accessed by multiple members to work on in parallel.
- The team suggests that this step be done in batches in case unexpected crashes or errors occur. 
- It is highly advised to keep a tracker on the progress of saving the files created. In this procedure, the team printed string messages as well as created text files that can be used to trace the progress of the step.

In [None]:
# Define the chunk size to be saved at a time.
paths_per_file = 1_000

# Calculate the number of files needed
num_files = len(image_paths) // paths_per_file + (
    1 if len(image_paths) % paths_per_file > 0 else 0
)

# Trace back precaution.
# Define the directory relative to the current working directory
directory = os.path.join(os.getcwd(), "checkpoint")

# Ensure the directory exists
if not os.path.exists(directory):
    os.makedirs(directory)

for i in range(num_files):  # Index `num_files` accordingly if crashes occur.
    # create chunks of the image_paths
    chunk_paths = image_paths[i * paths_per_file : (i + 1) * paths_per_file]
    image_df = (
        spark.createDataFrame([Row(image_path=path) for path in chunk_paths])
    )
    features_df = image_df.select(
        "image_path",
        array_to_vector(extract_features_udf("image_path")).alias("features"),
    )

    # write to the s3 bucket
    features_df.write.mode("append").parquet(
        "s3a://bdcc2024-cpt5-finalproject/cats_550k_features-parquet"
    )

    # create  ".txt" file to serve as checkpoints
    file_path = os.path.join(directory, f"Done with Chunk{i+1}.txt")
    with open(file_path, "w") as f:
        f.write(f"Checkpoint for chunk {i+1}")

    # printing checkpoints
    print(f"Done with Chunk {i+1}")

#### Congratulations
You now have extracted the features of the images that will be used in clustering