#My section

In [None]:
import pandas as pd
import os
from utils import download_images
train = pd.read_csv('/content/dataset/train.csv')
test = pd.read_csv('/content/dataset/test.csv')

In [None]:
import urllib.parse
temp_train = train.head(500) #changing number of rows to train with
download_images(temp_train['image_link'],'train_images')
df = temp_train
for i,row in df.iterrows():
  image_link = row['image_link']
  parsed_url = urllib.parse.urlparse(image_link)
  filename = os.path.basename(parsed_url.path)
  new_filename = f"{i}.jpg"
  if os.path.exists(filename):
    os.rename(filename,new_filename)

100%|██████████| 500/500 [00:02<00:00, 203.42it/s]


In [None]:
import cv2
import numpy as np
import os
from tqdm import tqdm

def preprocess_image(image_path, target_size=(224, 224)):
    img = cv2.imread(image_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, target_size)
    img = img.astype(np.float32) / 255.0
    return img

def preprocess_dataset(image_dir, output_dir, target_size=(224, 224)):
    os.makedirs(output_dir, exist_ok=True)

    for filename in tqdm(os.listdir(image_dir)):
        if filename.endswith(('.jpg', '.jpeg', '.png')):
            input_path = os.path.join(image_dir, filename)
            output_path = os.path.join(output_dir, filename)
            preprocessed_img = preprocess_image(input_path, target_size)
            cv2.imwrite(output_path, cv2.cvtColor((preprocessed_img * 255).astype(np.uint8), cv2.COLOR_RGB2BGR))

# Preprocess training and test datasets
preprocess_dataset('train_images', 'preprocessed_images1')

print("Image preprocessing complete!")

In [None]:
import pandas as pd
import re
from constants import entity_unit_map  # Import the entity-to-unit mapping

def preprocess_labels(df):
    def extract_value_and_unit(entity_value):
        match = re.match(r'(\d+(?:\.\d+)?)\s*(\w+)', str(entity_value))
        if match:
            value, unit = match.groups()
            return float(value), unit.lower()
        return None, None

    def normalize_unit(unit, entity_name):
        # Fetch the allowed units for the given entity
        allowed_units_for_entity = entity_unit_map.get(entity_name, set())
        if unit in allowed_units_for_entity:
            return unit
        # You can add unit conversion logic here if needed
        return None

    # Extract value and unit
    df['value'], df['unit'] = zip(*df['entity_value'].map(extract_value_and_unit))

    # Normalize units based on entity names
    df['normalized_unit'] = df.apply(lambda row: normalize_unit(row['unit'], row['entity_name']), axis=1)

    # Remove rows with invalid units
    # df = df.dropna(subset=['normalized_unit'])

    return df

# Load the training data
train_df = temp_train
# train_df = pd.read_csv('dataset/train.csv')

# Preprocess the labels
preprocessed_train_df = preprocess_labels(train_df)
print(preprocessed_train_df.shape)
# Save the preprocessed data
preprocessed_train_df.to_csv('train_labels.csv', index=False)

print("Label preprocessing complete!")

model dev

In [None]:
!apt-get update
!apt-get install -y tesseract-ocr
!pip install pytesseract

In [None]:
import cv2
import pytesseract
import pandas as pd
import os
from tqdm import tqdm

def perform_ocr(image_path):
    img = cv2.imread(image_path)
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    threshold = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY | cv2.THRESH_OTSU)[1]
    text = pytesseract.image_to_string(threshold)
    return text

def extract_ocr_features(image_dir, limit=10000):
    ocr_features = {}
    image_files = [filename for filename in os.listdir(image_dir) if filename.endswith(('.jpg', '.jpeg', '.png'))]

    for i, filename in enumerate(tqdm(image_files)):
        if i >= limit:
            break
        image_path = os.path.join(image_dir, filename)
        image_id = os.path.splitext(filename)[0]
        ocr_text = perform_ocr(image_path)
        ocr_features[image_id] = ocr_text

    return ocr_features

