In [1]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from PIL import Image
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import matplotlib.pyplot as plt
import numpy as np


ModuleNotFoundError: No module named 'torch'

In [None]:
class EmotionLandmarkNet(nn.Module):
    def __init__(self, input_size, num_classes, base_channels=32, expansion_factor=2):
        super(EmotionLandmarkNet, self).__init__()
        self.base_channels = base_channels
        self.expansion_factor = expansion_factor
        sequence_length = input_size // 3

        def conv_block(in_channels, out_channels, kernel_size=3, stride=1, padding=1):
            return nn.Sequential(
                nn.Conv1d(in_channels, out_channels, kernel_size, stride, padding, bias=False),
                nn.BatchNorm1d(out_channels),
                nn.SiLU(inplace=True)
            )

        def separable_conv(in_channels, out_channels, stride=1):
            return nn.Sequential(
                nn.Conv1d(in_channels, in_channels, kernel_size=3, stride=stride, padding=1, groups=in_channels, bias=False),
                nn.BatchNorm1d(in_channels),
                nn.SiLU(inplace=True),
                nn.Conv1d(in_channels, out_channels, kernel_size=1, stride=1, bias=False),
                nn.BatchNorm1d(out_channels)
            )

        self.prep = conv_block(3, self.base_channels, kernel_size=5, stride=2, padding=2)

        self.layer1 = nn.Sequential(
            separable_conv(self.base_channels, self.base_channels * self.expansion_factor, stride=1),
            nn.SiLU(inplace=True),
            nn.Dropout(0.3),
            separable_conv(self.base_channels * self.expansion_factor, self.base_channels * self.expansion_factor, stride=1),
            nn.SiLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )

        self.layer2 = nn.Sequential(
            separable_conv(self.base_channels * self.expansion_factor, self.base_channels * 2 * self.expansion_factor, stride=2),
            nn.SiLU(inplace=True),
            nn.Dropout(0.3),
            separable_conv(self.base_channels * 2 * self.expansion_factor, self.base_channels * 2 * self.expansion_factor, stride=1),
            nn.SiLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )

        self.layer3 = nn.Sequential(
            separable_conv(self.base_channels * 2 * self.expansion_factor, self.base_channels * 4 * self.expansion_factor, stride=2),
            nn.SiLU(inplace=True),
            nn.Dropout(0.3),
            separable_conv(self.base_channels * 4 * self.expansion_factor, self.base_channels * 4 * self.expansion_factor, stride=1),
            nn.SiLU(inplace=True),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )

        self.flatten = nn.Flatten()
        self.dropout = nn.Dropout(0.6) # Increased dropout here
        self.fc = nn.Linear(1, num_classes) # Placeholder, will be updated

        # --- DEBUGGING FORWARD PASS ---
        with torch.no_grad():
            dummy_input = torch.randn(1, input_size)
            x = dummy_input.view(1, 3, -1)
            x = self.prep(x)
            x = self.layer1(x)
            x = self.layer2(x)
            x = self.layer3(x)
            self._flattened_size = x.flatten().shape[0]
            print(f"Calculated flattened size: {self._flattened_size}")
            # Update the FC layer with the correct input size
            self.fc = nn.Linear(self._flattened_size, num_classes)

    def forward(self, x):
        batch_size = x.size(0)
        x = x.view(batch_size, 3, -1)

        x = self.prep(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)

        x = self.flatten(x)
        x = self.dropout(x)
        x = self.fc(x)
        return x

In [None]:
# --- Data Loading and Preprocessing ---
class WoundDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.classes = sorted(os.listdir(root_dir))
        self.class_to_idx = {c: i for i, c in enumerate(self.classes)}
        self.image_paths = []
        self.labels = []
        self.transform = transform

        for class_name in self.classes:
            class_dir = os.path.join(root_dir, class_name)
            if os.path.isdir(class_dir):
                for filename in os.listdir(class_dir):
                    if filename.endswith(('.jpg', '.jpeg', '.png')):
                        self.image_paths.append(os.path.join(class_dir, filename))
                        self.labels.append(self.class_to_idx[class_name])

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, label, os.path.basename(img_path)

In [None]:
# --- Hyperparameters ---
input_size = 224 * 224 * 3 # Example: flatten image of 224x224x3
num_classes = 2
batch_size = 32
learning_rate = 0.001
num_epochs = 20
validation_split = 0.2

# --- Data Transformations ---
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [None]:
# --- Load Dataset and Split ---
wound_data_dir = 'wound_data'
dataset = WoundDataset(root_dir=wound_data_dir, transform=transform)
train_idx, val_idx = train_test_split(list(range(len(dataset))), test_size=validation_split, stratify=dataset.labels, random_state=42)

