In [832]:
import json
import pandas as pd
import os
import sqlite3
from sklearn.metrics.pairwise import cosine_similarity

from enum import Enum
from typing import List
from pydantic import BaseModel, Field

#!pip install openai -U
from openai import OpenAI
key=os.environ.get('OPENAI_API_KEY')
client = OpenAI(api_key=key)
model = "gpt-4o-2024-08-06"

In [564]:
# Embedding brand in emotions space: Get association scores between an input and list of emotions

Characteristic = Enum('Characteristic', dict([(emotion, emotion) for emotion in emotions_ls]))

class EmotionalAssociationScore(BaseModel):
    emotion: Characteristic
    score: float

class EmotionalAssociationScores(BaseModel):
    associations: List[EmotionalAssociationScore] = Field(description="A list of emotions and associated scores")

def emotional_association_scores(
        thing, 
        model,
        emotions
    ):
    
    prompt = f"Assign emotional association scores between {0} and {len(emotions)} for the provided thing. "\
    "Assign a score for each of the following emotions. Briefly, explain the reason behind the association score."\
    "Ensure the scores reflect the association strength for the specified thing. "\
    "Thing: "\
    f"{thing}"
            
    completion = client.beta.chat.completions.parse(
        model = model,
        messages=[
            {"role": "system", "content": "Be a helpful assistant."},
            {"role": "user", "content": prompt}
        ],
        response_format=EmotionalAssociationScores,
    )
    #output returns in the defined pydantic style
    output = completion.choices[0].message.parsed
    return thing, output.json()

In [None]:
#not using this for the moment
# #Embedding brands in emotions space: 
# # tried nested prompt but decided to go with one prompt and a list comprehension
# emotions= emotions_ls
# associations_brands = [emotional_association_scores(thing, model, emotions) for thing in brands_ls[:3]]


In [None]:
def get_df(thing, model, emotions):
    gpt = emotional_association_scores(thing, model, emotions)
    data = list(json.loads(gpt[1]).values())[0]
    df = pd.DataFrame(data)
    df.rename(columns = {'score': gpt[0]}, inplace=True)
    df.set_index('emotion', inplace=True)
    return df

def get_dfs(things_ls, model, emotions):
    merged_df = pd.DataFrame()
    for thing in things_ls:
        new_df = get_df(thing, model, emotions)
        if merged_df.empty:
            merged_df = new_df
        else:
            merged_df = pd.merge(merged_df, new_df, left_index=True, right_index=True, how='outer')
    return merged_df


things_ls = brands_ls
dfs = get_dfs(things_ls, model, emotions)
# Drop columns with NaN values
dfs_cleaned = dfs.dropna(axis=1)

dfs_cleaned 

In [934]:
# Set pandas to display all rows and columns
pd.set_option('display.max_rows', None)  # Show all rows
pd.set_option('display.max_columns', None)  # Show all columns
pd.set_option('display.width', None)  # Adjust display width to prevent column cutting
pd.set_option('display.max_colwidth', None)  # Show full content in columns
# dfs.isna().sum()


In [None]:
# #Confirmed no need to l2 norm vectors for sklearn's cosine similarity:
# # Define your original vectors
# A = np.array([[2, 3]])
# B = np.array([[5, 4]])

# # Calculate cosine similarity without normalization
# cosine_sim_without_norm = cosine_similarity(A, B)

# # L2 normalize the vectors
# A_normalized = A / np.linalg.norm(A)
# B_normalized = B / np.linalg.norm(B)

# # Calculate cosine similarity with normalization
# cosine_sim_with_norm = cosine_similarity(A_normalized, B_normalized)

# # Print the outputs
# print("Cosine Similarity without normalization:")
# print(cosine_sim_without_norm[0][0])  # Output from unnormalized vectors

# print("\nCosine Similarity with normalization:")
# print(cosine_sim_with_norm[0][0])      # Output from normalized vectors
# cosine_sim_without_norm[0][0]==cosine_sim_with_norm[0][0]

In [None]:

def get_similarity(df, dfs):
    similarities = dict()

    # Reshape Series to 2D array (required by cosine_similarity)
    s1 = df.values.reshape(1, -1)

    for col in list(dfs.columns):
        # Reshape
        s2= dfs[col].values.reshape(1, -1)

        cosine_sim = cosine_similarity(s1, s2)
        similarities[col]= cosine_sim[0][0]

    sorted_dict = dict(sorted(similarities.items(), key=lambda item: item[1], reverse = True))

    # Get the top 3 (highest similarity)
    top_3 = list(dict(islice(sorted_dict.items(), 3)).keys())

    return top_3

