In [None]:
!pip install datasets dask[distributed] dask[dataframe] --quiet

In [None]:
#Load Libraries
import os
from transformers import CLIPProcessor, CLIPModel
import torch
import json
import pandas as pd
import requests
from PIL import Image
from scipy.spatial.distance import cosine
from urllib.parse import urlparse

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
#Load Dataset - Health and Household
from datasets import load_dataset

#raw_meta_Health_and_Household,raw_meta_Office_Products,raw_meta_Pet_Supplies
dataset_hh = load_dataset("McAuley-Lab/Amazon-Reviews-2023", 'raw_meta_Health_and_Household',split = 'full', download_mode="reuse_cache_if_exists", trust_remote_code=True)
dataset_hh

Generating full split: 100%|██████████| 797563/797563 [02:53<00:00, 4589.25 examples/s]


Dataset({
    features: ['main_category', 'title', 'average_rating', 'rating_number', 'features', 'description', 'price', 'images', 'videos', 'store', 'categories', 'details', 'parent_asin', 'bought_together', 'subtitle', 'author'],
    num_rows: 797563
})

In [None]:
#Add Product Metadata Category
def add_raw_meta_category_health(example):
  example['raw_meta_category'] = 'Health_and_Household'
  return example

dataset_hh = dataset_hh.map(add_raw_meta_category_health)

Map: 100%|██████████| 797563/797563 [01:56<00:00, 6828.47 examples/s]


In [None]:
#Filter Dataset
dataset_hh = dataset_hh.filter(lambda example: len(example["title"].split())>1 and example['description'] is not None and example['title'] is not None and example['features'] is not None and len(example['description']) > 0 and len(example['features']) > 0 and None not in example['images']['large'] and None not in example['images']['thumb'] and len(example['images']['large']) != 0 and len(example['images']['thumb']) != 0)
# dataset_hh = dataset_hh.filter(lambda example: example['description'] is not None and example['title'] is not None and example['features'] is not None and None not in example['images']['large'] and None not in example['images']['thumb'] and len(example['images']['large']) != 0 and len(example['images']['thumb']) != 0)
dataset_hh

Filter: 100%|██████████| 797563/797563 [01:06<00:00, 12063.83 examples/s]


Dataset({
    features: ['main_category', 'title', 'average_rating', 'rating_number', 'features', 'description', 'price', 'images', 'videos', 'store', 'categories', 'details', 'parent_asin', 'bought_together', 'subtitle', 'author', 'raw_meta_category'],
    num_rows: 415120
})

In [None]:
#Convert Description and Features variables to strings instead of Lists
def list_to_str(example):
    # example['features'] = ', '.join(example['features'])
    example['description'] = ', '.join(example['description'])
    return example


In [None]:
dataset_hh = dataset_hh.map(list_to_str)
dataset_hh

Map: 100%|██████████| 415120/415120 [01:29<00:00, 4660.76 examples/s]


Dataset({
    features: ['main_category', 'title', 'average_rating', 'rating_number', 'features', 'description', 'price', 'images', 'videos', 'store', 'categories', 'details', 'parent_asin', 'bought_together', 'subtitle', 'author', 'raw_meta_category'],
    num_rows: 415120
})

In [None]:
#Download Image from URL
def download_image(url):
    #print(f"Downloading image from {url}")
    # Validate the URL to check for a proper scheme
    if pd.isna(url) or url == '':
        return None
    parsed_url = urlparse(url)
    if not parsed_url.scheme:
        print(f"Skipping invalid URL: {url}")
        return Image.new('RGB', (200, 200), color='white')  # Return Blank White Image

    try:
        # Proceed with downloading the image
        response = requests.get(url, stream=True) #Timeout the request if it exceeds more than 20 seconds
        if response.status_code == 200:
            img = Image.open(response.raw).convert("RGB")
            #print('returning image')
            return img
        else:
            print(f"Skipping URL with status code {response.status_code}: {url}")
            return Image.new('RGB', (200, 200), color='white')  # Generate White Image
    except Exception as e:
        print(f"Error downloading image from {url}: {e}")
        return Image.new('RGB', (200, 200), color='white')  # Generate White Image

