In [2]:
def get_head_pose(landmarks, image_shape):
    # creates a 3d coordinate system model of a face
    model_points = np.array([
        (0.0, 0.0, 0.0),             # nose tip
        (0.0, -330.0, -65.0),        # chin
        (-225.0, 170.0, -135.0),     # left eye left corner
        (225.0, 170.0, -135.0),      # right eye right corner
        (-150.0, -150.0, -125.0),    # left Mouth corner
        (150.0, -150.0, -125.0)      # right mouth corner
    ])
    
    # using media pipe face gets the point corresponding to the model face
    image_points = np.array([
        (landmarks[1].x * image_shape[1], landmarks[1].y * image_shape[0]),  # nose tip
        (landmarks[152].x * image_shape[1], landmarks[152].y * image_shape[0]),  # chin
        (landmarks[226].x * image_shape[1], landmarks[226].y * image_shape[0]),  # left eye left corner
        (landmarks[446].x * image_shape[1], landmarks[446].y * image_shape[0]),  # right eye right corner
        (landmarks[57].x * image_shape[1], landmarks[57].y * image_shape[0]),  # left Mouth corner
        (landmarks[287].x * image_shape[1], landmarks[287].y * image_shape[0])  # right mouth corner
    ], dtype="double")

    # calculates the camera matrix
    principal_point = (image_shape[1]/2, image_shape[0]/2)
    focal_length = principal_point[0] / np.tan(60 / 2 * np.pi / 180)
    camera_matrix = np.array(
        [[focal_length, 0, principal_point[0]],
         [0, focal_length, principal_point[1]],
         [0, 0, 1]], dtype = "double"
    )

    # assuming no lens distortion
    distCoeffs = np.zeros((4,1))

    # SolvePnP
    (success, rotation_vector, translation_vector) = cv2.solvePnP(model_points, image_points, camera_matrix, distCoeffs, flags=cv2.SOLVEPNP_ITERATIVE)

    return rotation_vector, translation_vector

In [3]:
def rotation_vector_to_euler_angles(rotation_vector):
    # converts a rotation vector to a rotation matrix
    rotation_matrix, _ = cv2.Rodrigues(rotation_vector)
    # append a zero column to make it a 3x4 matrix
    projection_matrix = np.hstack((rotation_matrix, np.zeros((3, 1))))
    # returns three-element vector containing three Euler angles of rotation in degrees.
    euler_angles = cv2.decomposeProjectionMatrix(projection_matrix)[6]
    pitch, yaw, roll = [angle for angle in euler_angles]
    
    return pitch, yaw, roll

In [4]:
def is_facing_camera(pitch, yaw, pitch_threshold=160, yaw_threshold=25):
    if abs(pitch) > pitch_threshold and abs(yaw) < yaw_threshold:
        return True
    return False

In [5]:
def update_history(emotion, engaged, history_length=5):
    emotion_history.append(emotion)
    engagement_history.append(engaged)
    if len(emotion_history) > history_length:
        emotion_history.pop(0)
        engagement_history.pop(0)

In [14]:
def ask_chatgpt(question):
    """
    Send a question to ChatGPT and return the response.
    """
    response = openai.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "You are talking to the user with text to speech and vice versa\
            and you will be given the users facial expression where\
            {0: angry, 1: disgust, 2: fear, 3: happy, 4: neutral, 5: sad, 6: surprised}\
            and if the user is looking at the camera.\
            You will also be given the conversation log to keep track of the conversation.\
            Information in bracket is information given by the system.\
            Focus on the information outside of the brackets first which is information from the user.\
            Use this information to act like a friend.\
            Ocasionally take initiative and ask a question about the user.\
            Only talk about the user's feeling if their feeling is not neutral"}, 
            {"role": "user", "content": question},
        ]
    )
    return response.choices[0].message.content

In [7]:
def process_speech(data):
    audio = speech.RecognitionAudio(content=data)
    config = speech.RecognitionConfig(
        encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
        sample_rate_hertz=RATE,
        language_code="en-US"
    )
    response = speech_client.recognize(config=config, audio=audio)
    if response.results:
        return response.results[0].alternatives[0].transcript