get_similarity(df, dfs_cleaned)


In [24]:
#The cosine similarity ranges from -1 to 1, where:
#1 indicates identical vectors (i.e., vectors point in the same direction).
#0 indicates orthogonality (i.e., vectors are at a 90-degree angle to each other, no similarity).
#-1 indicates opposite directions (i.e., vectors point in exactly opposite directions).
#represents similarity between feature vectors, quantifying similarity between two vectors based on their direction, 
# irrespective of their magnitude.

#embeddings happen in a much smaller space of emotions as oppossed to ordinary, more common embeddings in a large space as more commonly done with openai api (read)

In [None]:
# a method
#embedding dimension is emotions
#talk about options
#get the brands, go through 50 emotins at a time
#cosine: normalize first: l2 norm = 1
#give instructions on readme on where key goes 
#first have everything in pandas df, then think about database
# one module or package w 1 .py 
#adaptors that take in pydantic datatypes and will make into sql
#argparse


In [None]:
# #to check emotion redundancy by looking at example groups 
#[i for i in list(json.loads(emotions).values())[0] if i in ['Joy', 'Happiness', 'Shame', 'Embarrassment', 'Envy', 'Jealousy' , 'Hate', 'disgust', 'hatered', 'Resentment']]

In [None]:
#test0

#Retrieve emotions from datbase or through openAI API
# if os.path.exists('emotions.json'):
#     with open('emotions.json', 'r') as f:
#         emotions_json = json.load(f)
# else:
#     emotions_json = get_emotions(model, api_key)

#test sqlite
# with sqlite3.connect(os.path.abspath('database.db')) as conn:
#     # Write the DataFrame to the database
#     df.to_sql('mytable', conn, if_exists='replace', index=False)
#     #cursor = conn.cursor()
#     #cursor.execute('SELECT SQLITE_VERSION()')
#     #data = cursor.fetchone()
#     #print('SQLite version:', data)

# query = "SELECT * FROM mytable"
# with sqlite3.connect(os.path.abspath('database.db')) as conn:
#     df_test= pd.read_sql_query(query, conn)


In [902]:
#test
# Get human emotions using the Pydantic model for the API response

class EmotionsResponse(BaseModel):
    #None as default if value not provided
    Emotions: List[str] = Field(None, description="List of non-redundant human emotions.") 

def get_emotions(model: str, key: str) -> List[str]:
    """Gets a list of 50 unique and non-redundant human emotions using the specified gpt model."""
    client = OpenAI(api_key=key)

    system_prompt = "Find 50 different, exclusive and unique human emotions. "\
    "For example, pick joy or happiness, pick Shame or Embarrassment, pick Envy or Jealousy, "\
    "pick Hate or disgust or hatered or Resentment. "\

    user_prompt = "Select 50 different and unique human emotions."

    try:
        completion = client.beta.chat.completions.parse(
            model= model,
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt}
            ],
            response_format=EmotionsResponse
        )

        #output in the defined pydantic style
        output = completion.choices[0].message.parsed
        return output.json()
    
    except Exception as e:
        print(f"An error occurred while trying to get emotions: {e}")
        return json.dumps({})


#Get 100 best selling American clothing brands
class BrandResponse(BaseModel):
    name: str = Field(description="Brand name as a string.")
    brand_info: str = Field(description="Brand information as a string.")
class BrandsResponse(BaseModel):
    brands: List[BrandResponse] = Field(description="A list of brand names and information.")

def get_brands(model: str, key: str) -> List[str]:
    """Get 5 best selling American clothing brands using the specified gpt model. Provide a brief information about each brand."""
    client = OpenAI(api_key=key)
    try:
        #Call the API to get the completion
        completion = client.beta.chat.completions.parse(
            model= model,
            messages=[
                {"role": "system", "content": "Find 5 non-redundant best selling American clothing brands."},
                {"role": "user", "content": "Give me 5 best selling American clothing brands and a brief information about each brand."}
            ],
            response_format=BrandsResponse
        )
        #output in the defined pydantic style
        output = completion.choices[0].message.parsed

        return output.json()
    
    except Exception as e:
        print(f"An error occurred while trying to get brands: {e}")
        return json.dumps({})


# Embedding and getting association scores between an input and emotions

