In [None]:
import os
import pandas as pd
import tensorflow as tf
from urllib.parse import urlparse
from pathlib import Path

# Define paths
csv_file_path = 'A2_Data.csv' 
destination_folder = 'downloaded_images'

# Ensure destination folder exists
Path(destination_folder).mkdir(parents=True, exist_ok=True)

def fetch_and_store_image(image_url, output_path):
    try:
        # Derive a filename by parsing the image URL
        filename = os.path.basename(urlparse(image_url).path)
        # Define the full path for saving the image
        full_path = os.path.join(output_path, filename)
        # Download and save the image using TensorFlow's utility
        image_path = tf.keras.utils.get_file(fname=full_path, origin=image_url, cache_dir=destination_folder)
        print(f"Image saved at: {image_path}")
    except Exception as error:
        print(f"Failed to download {image_url} due to {error}")


def process_csv_and_download_images(csv_path, output_folder):
    # Load the dataset
    dataset = pd.read_csv(csv_path)

    # Iterate through the dataset rows
    for _, row in dataset.iterrows():
        product_id = row['number']
        image_urls = eval(row['Image'])  # Converts string representation of list to list

        # Process each image URL
        for url in image_urls:
            # Construct a unique output path for each image
            output_path = os.path.join(output_folder, str(product_id))
            Path(output_path).mkdir(parents=True, exist_ok=True)
            # Download and save the image
            fetch_and_store_image(url, output_path)

process_csv_and_download_images(csv_file_path, destination_folder)


In [None]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.layers import RandomFlip, RandomRotation, RandomContrast
from pathlib import Path
import pickle

# Define the model for feature extraction
def get_feature_extraction_model():
    base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
    base_model.trainable = False  # Freeze the base model
    model = tf.keras.Sequential([
        tf.keras.layers.InputLayer(input_shape=(224, 224, 3)),
        RandomFlip("horizontal"),  # Random horizontal flip
        RandomRotation(0.1),       # Random rotation
        RandomContrast(0.1),       # Random contrast
        base_model,
        tf.keras.layers.GlobalAveragePooling2D()  # Add pooling layer to flatten the output
    ])
    return model

# Function to preprocess and augment an image
def preprocess_and_augment_image(img_path):
    img = load_img(img_path, target_size=(224, 224))  # Resize images
    img_array = img_to_array(img)
    img_array = preprocess_input(img_array)  # Apply ResNet50 preprocessing which includes normalization
    img_array = np.expand_dims(img_array, axis=0)  # Add batch dimension
    return img_array

# Process all images in a folder and save their extracted features
def process_images_and_save_features(input_dir, output_dir, model):
    Path(output_dir).mkdir(parents=True, exist_ok=True)
    
    for img_file in os.listdir(input_dir):
        if img_file.lower().endswith(('.png', '.jpg', '.jpeg')):
            img_path = os.path.join(input_dir, img_file)
            img_array = preprocess_and_augment_image(img_path)
            features = model.predict(img_array)
            
            # Save the features
            feature_file = os.path.splitext(img_file)[0] + '.pkl'
            with open(os.path.join(output_dir, feature_file), 'wb') as f:
                pickle.dump(features, f)
                
            print(f"Features extracted and saved for {img_file}")

# Main execution
if __name__ == "__main__":
    # Specify directories
    input_dir = 'images' 
    output_dir = 'image_features' 
    
    # Initialize the feature extraction model
    model = get_feature_extraction_model()
    
    # Process images and save features
    process_images_and_save_features(input_dir, output_dir, model)


In [4]:
import spacy
import pandas as pd
import numpy as np
import os
from collections import defaultdict
from math import log
import pickle

# Initialize spaCy for English language
nlp = spacy.load("en_core_web_sm")

# Function to preprocess text data
def preprocess_text(text):
    # Check if text is not a string (e.g., NaN or float)
    if not isinstance(text, str):
        return []  # Return an empty list of tokens for non-string inputs
    
    doc = nlp(text.lower())
    tokens = [token.lemma_ for token in doc if not token.is_punct and not token.is_stop]
    return tokens


# Function to calculate TF (Term Frequency)
def calculate_tf(tokens):
    tf = defaultdict(int)
    for token in tokens:
        tf[token] += 1
    return tf

