In [39]:
import tensorflow as tf
import pandas as pd
import numpy as np
import cv2
import os
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D

class AnnotationAnalyzer:
    def __init__(self, annotation_path, image_dir, min_samples_per_class=2):
        self.annotation_path = annotation_path
        self.image_dir = image_dir
        self.min_samples_per_class = min_samples_per_class
        self.df = None
        self.load_data()

     def load_data(self):
        """Load and preprocess annotation data"""
        
        with open(self.annotation_path, 'r') as f:
            annotations = json.load(f)
            
        
        records = []
        for filename, data in annotations.items():
            record = {
                'filename': filename,
                'timestamp': datetime.fromisoformat(data['timestamp']),
                'description': data['description']
            }
            
           
            if data.get('classifications'):
                record['top_class'] = data['classifications'][0]['label']
                record['confidence'] = data['classifications'][0]['confidence']
            
           
            if data.get('features'):
                record['edge_density'] = data['features']['edge_density']
                record['num_contours'] = data['features']['num_contours']
                for color, value in data['features']['avg_color'].items():
                    record[f'avg_{color}'] = value
            
            records.append(record)
        
        self.df = pd.DataFrame(records)
        
        
        class_counts = self.df['top_class'].value_counts()
        valid_classes = class_counts[class_counts >= self.min_samples_per_class].index
        self.df = self.df[self.df['top_class'].isin(valid_classes)]
        
        print(f"Loaded {len(self.df)} annotations after filtering")
        print(f"Number of unique classes: {len(valid_classes)}")
    
    def build_model(self, num_classes):
        """Build and compile the model with ResNet50 base"""
        base_model = ResNet50(
            weights='imagenet',
            include_top=False,
            input_shape=(224, 224, 3)
        )
        
        for layer in base_model.layers:
            layer.trainable = False
        
        model = tf.keras.Sequential([
            base_model,
            GlobalAveragePooling2D(),
            Dense(1024, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.01)),
            tf.keras.layers.Dropout(0.5),
            Dense(num_classes, activation='softmax')
        ])
        
        model.compile(
            optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy']
        )
        
        return model

    def prepare_training_data(self, target_size=(224, 224)):
        """Prepare data for model training"""
        self.analyze_class_distribution()
        
        le = LabelEncoder()
        self.df['label_encoded'] = le.fit_transform(self.df['top_class'])
        
        train_df, val_df = train_test_split(
            self.df, 
            test_size=0.2, 
            stratify=self.df['label_encoded'] if len(self.df['top_class'].unique()) > 1 else None,
            random_state=42
        )
        
        print(f"\nTraining set size: {len(train_df)}")
        print(f"Validation set size: {len(val_df)}")
        
        train_generator = self.create_data_generator(train_df, target_size, augment=True)
        val_generator = self.create_data_generator(val_df, target_size, augment=False)
        
        return train_generator, val_generator, le, len(train_df), len(val_df)
    
    def create_data_generator(self, dataframe, target_size, augment=True):
        """Create a data generator that yields batches of images and labels"""
        def generate_batches(batch_size=32):
            num_samples = len(dataframe)
            indices = np.arange(num_samples)
            
            while True:
                np.random.shuffle(indices)
                
                for start_idx in range(0, num_samples, batch_size):
                    end_idx = min(start_idx + batch_size, num_samples)
                    batch_indices = indices[start_idx:end_idx]
                    
                    batch_df = dataframe.iloc[batch_indices]
                    
                    images = []
                    labels = []
                    
                    for _, row in batch_df.iterrows():
                        try:
                            img_path = os.path.join(self.image_dir, row['filename'])
                            img = cv2.imread(img_path)
                            
                            if img is None:
                                print(f"Warning: Skipping unreadable image {row['filename']}")
                                continue
                                
                            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
                            img = cv2.resize(img, target_size)
                            img = img.astype(np.float32) / 255.0
                            
                            if augment:
                                # Add augmentation if needed
                                pass
                            
                            images.append(img)
                            labels.append(row['label_encoded'])
                            
                        except Exception as e:
                            print(f"Error processing {row['filename']}: {str(e)}")
                            continue
                    
                    if images:  # Only yield if we have valid images
                        yield np.array(images), np.array(labels)
        
        return generate_batches
    
    def train_model(self, epochs=10, batch_size=32):
        """Train the model"""
        train_gen, val_gen, label_encoder, train_size, val_size = self.prepare_training_data()
        num_classes = len(label_encoder.classes_)
        
        print(f"\nTraining model with {num_classes} classes")
        
        model = self.build_model(num_classes)
        
        steps_per_epoch = max(1, train_size // batch_size)
        validation_steps = max(1, val_size // batch_size)
        
        callbacks = [
            tf.keras.callbacks.EarlyStopping(
                monitor='val_loss',
                patience=3,
                restore_best_weights=True
            ),
            tf.keras.callbacks.ModelCheckpoint(
                'best_model.keras',
                monitor='val_loss',
                save_best_only=True
            )
        ]
        
        # Now we call the generator without passing size parameters
        history = model.fit(
            train_gen(batch_size),
            validation_data=val_gen(batch_size),
            epochs=epochs,
            steps_per_epoch=steps_per_epoch,
            validation_steps=validation_steps,
            callbacks=callbacks,
            verbose=1
        )
        
        self.plot_training_history(history)
        return model, label_encoder, history

    def plot_training_history(self, history):
        """Plot training history"""
        plt.figure(figsize=(12, 4))
        
        plt.subplot(1, 2, 1)
        plt.plot(history.history['loss'], label='Training Loss')
        plt.plot(history.history['val_loss'], label='Validation Loss')
        plt.title('Model Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
        
        plt.subplot(1, 2, 2)
        plt.plot(history.history['accuracy'], label='Training Accuracy')
        plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
        plt.title('Model Accuracy')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.legend()
        
        plt.tight_layout()
        plt.savefig('training_history.png')
        plt.close()

# Usage example
if __name__ == "__main__":
    annotation_path = r"C:\Users\eKasi_SWT_COM00862\Desktop\Health_Care_Project\automated_annotations.json"
    image_dir = r"C:\Users\eKasi_SWT_COM00862\Desktop\Health_Care_Project\doctor"
    
    analyzer = AnnotationAnalyzer(annotation_path, image_dir, min_samples_per_class=2)
    model, label_encoder, history = analyzer.train_model(epochs=10)
    
    # Save the model with proper extension
    save_dir = r"C:\Users\eKasi_SWT_COM00862\Desktop\Health_Care_Project"
    model.save(os.path.join(save_dir, 'trained_model.keras'))
    
    # Save the label encoder
    with open(os.path.join(save_dir, 'label_encoder.pkl'), 'wb') as f:
        pickle.dump(label_encoder, f)

IndentationError: unindent does not match any outer indentation level (<string>, line 20)