def get_scores(thing, emotions, model, key):

    Characteristic = Enum('Characteristic', dict([(emotion, emotion) for emotion in emotions]))

    class EmotionalAssociationScore(BaseModel):
        emotion: Characteristic
        score: float

    class EmotionalAssociationScores(BaseModel):
        associations: List[EmotionalAssociationScore] = Field(description="List of dictionaries with 'emotion' and 'score' as keys for each dictionary")
        explanation: str = Field(description="String explaining the association scores.")
    
    client = OpenAI(api_key=key)

    prompt = f"Assign emotional association scores, with each score reflecting the association strength between "\
    f"{thing} and each of the given {emotions}. Each score should be between 0 "\
    f"and {len(emotions)}. Briefly explain the reason behind the association scores."\
            
    completion = client.beta.chat.completions.parse(
        model = model,
        messages=[
            {"role": "system", "content": "Be a helpful assistant."},
            {"role": "user", "content": prompt}
        ],
        response_format=EmotionalAssociationScores,
    )
    #output in the defined pydantic style
    output = completion.choices[0].message.parsed
    return output.json()

def get_emotions_df(model, key):
    gpt = get_emotions(model, key)
    ls = list(json.loads(gpt).values())[0]
    df = pd.DataFrame(ls, columns = ['emotion'])
    df['emotion_id'] = df.index
    df = df[['emotion_id','emotion']]
    return df

def get_scores_thing(thing, emotions_df, model, key):
    emotions = list(emotions_df['emotion'].values)
    gpt = get_scores(thing, emotions, model, key)
    scoresinfo = json.loads(gpt)['explanation']

    df = pd.DataFrame(json.loads(gpt)['associations'])
    df.rename(columns = {'score': thing}, inplace=True)
    #merge to get emotion ids
    df = pd.merge(df, emotions_df, on ='emotion', how ='inner')
    df.drop('emotion', axis = 1, inplace=True)
    df= df[['emotion_id',f'{thing}']]
    return ({thing: scoresinfo}, df)

def get_scores_things(things, emotions_df, model, key):
    scoreinfos = []
    merged_df = pd.DataFrame()
    for thing in things:
        scoreinfo, new_df = get_scores_thing(thing, emotions_df, model, key)
        scoreinfos.append(scoreinfo)
        if merged_df.empty:
            merged_df = new_df
        else:
            merged_df = pd.merge(merged_df, new_df, on='emotion_id', how='inner')
    return (scoreinfos, merged_df)

def get_brands_scores(emotions_df, model, key):
    gpt = get_brands(model, key)

    brands_df = pd.DataFrame(list(json.loads(gpt).values())[0])
    brands_df.reset_index(inplace= True)
    brands_df.rename({'index':'brand_id'}, axis = 1, inplace = True)
    scoreinfos, scores_brands= get_scores_things(brands_df['name'], emotions_df, model, key)
    scoreinfos_df= pd.DataFrame([(k,v) for data in scoreinfos for k,v in data.items()], columns = ['name', 'scores_info'])
    brands_df = pd.merge(brands_df, scoreinfos_df, how = 'left', on ='name' )
    brands_df['gpt'] = model
    brands_df = brands_df[['brand_id','name','brand_info', 'scores_info','gpt']]

    scores_brands = pd.melt(scores_brands, id_vars='emotion_id', value_vars =list(scores_brands.columns))
    scores_brands.rename(columns = {'variable':'name','value':'score'}, inplace=True)
    scores_brands = pd.merge(scores_brands, brands_df[['brand_id','name']], on ='name', how ='inner')
    scores_brands.drop('name', axis= 1, inplace=True)
    scores_brands = scores_brands[['emotion_id','brand_id','score']]
    
    return (brands_df, scores_brands)

def get_similarity(brands_df, scores_thing, scores_brands, number):
    similarities = dict()

    scores_thing.sort_values(by='emotion_id',inplace=True)
    scores_thing.set_index('emotion_id', inplace=True)
    # Reshape Series to 2D array (required by cosine_similarity)
    s1 = scores_thing.values.reshape(1, -1)

    brand_ids = list(scores_brands['brand_id'].unique())
    for brand_id in brand_ids:
        df_brand = scores_brands.loc[scores_brands['brand_id']==brand_id]
        df_brand = df_brand.sort_values(by='emotion_id')
        df_brand= df_brand.set_index('emotion_id')
        s2= df_brand['score'].values.reshape(1, -1)
        cosine_sim = cosine_similarity(s1, s2)
        similarities[brand_id]= cosine_sim[0][0]
    #replace brand id with name 
    name_id = dict(zip(brands_df['name'], brands_df['brand_id']))
    similarities = {k: similarities[v] for k, v in name_id.items() if v in similarities}
    sorted_s = sorted(similarities.items(), key=lambda item: item[1], reverse = True)
    recommendations = list(dict(sorted_s[:number]).keys())
    return recommendations