# Function to calculate IDF (Inverse Document Frequency)
def calculate_idf(docs):
    idf = defaultdict(lambda: 0)
    total_docs = len(docs)
    for doc in docs:
        for token in set(doc):
            idf[token] += 1
    for token, val in idf.items():
        idf[token] = log(total_docs / float(val))
    return idf

# Function to calculate TF-IDF
def calculate_tfidf(docs):
    tfidf_scores = []
    idf = calculate_idf(docs)
    for doc in docs:
        tf = calculate_tf(doc)
        tfidf = {token: tf_val * idf[token] for token, tf_val in tf.items()}
        tfidf_scores.append(tfidf)
    return tfidf_scores

# Function to process all reviews and save their TF-IDF scores
def process_reviews_and_save_tfidf(csv_file_path, output_dir):
    # Load dataset
    df = pd.read_csv(csv_file_path)
    
    # Preprocess text reviews
    preprocessed_reviews = [preprocess_text(text) for text in df['Review Text']]
    
    # Calculate TF-IDF scores
    tfidf_scores = calculate_tfidf(preprocessed_reviews)
    
    # Ensure output directory exists
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    # Save TF-IDF scores with corresponding product IDs as filenames
    for index, scores in enumerate(tfidf_scores):
        product_id = df.iloc[index]['number']
        output_file_path = os.path.join(output_dir, f"{product_id}_text_tfidf.pkl")
        with open(output_file_path, 'wb') as f:
            pickle.dump(scores, f)
            
    print("All reviews processed and TF-IDF scores saved.")

csv_file_path = 'A2_Data.csv'  
output_dir = 'text_features' 

# Process reviews and save their TF-IDF scores
process_reviews_and_save_tfidf(csv_file_path, output_dir)


All reviews processed and TF-IDF scores saved.


In [18]:
import os
import shutil

# Path to the folder containing the image features
image_folder_path = 'image_features'
text_folder_path = 'text_features'
text_folder2_path = 'text_features2'

# Initialize an empty set to store unique IDs
unique_ids = set()

# Iterate over the files in the image features folder
for filename in os.listdir(image_folder_path):
    if os.path.isfile(os.path.join(image_folder_path, filename)):
        # Split the filename by '_' and get the first part as the ID
        file_id = filename.split('_')[0]
        unique_ids.add(file_id)

# Create a new folder to store selected text features
os.makedirs(text_folder2_path, exist_ok=True)

# Iterate over the files in the text features folder
for filename in os.listdir(text_folder_path):
    if os.path.isfile(os.path.join(text_folder_path, filename)):
        # Split the filename by '_' and get the first part as the ID
        file_id = filename.split('_')[0]
        # Check if the ID is in the final IDs set
        if file_id in unique_ids:
            # Copy the file to the new folder
            shutil.copy(os.path.join(text_folder_path, filename), os.path.join(text_folder2_path, filename))


In [21]:
len(unique_ids)

994

In [26]:
import csv
final_ids=unique_ids
# Define the filename for the initial CSV file
initial_csv_filename = 'A2_data.csv'

# Define the filename for the final CSV file
final_csv_filename = 'A2_data_filtered.csv'

# Open the initial CSV file for reading and the final CSV file for writing
with open(initial_csv_filename, 'r', newline='') as initial_file, \
        open(final_csv_filename, 'w', newline='') as final_file:
    
    # Create CSV reader and writer objects
    csv_reader = csv.DictReader(initial_file)
    csv_writer = csv.DictWriter(final_file, fieldnames=csv_reader.fieldnames)
    
    # Write the header row to the final CSV file
    csv_writer.writeheader()
    
    # Iterate over each row in the initial CSV file
    for row in csv_reader:
        # Check if the number in the row is present in the final_ids set
        if row['number'] in final_ids:
            # Write the row to the final CSV file
            csv_writer.writerow(row)


In [None]:
import os
import pandas as pd
import numpy as np
import pickle
from pathlib import Path

# Function to calculate cosine similarity for arrays
def cosine_similarity_vec(v1, v2):
    dot_product = np.dot(v1, v2)
    norm_a = np.linalg.norm(v1)
    norm_b = np.linalg.norm(v2)
    return dot_product / (norm_a * norm_b) if norm_a and norm_b else 0