In [8]:
def generate_chatgpt_prompt(text, emotion, is_engaged, conv_log):
    """
    Generate a prompt for ChatGPT based on user's current emotional state and engagement.
    """
        
    prompt = text
    prompt += " (User engagement is shown by the following list in chronological order "+ str(is_engaged)\
    + ", and the user's feeling are shown by the following list in chronological order " + str(emotion) +\
    "the conversation log is: " + '\n'.join(conv_log) + ")"
    
    return prompt

In [9]:
def update_history(emotion, engaged, history_length=3):
    emotion_history.append(emotion)
    engagement_history.append(engaged)
    if len(emotion_history) > history_length:
        emotion_history.pop(0)
        engagement_history.pop(0)

In [10]:
def text_to_speech_and_play(text):
    # Set the text input to be synthesized
    synthesis_input = texttospeech.SynthesisInput(text=text)

    # Build the voice request, select the language code and the ssml voice gender
    voice = texttospeech.VoiceSelectionParams(
        language_code='en-US',
        ssml_gender=texttospeech.SsmlVoiceGender.NEUTRAL
    )

    # Select the type of audio file you want
    audio_config = texttospeech.AudioConfig(
        audio_encoding=texttospeech.AudioEncoding.MP3
    )

    # Perform the text-to-speech request on the text input with the selected voice parameters and audio file type
    response = text_to_speech_client.synthesize_speech(
        input=synthesis_input, voice=voice, audio_config=audio_config
    )

    # Convert the response's audio_content (binary) into an audio segment
    audio_segment = AudioSegment.from_file(io.BytesIO(response.audio_content), format="mp3")

    # Play the audio segment out loud
    play(audio_segment)

In [11]:
from tensorflow.keras.models import load_model
model = load_model('model_all.keras')

In [17]:
import cv2
import numpy as np
import mediapipe as mp
from google.oauth2 import service_account
from google.cloud import speech
import pyaudio
import webrtcvad
from collections import deque
import wave
import io
from google.cloud import texttospeech
from google.oauth2 import service_account
from pydub import AudioSegment
from pydub.playback import play
import openai

# Provide the path to your service account key file
credentials = service_account.Credentials.from_service_account_file(
    'decoded-bulwark-421920-aa7f61c1c1ad.json'
)

# Set your OpenAI API key
openai.api_key = 'key'

# Initialize the Text-to-Speech client
text_to_speech_client  = texttospeech.TextToSpeechClient(credentials=credentials)

emotion_history = []
engagement_history = []
conv_log = []

# Setup for audio and speech recognition
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
CHUNK = int(RATE * 0.03)  # 10 ms chunk size
audio_interface = pyaudio.PyAudio()
stream = audio_interface.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK)
vad = webrtcvad.Vad(1)
# Pass the credentials to the speech client
speech_client  = speech.SpeechClient(credentials=credentials)
# Increase the size of silent frames buffer
SILENT_FRAMES_BUFFER_SIZE = 30  # Store more silent frames

# MediaPipe setup for face detection and mesh
frame_skip = 5  # Skip every 2 frames
frame_count = 0
mp_face_detection = mp.solutions.face_detection
mp_drawing = mp.solutions.drawing_utils
face_detection = mp_face_detection.FaceDetection(min_detection_confidence=0.5)
face_mesh = mp.solutions.face_mesh.FaceMesh(min_detection_confidence=0.5, min_tracking_confidence=0.5)
drawing_spec = mp_drawing.DrawingSpec(thickness=1, circle_radius=1)

# Emotion dictionary and video capture setup

emotions_dict = {0: "angry", 1: "disgust", 2: "fear", 3: "happy", 4: "neutral", 5: "sad", 6: "surprised"}
cap = cv2.VideoCapture(1)
if not cap.isOpened():
    raise IOError("Cannot open webcam")