train_dataset = torch.utils.data.Subset(dataset, train_idx)
val_dataset = torch.utils.data.Subset(dataset, val_idx)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)

In [None]:
# --- Initialize Model, Loss, and Optimizer ---
model = EmotionLandmarkNet(input_size=input_size, num_classes=num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=0.0001) # Added weight decay

In [None]:
# Set the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

In [None]:
# --- Training Loop ---

train_losses = []
val_losses = []
train_accuracies = []
val_accuracies = []

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct_predictions = 0
    total_samples = 0
    progress_bar = tqdm(enumerate(train_loader), total=len(train_loader), desc=f"Epoch {epoch+1}/{num_epochs}")
    for i, (inputs, labels, _) in progress_bar:
        inputs = inputs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)
        _, predicted = torch.max(outputs, 1)
        correct_predictions += (predicted == labels).sum().item()
        total_samples += labels.size(0)

        train_loss = running_loss / total_samples
        train_accuracy = correct_predictions / total_samples
        progress_bar.set_postfix(loss=train_loss, accuracy=train_accuracy)

    train_losses.append(train_loss)
    train_accuracies.append(train_accuracy)

    # --- Validation Loop ---
    model.eval()
    val_running_loss = 0.0
    val_correct_predictions = 0
    val_total_samples = 0
    with torch.no_grad():
        for inputs, labels, _ in val_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_running_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs, 1)
            val_correct_predictions += (predicted == labels).sum().item()
            val_total_samples += labels.size(0)

    val_loss = val_running_loss / val_total_samples
    val_accuracy = val_correct_predictions / val_total_samples
    val_losses.append(val_loss)
    val_accuracies.append(val_accuracy)

    print(f"Epoch {epoch+1} - Train Loss: {train_loss:.4f}, Train Acc: {train_accuracy:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.4f}")

In [None]:
# --- Plotting Training and Validation Metrics ---
epochs = range(1, num_epochs + 1)

plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(epochs, train_accuracies, 'bo-', label='Training accuracy')
plt.plot(epochs, val_accuracies, 'ro-', label='Validation accuracy')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(epochs, train_losses, 'bo-', label='Training loss')
plt.plot(epochs, val_losses, 'ro-', label='Validation loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
# --- After the training loop ---

# Define the path where you want to save the model
model_save_path = 'wound_classification_model_v2.pth'

# Save the state dictionary of the trained model
torch.save(model.state_dict(), model_save_path)

print(f"\nModel saved successfully to: {model_save_path}")

# --- Later, to load the model ---
# Create an instance of your model architecture
loaded_model = EmotionLandmarkNet(input_size=input_size, num_classes=num_classes)

# Load the saved state dictionary
loaded_model.load_state_dict(torch.load(model_save_path))

# Set the model to evaluation mode if you intend to use it for inference
loaded_model.eval()

In [None]:
# --- Prediction Function ---
def predict_image(image_path, model, transform, class_names, device):
    image = Image.open(image_path).convert('RGB')
    image_tensor = transform(image).unsqueeze(0).to(device)
    model.eval()
    with torch.no_grad():
        output = model(image_tensor)
        probabilities = torch.softmax(output, dim=1)
        confidence, predicted_class = torch.max(probabilities, 1)
        predicted_class_name = class_names[predicted_class.item()]
    return confidence.item(), predicted_class_name, os.path.splitext(os.path.basename(image_path))[0].split('_')[0] # Remove numbers and extension

In [None]:
from langchain import PromptTemplate
from langchain.chains import RetrievalQA
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Pinecone
import pinecone
from pinecone import Pinecone
from langchain.document_loaders import PyPDFLoader, DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.prompts import PromptTemplate
from langchain.llms import CTransformers
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.prompts import ChatPromptTemplate

PINECONE_API_KEY = "pcsk_25mGEG_38SLB2BbJDvvtkHN2V8NCK2MP9yyyFnm5e7U9tGB7SnJoJVjAbVnpR6nYtDn9qA"

# Extract data from the PDF
def load_pdf(data):
    loader = DirectoryLoader(data,
                             glob="*.pdf",
                             loader_cls=PyPDFLoader)

    documents = loader.load()

    return documents

extracted_data = load_pdf("data/")

# Create text chunks
def text_split(extracted_data):
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=20)
    text_chunks = text_splitter.split_documents(extracted_data)

    return text_chunks

text_chunks = text_split(extracted_data)
print("length of my chunk:", len(text_chunks))

# Download embedding model
def download_hugging_face_embeddings():
    embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
    return embeddings

import sentence_transformers
embeddings = download_hugging_face_embeddings()
from pinecone import Pinecone, ServerlessSpec

pc = Pinecone(api_key=PINECONE_API_KEY)

