In [5]:
import os
import torch
import tensorflow as tf
import numpy as np
from transformers import AutoTokenizer, AutoModelForTokenClassification
from tensorflow.keras.preprocessing import image
from tensorflow.keras.applications.resnet50 import preprocess_input, ResNet50
from tensorflow.keras import layers, models

# Add this at the top of your notebook after imports
import keras
keras.config.enable_unsafe_deserialization()

In [6]:
# Define base directory
base_dir = os.path.abspath(os.path.join(os.getcwd(), ".."))
print("Current working directory:", os.getcwd())
print("Base directory:", base_dir)

# Check required paths
paths_to_check = {
    "NER model directory": os.path.join(base_dir, "models", "ner_model", "final"),
    "Image classifier model": os.path.join(base_dir, "models", "animal_classifier_final.keras")
}

for name, path in paths_to_check.items():
    exists = os.path.exists(path)
    print(f"{name}: {'✓ EXISTS' if exists else '✗ MISSING'} at {path}")

Current working directory: C:\Users\mar4u\Documents\DS-Test-2025\task2\scripts
Base directory: C:\Users\mar4u\Documents\DS-Test-2025\task2
NER model directory: ✓ EXISTS at C:\Users\mar4u\Documents\DS-Test-2025\task2\models\ner_model\final
Image classifier model: ✓ EXISTS at C:\Users\mar4u\Documents\DS-Test-2025\task2\models\animal_classifier_final.keras


In [7]:
class AnimalPipeline:
    def __init__(self):
        self.BASE_DIR = os.path.abspath(os.path.join(os.getcwd(), ".."))
        
        # Load NER model
        self.ner_model_path = os.path.join(self.BASE_DIR, "models", "ner_model", "final")
        self.tokenizer = AutoTokenizer.from_pretrained(self.ner_model_path)
        self.ner_model = AutoModelForTokenClassification.from_pretrained(self.ner_model_path)
        
        # Define class names for the image classifier
        self.class_names = ['butterfly', 'cat', 'chicken', 'cow', 'dog', 'elephant', 'horse', 'sheep', 'spider', 'squirrel']
        self.image_model = self.create_and_load_image_model()
    
    def create_and_load_image_model(self):
        # Create model architecture using ResNet50 as base
        base_model = ResNet50(
            input_shape=(224, 224, 3),
            include_top=False,
            weights="imagenet"
        )
        base_model.trainable = False
        
        model = models.Sequential([
            base_model,
            layers.GlobalAveragePooling2D(),
            layers.BatchNormalization(),
            layers.Dense(512, activation='relu'),
            layers.BatchNormalization(),
            layers.Dropout(0.5),
            layers.Dense(256, activation='relu'),
            layers.Dropout(0.3),
            layers.Dense(len(self.class_names), activation='softmax')
        ])
        
        # Load trained weights
        model_path = os.path.join(self.BASE_DIR, "models", "animal_classifier_final.keras")
        if os.path.exists(model_path):
            old_model = tf.keras.models.load_model(model_path)
            # Copy weights, skipping the Lambda layer
            for new_layer, old_layer in zip(model.layers, old_model.layers[1:]):
                new_layer.set_weights(old_layer.get_weights())
        else:
            print(f"Warning: Model file not found at {model_path}")
        
        return model
    
    def extract_animal_from_text(self, text):
        # Tokenize input text
        inputs = self.tokenizer(text, return_tensors="pt", truncation=True, max_length=128)
        
        # Get predictions
        with torch.no_grad():
            outputs = self.ner_model(**inputs)
            predictions = torch.argmax(outputs.logits, dim=2)
        
        # Extract tokens labeled as ANIMAL
        tokens = self.tokenizer.convert_ids_to_tokens(inputs["input_ids"][0])
        animal_tokens = []
        current_animal = []
        
        for token, pred in zip(tokens, predictions[0]):
            if pred == 1:  # ANIMAL label
                if token.startswith("##"):
                    current_animal.append(token[2:])
                else:
                    if current_animal:
                        animal_tokens.append("".join(current_animal))
                        current_animal = []
                    current_animal.append(token)
            else:
                if current_animal:
                    animal_tokens.append("".join(current_animal))
                    current_animal = []
        
        if current_animal:
            animal_tokens.append("".join(current_animal))
        
        # Clean up extracted animal names
        animals = [animal.lower() for animal in animal_tokens if animal not in ["[CLS]", "[SEP]", "[PAD]"]]
        return animals[0] if animals else None
    
    def classify_image(self, image_path):
        # Load and preprocess image
        img = image.load_img(image_path, target_size=(224, 224))
        img_array = image.img_to_array(img)
        img_array = np.expand_dims(img_array, axis=0)
        img_array = preprocess_input(img_array)
        
        # Get prediction
        predictions = self.image_model.predict(img_array, verbose=0)
        predicted_class = self.class_names[np.argmax(predictions[0])]
        confidence = np.max(predictions[0]) * 100
        
        return predicted_class, confidence
    
    def process(self, text, image_path):
        """
        Process text and image to determine if they match.
        Returns:
            tuple: (boolean match result, dict with detailed information)
        """
        # Extract animal from text
        text_animal = self.extract_animal_from_text(text)
        if not text_animal:
            return False, {
                "error": "No animal mentioned in text",
                "text_animal": None,
                "image_animal": None,
                "confidence": 0
            }
        
        # Classify image
        image_animal, confidence = self.classify_image(image_path)
        
        # Check if animals match
        match = text_animal.lower() == image_animal.lower()
        
        return match, {
            "text_animal": text_animal,
            "image_animal": image_animal,
            "confidence": confidence,
            "match": match
        }

In [10]:
# Initialize the pipeline
pipeline = AnimalPipeline()

# Test cases with your actual images
test_cases = [
    {
        "text": "There is a cat in the picture.",
        "image_path": "../test_img1.jpg"  
    },
    {
        "text": "I can see a dog.",
        "image_path": "../test_img2.jpg"
    },
    {
        "text": "Look at this elephant.",
        "image_path": "../test_img3.jpg"
    },
    {
        "text": "Here is the butterfly.",
        "image_path": "../test_img4.jpg"
    }
]

# Run tests
for test in test_cases:
    print(f"\nTesting with text: {test['text']}")
    try:
        match, details = pipeline.process(test['text'], test['image_path'])
        print(f"Text mentions: {details['text_animal']}")
        print(f"Image shows: {details['image_animal']} (confidence: {details['confidence']:.2f}%)")
        print(f"Match: {match}")
    except Exception as e:
        print(f"Error occurred with {test['image_path']}: {str(e)}")


Testing with text: There is a cat in the picture.
Text mentions: cat
Image shows: dog (confidence: 99.89%)
Match: False

Testing with text: I can see a dog.
Text mentions: dog
Image shows: chicken (confidence: 100.00%)
Match: False

Testing with text: Look at this elephant.
Text mentions: elephant
Image shows: cat (confidence: 44.92%)
Match: False

Testing with text: Here is the butterfly.
Text mentions: butterfly
Image shows: butterfly (confidence: 100.00%)
Match: True
