<a href="https://colab.research.google.com/github/pkhlingam09/Fashion-Search-AI/blob/main/Fashion_Search.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
## Dataset Link (Kaggle)
# https://www.kaggle.com/datasets/djagatiya/myntra-fashion-product-dataset

In [None]:
#############################################################################################################################################################################################################################
###################################################################################################   MYNTRA ASSIGNMENT   ###################################################################################################

In [None]:
file_path = "/content/drive/MyDrive/Fashion_Search_AI/"

In [None]:
from google.colab import drive, files
drive.mount('/content/drive')

In [None]:
def read_dataset():
  !pip install -q kaggle chromadb
  files.upload()
  !mkdir -p ~/.kaggle
  !cp kaggle.json ~/.kaggle/
  !chmod 600 ~/.kaggle/kaggle.json
  !cat ~/.kaggle/kaggle.json
  !kaggle datasets download "djagatiya/myntra-fashion-product-dataset" -p file_path
  !unzip file_path+"myntra-fashion-product-dataset.zip" -d file_path

In [None]:
!pip install -q chromadb

In [None]:
import os
import pathlib

import string
import re
import ast
import json
import numpy as np
import pandas as pd

import chromadb
from chromadb.utils.embedding_functions import OpenAIEmbeddingFunction

from openai import OpenAI
from sentence_transformers import CrossEncoder, util

In [None]:
gpt_model = "gpt-4o-mini"
moderator = "omni-moderation-latest"

In [None]:
df = pd.read_csv(file_path + "Fashion Dataset v2.csv")

In [None]:
df.head(5)

In [None]:
df.shape

In [None]:
df.info()

In [None]:
df.isna().sum()

In [None]:
round(100 * df.isna().sum()/df.shape[0], 3)

In [None]:
df.shape

In [None]:
df.head(5)

In [None]:
class clean_datas:

  def cleandata(self, df):
    df["products"] = df["products"].str.lower()
    df["colour"] = df["colour"].str.lower()
    df["brand"] = df["brand"].str.lower()
    self.clean_description(df)
    self.clean_p_attributes(df)
    self.merge_cols(df)

  ## clean description column
  def clean_description(self, df):
    df["description"] = df["description"].apply(lambda x: x.lower())
    htmltags = re.compile("(<.*?>)|&nbsp;|&amp;|[;|]+")
    df["description"] = df["description"].apply(lambda x: re.sub(htmltags, ", ", x))
    puncts = re.compile(r"[ \(\)]+")
    df["description"] = df["description"].apply(lambda x: re.sub(puncts, " ", x))
    s_quote = re.compile(r"[',]{2,}|(, ,)+")
    df["description"] = df["description"].apply(lambda x: re.sub(s_quote, "", x))
    df["description"] = df["description"].apply(lambda x: re.sub("(?i)(size s)", "size s, ", x))
    df["description"] = df["description"].apply(lambda x: re.sub("(?i)(size m)", "size m, ", x))
    df["description"] = df["description"].apply(lambda x: re.sub("(?i)(size l)", "size l, ", x))
    spaces = re.compile(r" {2,}")
    df["description"] = df["description"].apply(lambda x: re.sub(spaces, " ", x).strip())
    df["description"] = df["description"].apply(lambda x: re.sub(" *,$", "", x))

  ## Clean p_attributes column
  def clean_p_attributes(self, df):
    dict_list = []
    pattr_keys_toremove = ["body shape id", "body or garment size"]

    htmltags = re.compile(r"(<.*?>)?(\\r|\\n)")
    df["p_attributes"] = df["p_attributes"].apply(lambda x: re.sub(htmltags, " ", x))
    if not isinstance(df.loc[0, "p_attributes"], dict):
      df['p_attributes'] = df['p_attributes'].apply(lambda x: ast.literal_eval(x))
    ## Remove Chosen keys from dictionaries
    df["p_attributes"] = [{key.lower(): val.lower() for key, val in dicts.items() if key.lower() not in pattr_keys_toremove and val != 'NA' and val != 'None' and val != ''} for dicts in list(df["p_attributes"].values)]
    ## Convert to String and Replace
    df["p_attributes"] = df["p_attributes"].astype("str")
    df["p_attributes"] = df['p_attributes'].replace({":": " is ", ",": ".", "[{}\']": "", " +": " "}, regex=True)
    df["p_attributes"] = df["p_attributes"].apply(lambda x: x.lower())

  ## Merge columns Product name, Products, price with description column
  def merge_cols(self, df):
    df["metadata"] = df.apply(lambda x: {"products": x["products"], "colour": x["colour"], "brand": x["brand"], "price": x["price"]}, axis=1)
    for ind in range(0, df.shape[0]):
      df.loc[ind, "description"] = f"Product name is {df['name'][ind]}." + df.loc[ind, "description"] + f"{df['p_attributes'][ind]}"