In [935]:
'Fear of God'
with sqlite3.connect(os.path.abspath('database.db')) as conn:
    cursor = conn.cursor()
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name ='brands';")
    query = "SELECT * FROM 'brands'" 
    df = pd.read_sql_query(query, conn)


In [937]:
df.loc[df['name']=='Fear of God']['brand_info']

98    Founded in 2013 by Jerry Lorenzo, known for luxury streetwear and casual essentials.
Name: brand_info, dtype: object

In [929]:
#test
#check the databse tables, drop tables and check again: testing check_emotions_exists and check_brands_exists
with sqlite3.connect(os.path.abspath('database.db')) as conn:
    cursor = conn.cursor()
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
    tables = cursor.fetchall()
    print("Tables in the database:", tables)
    for table in tables:
        query = f'SELECT * FROM {table[0]}' 
        table = pd.read_sql_query(query, conn)
        print(table.head())
        print(table.shape)
        
# # Drop all tables and check again the above works
# with sqlite3.connect(os.path.abspath('database.db')) as conn:
#     cursor = conn.cursor()
#     cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
#     tables = cursor.fetchall()
#     print("Tables in the database:", tables)
#     for table in tables:
#         cursor.execute(f"DROP TABLE IF EXISTS {table[0]};")
#         print(f"Table {table[0]} dropped")
#     conn.commit()

#drop one table
# with sqlite3.connect(os.path.abspath('database.db')) as conn:
#     cursor = conn.cursor()
#     cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name in ('emotions');")
#     tables = cursor.fetchall()
#     for table in tables:
#         cursor.execute(f"DROP TABLE IF EXISTS {table[0]};")
#         print(f"Table {table[0]} dropped")
#     conn.commit()

Tables in the database: [('emotions',), ('brands',), ('association_scores',)]
   emotion_id emotion
0           0     Joy
1           1   Shame
2           2    Envy
3           3    Hate
4           4   Pride
(49, 2)
   brand_id            name  \
0         0            Nike   
1         1    Ralph Lauren   
2         2          Levi's   
3         3    Calvin Klein   
4         4  Tommy Hilfiger   

                                          brand_info  \
0  Established in 1964, Nike is a leading sportsw...   
1  Founded in 1967, Ralph Lauren offers a range o...   
2  Originating in 1853, Levi's is famous for its ...   
3  Launched in 1968, Calvin Klein is renowned for...   
4  Since 1985, Tommy Hilfiger has been a key play...   

                                         scores_info                gpt  
0  Nike, as a leading global sports brand, is str...  gpt-4o-2024-08-06  
1  The brand Ralph Lauren is often associated wit...  gpt-4o-2024-08-06  
2  Levi's is a well-known and truste

In [923]:
#test
# def check_emotions_exists(mytable, model, key):
#     with sqlite3.connect(os.path.abspath('database.db')) as conn:
#         cursor = conn.cursor()
#         cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (mytable,))
#         if cursor.fetchone() is not None:
#             print(f'Reading {mytable} from database...')
#             query = f'SELECT * FROM {mytable}' 
#             df = pd.read_sql_query(query, conn)
#         else:
#             print(f"{mytable} dataset doesn't exist so generating one...")
#             df = get_emotions_df(model, key)
#             df.to_sql('emotions', conn, if_exists = 'replace', index=False)
#     return df


def check_data_exists(thing, number, update_brand_list, model, key):
    with sqlite3.connect(os.path.abspath('database.db')) as conn:
        cursor = conn.cursor()
        cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name in ('emotions', 'brands', 'association_scores')")
        tables = cursor.fetchall()

        if len(tables) == 3:
            print('Reading emotions from database...')
            emotions_df = pd.read_sql_query("SELECT * FROM 'emotions'", conn) 

            if update_brand_list == 'no':
                print('Reading brands from database...')
                brands_df = pd.read_sql_query("SELECT * FROM 'brands'" , conn)
                scores_brands = pd.read_sql_query("SELECT * FROM 'association_scores'" , conn)
            else:
                print('Generating brands data...')
                brands_df, scores_brands = get_brands_scores(emotions_df.iloc[:3], model, key)
                brands_df.to_sql('brands', conn, if_exists = 'replace', index=False)
                scores_brands.to_sql('association_scores', conn, if_exists = 'replace', index=False)
        else:
            print("The data that I need is not in the 'database.db' so need to get it fresh...")
            emotions_df = get_emotions_df(model, key)
            brands_df, scores_brands = get_brands_scores(emotions_df.iloc[:3], model, key)
            emotions_df.to_sql('emotions', conn, if_exists = 'replace', index=False)
            brands_df.to_sql('brands', conn, if_exists = 'replace', index=False)
            scores_brands.to_sql('association_scores', conn, if_exists = 'replace', index=False)
        
        _,scores_thing = get_scores_thing(thing, emotions_df.iloc[:3], model, key)
        result = get_similarity(brands_df, scores_thing, scores_brands, number)
    
    print(f'Given your favorite book, {thing}, here is the brand(s) I recommend: {result}')

    return result