In [None]:
#Instanatiate the Processor and the Model and shift Model to Device
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)
model = CLIPModel.from_pretrained("openai/clip-vit-base-patch16").to(device)
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch16")

cpu


  return self.fget.__get__(instance, owner)()


In [None]:
# Initialize a global count variable
count_of_products = 0
results_list = []  # List to store results temporarily

def save_results():
    global results_list
    if results_list:  # Check if there are results to save
        df = pd.DataFrame(results_list)
        # Specify the path where you want to save the file
        file_path = '/content/drive/MyDrive/ParquetFiles-Embeddings/embeddings_results.parquet'

        # Save the DataFrame to a Parquet file
        df.to_parquet(file_path, index=False, engine='pyarrow', compression='snappy')  # Save results
        # results_list = []  # Clear the list after saving
        # count_of_products = 0  # Reset the count of products after saving

        # Provide a download link
        print(f"Results saved to {file_path}.")
        #files.download(file_path)  # This will trigger the download in Google Colab


def process_row(row):
    # global count_of_products  # Access the global variable

    try:
        asin = row['parent_asin']
        title = row['title']
        description = row['description']

        # features = row['features']


        # if type(features) == list:
        #   if len(features) == 0:
        #     features = ""
        #   else:
        #     features = " ".join(features)

        if type(description) == list:
          if len(description) == 0:
            description = ""
          else:
            description = " ".join(description)

        image_urls = row['first_large_image']

        # print('Processing:', title)
        # print('Image URL:', image_urls)

        image = download_image(image_urls)  # Ensure this function returns a PIL image or suitable format

        if image:
            # Tokenize text and move to GPU
            text_inputs = processor(text=title + " " + description + " " + features, return_tensors="pt", padding=True, truncation=True)
            text_inputs = {k: v.to(device) for k, v in text_inputs.items()}  # Move text inputs to GPU

            # Preprocess image and move to GPU
            image_inputs = processor(images=image, return_tensors="pt")
            image_inputs = {k: v.to(device) for k, v in image_inputs.items()}  # Move image inputs to GPU

            # Get embeddings
            with torch.no_grad():
                text_embeds = model.get_text_features(**text_inputs)
                image_embeds = model.get_image_features(**image_inputs)

            similarity = 1 - cosine(text_embeds, image_embeds)
        else:
            text_embeds= None
            image_embeds = None
            similarity = None

        # Create result dictionary
        result = {
            'parent_asin': asin,
            'title': title,
            'description': description,
            'image_url': image_urls,
            'tokenized_text': text_embeds,
            'tokenized_image': image_embeds,
            'similarity': similarity
        }


    except Exception as e:
        print(f"Error processing row: {e}")
        return None

# Final save at the end of processing
def finalize_processing():
    """Final save for any remaining results."""
    save_results()  # Save any remaining results that weren't saved in the loop

# Example usage
# Assume df is your Dask DataFrame
# After processing rows, call finalize_processing to save any leftover results


def partition_function(partition):
    result = partition.apply(process_row, axis=1)
    return pd.DataFrame(result.tolist(), index=result.index)
    # At this point, result is a Series of dictionaries

In [None]:
import dask.dataframe as dd
from dask.distributed import Client
from tqdm import tqdm


# Initialize Dask client with a specific number of workers
# client = Client(n_workers = 2)  # Use 4 workers for parallel processing
# print(client)



batch_idx = 1