In [None]:
cleanData = clean_datas()
cleanData.cleandata(df)

In [None]:
## Find length for each row of description column
df["desc_len"] = df['description'].apply(lambda x: len(x.split()))

In [None]:
df['p_id'] = df['p_id'].astype("str")

In [None]:
myntra = df.drop(["ratingCount", "avg_rating", "name", "p_attributes", "products", "price", "colour", "brand", "img"], axis=1)

In [None]:
myntra.head(5)

In [None]:
myntra = myntra[["p_id", "description", "metadata"]]

In [None]:
myntra.head(5)

In [None]:
# ____________________________________________________________________________________________  EDA and Data Cleaning Complete  ____________________________________________________________________________________________

In [None]:
## Create Embeddings using ChromaDB

In [None]:
embed_model = "text-embedding-3-large"
cross_encode_model = "cross-encoder/ms-marco-MiniLM-L-6-v2"
api_file = file_path + "api_key.txt"
threshold = 0.15

In [None]:
class OpenAI_Embeddings:

  def __init__(self, model, api_path, folder_name):
    if folder_name == "chroma_db":
      self.client = chromadb.PersistentClient(path= file_path + folder_name)
    elif folder_name == "cache_db":
      self.client = chromadb.Client()
    self.model = model
    self.api_path = api_path
    self.embedding_function = None
    self.myntra_collections = None
    self.create_my_embedding()

  def get_api(self):
    with open(self.api_path, "r") as fptr:
      api_key = fptr.read()
      fptr.close()
    return api_key

  def create_my_embedding(self):
    self.embedding_function = OpenAIEmbeddingFunction(
                                                        api_key = self.get_api(),
                                                        model_name = self.model
                                                     )

  def create_get_collection(self, name):
    ## Create or Load Chroma Collection
    self.myntra_collections = self.client.get_or_create_collection(
                                                            name = name,
                                                            embedding_function = self.embedding_function
                                                        )
    return self.myntra_collections

  def add_to_collection(self, data, collections):
    ## Add data to Chroma Collection
    prev = 0
    docs_list = data["description"].to_list()
    meta_list = data['metadata'].to_list()
    id_list = data['p_id'].to_list()
    ## Add data to Chroma Collection
    for batch_size in range(1000, len(docs_list), 1000):
      collections.add(
                                    documents = docs_list[prev:batch_size],
                                    metadatas = meta_list[prev:batch_size],
                                    ids = id_list[prev:batch_size]
                                 )
      prev = batch_size
    if prev < len(docs_list):
      collections.add(
                                     documents = docs_list[prev:],
                                     metadatas = meta_list[prev:],
                                     ids = id_list[prev:]
                                 )
    return collections

In [None]:
class Chroma_Search(OpenAI_Embeddings):

  def __init__(self, model, api_path, folder_name):
    super().__init__(model, api_path, folder_name)

  def text_query(self, collections, query, w_clause=None, w_doc_clause=None):
    query_results = collections.query(
                                          query_texts = [query],
                                          n_results = 10,
                                          where = w_clause,
                                          where_document = w_doc_clause
                                     )
    return query_results

In [None]:
## This embedding model is to create embeddings and query
myntra_obj = Chroma_Search(embed_model, api_file, "chroma_db")
myntra_collection = myntra_obj.create_get_collection("myntra")

if not os.path.isdir(file_path + "chroma_db"):
  myntra_obj.add_to_collection(myntra, myntra_collection)

myntra_collection.peek()

In [None]:
## This embedding model is to create cache and query
cache_collections = []
collection_names = ["cache_1", "cache_2", "cache_3", "cache_4", "cache_5"]

cache_obj = Chroma_Search(embed_model, api_file, "cache_db")
for name in collection_names:
  cache_collections.append(cache_obj.create_get_collection(name))

In [None]:
cache_collections[0].peek()

In [None]:
## Prompt Generation using OpenAI

In [None]:
chat_client = OpenAI(api_key=myntra_obj.get_api())

In [None]:
def fashion_converse(conversation):
    chat_resp = chat_client.chat.completions.create(
                                    model = gpt_model,
                                    messages = conversation,
                                    temperature = 0.4,
                                    max_tokens = 300,
                                    tools = tools_shop_assist(),
                                    tool_choice = "auto"
                                    )
    return chat_resp.choices[0].message

def chat_moderator(msg):
    response = chat_client.moderations.create(
                                    model = moderator,
                                    input = msg
                                    )

    return response.results[0].flagged

