In [None]:
!pip install optuna

In [None]:
! pip install transformers

In [None]:
from transformers import pipeline
classifier = pipeline('sentiment-analysis')

In [None]:
pip install pandas numpy torch torchvision transformers scikit-learn opencv-python pytesseract tqdm optuna joblib


In [None]:
import os
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertModel
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import cv2
import pytesseract
from tqdm import tqdm
import re
from torchvision import transforms
import optuna

# Mapping of entities to measurement units
unit_mapping = {
    'width': {'centimetre', 'inch', 'cm', 'mm', 'm'},
    'height': {'centimetre', 'inch', 'cm', 'mm', 'm'},
    'depth': {'centimetre', 'inch', 'cm', 'mm', 'm'},
    'item_weight': {'gram', 'kilogram', 'ounce', 'pound', 'g', 'kg', 'lb', 'oz'},
    'item_volume': {'millilitre', 'litre', 'ml', 'l'}
}

# Loading data from a CSV file
def load_csv_data(file_path, sample_size=1000):
    data_frame = pd.read_csv(file_path).sample(n=sample_size, random_state=42).reset_index(drop=True)
    return data_frame

# Clean extracted entity_value by parsing out numbers and units
def clean_value(value):
    try:
        value = str(value)
        value = re.sub(r'^[^\d.]+', '', value)
        parts = value.split()
        if len(parts) >= 2:
            num_value = float(parts[0])
            unit = ' '.join(parts[1:])
            return f"{num_value} {unit}"
    except:
        return None
    return None

# Pre-processing the dataset to clean and filter valid entries
def clean_dataset(df):
    df['entity_value'] = df['entity_value'].apply(clean_value)
    df = df.dropna(subset=['entity_value'])
    return df

# Text extraction using OCR
def process_image(image_url):
    try:
        img_path = torch.hub.download_url_to_file(image_url, 'temp_image.jpg')
        img = cv2.imread('temp_image.jpg')
        if img is None:
            print(f"Unable to load image from {image_url}")
            return None, ""
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, (224, 224))
        img = transforms.ToTensor()(img)
        img = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])(img)
        extracted_text = pytesseract.image_to_string(cv2.imread('temp_image.jpg'))
        return img, extracted_text
    except Exception as e:
        print(f"Error during image processing for {image_url}: {e}")
        return None, ""

# Custom Dataset class for loading entity extraction data
class EntityDataset(Dataset):
    def __init__(self, df, tokenizer, max_seq_length=128):
        self.data = df
        self.tokenizer = tokenizer
        self.max_seq_length = max_seq_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        img, text = process_image(row['image_link'])
        
        if img is None:
            img = torch.zeros((3, 224, 224))
        
        encoded_text = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_seq_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )

        entity_value = row['entity_value'].strip()
        parts = entity_value.split()
        value = float(parts[0]) if len(parts) >= 2 and parts[0].isdigit() else 0.0
        unit = ' '.join(parts[1:]) if len(parts) >= 2 else ''

        return {
            'image': img,
            'input_ids': encoded_text['input_ids'].flatten(),
            'attention_mask': encoded_text['attention_mask'].flatten(),
            'group_id': torch.tensor(row['group_id'], dtype=torch.long),
            'entity_name': torch.tensor(row['entity_name'], dtype=torch.long),
            'value': torch.tensor(value, dtype=torch.float),
            'unit': unit
        }
    
## Hybrid Model Architecture
# Neural network model combining BERT and ResNet
class EntityModel(nn.Module):
    def __init__(self, group_count, entity_count, unit_count):
        super(EntityModel, self).__init__()
        self.bert = BertModel.from_pretrained('bert-base-uncased')
        self.resnet = torch.hub.load('pytorch/vision:v0.10.0', 'resnet50', pretrained=True)
        self.resnet.fc = nn.Linear(2048, 128)
        
        self.group_embed = nn.Embedding(group_count, 16)
        self.entity_embed = nn.Embedding(entity_count, 16)
        
        self.fc1 = nn.Linear(128 + 768 + 16 + 16, 256)
        self.fc2 = nn.Linear(256, 128)
        self.value_out = nn.Linear(128, 1)
        self.unit_out = nn.Linear(128, unit_count)
        
    def forward(self, img, input_ids, attention_mask, group, entity):
        img_features = self.resnet(img)
        bert_output = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        text_features = bert_output.last_hidden_state[:, 0, :]
        group_features = self.group_embed(group)
        entity_features = self.entity_embed(entity)
        
        combined_features = torch.cat((img_features, text_features, group_features, entity_features), dim=1)
        x = torch.relu(self.fc1(combined_features))
        x = torch.relu(self.fc2(x))
        value_prediction = self.value_out(x)
        unit_prediction = self.unit_out(x)
        return value_prediction, unit_prediction

