In [None]:
pip install sentence_transformers

Collecting sentence_transformers
  Downloading sentence_transformers-3.2.0-py3-none-any.whl.metadata (10 kB)
Downloading sentence_transformers-3.2.0-py3-none-any.whl (255 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m255.2/255.2 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: sentence_transformers
Successfully installed sentence_transformers-3.2.0


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
pip install flask_ngrok

Collecting flask_ngrok
  Downloading flask_ngrok-0.0.25-py3-none-any.whl.metadata (1.8 kB)
Downloading flask_ngrok-0.0.25-py3-none-any.whl (3.1 kB)
Installing collected packages: flask_ngrok
Successfully installed flask_ngrok-0.0.25


In [None]:
from flask import Flask, render_template, request, jsonify
from flask_ngrok import run_with_ngrok
import json
import torch
from sentence_transformers import SentenceTransformer, util
from sklearn.cluster import KMeans
import pandas as pd
import re
from collections import defaultdict

  from tqdm.autonotebook import tqdm, trange


In [None]:
import json
import os
import re
from collections import defaultdict
import torch
from sentence_transformers import SentenceTransformer, util
from sklearn.cluster import KMeans
import pandas as pd

def clean_text(text):
    # Remove extra whitespace characters
    text = re.sub(r'\s+', ' ', text).strip()
    # Normalize punctuation (this is just a simple example, you may need more complex rules)
    text = text.replace('，', ',').replace('。', '.')
    return text

def are_similar(text1, text2, threshold=0.8):
    # This is a simple similarity check, you may need a more complex algorithm
    words1 = set(text1.lower().split())
    words2 = set(text2.lower().split())
    intersection = words1.intersection(words2)
    return len(intersection) / max(len(words1), len(words2)) > threshold

# Define input file paths
input_files = ['/content/drive/MyDrive/chatbot/merged_qa_dataset.json']

# Define output file path
output_file = '/content/cleaned_data'

# Initialize merged data list and question dictionary
merged_data = []
question_dict = defaultdict(list)

# Iterate over each input file
for file_path in input_files:
    with open(file_path, 'r', encoding='utf-8') as file:
        data = json.load(file)
        if isinstance(data, list):
            for item in data:
                question = clean_text(item.get('question', ''))
                answer = clean_text(item.get('answer', ''))
                category = item.get('category', '').strip()

                if question and answer:
                    question_dict[question].append((answer, category))
        elif isinstance(data, dict):
            question = clean_text(data.get('question', ''))
            answer = clean_text(data.get('answer', ''))
            category = data.get('category', '').strip()

            if question and answer:
                question_dict[question].append((answer, category))

# Merge similar questions
final_data = []
for question, answers in question_dict.items():
    similar_found = False
    for existing in final_data:
        if are_similar(question, existing['question']):
            # If a similar question is found, merge answers and categories
            existing['answers'].extend(answers)
            similar_found = True
            break
    if not similar_found:
        final_data.append({
            'question': question,
            'answers': answers
        })

# Reassign IDs and format output
for index, item in enumerate(final_data, start=1):
    merged_data.append({
        'id': index,
        'question': item['question'],
        'answer': '; '.join(set(answer for answer, _ in item['answers'])),
        'category': '; '.join(set(category for _, category in item['answers'] if category))
    })

# Write the merged data to a new file
with open(output_file, 'w', encoding='utf-8') as outfile:
    json.dump(merged_data, outfile, ensure_ascii=False, indent=2)

print(f"Merging complete, processed {len(merged_data)} unique entries. Output file: {output_file}")


Merging complete, processed 174 unique entries. Output file: /content/cleaned_data


In [None]:
# Load data
with open('/content/cleaned_data', 'r', encoding='utf-8') as file:
    qa_data = json.load(file)

# Load customer information
customer_info = pd.read_csv('/content/drive/MyDrive/chatbot/dummy_data (1).csv')
customer_info.set_index('Account Number', inplace=True)

# Extract questions and answers
questions = [item['question'] for item in qa_data]
answers = [item['answer'] for item in qa_data]

# Load pre-trained BERT model
model = SentenceTransformer('all-MiniLM-L6-v2')

# Encode all questions
question_embeddings = model.encode(questions, convert_to_tensor=True)

# Use K-means for simple intent clustering
num_clusters = 5
kmeans = KMeans(n_clusters=num_clusters)
kmeans.fit(question_embeddings.cpu().numpy())

class QAAgent:
    def get_intent(self, user_question):
        user_embedding = model.encode(user_question, convert_to_tensor=True)
        intent = kmeans.predict(user_embedding.cpu().numpy().reshape(1, -1))[0]
        return f"Intent_{intent}"

    def get_answer(self, user_question, conversation_history):
        user_embedding = model.encode(user_question, convert_to_tensor=True)
        cos_scores = util.cos_sim(user_embedding, question_embeddings)[0]
        top_result = torch.topk(cos_scores, k=1)

        most_similar_index = top_result.indices[0].item()
        most_similar_score = top_result.values[0].item()

        if most_similar_score > 0.7:
            return self.format_answer(answers[most_similar_index])
        else:

            return ("I'm sorry, I couldn't find a specific answer to your question. "
                    "Please book an appointment to speak with a renewal specialist at Manulife Bank. "
                    "<a href='https://outlook.office365.com/book/MLB_Renewals@MFC.onmicrosoft.com/' target='_blank'>Click here</a> to schedule an appointment.")

    def question_similarity(self, q1, q2):

        words1 = set(q1.lower().split())
        words2 = set(q2.lower().split())
        intersection = words1.intersection(words2)
        union = words1.union(words2)
        return len(intersection) / len(union)

    def format_answer(self, answer):
        # Remove numbering at the beginning of steps
        answer = re.sub(r'^\d+\.\s*', '', answer, flags=re.MULTILINE)

        # Split the answer into separate steps
        steps = re.split(r'\s*\d+\.\s*|\n', answer)

        # Remove empty steps and strip whitespace
        steps = [step.strip() for step in steps if step.strip()]

        # If the answer contains multiple steps, format it as a list
        if len(steps) > 1:
            return "\n".join(f"- {step}\n" for step in steps)
        else:
            # If it's not a list of steps, keep it as is
            return answer

    def get_personal_answer(self, user_question, customer_id):
        try:
            customer_id = int(customer_id)
        except ValueError:
            return "I'm sorry, there was an error processing your account number."

        if customer_id in customer_info.index:
            info = customer_info.loc[customer_id]
            if "balance" in user_question.lower():
                return f"Your current account balance is ${info['Current Balance']}. Your sub-account balance is ${info['Sub_acc_Current_Balance']}."
            elif "loan amount" in user_question.lower():
                return f"Your original loan amount was ${info['Original Loan Amount']}."
            elif "maturity date" in user_question.lower():
                return f"Your loan maturity date is {info['Maturity Date']}."
            elif "name" in user_question.lower():
                return f"Your name is {info['First Name']} {info['Last Name']}."
            elif "payment frequency" in user_question.lower():
                return f"Your main account payment frequency is {info['Payment Frequency']}. Your sub-account payment frequency is {info['Sub_acc_Payment_Frequency']}."
            elif "prepayment" in user_question.lower():
                return f"Your prepayment amount is ${info['Prepayment']}."
            elif "interest type" in user_question.lower():
                return f"Your sub-account interest type is {info['Sub_acc_Interest_Type']}."
            else:
                return "I'm sorry, I can't answer this specific personal question. Please ask about your balance, loan amount, maturity date, name, payment frequency, prepayment, or interest type."
        else:
            return "I'm sorry, I couldn't find your personal information."

class VerificationAgent:
    def verify_identity(self, account_number, first_name, last_name):
        try:
            account_number = int(account_number)
        except ValueError:
            return False, "Invalid account number format."

        if account_number in customer_info.index:
            actual_info = customer_info.loc[account_number]
            if (first_name.lower().strip() == str(actual_info['First Name']).lower().strip() and
                last_name.lower().strip() == str(actual_info['Last Name']).lower().strip()):
                return True, "Identity verification successful."
            else:
                return False, "Identity verification failed. Please try again."
        else:
            return False, "Account number not found."

class ChatbotManager:
    def __init__(self):
        self.qa_agent = QAAgent()
        self.verification_agent = VerificationAgent()
        self.conversation_history = []
        self.current_account_number = None
        self.current_first_name = None
        self.is_verified = False
        self.verification_stage = 0
        self.personal_phrases = [
            "my current balance", "my loan amount", "my maturity date",
            "my full name", "my payment frequency", "my prepayment amount",
            "my interest rate"
        ]
        self.general_phrases = [
            "prepayment privilege", "interest type", "payment options"
        ]

    def is_personal_question(self, user_input):
        user_input_lower = user_input.lower()


        if any(phrase in user_input_lower for phrase in self.personal_phrases):
            return True


        if any(phrase in user_input_lower for phrase in self.general_phrases):
            return False

        financial_terms = ["prepayment", "balance", "loan", "interest", "payment"]
        if any(f"my {term}" in user_input_lower or f"mine {term}" in user_input_lower for term in financial_terms):
            return True


        first_person_pronouns = ["my", "i", "me", "mine"]
        if any(pronoun + " " in user_input_lower.split() for pronoun in first_person_pronouns):

            if not any(word in user_input_lower for word in ["how", "what", "when", "where", "why", "which"]):
                return True


        return False


    def process_input(self, user_input):
        intent = self.qa_agent.get_intent(user_input)

        is_personal_question = self.is_personal_question(user_input)

        if is_personal_question or self.verification_stage > 0:
            if not self.is_verified:
                if self.verification_stage == 0:
                    self.verification_stage = 1
                    return "Please verfy your identify before we proceed further. Enter your account number:"
                elif self.verification_stage == 1:
                    self.current_account_number = user_input
                    self.verification_stage = 2
                    return "Please enter your first name for identity verification."
                elif self.verification_stage == 2:
                    self.current_first_name = user_input
                    self.verification_stage = 3
                    return "Please enter your last name for identity verification."
                elif self.verification_stage == 3:
                    is_verified, message = self.verification_agent.verify_identity(
                        self.current_account_number, self.current_first_name, user_input)
                    if is_verified:
                        self.is_verified = True
                        self.verification_stage = 0
                        return "Identity verification successful. You can now ask personal questions."
                    else:
                        self.current_account_number = None
                        self.current_first_name = None
                        self.verification_stage = 0
                        return "Identity verification failed. Please start the verification process again."

        if self.is_verified and is_personal_question:
            answer = self.qa_agent.get_personal_answer(user_input, self.current_account_number)
        else:
            answer = self.qa_agent.get_answer(user_input, self.conversation_history)

        response = f"Manulife Kleo: {answer}"

        self.conversation_history.append(intent)
        if len(self.conversation_history) > 3:
            self.conversation_history.pop(0)

        return response

# Chatbot main loop
print("Hi! I'm Manulife Kleo, your virtual Renewal assistant. How can I help you today? (Type 'exit' to end the conversation).")
chatbot_manager = ChatbotManager()

while True:
    user_input = input("\nYou: ")
    if user_input.lower() == 'exit':
        print("\nManulife Kleo: Thank you for using the chatbot. Goodbye!")
        break

    response = chatbot_manager.process_input(user_input)
    print(f"\n{response}")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/10.7k [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]



1_Pooling/config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Hi! I'm Manulife Kleo, your virtual Renewal assistant. How can I help you today? (Type 'exit' to end the conversation).


In [None]:
!mkdir -p static/js static/css templates
!mv script.js static/js/
!mv style.css static/css/
!mv index.html templates/

In [None]:
import json
import os
import re
from collections import defaultdict
import torch
from sentence_transformers import SentenceTransformer, util
from sklearn.cluster import KMeans
import pandas as pd
from flask import Flask, render_template, request, jsonify
from google.colab import output
import threading
import time

In [None]:
app = Flask(__name__)
chatbot_manager = ChatbotManager()

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/chat', methods=['POST'])
def chat():
    user_input = request.json['message']
    response = chatbot_manager.process_input(user_input)
    return jsonify({'response': response})

@app.route('/init', methods=['GET'])
def init_chat():
    welcome_message = "Hi! I'm Manulife Kleo, your virtual Renewal assistant. How can I help you today?"
    return jsonify({'response': welcome_message})
# Run the Flask app in a background thread

# Run the Flask app in a background thread
def run_flask():
    app.run(port=8000)

threading.Thread(target=run_flask, daemon=True).start()

time.sleep(3)

# Display a clickable URL
output.serve_kernel_port_as_window(8000)