# Function to calculate cosine similarity for dictionaries (text features)
def cosine_similarity_dict(d1, d2):
    words = set(d1.keys()).intersection(set(d2.keys()))
    dot_product = sum(d1[word] * d2[word] for word in words)
    norm_d1 = np.sqrt(sum(v * v for v in d1.values()))
    norm_d2 = np.sqrt(sum(v * v for v in d2.values()))
    return dot_product / (norm_d1 * norm_d2) if norm_d1 and norm_d2 else 0

# Load features from a pickle file
def load_features_from_pickle(file_path):
    with open(file_path, 'rb') as file:
        return pickle.load(file)

def process_and_save_metrics(csv_file_path, image_features_dir, text_features_dir, output_dir):
    df = pd.read_csv(csv_file_path)
    Path(output_dir).mkdir(exist_ok=True)
    
    for index, row in df.iterrows():
        product_id = row['number']
        image_urls = eval(row['Image'])

        # Load text features for the product ID
        text_feature_path = Path(text_features_dir) / f"{product_id}_text_tfidf.pkl"
        text_features = pickle.load(open(text_feature_path, 'rb'))

        # Load all other text and image features
        all_text_features = {file.stem.split('_')[0]: pickle.load(open(file, 'rb'))
                             for file in Path(text_features_dir).glob('*_text_tfidf.pkl') if file.stem != f"{product_id}_text_tfidf"}
        all_image_features = {file.stem.split('_')[0]: np.array(pickle.load(open(file, 'rb'))[0])
                              for file in Path(image_features_dir).glob('*.pkl') if not file.stem.startswith(f"{product_id}_")}

        # Process each image URL for the product ID
        for img_idx, img_url in enumerate(image_urls):
            image_feature_path = Path(image_features_dir) / f"{product_id}_{img_idx}.pkl"
            image_features = np.array(pickle.load(open(image_feature_path, 'rb'))[0])  # Assuming the feature is the first element

            # Find similar IDs based on text and image features
            similar_by_text = [(other_id, cosine_similarity_dict(text_features, all_text_features[other_id]))
                               for other_id in all_text_features]
            similar_by_image = [(other_id, cosine_similarity_vec(image_features, all_image_features[other_id]))
                                for other_id in all_image_features]

            # Sort and get top 3 similar IDs
            similar_by_text.sort(key=lambda x: x[1], reverse=True)
            similar_by_image.sort(key=lambda x: x[1], reverse=True)
            top_similar_by_text = similar_by_text[:3]
            top_similar_by_image = similar_by_image[:3]

            # Save cosine similarity scores for both text and image
            metrics = {
                'text_based': [(other_id, 
                                cosine_similarity_vec(image_features, all_image_features[other_id]),
                                text_similarity
                                )
                               for other_id, text_similarity in top_similar_by_text],
                'image_based': [(other_id, 
                                 image_similarity, 
                                 cosine_similarity_dict(text_features, all_text_features[other_id]))
                                for other_id, image_similarity in top_similar_by_image]
            }

            # Save to pickle file
            output_filename = f"metrics_{product_id}_{img_idx}.pkl"
            output_filepath = Path(output_dir) / output_filename
            with open(output_filepath, 'wb') as f:
                pickle.dump(metrics, f)
            print(f"Metrics saved for product ID {product_id} image index {img_idx} to {output_filename}")


# Run the main process
if __name__ == "__main__":
    csv_file_path = 'A2_Data_filtered.csv'  
    image_features_dir = 'image_features' 
    text_features_dir = 'text_features2'  
    output_dir = 'output_metrics'  

    process_and_save_metrics(csv_file_path, image_features_dir, text_features_dir, output_dir)


In [1]:
import os
import pandas as pd
import pickle
from pathlib import Path
import numpy as np

def match_input_with_dataset(csv_file_path, input_image_url, input_review_text):
    df = pd.read_csv(csv_file_path)
    for _, row in df.iterrows():
        if input_review_text == row['Review Text'] and input_image_url in eval(row['Image']):
            return row['number'], eval(row['Image']).index(input_image_url)
    raise ValueError("No matching entry found in the dataset.")

def load_specific_metrics(output_metrics_dir, product_id, image_index):
    metrics_path = Path(output_metrics_dir) / f"metrics_{product_id}_{image_index}.pkl"
    with open(metrics_path, 'rb') as f:
        metrics = pickle.load(f)
    return metrics