# function for hyperparameter tuning using Optuna
def tune_hyperparameters(trial):

    learning_rate = trial.suggest_loguniform('learning_rate', 1e-5, 1e-2)
    batch_size = trial.suggest_categorical('batch_size', [16, 32, 64])
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = EntityModel(num_group_ids, num_entity_names, num_units).to(device)
    
    loss_fn_value = nn.MSELoss()
    loss_fn_unit = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_data, batch_size=batch_size)
    
    epochs = 3
    
    for epoch in range(epochs):
        model.train()
        epoch_loss = 0
        for batch in train_loader:
            img = batch['image'].to(device)
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            group = batch['group_id'].to(device)
            entity = batch['entity_name'].to(device)
            value = batch['value'].to(device)
            unit = torch.tensor([unit_mapping.get(u, 0) for u in batch['unit']], dtype=torch.long).to(device)
            
            optimizer.zero_grad()
            value_pred, unit_pred = model(img, input_ids, attention_mask, group, entity)
            loss_value = loss_fn_value(value_pred.squeeze(), value)
            loss_unit = loss_fn_unit(unit_pred, unit)
            loss = loss_value + loss_unit
            loss.backward()
            optimizer.step()
            
            epoch_loss += loss.item()
        
        # Validation loop
        model.eval()
        validation_loss = 0
        with torch.no_grad():
            for batch in val_loader:
                img = batch['image'].to(device)
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                group = batch['group_id'].to(device)
                entity = batch['entity_name'].to(device)
                value = batch['value'].to(device)
                unit = torch.tensor([unit_mapping.get(u, 0) for u in batch['unit']], dtype=torch.long).to(device)
                
                value_pred, unit_pred = model(img, input_ids, attention_mask, group, entity)
                loss_value = loss_fn_value(value_pred.squeeze(), value)
                loss_unit = loss_fn_unit(unit_pred, unit)
                validation_loss += (loss_value + loss_unit).item()
        
        trial.report(validation_loss, epoch)
        if trial.should_prune():
            raise optuna.TrialPruned()
    
    return validation_loss

# Main execution of code
if __name__ == "__main__":

    data_df = load_csv_data('train.csv', sample_size=1000)
    data_df = clean_dataset(data_df)

    # Tokenizer setup
    tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    
    # Label encoding for categorical features
    group_encoder = LabelEncoder()
    entity_encoder = LabelEncoder()
    data_df['group_id'] = group_encoder.fit_transform(data_df['group_id'])
    data_df['entity_name'] = entity_encoder.fit_transform(data_df['entity_name'])
    unit_encoder = LabelEncoder()
    units = list(set([unit for sublist in unit_mapping.values() for unit in sublist]))
    unit_encoder.fit(units)
    
    num_group_ids = len(group_encoder.classes_)
    num_entity_names = len(entity_encoder.classes_)
    num_units = len(unit_encoder.classes_)

    train_df, val_df = train_test_split(data_df, test_size=0.2, random_state=42)
    train_data = EntityDataset(train_df, tokenizer)
    val_data = EntityDataset(val_df, tokenizer)
    
    # Optimizing model using Optuna
    study = optuna.create_study(direction='minimize')
    study.optimize(tune_hyperparameters, n_trials=10)
    
    print("Best hyperparameters found:", study.best_trial.params)

    # Training final model with best hyperparameters
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = EntityExtractionModel(num_group_ids, num_entity_names, num_units).to(device)

    criterion_value = nn.MSELoss()
    criterion_unit = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=best_params['lr'])

    train_loader = DataLoader(train_dataset, batch_size=best_params['batch_size'], shuffle=True)

    num_epochs = 500

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        for batch in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}"):
            img = batch['image'].to(device)
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            group = batch['group_id'].to(device)
            entity = batch['entity_name'].to(device)
            value = batch['value'].to(device)
            unit = torch.tensor([unit_encoder.get(u, 0) for u in batch['unit']], dtype=torch.long).to(device)
            
            optimizer.zero_grad()
            value_pred, unit_pred = model(img, input_ids, attention_mask, group, entity)
            loss_value = criterion_value(value_pred.squeeze(), value)
            loss_unit = criterion_unit(unit_pred, unit)
            loss = loss_value + loss_unit
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
        
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(train_loader):.4f}")

    # Save the model
    torch.save(model.state_dict(), 'entity_extraction_model.pth')
    print("Model saved as 'entity_extraction_model.pth'")

    # Save encoders and other necessary information
    import joblib
    joblib.dump(group_encoder, 'group_encoder.joblib')
    joblib.dump(entity_encoder, 'entity_encoder.joblib')
    joblib.dump(unit_encoder, 'unit_encoder.joblib')
    print("Encoders saved")

    print("Training completed. You can now use the saved model for inference on test.csv.")


