# Setup and imports

In [11]:
import warnings
warnings.filterwarnings('ignore')

import os
import pandas as pd
from PIL import Image, UnidentifiedImageError
from tqdm import tqdm
import torch
import pytesseract
import cv2
from torchvision import models, transforms
# from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np
import pickle

from src.utils import download_images
from src.constants import entity_unit_map, allowed_units

vectorizer = TfidfVectorizer()

# Helper Functions 

## Feature Extraction 

In [12]:
resnet = models.resnet50(pretrained=True)
resnet = torch.nn.Sequential(*list(resnet.children())[:-1])
resnet.eval()

def preprocess_for_resnet(image_path):
    try:
        img = Image.open(image_path).convert('RGB')
        transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
        img_tensor = transform(img).unsqueeze(0) 
        return img_tensor
    except (OSError, UnidentifiedImageError) as e:
        return None

def extract_features(image_path):
    img_tensor = preprocess_for_resnet(image_path)
    if img_tensor is None:
        return None  
    with torch.no_grad():
        features = resnet(img_tensor)
    return features.squeeze().numpy()

## Text Extraction

In [13]:
def extract_text_with_ocr(image_path):
    try:
        image = cv2.imread(image_path)
        gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        extracted_text = pytesseract.image_to_string(gray_image)
        return extracted_text.strip()
    except Exception as e:
        print(f"Error extracting text from {image_path}: {e}")
        return ""

## Combining Features and Text

In [14]:
def combine_features_and_text(features, text):
    text_features = vectorizer.transform([text]).toarray()
    combined_features = np.concatenate((features, text_features.flatten()))

    return combined_features

## Model Training 

In [15]:
X = []  
y = []  

def train_model(npz_file_path):
    data = np.load(npz_file_path)
    X = data['X_train']  
    y = data['y_train']

    X = np.array(X)
    y = np.array(y)

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

    model = RandomForestClassifier(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)

    accuracy = model.score(X_test, y_test)
    f1 = f1_score(y_test, y_pred, average='weighted')

    print(f"Model accuracy: {accuracy}")
    print(f"Model F1 score: {f1}")
    
    return model

## Check Image Downloads 

In [16]:
def check_images_already_downloaded(image_links, image_folder):
    print("Checking if images are already downloaded...")
    missing_images = []
    for image_link in tqdm(image_links, desc="Checking images"):
        image_filename = os.path.basename(image_link)
        image_path = os.path.join(image_folder, image_filename)
        if not os.path.exists(image_path):
            missing_images.append(image_link)
    print(f"Check completed. {len(missing_images)} missing images found.")
    return missing_images

## Prediction Function 

In [17]:
# model_filename = 'model.pkl'
# with open(model_filename, 'rb') as file:
#     model = pickle.load(file)

def predictor(image_link, category_id, entity_name, model, image_folder):
    image_filename = os.path.basename(image_link)
    image_path = os.path.join(image_folder, image_filename)
    
    print(f"Processing image link: {image_link}")
    print(f"Image will be saved at: {image_path}")

    if not os.path.exists(image_path):
        print(f"Image not found locally. Downloading image: {image_filename}")
        download_images([image_link], image_folder, allow_multiprocessing=False)
        print(f"Image downloaded: {image_filename}")
    
    print(f"Extracting features from image: {image_filename}")
    features = extract_features(image_path)

    if features is None:
        print(f"Skipping prediction due to incomplete or corrupt image: {image_filename}")
        return None

    print(f"Making prediction for entity: {entity_name}")
    prediction = model.predict([features])[0]

    if entity_name in entity_unit_map:
        entity_units = entity_unit_map[entity_name]
        if prediction not in entity_units:
            print(f"Prediction {prediction} is not in the allowed units for entity {entity_name}.")
            return None  
    else:
        print(f"Entity name {entity_name} not found in the entity-unit map.")

    print(f"Prediction completed: {prediction}")
    return prediction

# Main Processing Code

## Training Data Loading

In [86]:
DATASET_FOLDER = './dataset/'
IMAGE_DOWNLOAD_FOLDER_TRAIN = './downloaded_images/'
IMAGE_DOWNLOAD_FOLDER_TEST = './downloaded_images_test/'

print(f"Loading training dataset from folder: {DATASET_FOLDER}")
train_data = pd.read_csv(os.path.join(DATASET_FOLDER, 'train.csv'))
print("Training dataset loaded.")

print("Fitting the vectorizer on training text data...")
text_data = train_data['entity_value'].fillna('') 
vectorizer.fit(text_data)
print("Vectorizer fitted.")

missing_train_images = check_images_already_downloaded(train_data['image_link'].tolist(), IMAGE_DOWNLOAD_FOLDER_TRAIN)

