<a href="https://colab.research.google.com/github/ishitas2365/IITISOC_Mock_Interviewer/blob/main/InterBot.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get update && apt-get install -y ffmpeg && pip install pydub gradio SpeechRecognition gTTS huggingface_hub ipython

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.83)] [                                                                               Get:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
0% [2 InRelease 15.6 kB/128 kB 12%] [Connecting to security.ubuntu.com (185.125                                                                               Get:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:5 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Get:6 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:7 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drive

In [None]:
import gradio as gr
import cv2
import requests
import os
import tempfile
from collections import Counter
from queue import Queue
import threading
from pydub import AudioSegment
import speech_recognition as sr
from gtts import gTTS
from huggingface_hub import InferenceClient
import time
from IPython.display import Audio, display

# — Face‐expression model setup —
API_URL = "https://api-inference.huggingface.co/models/trpakov/vit-face-expression"
HEADERS = {"Authorization": "Bearer ************************",
           "Content-Type": "application/octet-stream"}

def query_expression(frame_path):
    with open(frame_path, "rb") as f:
        data = f.read()
    resp = requests.post(API_URL, headers=HEADERS, data=data, timeout=30)
    resp.raise_for_status()
    out = resp.json()
    if isinstance(out, dict) and "error" in out:
        raise RuntimeError(out["error"])
    return out

def predict_expression(video_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        return "Could not open video."
    fps = cap.get(cv2.CAP_PROP_FPS) or 0.0
    skip = max(int(fps), 1)
    counts = Counter()
    idx = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        if idx % skip == 0:
            tmp = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False)
            cv2.imwrite(tmp.name, frame); tmp.close()
            try:
                out = query_expression(tmp.name)
                if isinstance(out, list) and out:
                    best = max(out, key=lambda x: x.get("score", 0))
                    counts[best.get("label")] += 1
            except:
                pass
            os.remove(tmp.name)
        idx += 1
    cap.release()
    if not counts:
        return "No faces detected."
    return counts.most_common(1)[0][0]

# LLM setup
client = InferenceClient("meta-llama/Meta-Llama-3-8B-Instruct",
                         token="****************************")

def get_system_prompt(topic):
    return f"You are an AI mock interviewer with expertise in {topic}. Your job is to conduct a realistic and challenging mock interview for the candidate. Your questions should evaluate the candidate's knowledge, problem-solving skills, and ability to apply concepts in practical scenarios. Ensure the interview is thorough, engaging, and covers various aspects of the role without repeating questions."

def generate_question(topic, position, difficulty, previous_questions=[], max_new_tokens=500, question_count=1):
    system_prompt = get_system_prompt(topic)
    messages = [{"role": "system", "content": system_prompt}]
    if question_count == 1:
        messages.append({"role": "user", "content": f"Generate only 1 short question on the future plans of the user in the domain of {topic}."})
    elif question_count == 2:
        messages.append({"role": "user", "content": f"Generate only 1 relevant and concise question about the position of {position}. Don't provide its answer."})
    else:
        messages.append({"role": "user", "content": f"Generate only 1 relevant and concise {difficulty} level question on {topic}. Never provide its answer. Don't generate multiple choice questions. Test the knowledge of the user on {topic}. Make sure the question is not already in the list of previous questions: {', '.join(previous_questions)}."})
    response = client.chat_completion(messages=messages, max_tokens=max_new_tokens, stream=True)
    generated_text = ""
    for chunk in response:
        generated_text += chunk.choices[0].delta.content or ""
    return generated_text.strip()

def verify_answer(topic, position, user_answer, question):
    messages = [{"role": "user", "content": f"Tell the user if the answer {user_answer} is right or wrong in relevance to the question {question}. Tell right or wrong only. Add a 1 line explanation. Don't provide the answer to {question}."}]
    response = client.chat_completion(messages=messages, max_tokens=100, stream=True)
    generated_text = ""
    for chunk in response:
        generated_text += chunk.choices[0].delta.content or ""
    return generated_text.strip()

def generate_hint(topic, user_answer, question):
    messages = [{"role": "user", "content": f"Generate a hint for the question {question} in context to {topic}. Don't provide the answer."}]
    response = client.chat_completion(messages=messages, max_tokens=100, stream=True)
    generated_text = ""
    for chunk in response:
        generated_text += chunk.choices[0].delta.content or ""
    return generated_text.strip()

def get_correct_answer(topic, position, question):
    messages = [{"role": "user", "content": f"Provide the correct answer to {question} in context to {topic}."}]
    response = client.chat_completion(messages=messages, max_tokens=500, stream=True)
    generated_text = ""
    for chunk in response:
        generated_text += chunk.choices[0].delta.content or ""
    return generated_text.strip()

def generate_feedback(correct_answers, total_answers, conversation_history):
    accuracy = correct_answers / max(total_answers, 1)
    history_str = "\n".join([f"{entry['role']}: {entry['content']}" for entry in conversation_history])

    feedback_prompt = (
        f"Based on the following conversation history of a mock interview, provide constructive feedback for the user. "
        f"Include the user's strengths and weaknesses, and give advice on how to improve and refine their skills for a real interview.\n\n"
        f"Overall, assess the candidate's performance in terms of clarity, technical knowledge, problem-solving abilities, and communication skills. Highlight areas of improvement and suggest actionable advice to enhance interview readiness."
        f"Provide detailed feedback, mentioning specific instances from the conversation history to support your assessment.\n\n"
        f"Conversation History:\n{history_str}\n\n"
        f"Overall accuracy of the user was {accuracy:.2f}. Specify it and comment on it."
    )

    messages = [{"role": "user", "content": feedback_prompt}]
    response = client.chat_completion(messages=messages, max_tokens=600, stream=False)
    feedback = response.choices[0].message['content'].strip()
    return feedback

def speak(text):
    tts = gTTS(text)
    filename = "temp.mp3"
    tts.save(filename)
    return filename

# speak outside and enqueue filename
def play_audio_file(filename):
    audio_queue.put(filename)

# queue worker plays the mp3 file
def audio_worker():
    while True:
        filename = audio_queue.get()
        if filename is None:
            break
        display(Audio(filename, autoplay=True))
        audio = AudioSegment.from_file(filename)
        duration = len(audio) / 1000.0
        time.sleep(duration + 0.5)
        audio_queue.task_done()


# Queue to manage audio playback
audio_queue = Queue()

# Start the background thread once at launch
audio_thread = threading.Thread(target=audio_worker, daemon=True)
audio_thread.start()

def recognize_speech(audio_file):
    if not audio_file or not os.path.exists(audio_file):
        return "No valid audio file provided."
    recognizer = sr.Recognizer()
    with sr.AudioFile(audio_file) as source:
        audio_data = recognizer.record(source)
    try:
        text = recognizer.recognize_google(audio_data)
    except sr.UnknownValueError:
        text = "Sorry, I did not understand the audio."
    return text


def gradio_interface(topic, position, difficulty):
    question = generate_question(topic, position, difficulty, previous_questions=[], question_count=1)
    previous_questions = [question]
    question_count = 1
    correct_answers = 0
    total_answers = 0
    hint_given = False
    conversation_history = []
    filename = speak(question)
    play_audio_file(filename)
    conversation_history.append({"role": "interviewer", "content": question})

    def next_question(audio_file, video_file, question, hint_given, question_count, previous_questions,
                      correct_answers, total_answers, conversation_history, last_answer):
        if audio_file:
            user_answer = recognize_speech(audio_file)
            conversation_history.append({"role": "candidate", "content": user_answer})

            if hint_given and user_answer.strip().lower() == last_answer.strip().lower():
                prompt = "Please try answering again based on the hint before we move on."
                filename = speak(prompt)
                play_audio_file(filename)
                return question, question, prompt, hint_given, question_count, previous_questions, correct_answers, total_answers, conversation_history, last_answer

            if user_answer.lower() == 'finish interview':
                feedback = generate_feedback(correct_answers, total_answers, conversation_history)
                filename = speak(feedback)
                try:
                   expr = predict_expression(video_file)
                   feedback += f"\n\nMost common facial expression: {expr}"
                except Exception as e:
                   feedback += f"\n\n(Expression error: {e})"
                filename = speak(feedback)
                play_audio_file(filename)
                return question, question, f"Interview finished!\n\n{feedback}", False, question_count, previous_questions, correct_answers, total_answers, conversation_history, last_answer

            feedback = ""
            updated_last_answer = user_answer

            if question_count == 1:
                response = "It's great to hear about your innovative ideas. I wish you all the best for your future projects. Let's proceed with the interview."
                feedback = response
                filename = speak(feedback)
                play_audio_file(filename)
                conversation_history.append({"role": "interviewer", "content": response})

                question_count += 1
                question = generate_question(topic, position, difficulty, previous_questions, question_count=question_count)
                previous_questions.append(question)
                filename = speak(question)
                play_audio_file(filename)
                conversation_history.append({"role": "interviewer", "content": question})
                return question, question, feedback, hint_given, question_count, previous_questions, correct_answers, total_answers, conversation_history, updated_last_answer

            if not hint_given:
              verification = verify_answer(topic, position, user_answer, question)
              feedback += verification + "\n"
              filename = speak(feedback)
              play_audio_file(filename)
              conversation_history.append({"role": "interviewer", "content": verification})

              if "right" in verification.lower():
                  correct_answers += 1
                  total_answers += 1
                  question_count += 1
                  hint_given = False

                  #  Generate next question only after current is done
                  new_question = generate_question(topic, position, difficulty, previous_questions, question_count=question_count)
                  previous_questions.append(new_question)
                  filename = speak(new_question)
                  play_audio_file(filename)
                  conversation_history.append({"role": "interviewer", "content": new_question})

                  #  Return the new question here
                  return new_question, new_question, feedback, hint_given, question_count, previous_questions, correct_answers, total_answers, conversation_history, updated_last_answer

              else:
                  total_answers += 1
                  hint = generate_hint(topic, user_answer, question)
                  feedback += hint + "\n"
                  filename = speak(hint)
                  play_audio_file(filename)
                  conversation_history.append({"role": "interviewer", "content": hint})
                  hint_given = True

                  # Return current question
                  return question, question, feedback, hint_given, question_count, previous_questions, correct_answers, total_answers, conversation_history, updated_last_answer

            else:
                verification = verify_answer(topic, position, user_answer, question)
                feedback += verification + "\n"
                filename = speak(feedback)
                play_audio_file(filename)
                conversation_history.append({"role": "interviewer", "content": verification})
                updated_last_answer = user_answer

                if "right" in verification.lower():
                    correct_answers += 1
                else:
                    correct_answer = get_correct_answer(topic, position, question)
                    feedback += f"Correct answer: {correct_answer}\n"
                    filename = speak(f"Correct answer: {correct_answer}")
                    play_audio_file(filename)
                    conversation_history.append({"role": "interviewer", "content": f"Correct answer: {correct_answer}"})

                hint_given = False
                question_count += 1
                question = generate_question(topic, position, difficulty, previous_questions, question_count=question_count)
                previous_questions.append(question)
                filename = speak(question)
                play_audio_file(filename)
                conversation_history.append({"role": "interviewer", "content": question})
                return question, question, feedback, hint_given, question_count, previous_questions, correct_answers, total_answers, conversation_history, updated_last_answer

            return question, question, feedback, hint_given, question_count, previous_questions, correct_answers, total_answers, conversation_history, updated_last_answer

        return question, question, "", hint_given, question_count, previous_questions, correct_answers, total_answers, conversation_history, last_answer

    def transcribe_audio(audio_file):
        return recognize_speech(audio_file)

    with gr.Blocks(theme=gr.themes.Default()) as demo:
        gr.Markdown(f"### Mock Interview on {topic} for {position} position at {difficulty} difficulty level")
        question_output = gr.Textbox(label="Question", interactive=False, value=question)
        audio_input = gr.Audio(label="Your Answer", type="filepath")
        video_input = gr.Video(label="Your Video")
        transcribed_output = gr.Textbox(label="Transcribed Answer", interactive=False)
        transcribe_button = gr.Button("Transcribe Audio")
        submit_button = gr.Button("Submit Answer")
        feedback_output = gr.Textbox(label="Interviewer’s Response", interactive=False)

        last_answer_state = gr.State("")
        question_state = gr.State(question)
        hint_given_state = gr.State(hint_given)
        question_count_state = gr.State(question_count)
        previous_questions_state = gr.State(previous_questions)
        correct_answers_state = gr.State(correct_answers)
        total_answers_state = gr.State(total_answers)
        conversation_history_state = gr.State(conversation_history)

        transcribe_button.click(fn=transcribe_audio, inputs=audio_input, outputs=transcribed_output)

        submit_button.click(
            fn=next_question,
            inputs=[audio_input, video_input, question_state, hint_given_state, question_count_state, previous_questions_state,
                    correct_answers_state, total_answers_state, conversation_history_state, last_answer_state],
            outputs=[question_output, question_state, feedback_output, hint_given_state, question_count_state, previous_questions_state,
                     correct_answers_state, total_answers_state, conversation_history_state, last_answer_state]
        )

    demo.launch(debug=True)




In [None]:
if __name__ == "__main__":
    name       = input("Enter your name: ")
    domain     = input("Enter the topic/domain: ")
    position   = input("Enter the position: ")
    difficulty = input("Enter difficulty level (easy, medium, hard): ")

    greeting = (
        f"Hello {name}. I am InterBot, a smart multimodal interview simulator. "
        f"I will test your knowledge on {domain} at {difficulty} difficulty "
        f"for the {position} position to help you prepare better for your interview."
    )
    print(greeting)
    filename = speak(greeting)
    play_audio_file(filename)

    gradio_interface(domain, position, difficulty)


Enter your name: ishita
Enter the topic/domain: machine learning
Enter the position: ML engineer
Enter difficulty level (easy, medium, hard): easy
Hello ishita. I am InterBot, a smart multimodal interview simulator. I will test your knowledge on machine learning at easy difficulty for the ML engineer position to help you prepare better for your interview.


Access to the secret `HF_TOKEN` has not been granted on this notebook.
You will not be requested again.
Please restart the session if you want to be prompted again.


It looks like you are running Gradio on a hosted a Jupyter notebook. For the Gradio app to work, sharing must be enabled. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://f435307634518e604b.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/uvicorn/protocols/http/h11_impl.py", line 403, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/uvicorn/middleware/proxy_headers.py", line 60, in __call__
    return await self.app(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/fastapi/applications.py", line 1054, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/local/lib/python3.11/dist-packages/starlette/applications.py", line 112, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.11/dist-packages/starlette/middleware/errors.py", line 187, in __call__
    raise exc
  File "/usr/local/lib/python3.11/dist-packages/starlette/middleware/errors.py",

Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://f435307634518e604b.gradio.live