# Example usage
mytable = 'emotions'
update_brand_list = 'yes'
thing = 'summer'
number = 2


In [921]:
check_data_exists(thing, number, update_brand_list, model, key)

Reading emotions from database...
Generating brands data...
Given your favorite book, summer, here is the brand(s) I recommend: ['Nike', 'Under Armour']


['Nike', 'Under Armour']

In [None]:
def check_data_exists(model, api_key, db_name, update_brand_list):
    with sqlite3.connect(os.path.abspath(db_name)) as conn:
        cursor = conn.cursor()
        cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name in ('emotions', 'brands', 'association_scores')")
        tables = cursor.fetchall()
        if len(tables) == 3:
            query = "SELECT * FROM 'emotions'"
            emotions_df = pd.read_sql_query(query, conn) 

            if update_brand_list == 'no':
                print('Reading from database...')
                query = "SELECT * FROM 'association_scores'" 
                scores_df = pd.read_sql_query(query, conn)
            else:
                print('Generating brands data...')
                brands, scores_df = get_brands_scores(model, api_key, emotions_df['emotion'][:3])
                brands.to_sql('brands', conn, if_exists = 'replace', index=False)
                scores_df.to_sql('association_scores', conn, if_exists = 'replace', index=False)

        else:
            print("Brands data doesn't exist so generating...")
            emotions_df = get_emotions_df(model, api_key)
            emotions_df.to_sql('emotions', conn, if_exists = 'replace', index=False)

            brands, scores_df = get_brands_scores(model, api_key, emotions_df['emotion'][:3])
            brands.to_sql('brands', conn, if_exists = 'replace', index=False)
            scores_df.to_sql('association_scores', conn, if_exists = 'replace', index=False)

    return (emotions_df, scores_df)




# Example usage
model = "gpt-4o-2024-08-06"
api_key=os.environ.get('OPENAI_API_KEY')
db_name = 'database.db'
update_brand_list = 'no'
number = 3
thing ='summer'



In [486]:
with sqlite3.connect(os.path.abspath(db_name)) as conn:
    cursor = conn.cursor()
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
    tables = cursor.fetchall()
    table_names = [table[0] for table in tables]
    print("Tables in the database:", table_names)

Tables in the database: ['emotions', 'brands', 'association_scores']


In [None]:
df = get_df(thing, model, emotions, api_key)
df_cleaned = df.dropna(axis=1)

dfs = get_dfs(things, model, emotions, api_key)
dfs_cleaned = dfs.dropna(axis=1)


result = get_similarity(df_cleaned, dfs_cleaned, number)
result

In [454]:
with sqlite3.connect(os.path.abspath(db_name)) as conn:
    cursor = conn.cursor()
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name ='brands';")
    query = "SELECT * FROM 'brands'" 
    df = pd.read_sql_query(query, conn)
df

Unnamed: 0,id,brand,info,scores_info,gpt
0,0,Nike,"Founded in 1964, Nike is a multinational corpo...","Nike, as a popular and successful brand, evoke...",gpt-4o-2024-08-06
1,1,Levi's,"Founded in 1853, Levi's is renowned for its de...",Levi's is a well-established brand known for i...,gpt-4o-2024-08-06
2,2,Ralph Lauren,"Established in 1967, Ralph Lauren is a luxury ...","Ralph Lauren, as a renowned fashion brand, is ...",gpt-4o-2024-08-06
3,3,Under Armour,"Founded in 1996, Under Armour is a leading bra...",Under Armour is a well-regarded athletic wear ...,gpt-4o-2024-08-06
4,4,Calvin Klein,"Launched in 1968, Calvin Klein is an iconic fa...","Calvin Klein, being a renowned fashion brand, ...",gpt-4o-2024-08-06