In [None]:
def create_queries(inp_query_dict):
  query_list = []

  query = inp_query_dict["inp_query"]
  attr_dict_ = ast.literal_eval(inp_query_dict["attr_dict"])
  query = f'{query}. {inp_query_dict["addn_info"]}'

  for key, vals in attr_dict_.items():
    if key == "products" and attr_dict_[key] != []:
      if len(attr_dict_[key][0]) == 1:
        query_list.append({"products": {"$eq": attr_dict_[key]}})
      else:
        temp = [{key: {"$eq": ', '.join(attr_dict_[key])}}]
        temp.extend([{key: {"$eq": prod}} for prod in attr_dict_[key]])
        query_list.append({"$or": temp})
    if key == "colour" and attr_dict_[key] != []:
      if len(attr_dict_[key][0]) == 1:
        query_list.append({key: {"$eq": attr_dict_[key]}})
      else:
        query_list.append({"$or": [{key: {"$eq": color}} for color in attr_dict_[key]]})
    elif key == "brand" and attr_dict_[key] != []:
      query_list.append({key: {"$eq": attr_dict_[key]}})
    elif key == "price" and attr_dict_[key] != []:
      query_list.append({"price": {"$lte": float(attr_dict_[key])}})
  return query, {"$and": query_list}

In [None]:
ind_track = 0
def user_queries(query, attr_dict, w_doc_clause):
  global ind_track
  ids = []
  documents = []
  distances = []
  metadatas = []

  cache_result = [cache_obj.text_query(cache_collection, query, attr_dict, w_doc_clause) for cache_collection in cache_collections if len(cache_collection.peek()["ids"])]
  cache_ind = [ind for ind, result in enumerate(cache_result) if result['distances'][0] != [] and result['distances'][0][0] <= threshold]
  if len(cache_ind) != 0:
    for ind in cache_ind:
      cache_result_dict = cache_result[ind]['metadatas'][0][0]
      # Loop through each inner list and then through the dictionary
      for key, value in cache_result_dict.items():
          if 'ids' in key:
            ids.append(value)
          elif 'documents' in key:
              documents.append(value)
          elif 'distances' in key:
              distances.append(value)
          elif 'metadatas' in key:
              metadatas.append(value)

    # Create a DataFrame
    results_df = pd.DataFrame({
      'IDs': ids,
      'Documents': documents,
      'Distances': distances,
      'Metadatas': metadatas
    })
    results_df.drop_duplicates(subset="IDs", inplace=True)
    return results_df
  else:
    results = myntra_obj.text_query(myntra_collection, query, attr_dict, w_doc_clause)
    if not len(results):
      return None
    keys = []
    vals = []
    for key, val in results.items():
      if val == None:
        continue
      if key.lower() != "embeddings" and key.lower() != "uris" and key.lower() != "data" and key.lower() != "included":
        for i in range(0, len(results["ids"][0])):
              keys.append(str(key)+str(i))
              vals.append(str(val[0][i]))
      ## Add new query to collection
    count = 1
    for cache_collection in cache_collections:
      count += count + 1
      if not len(cache_collection.peek()["ids"]):
        cache_collection.add(
                              documents = [query],
                              ids = [query],
                              metadatas =  dict(zip(keys, vals))
                            )
        break
    if count < 5:
      cache_collection = cache_collections[ind_track]
      ind_track += 1
      if ind_track == 4:
        ind_track = 0
      cache_collection.add(
                            documents = [query],
                            ids = [query],
                            metadatas =  dict(zip(keys, vals))
                          )
    ## Create Result database
    result_dict = {'Metadatas': results['metadatas'][0], 'Documents': results['documents'][0], 'Distances': results['distances'][0], "IDs":results["ids"][0]}
    results_df = pd.DataFrame.from_dict(result_dict)
    return results_df

In [None]:
## Cross-Encoders
def crossencode_user_query(df, query):
  cross_encoder = CrossEncoder(cross_encode_model)
  scores = cross_encoder.predict([[query, response] for response in df['Documents']])
  df['cross_encoder_scores'] = scores
  df.sort_values(by='cross_encoder_scores', ascending=False, inplace=True)
  return df[["Metadatas", "Documents"]]

In [None]:
def format_query_data(inp_query_dict):
  attr_dict_ = ast.literal_eval(inp_query_dict["attr_dict"])

  for key, val in attr_dict_.copy().items():
    if isinstance(val, list):
      if len(val) == 1:
        attr_dict_[key] = val[0]
      elif isinstance(val, str) and "," in val:
        val = val.split(",")
        val = val.replace(" ", "")
        attr_dict_[key] = val
      elif val == "":
        attr_dict_[key] = []
  inp_query_dict["attr_dict"] = str(attr_dict_)
  return inp_query_dict