# Extract OCR features for training and test sets, limiting to 10,000 images
train_ocr_features = extract_ocr_features('preprocessed_images1', limit=10000)
# test_ocr_features = extract_ocr_features('preprocessed_images2', limit=10000)

# Save OCR features
pd.DataFrame.from_dict(train_ocr_features, orient='index', columns=['ocr_text']).to_csv('train_ocr_features.csv')
# pd.DataFrame.from_dict(test_ocr_features, orient='index', columns=['ocr_text']).to_csv('test_ocr_features.csv')

print("OCR feature extraction complete!")

100%|██████████| 494/494 [02:13<00:00,  3.71it/s]

OCR feature extraction complete!





In [None]:
import torch
from torchvision import models, transforms
from PIL import Image
import pandas as pd
import os
from tqdm import tqdm

model=models.resnet50(pretrained=True)
model = torch.nn.Sequential(*list(model.children())[:-1])
model.eval()

transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406],std=[0.229,0.224,0.225]),
])

def extract_cnn_features(image_path):
  img = Image.open(image_path).convert('RGB')
  img_tensor = transform(img).unsqueeze(0)
  with torch.no_grad():
    features = model(img_tensor)
  return features.squeeze().numpy()

def extract_cnn_features_batch(image_dir):
  cnn_features = {}
  for filename in tqdm(os.listdir(image_dir)):
    if filename.endswith(('.jpg','.jpeg','.png')):
      image_path = os.path.join(image_dir,filename)
      image_id = os.path.splitext(filename)[0]
      features = extract_cnn_features(image_path)
      cnn_features[image_id] = features
  return cnn_features

train_cnn_features = extract_cnn_features_batch('preprocessed_images1')

pd.DataFrame.from_dict(train_cnn_features,orient='index').to_csv('train_cnn_features.csv')

print('cnn extraction coplete')

Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 126MB/s]
100%|██████████| 494/494 [01:55<00:00,  4.27it/s]


cnn extraction coplete


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

In [None]:
class ProductDataset(Dataset):
    def __init__(self, ocr_features, cnn_features, labels):
        self.ocr_features = torch.tensor(ocr_features, dtype=torch.long)  # Convert to tensor
        self.cnn_features = torch.tensor(cnn_features, dtype=torch.float)  # Convert to tensor
        self.labels = torch.tensor(labels, dtype=torch.long)  # Convert to tensor

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return {
            'ocr': self.ocr_features[idx],
            'cnn': self.cnn_features[idx],
            'label': self.labels[idx]
        }


In [None]:
class HybridModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, cnn_feature_dim, num_classes):
        super(HybridModel, self).__init__()

        # OCR branch
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)

        # CNN branch
        self.fc_cnn = nn.Linear(cnn_feature_dim, hidden_dim)

        # Combined layers
        self.fc_combined = nn.Sequential(
            nn.Linear(hidden_dim * 2, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, num_classes)
        )
    def forward(self, ocr, cnn):
        # OCR branch
        ocr_emb = self.embedding(ocr)
        ocr_out, _ = self.lstm(ocr_emb)
        ocr_out = ocr_out[:, -1, :]  # Take the last output

        # CNN branch
        cnn_out = self.fc_cnn(cnn)

        # Combine and predict
        combined = torch.cat((ocr_out, cnn_out), dim=1)
        output = self.fc_combined(combined)

        return output

In [None]:
def safe_read_csv(file_path, index_col=None):
    """
    Safely read a CSV file with error handling for non-existent or empty files.

    Args:
    - file_path (str): Path to the CSV file.
    - index_col (int or str, optional): Column to set as index. Defaults to None.

    Returns:
    - pd.DataFrame: DataFrame containing the CSV data.
    """
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"File {file_path} does not exist.")
    if os.stat(file_path).st_size == 0:
        raise ValueError(f"File {file_path} is empty.")
    return pd.read_csv(file_path, index_col=index_col)

# File paths
ocr_features_path = 'train_ocr_features.csv'
cnn_features_path = 'train_cnn_features.csv'
labels_path = 'train_labels.csv'

