In [None]:
import csv
import torch
import nltk
import string
import numpy as np
from nltk.stem.porter import PorterStemmer
from nltk.corpus import stopwords
from transformers import RobertaTokenizer, RobertaModel
from torch.utils.data import TensorDataset, DataLoader
from torch.nn import Linear, CrossEntropyLoss
from sklearn.model_selection import train_test_split
import pickle
from torch.nn import Linear
from flask import Flask, render_template, request
import random


# Download NLTK data
nltk.download('punkt')
nltk.download('stopwords')

# Initialize NLTK components
stemmer = PorterStemmer()
stop_words = set(stopwords.words('english'))
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
model = RobertaModel.from_pretrained('roberta-base')


# Function to remove punctuation
def remove_punctuation(text):
    translator = str.maketrans('', '', string.punctuation)
    return text.translate(translator)


# Function for tokenization
def Tokenization(sentence):
    return nltk.word_tokenize(sentence)


# Function for stemming
def Stem(word):
    return stemmer.stem(word.lower())


# Function to remove stopwords
def RemoveStopwords(tokenized_sentence):
    return [word for word in tokenized_sentence if word not in stop_words]


# Load your dataset from CSV
questions = []
labels = []

with open('dataset_done_done_new.csv', 'r') as file:
    reader = csv.reader(file)
    next(reader)  # Skip the header row
    for row in reader:
        tag = row[1]
        labels.append(tag)
        for pattern in row[0].split(';'):
            questions.append(pattern)





# Text preprocessing
cleaned_questions = []
for question in questions:
    question = remove_punctuation(question)  # Remove punctuation
    tokens = Tokenization(question)          # Tokenize
    tokens = RemoveStopwords(tokens)         # Remove stopwords
    cleaned_question = ' '.join(tokens)      # Re-join tokens into a single string
    cleaned_questions.append(cleaned_question)


# Encode questions
encoded = tokenizer(questions, padding=True, truncation=True, return_tensors='pt')

# Get embeddings for [CLS] token
with torch.no_grad():
    outputs = model(**encoded)
    cls_embeddings = outputs.last_hidden_state[:, 0, :]

# Prepare labels
label_dict = {tag: index for index, tag in enumerate(set(labels))}
y = torch.tensor([label_dict[label] for label in labels])

# Split the dataset into training, validation, and test sets
train_data, test_data, train_labels, test_labels = train_test_split(cls_embeddings, y, test_size=0.3, random_state=42)
val_data, test_data, val_labels, test_labels = train_test_split(test_data, test_labels, test_size=0.6, random_state=42)

# Convert the data into TensorDataset
train_dataset = TensorDataset(train_data, train_labels)
val_dataset = TensorDataset(val_data, val_labels)
test_dataset = TensorDataset(test_data, test_labels)

# Data loaders
train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=16, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=True)

# Define a simple classifier
n_classes = len(set(labels))
classifier = Linear(cls_embeddings.size(1), n_classes)

# Training setup
optimizer = torch.optim.Adam(classifier.parameters(), lr=0.01)
criterion = CrossEntropyLoss()
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)


# Save labels and questions using pickle
pickle.dump(labels, open('texts.pkl', 'wb'))
pickle.dump(questions, open('labels.pkl', 'wb'))

# Training loop
for epoch in range(120):
    total_loss = 0
    correct = 0
    total = 0

    # Training phase
    classifier.train()
    for inputs, targets in train_dataloader:
        optimizer.zero_grad()
        outputs = classifier(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        correct += (predicted == targets).sum().item()
        total += targets.size(0)

    # Validation phase
    classifier.eval()
val_correct = 0
    val_total = 0
    with torch.no_grad():
        for inputs, targets in val_dataloader:
            outputs = classifier(inputs)
            _, predicted = torch.max(outputs.data, 1)
            val_correct += (predicted == targets).sum().item()
            val_total += targets.size(0)

    train_accuracy = correct / total
    val_accuracy = val_correct / val_total
    print(
        f'Epoch {epoch + 1}, Loss: {total_loss / len(train_dataloader):.4f}, Train Accuracy: {train_accuracy:.4f}, Validation Accuracy: {val_accuracy:.4f}')

# Evaluate on test set
classifier.eval()
test_correct = 0
test_total = 0
with torch.no_grad():
    for inputs, targets in test_dataloader:
        outputs = classifier(inputs)
        _, predicted = torch.max(outputs.data, 1)
        test_correct += (predicted == targets).sum().item()
        test_total += targets.size(0)

test_accuracy = test_correct / test_total
print(f'Test Accuracy: {test_accuracy*100:.2f}%')

# Save the trained model
torch.save(classifier.state_dict(), 'roberta_classifier.h5')


#flask code
app = Flask(__name__)
app.static_folder = 'static'

# Load labels
with open('labels.pkl', 'rb') as f:
    labels = pickle.load(f)


# Load the trained model state_dict if available
model_file = 'roberta_classifier.h5'

if torch.cuda.is_available():
    map_location = 'cuda:0'
else:
    map_location = 'cpu'

try:
    if torch.cuda.is_available():
        state_dict = torch.load(model_file, map_location=map_location)
    else:
        state_dict = torch.load(model_file, map_location=torch.device('cpu'))

    # Check if the state_dict needs adjustment due to output size mismatch
    if 'weight' in state_dict and state_dict['weight'].shape != classifier.weight.shape:
        print(f"Adjusting the classifier output size from {classifier.out_features} to {state_dict['weight'].shape[0]}")
        classifier = Linear(model.config.hidden_size, state_dict['weight'].shape[0])

    # Load the adjusted state_dict into the classifier
    classifier.load_state_dict(state_dict)
    classifier.eval()
    print("Model loaded successfully!")
except FileNotFoundError:
    print(f"Error: Model file '{model_file}' not found.")
except Exception as e:
    print("Error loading model:", e)


def predict_class(sentence, threshold=0.5):
    # Process the input sentence
    encoded_input = tokenizer(sentence, padding=True, truncation=True, return_tensors='pt')

    # Get the CLS token embedding
    with torch.no_grad():
        outputs = model(**encoded_input)
        cls_embedding = outputs.last_hidden_state[:, 0, :]

    # Make a prediction
    with torch.no_grad():
        logits = classifier(cls_embedding)
        probabilities = torch.softmax(logits, dim=1)
        confidence, predicted_class = torch.max(probabilities, dim=1)  # torch.argmax(logits, dim=1).item()

        if confidence.item() < threshold:
            return "I am Sorry There Is No Answer For This Question 😢 💔"
        # predicted_class = torch.argmax(logits, dim=1).item()
        predicted_label = list(label_dict.keys())[list(label_dict.values()).index(predicted_class)]

    return predicted_label




@app.route("/")
def home():
    return render_template("index.html")


@app.route("/get")
def get_bot_response():
    userText = request.args.get('msg')
    result=predict_class(userText)
    return result

if name == "__main__":
    app.run(debug=True)