In [None]:
def list_products(inp_query_dict):
  inp_query_dict = format_query_data(inp_query_dict)
  query, inp_dict = create_queries(inp_query_dict)
  df = user_queries(query, inp_dict, None)
  return crossencode_user_query(df, query)

In [None]:
## Belongs in Tools_Calls Class

def tools_shop_assist():
        criteria = [
                        {
                            "type": "function",
                            "function": {
                                "name": "list_products",
                                "description": "Function takes input string and additional criteria as input and returns ",
                                "strict": False,
                                "parameters": {
                                    "type": "object",
                                    "properties": {
                                        "inp_query": {
                                            "type": "string",
                                            "description": "string indicating input given by user eg. find all the skirts in pink.",
                                        },
                                        "attr_dict": {
                                            "type": "string",
                                            "description": "features the apparel needs to have. eg. {'size': 'long', 'colour': 'pink', 'design': 'floral'}.",
                                        },
                                        "addn_info": {
                                            "type": "string",
                                            "description": "string additional requirements given by the user eg. size should be small and closure will be zipper",
                                        },
                                    },
                                    "required": ["inp_query", "attr_dict", "addn_info"],
                                    "additionalProperties": False
                                },
                            },
                        }
                    ]

        return criteria


In [None]:
def initial_conversation():
    delimiters = "#####"
    core_ask = ["products", "colour", "brand", "price"]

    chat_prompt = f"""
                      You new an expert in Fashion Digital Marketer, Fashion Merchandise, Stylist and E-commerce Consultant.Your job is to assist customers find the apparel they want to buy through polite and professional conversation.
                      {delimiters}
                      Enquire customer about each of the criteria in the list {core_ask}. If the response is not clear, rephrase the question and ask again.
                      Values for all the criteria have to be extracted from customer, missing any of the criteria will be penalised.
                      ''' Showing the summary and internal information will be penalised. '''
                      (If the values for any criteria contains 'and' or any punctuations except '-', split them.)
                      (If the values for any criteria contains 'no' or 'none' or 'any', replace them with empty list such as '[]'.)
                      (If there are multiple values for a criteria, save the values as list of values. Not saving them as list of values will be penalised.)
                      Once the data is collected from customer, convert the criteria and the answers received for the criteria into a dictionary format.
                      ''' Showing the summary and internal information will be penalised. '''
                      Once all the data is collected, ask the customer (if he wants to add any other information. if yes allow customer to add details and save it as a string). else proceed.
                      Call function list_products() with arguments as the complete input string by customer and additional criteria and answers stored in dictionary earlier.
                      list_products() function returns a dataframe, which consists of one product in every row. Take the details of product from all columns in each row and display them Product name, products available and total price in rupees details for each product in each row and display them.
                      {delimiters}
                      Examples of how to display the final output:
                      Product            Products Available            Price
                      xyzlb              a, b, c                       Rs.54
                      mnopkqhgy          a                             Rs.2710
                      qwp                a, b, c, d                    Rs.89433
                      {delimiters}
                   """
    chat_prompt = [{"role": "system", "content": chat_prompt},
                   {"role": "user", "content": f"Start conversation with a polite welcome and enquiring about the criteria in the list {core_ask} one at a time"}]

    return chat_prompt


In [None]:
def start_conversations():
  converse = initial_conversation()
  chat_resp = fashion_converse(converse)
  converse.append({"role": "assistant", "content": str(chat_resp.content)})
  print(chat_resp.content)

  while True:
    user_input = input().lower().strip()
    if user_input == "exit":
      break

    is_flagged = chat_moderator(user_input)
    if is_flagged:
        print("Sorry, this message has been flagged. Cannot be accepted.")
        continue ## Skip everything and get back to input()

    converse.append({"role": "user", "content": user_input})
    chat_resp = fashion_converse(converse)
    if chat_resp.tool_calls:
      for tool_call in chat_resp.tool_calls:
        func_name = tool_call.function.name
        args = json.loads(tool_call.function.arguments)
        tool_response = list_products(args)
        ## Append Function Calling Request
        converse.append(chat_resp)
        converse.append({
                        "role": "tool",
                        "tool_call_id": tool_call.id,
                        "name": func_name,
                        "content": str(tool_response)
                        })
      chat_resp = fashion_converse(converse)
      print(chat_resp.content)
    else:
      converse.append({"role": "assistant", "content": str(chat_resp.content)})
      print(chat_resp.content)


In [None]:
start_conversations()