# Load features and labels
try:
    train_ocr = safe_read_csv(ocr_features_path, index_col=0)
    train_cnn = safe_read_csv(cnn_features_path, index_col=0)
    train_labels = safe_read_csv(labels_path)
    print("Files loaded successfully!")
except FileNotFoundError as e:
    print(f"Error: {e}")
except ValueError as e:
    print(f"Error: {e}")
except pd.errors.EmptyDataError:
    print("One or more files are empty.")
except Exception as e:
    print(f"An unexpected error occurred: {e}")

train_ocr.shape
train_labels.shape
# train_cnn.shape

Files loaded successfully!


(480, 7)

In [None]:
def safe_read_csv(file_path, index_col=None):
    """
    Safely read a CSV file with error handling for non-existent or empty files.

    Args:
    - file_path (str): Path to the CSV file.
    - index_col (int or str, optional): Column to set as index. Defaults to None.

    Returns:
    - pd.DataFrame: DataFrame containing the CSV data.
    """
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"File {file_path} does not exist.")
    if os.stat(file_path).st_size == 0:
        raise ValueError(f"File {file_path} is empty.")
    return pd.read_csv(file_path, index_col=index_col)

# File paths
ocr_features_path = 'train_ocr_features.csv'
cnn_features_path = 'train_cnn_features.csv'
labels_path = 'train_labels.csv'

# Load features and labels
try:
    train_ocr = safe_read_csv(ocr_features_path, index_col=0)
    train_cnn = safe_read_csv(cnn_features_path, index_col=0)
    train_labels = safe_read_csv(labels_path)
    print("Files loaded successfully!")
except FileNotFoundError as e:
    print(f"Error: {e}")
except ValueError as e:
    print(f"Error: {e}")
except pd.errors.EmptyDataError:
    print("One or more files are empty.")
except Exception as e:
    print(f"An unexpected error occurred: {e}")

le = LabelEncoder()
train_labels['encoded_value'] = le.fit_transform(train_labels['value'])
#split data
train_data,val_data,train_labels,val_labels = train_test_split(
    pd.concat([train_ocr,train_cnn],axis=1),
    train_labels['encoded_value'],
    test_size = 0.2,
    random_state=42
)

train_dataset = ProductDataset(train_data['ocr_text'].values,train_data.drop('ocr_text',axis=1).values,train_labels.values)
val_dataset = ProductDataset(val_data['ocr_text'].values,val_data.drop('ocr_text',axis=1).values,val_labels.values)
train_loader = DataLoader(train_dataset,batch_size=32,shuffle=True)
val_loader = DataLoader(val_dataset,batch_size=32)

#initialise model
vocab_size = 10000
embedding_dim = 100
hidden_dim = 128
cnn_features_diim = train_cnn.shape[1]
num_classes = len(le.classes_)

model = HybridModel(vocab_size, embedding_dim, hidden_dim, cnn_features_dim,num_classes)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())

num_epochs = 10
for epoch in range(num_epochs):
  model.train()
  for batch in train_loader:
    optimizer.zero_grad()
    outputs = model(batch['ocr'],batch['cnn'])
    loss = criterion(outputs,batch['label'])
    loss.backward()
    optimizer.step()
  model.eval()
  val_loss = 0
  correct = 0
  total = 0
  with torch.no_grad():
    for batch in val_loader:
      outputs = model(batch['ocr'],batch['cnn'])
      loss = criterion(outputs, batch['label'])
      val_loss += loss.item()
      _,predicted = outputs.max(1)
      total += batch['label'].size(0)
      correct += predicted.eq(batch['label']).sum().item()
print(f'Epoch {epoch+1}/{num_epochs}, '
          f'Train Loss: {loss.item():.4f}, '
          f'Val Loss: {val_loss/len(val_loader):.4f}, '
          f'Val Accuracy: {100.*correct/total:.2f}%')

torch.save(model.state_dict(),'model.pth')
print('model training complete and saved')