In [1]:
import os
import cv2
import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client

In [None]:
# Start a Dask client
client = Client(n_workers=2, threads_per_worker=2)

In [2]:
def euclidean_distance(vec1, vec2):
    vec1 = np.array(vec1)
    vec2 = np.array(vec2)
    
    distance = np.linalg.norm(vec1 - vec2)
    
    return distance

In [31]:
def process_image(image_path):
    
    resized_image = cv2.resize(cv2.imread(image_path), (128, 128))
    flattened_image = np.array(resized_image).flatten()
    
    return flattened_image

In [None]:
def get_image_id(image_path):
    image_id = os.path.splitext(os.path.basename(image_path))[0]
    
    return image_id

In [None]:
def process_partition(df):
    return df['ImagePaths'].apply(lambda x: process_image(x))

In [10]:
image_dir = './images'
image_paths = [os.path.join(image_dir, img_name) for img_name in os.listdir(image_dir)]
df = pd.DataFrame({'ImagePaths': image_paths})
ddf = dd.from_pandas(df, npartitions=10)
ddf['flattened_image'] = ddf.map_partitions(process_partition, meta=('flattened_image', 'object'))
ddf['ID'] = ddf.map_partitions(lambda df: df['ImagePaths'].apply(lambda x: get_image_id(x)), meta=('ID', 'str'))
ddf = ddf.drop(labels=['ImagePaths'],axis=1)

In [None]:
output_dask_dir = './flattened_images'
ddf.to_csv(f'{output_dask_dir}/ddf*.csv', index = False)