#Convert to Pandas
for df in tqdm(dataset_hh.to_pandas(batch_size = 100000, batched = True)): #Get batches of a large dataset into a Dataframe

  # #This is to allow continuation from last fully processed batch
  # if batch_idx < 64:
  #   print(f"Batch {batch_idx} already completed!")
  #   batch_idx += 1
  #   continue

  print('Processing batch:', batch_idx)


  df['first_large_image'] = df['images'].apply(lambda x: x['large'][0] if isinstance(x, dict) and x['large'].size > 0 else None)
  # df = df[['parent_asin', 'title', 'description', 'features', 'first_large_image']]
  df = df[['parent_asin', 'title', 'description', 'first_large_image']]
  # df['description'] = df['description'].apply(lambda x: ' '.join(map(str, x)))
  # df['features'] = df['features'].apply(lambda x: ' '.join(map(str, x)))



  df_dask = dd.from_pandas(pd.DataFrame(df), npartitions=25)  # Adjust partitions as needed
  # tokenized_results = df_dask.map_partitions(lambda df_dask: df_dask.apply(process_row, axis=1)).compute()

  tokenized_results = df_dask.map_partitions(partition_function, meta = {'parent_asin':'str',
                                                                         'title':'str', 'description':'object',
                                                                         'image_url':'str',
                                                                         'tokenized_text':'object', 'tokenized_image':'object','similarity':'float'}).compute()

  count_of_products = len(tokenized_results)


  tokenized_results.to_parquet(f'Health_Household_Embeddings/tokenized_results_{batch_idx}.parquet', index=False, engine='pyarrow')

  print(f"Processed {count_of_products} products")

  count_of_products = 0

  batch_idx += 1

  # torch.cuda.empty_cache()


Perhaps you already have a cluster running?
Hosting the HTTP server on port 36556 instead


<Client: 'tcp://127.0.0.1:37958' processes=2 threads=64, memory=80.00 GiB>


0it [00:00, ?it/s]

Processing batch: 1


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
1it [12:53, 773.60s/it]

Processed 10000 products
Processing batch: 2


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
1it [17:06, 1026.37s/it]


KeyboardInterrupt: 

Process Dask Worker process (from Nanny):
2024-10-22 14:11:15,320 - distributed.nanny - ERROR - Worker process died unexpectedly
Process Dask Worker process (from Nanny):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/home/koppolu.s/.conda/envs/pytorch_env/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/home/koppolu.s/.conda/envs/pytorch_env/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/koppolu.s/.conda/envs/pytorch_env/lib/python3.10/site-packages/distributed/process.py", line 202, in _run
    target(*args, **kwargs)
  File "/home/koppolu.s/.conda/envs/pytorch_env/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/home/koppolu.s/.conda/envs/pytorch_env/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/koppolu.s/.conda/envs/pyt

In [None]:
#The Tensors are getting stored as string
#To convert these back into NumPy Arrays - we can use the following function
#We can convert the NumPy Arrays to Tensors when storing in Pinecone
#Here df - Is a dataframe obtained from parquet file using pd.read_parquet() with engine = pyarrow


import pandas as pd
import torch
import ast
from tqdm import tqdm


def convert_str_to_tensor(df):
  text_embeddings_list = []
  image_embeddings_list = []

  for i in tqdm(range(len(df))):

    tensor_string = df.loc[i, 'tokenized_text']
    numeric_string = tensor_string.replace("tensor(", "").replace(")", "")
    numeric_string = numeric_string.replace("\n", "").replace(" ", "")
    tensor_list = ast.literal_eval(numeric_string)
    text_t = torch.tensor(tensor_list)


    text_embeddings_list.append(text_t.numpy()) #Store the Tensor as NumPy in the dataframe because only NumPy can be stored in Dataframe rows

    tensor_string = df.loc[i, 'tokenized_image']
    numeric_string = tensor_string.replace("tensor(", "").replace(")", "")
    numeric_string = numeric_string.replace("\n", "").replace(" ", "")
    tensor_list = ast.literal_eval(numeric_string)
    text_i = torch.tensor(tensor_list)

    image_embeddings_list.append(text_i.numpy()) #Store the Tensor as NumPy in the dataframe because only NumPy can be stored in Dataframe rows


  df['text_embeddings'] = text_embeddings_list
  df['image_embeddings'] = image_embeddings_list
  return df


