In [None]:
import pandas as pd
from pathlib import Path

from src.data_processing import download_images
from src.feature_extraction import extract_dimensions, entity_unit_map, allowed_units, unit_abbreviation_map, irregular_plurals

In [None]:
test_data = pd.read_csv('../dataset/test.csv')
test_data = test_data.sort_values('group_id')
print(f"Loaded data with {len(test_data)} rows.")

In [None]:
import os

image_links = test_data['image_link'].tolist()
download_folder = '../images'
os.makedirs(download_folder, exist_ok=True)  # Creates directory if it doesn't exist
download_images(image_links, download_folder)

In [None]:
# Cell 4: Extract text from images
def extract_text_from_image(image_path):
    """
    Extracts text from an image file using Tesseract OCR.
    Handles potential OSError if image is truncated.
    """
    try:
        image = Image.open(image_path)
        text = pytesseract.image_to_string(image)
        return text.replace('\\n', '').strip()  
    except OSError as e:
        print(f"Error opening image {image_path}: {e}. Image might be truncated or corrupted.")
        return None
    except Exception as e:
        print(f"Error extracting text from {image_path}: {e}")
        return None
    
    
print("Extracting text from images...")
test_data['extracted_text'] = test_data['image_link'].apply(
    lambda link: extract_text_from_image(f'{download_folder}/{Path(link).name}')
)
print("Text extraction complete.")
print(test_data[['image_link', 'extracted_text']].head())

In [None]:
# Cell 5: Extract dimensions
dimensions = []
group_id = ''
# Process extracted text for each row

for index, row in test_data.iterrows():

  image_path = f'../images/{Path(row["image_link"]).name}'
  extracted_text = extract_text_from_image(image_path)
  if index == 1 :
    group_id = row['group_id']
  if row['group_id'] == group_id:
    d = extract_dimensions(extracted_text)
    for i in d:
      if i not in dimensions:
        dimensions.append(i)
  else:
    dimensions = extract_dimensions(extracted_text)
    group_id = row['group_id']

  a_units = entity_unit_map[row['entity_name']]

  # Check if dimensions is a list and has items before assigning
  if isinstance(dimensions, list) and len(dimensions) > 0:
    # Join the dimensions into a single string
    diml = []
    for i in dimensions:
      if i[1] in a_units:
        diml.append(list(i))
    # Check if diml is empty before assigning
    if len(diml) > 0:
      # Convert list of lists to a string representation
      test_data.loc[index, 'prediction'] = ', '.join([f'{number} {unit}' for number, unit in diml])w
    else:
      test_data.loc[index, 'prediction'] = None # Assign None for empty diml
  else:
    # Handle the case of empty dimensions, possibly assigning a default value
    test_data.loc[index, 'prediction'] = None # Assign None for empty dimensions

In [None]:
# Keep only required columns
output_data = test_data

# Save to CSV
output_data.to_csv('../dataset/test_out.csv', index=False)

# Drop rows with NaN values in the 'prediction' column
output_data = output_data.dropna(subset=['prediction'])

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error, f1_score
import joblib
import re

# Load the data
df = pd.read_csv('../dataset/modified_train.csv').dropna(subset=['prediction', 'extracted_text'])

# Function to extract numerical value from string
def extract_number(value):
    if pd.isna(value):
        return np.nan
    match = re.search(r'(\d+(\.\d+)?)', str(value))
    return float(match.group(1)) if match else np.nan

# Preprocess the data
le_group = LabelEncoder()
le_entity = LabelEncoder()

df['group_id'] = le_group.fit_transform(df['group_id'])
df['entity_name'] = le_entity.fit_transform(df['entity_name'])

# Extract numerical values from prediction column
df['prediction_numeric'] = df['prediction'].apply(extract_number)


# Drop rows with NaN values
df = df.dropna(subset=['prediction_numeric'])

# Create TF-IDF vectorizer
tfidf = TfidfVectorizer(max_features=1000, stop_words='english')
tfidf_matrix = tfidf.fit_transform(df['extracted_text'])

# Combine features
X = np.hstack((df[['group_id', 'entity_name']].values, tfidf_matrix.toarray()))
y = df['prediction_numeric']

# Split the data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train the model
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

# Evaluate the model
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
rmse = np.sqrt(mse)

# Calculate relative accuracy and normalized RMSE
relative_accuracy = 1 - (mae / np.mean(np.abs(y_test)))
nrmse = rmse / (np.max(y_test) - np.min(y_test))

# Calculate F1 score (adapted for regression)
# We'll use the median as a threshold to convert to binary classes
threshold = np.median(y_test)
y_test_binary = (y_test > threshold).astype(int)
y_pred_binary = (y_pred > threshold).astype(int)
f1 = f1_score(y_test_binary, y_pred_binary)

accuracy = np.mean(y_test_binary == y_pred_binary)

print(f"Mean Squared Error: {mse}")
print(f"Root Mean Squared Error: {rmse}")
print(f"Mean Absolute Error: {mae}")
print(f"R-squared Score: {r2}")
print(f"Actual Accuracy: {accuracy:.2%}")
print(f"Normalized RMSE: {nrmse:.2%}")
print(f"F1 Score: {f1:.4f}")

# Save the model and preprocessing objects
joblib.dump(model, 'product_attribute_model.joblib')
joblib.dump(le_group, 'label_encoder_group.joblib')
joblib.dump(le_entity, 'label_encoder_entity.joblib')
joblib.dump(tfidf, 'tfidf_vectorizer.joblib')

# Function to make predictions on new data
def predict_attribute(group_id, entity_name, extracted_text):
    # Load the saved model and preprocessing objects
    loaded_model = joblib.load('product_attribute_model.joblib')
    loaded_le_group = joblib.load('label_encoder_group.joblib')
    loaded_le_entity = joblib.load('label_encoder_entity.joblib')
    loaded_tfidf = joblib.load('tfidf_vectorizer.joblib')

    # Preprocess the input
    group_id_encoded = loaded_le_group.transform([group_id])
    entity_name_encoded = loaded_le_entity.transform([entity_name])
    text_tfidf = loaded_tfidf.transform([extracted_text])

    # Combine features
    X_new = np.hstack((np.array([[group_id_encoded[0], entity_name_encoded[0]]]), text_tfidf.toarray()))

    # Make prediction
    prediction = loaded_model.predict(X_new)

    return prediction[0]



In [None]:
# Display a final preview
print("\nFinal data preview:")
print(test_data.head())