def get_urls_and_reviews(csv_file_path, id):
    df = pd.read_csv(csv_file_path)
    row = df.loc[df['number'] == int(id)].iloc[0]
    return eval(row['Image']), row['Review Text']

def calculate_composite_score(image_cosine, text_cosine):
    return (image_cosine + text_cosine) / 2

def display_formatted_results(metrics, type, csv_file_path):
    print(f"—--------------------------------------------------------------------------------------------\nUSING {type.upper()} RETRIEVAL")
    for idx, (id, image_cosine, text_cosine) in enumerate(metrics, start=1):
        composite_score = calculate_composite_score(image_cosine, text_cosine)
        urls, review = get_urls_and_reviews(csv_file_path, id)
        print(f"{idx}) Image URL: {urls}\nReview: {review}\nCosine similarity of images: {image_cosine:.4f}\nCosine similarity of text: {text_cosine:.4f}\nComposite similarity score: {composite_score:.4f}")

def get_combined_top_pairs(image_based_metrics, text_based_metrics):
    # Combine metrics from both retrieval types and calculate composite score for each
    combined_metrics = [(id, image_cosine, text_cosine, calculate_composite_score(image_cosine, text_cosine)) for id, image_cosine, text_cosine in image_based_metrics + text_based_metrics]
    # Sort by composite score
    sorted_by_composite_score = sorted(combined_metrics, key=lambda x: x[3], reverse=True)
    # Return top 3 pairs with all details
    return sorted_by_composite_score[:3]

def main(input_image_url, input_review_text, csv_file_path, output_metrics_dir):
    product_id, image_index = match_input_with_dataset(csv_file_path, input_image_url, input_review_text)
    metrics = load_specific_metrics(output_metrics_dir, product_id, image_index)

    # Adjust to compute and display results correctly with composite scores
    image_based_with_composite = [(id, img_cos, txt_cos) for id, img_cos, txt_cos in metrics['image_based']]
    text_based_with_composite = [(id, img_cos, txt_cos) for id, img_cos, txt_cos in metrics['text_based']]

    display_formatted_results(image_based_with_composite, 'image', csv_file_path)
    display_formatted_results(text_based_with_composite, 'text', csv_file_path)

    # Calculate top 3 combined pairs across both systems
    top_combined_pairs = get_combined_top_pairs(image_based_with_composite, text_based_with_composite)

    # Display top 3 combined pairs
    print("\n—--------------------------------------------------------------------------------------------")
    print("TOP 3 COMBINED PAIRS BASED ON COMPOSITE SIMILARITY SCORE")
    for idx, (id, image_cosine, text_cosine, composite_score) in enumerate(top_combined_pairs, start=1):
        urls, review = get_urls_and_reviews(csv_file_path, id)
        print(f"{idx}) Image URL: {urls}\nReview: {review}\nCosine similarity of images: {image_cosine:.4f}\nCosine similarity of text: {text_cosine:.4f}\nComposite similarity score: {composite_score:.4f}")

if __name__ == "__main__":
    input_image_url = "https://images-na.ssl-images-amazon.com/images/I/71bztfqdg+L._SY88.jpg"
    input_review_text = "I have been using Fender locking tuners for about five years on various strats and teles. Definitely helps with tuning stability and way faster to restring if there is a break."
    csv_file_path = "A2_Data_filtered.csv"
    output_metrics_dir = "output_metrics"
    main(input_image_url, input_review_text, csv_file_path, output_metrics_dir)


—--------------------------------------------------------------------------------------------
USING IMAGE RETRIEVAL
1) Image URL: ['https://images-na.ssl-images-amazon.com/images/I/719-SDMiOoL._SY88.jpg']
Review: These locking tuners look great and keep tune.  Good quality materials and construction.  Excellent upgrade to any guitar.  I had to drill additions holes for installation.  If your neck already comes with pre-drilled holes, then they should drop right in, otherwise you will need to buy a guitar tuner pin drill jig, also available from Amazon.
Cosine similarity of images: 0.7491
Cosine similarity of text: 0.1401
Composite similarity score: 0.4446
2) Image URL: ['https://images-na.ssl-images-amazon.com/images/I/711kGbkdzEL._SY88.jpg']
Review: Had to drill into my headstock. Needs 2 holes per tree because of the mounting peg. Use a ruler and a 1/16 drillbit and you'll be fine. I recommend installing with the strings on so you can set them properly.
Cosine similarity of images: 0