# Homework Assignment: Building an Image Processing Pipeline with PySpark

## Objective
This assignment will walk you through a simple PySpark pipeline to classify images from the CIFAR-10 dataset using a machine learning model. Please read the provided code segment and answer the questions, to help understand how the pipeline is constructed. 

## Setup Instructions
These intructions are intended for linux. Start by git cloning the repository, then use the following steps to setup the enviroment: 
```
sudo apt update
sudo apt upgrade
sudo apt install default-jdk
sudo apt install python3 pip3
pip3 install pyspark Pillow numpy jupyter
```

## Resources
- [PySpark Documentation](https://spark.apache.org/docs/latest/api/python/index.html)
- [Machine Learning with PySpark MLlib](https://spark.apache.org/docs/latest/ml-guide.html)
- [CIFAR-10 Dataset](https://www.cs.toronto.edu/~kriz/cifar.html)

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CIFAR-10 Image Processing with PySpark") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memoryOverhead", "512m") \
    .config("spark.sql.shuffle.partitions", "100") \
    .getOrCreate()

1. Why is it necessary to configure `spark.executor.memory` and `spark.driver.memory`? Include an explanation of what the executor and driver do.

`spark.executor.memory` speicifies the amount of memory to be used by each executor process in Spark. Executors are responsible for executing tasks and returning restulst to the driver. By configuring this setting, you control the memory resources available for processing data on each node, which directly affects performance and efficiency. If the memory is too low, tasks may fail due to memory overflow; if too high, you might underutilize resources.

`spark.driver.memory` sets the amount of memory to use for the Spark driver, which orchestrates the operations of Spark applications. The driver holds the information about the Spark application and is responsible for scheduling jobs, negotiating with the cluster manager, and managing the overall execution process. Sufficient memory allocation here is crucial for managing application workflows effectively, especially when collecting data back to the driver for final processing, which can be memory-intensive.

In [None]:
import pickle

def unpickle(file):
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='bytes')
    return dict

3. What is the purpose of serialization in distributed systems?

Serialization in distributed systems involves converting data structures or object states into a format that can be easily stored or transmitted and then reconstructed later. The primary purposes of serialization are:

- Data Exchange: Allows data to be efficiently transferred over the network between different components of a distributed system (like between nodes of a Spark cluster or from clients to servers).

- Persistence: Enables data to be written to or read from storage mediums in a form that can be readily consumed by applications, ensuring that the state can be preserved beyond the lifetime of the process.

- Performance: Serialized data is typically compacted, reducing I/O overhead and improving the performance of data-intensive operations across the network.

CIFAR-10 dataset files contain image data and labels in a format that is not immediately suitable for analysis with Spark. You need to transform this data into a format that can be used to create a DataFrame in Spark. Below is the starter function to load a CIFAR-10 batch file into a list of tuples, which will be parallelized into an RDD and then converted into a DataFrame. 

4. Add line by line comments in the provided code to explain the transformation process, particularly focusing on image reshaping and serialization.

In [None]:
from PIL import Image
import io
import numpy as np

def load_cifar10_batch(file):
    """
    Loads a CIFAR-10 batch file and returns a list of tuples containing image data and labels.
    Args:
    - file (str): Path to the CIFAR-10 batch file.
    Returns:
    - list: A list of tuples, where each tuple contains (image_data, label).
    """
    batch = unpickle(file)
    data = batch[b'data']
    labels = batch[b'labels']
    images_and_labels = []

    # TODO: Comment the following code
    for i in range(len(data)): # Iterate over each image's data in the batch
        image_array = data[i] # Access the raw image data for the i-th image.
        image_array_reshaped = image_array.reshape(3, 32, 32).transpose(1, 2, 0) # Reshapes the flat array into a 3D array (3 channels, 32x32 pixels) and transposes the axes for a proper image format.
        image = Image.fromarray(image_array_reshaped) # Converts the NumPy array into a PIL Image object.

        img_byte_arr = io.BytesIO() # Creates an in-memory byte stream to store the image data.
        image.save(img_byte_arr, format='PNG') # Saves the Image object as a PNG to the byte stream.
        image_bytes = img_byte_arr.getvalue() # Extracts the byte value of the image data frome the stream.

        images_and_labels.append((image_bytes, labels[i])) # Appends a tuple of image bytes and the corresponding label to the list.
    
    return images_and_labels # Returns the list of tuples with image data and labels.

## Data Preparation

DataFrames provide a convenient and efficient way to handle structured data in Spark. You will now take the data loaded from CIFAR-10 files, parallelize it using RDDs (Resilient Distributed Datasets), and then convert these RDDs into DataFrames. This process must handle multiple batches of data to form a comprehensive dataset.

In [None]:
from pyspark.sql import Row

# Function to create a DataFrame from a single batch file
def create_dataframe_from_batch(file):
    images_and_labels = load_cifar10_batch(file)
    rdd = spark.sparkContext.parallelize(images_and_labels)
    row_rdd = rdd.map(lambda x: Row(image_data=x[0], label=x[1]))
    df = spark.createDataFrame(row_rdd)
    return df

# Load and combine multiple batches
df = None
for batch_file in batch_files:
    batch_df = create_dataframe_from_batch(batch_file)
    if df is None:
        df = batch_df
    else:
        df = df.union(batch_df)


1. What does the `parallelize` method do, and why is it important in Spark?

The parallelize method in Spark is used to distribute a local Python collection (like lists or arrays) across the nodes of a Spark cluster. This method creates a Resilient Distributed Dataset (RDD), which is Spark's fundamental data structure. RDDs are distributed across many nodes so that the data can be processed in parallel.

- Scalability: By parallelizing data, Spark can handle computations over large datasets that exceed the memory of a single machine. It allows the framework to scale horizontally, distributing tasks across many servers.
- Fault Tolerance: RDDs are resilient because they track the lineage of transformations applied to each dataset. If any part of the dataset is lost due to node failure, Spark can recompute just the lost partitions from lineage, minimizing the computation needed to recover the lost data.
- Performance: Parallel processing is central to Spark’s ability to handle big data. Tasks that would take a long time to execute on a single machine can be divided into smaller tasks and executed simultaneously across multiple machines, greatly reducing processing times.

2. How does the `union` method help in combining data from different sources?

The union method in Spark is used to combine two RDDs or DataFrames, appending one dataset to another. The resulting dataset will include all rows from both datasets. For union to work correctly, both datasets must have the same schema and number of columns; however, the columns do not necessarily need to be of the same type.

- Data Integration: union is essential for combining datasets from different sources, such as different data files, databases, or even data streams. It allows for easy and efficient data aggregation, essential in data analysis and machine learning scenarios where data comes from multiple sources.
- Simplicity and Efficiency: Using union, data engineers can merge datasets in a straightforward manner without needing complex join operations. This can be particularly efficient when dealing with large volumes of data that simply need to be processed as a single dataset.
- Flexibility: It provides the ability to perform more complex data transformations and analyses by enabling the integration of diverse datasets, thereby enriching the data context.

In machine learning, features need to be numeric and typically normalized. The images in the CIFAR-10 dataset are in byte format and must be converted into a usable form for machine learning models. This task involves writing a UDF that converts the image byte data into a dense vector of normalized pixel values.

In [None]:
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.types import BinaryType
import numpy as np

# TODO: INSERT GRAYSCALE UDF HERE
def rgb_to_grayscale(image_bytes):
    image = Image.open(io.BytesIO(image_bytes))
    grayscale_image = image.convert('L')
    img_byte_arr = io.BytesIO()
    grayscale_image.save(img_byte_arr, format='PNG')
    return img_byte_arr.getvalue()

def convert_bytes_to_vector(image_bytes):
    image = Image.open(io.BytesIO(image_bytes))
    array = np.array(image).flatten().astype(float) / 255.0 # Flatten and normalize Pixel Values
    return Vectors.dense(array)

grayscale_udf = udf(rgb_to_grayscale, BinaryType())
convert_udf = udf(convert_bytes_to_vector, VectorUDT())

# TODO apply grayscale UDF here
df = df.withColumn("grayscale_data", grayscale_udf("image_data"))

# Apply UDF to the DataFrame
df = df.withColumn("features", convert_udf("grayscale_data"))


3. Why is it necessary to normalize the pixel values in image processing?

Normalizing pixel values in image processing to a uniform scale, typically between 0 and 1, is crucial for maintaining consistency and enhancing machine learning models, as it ensures uniformity across varying image sources and optimizes training processes by improving convergence and model accuracy. In addition, normalized data reduces computational complexity, easing memory demands and speeding up calculations. 

4. What are UDFs? What are the benefits of using UDFs in Spark?

The use of User-Defined Functions (UDFs) in Spark offers significant benefits, including the ability to extend Spark SQL's capabilities with custom functionality tailored to specific processing requirements. UDFs can be seamlessly integrated into Spark SQL, allowing complex procedural logic to be embedded within SQL queries, leveraging Spark’s distributed processing power. This integration not only enhances performance by allowing parallel processing across cluster nodes but also promotes reusability and consistency across different Spark applications and queries, effectively customizing and expanding the utility of Spark in big data environments.

Color images have three color channels (RGB), while grayscale images combine these channels into a single intensity value that represents different shades of gray. The transformation to grayscale simplifies the data and can be beneficial for various image processing tasks, including reducing the complexity of machine learning models. 

5. Create a function called rgb_to_grayscale that takes in the image_bytes and returns grayscaled bytes, and a subsequent UDF called grayscale_udf. Hint: Convert to image, use the .convert() method from PIL to grayscale the image, then convert back to bytes.

## Machine Learning Pipeline

Here, we'll set up a machine learning pipeline using PySpark's MLlib. This pipeline will include initializing a RandomForest classifier and fitting it to the training data. This is a critical step in predicting the labels for new data based on learned patterns.

In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=10)

pipeline = Pipeline(stages=[rf])

model = pipeline.fit(train_df)

1. What is the role of a RandomForest classifier in this context?

In this situation, the RandomForest classifier is used for the machine learning part of the pipeline, in fact it is the entire pipeline. This is how pyspark will handle feature extraction, and fit the classifier to the given training data. 

2. How does a machine learning pipeline simplify the process of model training and prediction?

A machine learning pipeline in PySpark simplifies the process of model training and prediction by encapsulating all the steps involved in data preprocessing, model training, and predictions into a single, manageable workflow. This structured approach ensures that all steps are performed in a specific sequence, maintaining consistency across different runs and datasets. It automates repetitive tasks, reduces the likelihood of errors, and enhances the reproducibility of results. By using a pipeline, data scientists can easily experiment with different models and preprocessing techniques without rewriting substantial amounts of code, making the process of model development both efficient and scalable.

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(model.transform(test_df))
print(f"Accuracy: {accuracy}")

spark.stop()

3. Try varying the parameters and models to improve the accuracy. Try varying the number of trees and max depth, or use a different model entirely. Try at least 3 different scenarios and write a brief report on what seems to improve performance. 