In [None]:
import torch
import pandas as pd
import numpy as np
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer
import joblib
from tqdm import tqdm
import cv2
import pytesseract
from torchvision import transforms

# Custom dataset class for processing test data
class CustomTestDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_seq_len=128):
        self.dataframe = dataframe
        self.tokenizer = tokenizer
        self.max_seq_len = max_seq_len

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, index):
        row = self.dataframe.iloc[index]
        image_tensor, extracted_text = process_image(row['image_link'])

        # Tokenize text data using BERT tokenizer
        encoded = self.tokenizer.encode_plus(
            extracted_text,
            add_special_tokens=True,
            max_length=self.max_seq_len,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )

        return {
            'image': image_tensor,
            'input_ids': encoded['input_ids'].flatten(),
            'attention_mask': encoded['attention_mask'].flatten(),
            'group_id': torch.tensor(row['group_id'], dtype=torch.long),
            'entity_name': torch.tensor(row['entity_name'], dtype=torch.long)
        }

# This is a Function used to encode unknown labels
def handle_unknown_labels(encoder, label_list):
    encoded_values = []
    for label in label_list:
        try:
            encoded_values.append(encoder.transform([label])[0])
        except ValueError:
            encoded_values.append(-1)  # Assign -1 for unseen labels
    return encoded_values

# This is for Loading saved model and encoders
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = EntityExtractionModel(num_group_ids, num_entity_names, num_units)
model.load_state_dict(torch.load('entity_extraction_model.pth', map_location=device))
model.to(device)
model.eval()

# Load encoders using joblib
group_encoder = joblib.load('group_encoder.joblib')
entity_encoder = joblib.load('entity_encoder.joblib')
unit_encoder = joblib.load('unit_encoder.joblib')
unit_decoder = {v: k for k, v in unit_encoder.items()}

# Load test dataset
test_data = pd.read_csv('test.csv')
test_data = test_data.sample(n=10, random_state=42).reset_index(drop=True)

# Handle unknown group_id and entity_name in test data
test_data['group_id'] = handle_unknown_labels(group_encoder, test_data['group_id'])
test_data['entity_name'] = handle_unknown_labels(entity_encoder, test_data['entity_name'])

# Prepare the tokenizer and dataset
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
test_dataset = CustomTestDataset(test_data, tokenizer)
test_loader = DataLoader(test_dataset, batch_size=2, shuffle=False)

# Prediction function
def generate_predictions(model, dataloader, device):
    results = []
    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Generating predictions"):
            # Moving batch data to appropriate device
            images = batch['image'].to(device)
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            groups = batch['group_id'].to(device)
            entities = batch['entity_name'].to(device)
            
            # Handle unknown labels by setting them to zero index
            groups[groups == -1] = 0
            entities[entities == -1] = 0
            
            # Model predictions for value and unit
            predicted_value, predicted_unit = model(images, input_ids, attention_mask, groups, entities)
            
            # Processing predictions
            predicted_value = predicted_value.squeeze().cpu().numpy()
            predicted_unit = torch.argmax(predicted_unit, dim=1).cpu().numpy()

            for value, unit in zip(predicted_value, predicted_unit):
                results.append((value, unit_decoder[unit]))

    return results

# Performing predictions
prediction_results = generate_predictions(model, test_loader, device)

# Adding predictions to the test dataframe
test_data['predicted_value'] = [pred[0] for pred in prediction_results]
test_data['predicted_unit'] = [pred[1] for pred in prediction_results]

# Displaying predictions for each sample
print("\nTest Sample Predictions:")
for idx, row in test_data.iterrows():
    print(f"Sample {idx + 1}:")
    print(f"  Group ID: {row['group_id']} (Unknown if -1)")
    print(f"  Entity Name: {row['entity_name']} (Unknown if -1)")
    print(f"  Predicted Value: {row['predicted_value']:.2f} {row['predicted_unit']}")
    print()

# Saving the predictions to a CSV file
test_data.to_csv('test_out.csv', index=False)
print("Predictions have been saved to 'test_out.csv'")