try:
    silent_frames = deque(maxlen=SILENT_FRAMES_BUFFER_SIZE)   # Buffer to store silent frames
    audio_frames = []
    detected_text = ""
    emotion_label = 4
    engagement = True
    

    while True:

        # Audio processing for speech detection
        audio_frame = stream.read(CHUNK, exception_on_overflow=False)
        is_speech = vad.is_speech(audio_frame, RATE)

        if is_speech:
            if silent_frames:
                audio_frames.extend(silent_frames)
                silent_frames.clear()
            audio_frames.append(audio_frame)
        else:
            silent_frames.append(audio_frame)
            if len(audio_frames) > 0:
                detected_text = process_speech(b''.join(audio_frames))
                if detected_text != "" and detected_text:
                    print(f"Recognized text: {detected_text}")
                    prompt = generate_chatgpt_prompt(detected_text, emotion_history, engagement_history, conv_log)
                    response = ask_chatgpt(prompt)
                    print(response)
                    text_to_speech_and_play(response)
                    if len(conv_log) > 4:
                        conv_log.pop(0)
                        conv_log.pop(0)
                    conv_log.append(f"Recognized text: {detected_text}")
                    conv_log.append(f"Response text: {response}")
                audio_frames = []

        
        
        ret, frame = cap.read()            
        if not ret:
            break
            
        frame_count += 1
        
        if frame_count % frame_skip != 0:
            continue  # Skip this frame
        # Video processing for face detection
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        detection_results = face_detection.process(frame_rgb)
        mesh_results = face_mesh.process(frame_rgb)
        frame = cv2.resize(frame, (640, 480))

        # Handling face detection
        if detection_results.detections:
            for detection in detection_results.detections:
                bboxC = detection.location_data.relative_bounding_box
                ih, iw, ic = frame.shape
                x, y, w, h = int(bboxC.xmin * iw), int(bboxC.ymin * ih), int(bboxC.width * iw), int(bboxC.height * ih)
                face_roi = frame[y:y+h, x:x+w] if x >= 0 and y >= 0 else None
                if face_roi is not None and face_roi.size > 0:
                    cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)

                    # Emotion detection
                    gray_frame = cv2.cvtColor(face_roi, cv2.COLOR_BGR2GRAY)
                    resized_frame = cv2.resize(gray_frame, (48, 48))
                    normalized_frame = resized_frame.astype('float32') / 255.0
                    expanded_frame = np.expand_dims(np.expand_dims(normalized_frame, axis=0), axis=-1)
                    prediction = model.predict(expanded_frame, verbose=0)
                    emotion_label = np.argmax(prediction)

        # Handling face mesh
        if mesh_results.multi_face_landmarks:
            for face_landmarks in mesh_results.multi_face_landmarks:
                # mp_drawing.draw_landmarks(
                #     image=frame,
                #     landmark_list=face_landmarks,
                #     connections=mp.solutions.face_mesh.FACEMESH_CONTOURS,
                #     landmark_drawing_spec=drawing_spec,
                #     connection_drawing_spec=drawing_spec)
                # get rotation vector
                rotation_vector, translation_vector = get_head_pose(face_landmarks.landmark, frame.shape)
                # get pitch and yaw
                pitch, yaw, _ = rotation_vector_to_euler_angles(rotation_vector)
                # check if user is facing the camera
                if is_facing_camera(pitch, yaw):
                    engagement = True
                    
                else:
                    engagement = False

        update_history(emotion_label, engagement)
        cv2.putText(frame, str(engagement), (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        cv2.putText(frame, f'Emotion: {emotions_dict[emotion_label]}', (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        cv2.imshow('Frame', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

except KeyboardInterrupt:
    print("Stream stopped")

finally:
    cap.release()
    cv2.destroyAllWindows()
    stream.stop_stream()
    stream.close()
    audio_interface.terminate()

Recognized text: the frames like really bad cuz like um
Sounds like you had a tough time with something, but I noticed you're still trying to stay positive. What happened that made the frames turn out so bad?
Recognized text: brings bad because if I do like Max frame rate or like high frame rate
Oh, I see. So adjusting the Max frame rate or high frame rate didn't really work out as you expected. What were you trying to do—playing a game or working on some sort of project?
Stream stopped