index_name = "medical-assistant"

try:
    pc.describe_index(index_name)
    print(f"Index '{index_name}' already exists.")
except pinecone.exceptions.NotFoundException:
    pc.create_index(
        name=index_name,
        dimension=384,
        metric="cosine",
        spec=ServerlessSpec(
            cloud="aws",
            region="us-east-1"
        )
    )
    print(f"Index '{index_name}' created successfully.")

import os
os.environ["PINECONE_API_KEY"] = PINECONE_API_KEY
from langchain_pinecone import PineconeVectorStore

try:
    docsearch = PineconeVectorStore.from_existing_index(
        index_name=index_name,
        embedding=embeddings
    )
    print(f"Successfully loaded existing index '{index_name}'.")
except Exception as e:
    print(f"Error loading existing index: {e}. Creating new index.")
    docsearch = PineconeVectorStore.from_documents(
        documents=text_chunks,
        index_name=index_name,
        embedding=embeddings
    )
    print(f"New index '{index_name}' created and populated.")

retriever = docsearch.as_retriever(search_type="similarity", search_kwargs={"k": 2})
retrieved_docs = retriever.invoke("What is Acne?")
print("Retrieved documents:", retrieved_docs)

In [None]:
# Testing the model

# Load your local LLM model
def load_local_llm(model_path):
    llm = CTransformers(
        model=model_path,
        model_type="llama",
        config={'max_new_tokens': 256, 'temperature': 0.0}
    )
    return llm

llm_model_path = "model/llama-2-7b-chat.ggmlv3.q8_0.bin"
llm = load_local_llm(llm_model_path)

# Create the RAG chain
system_prompt = (
    "You are an assistant for question-answering tasks."
    "Use the following pieces of retrieved context to answer "
    "the question. If the user mentions that the situation is critical or an emergency, "
    "list out remedies in simple steps that the user should take to deal with the emergency. "
    "Otherwise, provide a normal answer. In all cases, format the answer neatly and clearly."
    "If you don't know the answer, say that you don't know."
    "\n\n"
    "{context}"
)

prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt),
        ("human", "{input}"),
    ]
)
questions_answer_chain = create_stuff_documents_chain(llm, prompt)
rag_chain = create_retrieval_chain(retriever, questions_answer_chain)

# Invoke the RAG chain
query = "What is Acne?"
response = rag_chain.invoke({"input": query})

print("RAG Response:", response["answer"])

In [None]:
# Load your local LLM model
def load_local_llm(model_path):
    llm = CTransformers(
        model=model_path,
        model_type="llama",
        config={'max_new_tokens': 500, 'temperature': 0.2}
    )
    return llm

llm_model_path = "model/llama-2-7b-chat.ggmlv3.q8_0.bin"
llm = load_local_llm(llm_model_path)

# Create the RAG chain
system_prompt = (
    "You are an assistant for question-answering tasks."
    "Use the following pieces of retrieved context to answer "
    "the question. If the user mentions that the situation is critical or an emergency, "
    "list out remedies in simple steps that the user should take to deal with the emergency. "
    "Otherwise, provide a normal answer. In all cases, format the answer neatly and clearly."
    "If you don't know the answer, say that you don't know."
    "\n\n"
    "{context}"
)

prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt),
        ("human", "{input}"),
    ]
)
questions_answer_chain = create_stuff_documents_chain(llm, prompt)
rag_chain = create_retrieval_chain(retriever, questions_answer_chain)

In [None]:
# step1a: Text to Speech-TTS-model (with gTTS)

import os
from gtts import gTTS
import subprocess
import platform

def text_to_speech_with_gtts(input_text, output_filepath):
    language="en"

    audioobj=gTTS(
        text=input_text,
        lang=language,
        slow=False
    )
    audioobj.save(output_filepath)

    os_name = platform.system()
    try:
        if os_name == "Darwin":
            subprocess.run(['afplay', output_filepath])
        elif os_name == "Windows":
            subprocess.run(['ffplay', '-nodisp', '-autoexit', output_filepath],
                           stdout=subprocess.DEVNULL,
                           stderr=subprocess.DEVNULL
                           )
        elif os_name == "Linux":
            subprocess.run(['aplay', output_filepath])
        else:
            raise OSError("Unsupported operating system")
    except Exception as e:
        print(f"An error occured while trying to play the audio: {e}")

input_text="Hi this is AI with Anurag, autoplay testing"
# text_to_speech_with_gtts(input_text=input_text, output_filepath="gtts_testing_autoplay.mp3")

In [None]:
import torch
import torchvision.transforms as transforms
from PIL import Image
import os
import re
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

# Assume you have your EmotionLandmarkNet class defined in this script