if missing_train_images:
    print(f"Found {len(missing_train_images)} images to download for training.")
    download_images(missing_train_images, IMAGE_DOWNLOAD_FOLDER_TRAIN)
    print("Image download process for training completed.")
else:
    print("All training images are already downloaded.")

Loading training dataset from folder: ./dataset/
Training dataset loaded.
Fitting the vectorizer on training text data...
Vectorizer fitted.
Checking if images are already downloaded...


Checking images: 100%|██████████| 263859/263859 [00:28<00:00, 9290.73it/s] 

Check completed. 0 missing images found.
All training images are already downloaded.





## Feature Extraction

In [None]:
def extract_text_with_ocr(image_path):
    try:
        image = cv2.imread(image_path)
        gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        extracted_text = pytesseract.image_to_string(gray_image)
        return extracted_text.strip()
    except Exception as e:
        print(f"Error extracting text from {image_path}: {e}")
        return ""

vectorizer = TfidfVectorizer()

def combine_features_and_text(features, text):
    text_features = vectorizer.transform([text]).toarray()
    combined_features = np.concatenate((features, text_features.flatten()))
    return combined_features

print("Starting feature extraction for training images...")

num_images_to_process = 20
limited_train_data = train_data.head(num_images_to_process)

X_train = []
y_train = []

skipped_images = []
successful_images = []

with tqdm(total=limited_train_data.shape[0], desc='Extracting Features', unit='image', ncols=100) as pbar:
    for index, row in limited_train_data.iterrows():
        image_link = row['image_link']
        entity_value = row['entity_value']

        image_filename = os.path.basename(image_link)
        image_path = os.path.join(IMAGE_DOWNLOAD_FOLDER_TRAIN, image_filename)

        features = extract_features(image_path)

        if features is not None:
            extracted_text = extract_text_with_ocr(image_path)
            combined_input = combine_features_and_text(features, extracted_text)

            X_train.append(combined_input)
            y_train.append(entity_value)
            successful_images.append(image_filename)
        else:
            skipped_images.append(image_filename)

        pbar.update(1)

print("Feature extraction for training completed.")
print(f"Total images processed: {len(limited_train_data)}")
print(f"Successfully extracted features from {len(successful_images)} images.")
print(f"Skipped {len(skipped_images)} images.")

if skipped_images:
    print("Skipped images:")
    for img in skipped_images:
        print(f" - {img}")

# if X_train:
#     sample_features = np.array(X_train)
#     print("Sample of extracted features:")
#     print(sample_features[:5])  # Print the first 5 feature vectors
#     print("Feature vector shape:", sample_features[0].shape)
# else:
#     print("No features were extracted.")

## Model Training

In [20]:
print("Training model on extracted features...")
model = train_model("combined_input.npz")
print("Model training completed.")

Training model on extracted features...
Model accuracy: 0.105
Model F1 score: 0.08382142857142856
Model training completed.


In [None]:
model_filename = 'model.pkl'
with open(model_filename, 'wb') as file:
    pickle.dump(model, file)

print(f"Model saved to {model_filename}")

## Model Testing

In [None]:
print(f"Loading test dataset from folder: {DATASET_FOLDER}")
test_data = pd.read_csv(os.path.join(DATASET_FOLDER, 'sample_test.csv'))
print("Test dataset loaded.")

missing_test_images = check_images_already_downloaded(test_data['image_link'].tolist(), IMAGE_DOWNLOAD_FOLDER_TEST)

if missing_test_images:
    print(f"Found {len(missing_test_images)} images to download for testing.")
    download_images(missing_test_images, IMAGE_DOWNLOAD_FOLDER_TEST)  
    print("Image download process for testing completed.")
else:
    print("All test images are already downloaded.")

print("Starting predictions for test data...")
test_predictions = test_data.copy()
test_predictions['prediction'] = test_predictions.apply(
    lambda row: predictor(row['image_link'], row['group_id'], row['entity_name'], model, IMAGE_DOWNLOAD_FOLDER_TEST), axis=1
)

test_predictions = test_predictions[test_predictions['prediction'].notnull()]
print("Predictions for test data completed.")

y_true = test_predictions['entity_name'] 
y_pred = test_predictions['prediction']

accuracy = accuracy_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred, average='weighted')

print(f"Model accuracy: {accuracy}")
print(f"Model F1 score: {f1}")

output_filename = os.path.join(DATASET_FOLDER, 'test_out.csv')
print(f"Saving predictions to {output_filename}...")
test_predictions[['index', 'prediction']].to_csv(output_filename, index=False)
print(f"Predictions saved to {output_filename}.")