# Load your trained model (replace with your actual parameters)
input_size = 224 * 224 * 3
num_classes = 2
model = EmotionLandmarkNet(input_size=input_size, num_classes=num_classes)
model_path = 'wound_classification_model.pth'
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
try:
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()
    model.to(device)
    print("Model loaded successfully.")
except FileNotFoundError:
    print(f"Error: Model not found at {model_path}")
    exit()

# Define your class names
class_names = ['critical', 'non_critical']

# Define the image transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

def clean_filename(filename):
    """Removes numbers and special characters from a filename base."""
    cleaned_name = re.sub(r'[^a-zA-Z_]', '', filename)
    return cleaned_name

def predict_single_image(image_path):
    try:
        image = Image.open(image_path).convert('RGB')
        image_tensor = transform(image).unsqueeze(0).to(device)
        with torch.no_grad():
            output = model(image_tensor)
            probabilities = torch.softmax(output, dim=1)
            confidence, predicted_class = torch.max(probabilities, 1)
            
            # Check confidence score threshold
            confidence_score = confidence.item()
            if confidence_score < 0.7:  # Threshold set to 70%
                # Flip to the other class
                predicted_class = 1 - predicted_class
                confidence_score = 1.0 - confidence_score
            
            predicted_class_name = class_names[predicted_class.item()]
        
        filename_base = os.path.splitext(os.path.basename(image_path))[0]
        cleaned_filename = clean_filename(filename_base)
        return confidence_score, predicted_class_name, cleaned_filename, image
    except FileNotFoundError:
        print(f"Error: Image not found at {image_path}")
        return None, None, None, None
    except Exception as e:
        print(f"Error processing image: {e}")
        return None, None, None, None

In [None]:
if __name__ == "__main__":
    query = ""

    while True:
        print("\nOptions:")
        print("1: Image-based input")
        print("2: Text-based input")
        print("3: Both image and text-based input")
        print("4: Exit")

        try:
            choice = int(input("Select an option (1-4): "))
        except ValueError:
            print("Invalid input. Please enter a number between 1 and 4.")
            continue

        match choice:
            case 1:  # Image-based input
                image_path = input("Enter the full path to the image file you want to predict: ")

                if not os.path.exists(image_path):
                    print(f"Error: Image file not found at {image_path}")
                elif not image_path.lower().endswith(('.jpeg', '.jpg', '.png')):
                    print("Error: Please enter a valid JPEG, JPG, or PNG image file.")
                else:
                    confidence, predicted_class, filename_base, image = predict_single_image(image_path)
                    if confidence is not None:
                        print("\n--- Prediction Result ---")
                        print(f"Filename: {filename_base}")
                        print(f"Predicted Class: {predicted_class}")
                        print(f"Confidence: {confidence:.4f}")

                        # Display the image using Matplotlib
                        plt.figure()
                        plt.imshow(image)
                        plt.title(f"Filename: {filename_base}\nPredicted: {predicted_class} (Conf: {confidence:.2f})")
                        plt.axis('off')  # Turn off axis numbers and ticks
                        plt.show()

                        query = f"{predicted_class}, {filename_base}"

            case 2:  # Text-based input
                query = input("Ask a question about the medical documents (or type 'exit' to quit): ")

            case 3:  # Both image and text-based input
                image_path = input("Enter the full path to the image file you want to predict: ")

                if not os.path.exists(image_path):
                    print(f"Error: Image file not found at {image_path}")
                elif not image_path.lower().endswith(('.jpeg', '.jpg', '.png')):
                    print("Error: Please enter a valid JPEG, JPG, or PNG image file.")
                else:
                    confidence, predicted_class, filename_base, image = predict_single_image(image_path)
                    if confidence is not None:
                        print("\n--- Prediction Result ---")
                        print(f"Filename: {filename_base}")
                        print(f"Predicted Class: {predicted_class}")
                        print(f"Confidence: {confidence:.4f}")

                        # Display the image using Matplotlib
                        plt.figure()
                        plt.imshow(image)
                        plt.title(f"Filename: {filename_base}\nPredicted: {predicted_class} (Conf: {confidence:.2f})")
                        plt.axis('off')  # Turn off axis numbers and ticks
                        plt.show()

                text_query = input("Ask a question about the medical documents (or type 'exit' to quit): ")
                query = f"{predicted_class}, {filename_base}, {text_query}"

            case 4:  # Exit
                print("Exiting the question-answering session.")
                break

            case _:  # Default case for invalid inputs
                print("Invalid selection. Please select a valid option.")

        if query:
            response = rag_chain.invoke({"input": query})
            print("RAG Response:", response["answer"])
        
        text_to_speech_with_gtts(input_text=response["answer"], output_filepath="gtts_output_file.mp3")