In [1]:
!pip install openai==0.28



In [2]:
!pip install requests



In [3]:
!pip install moviepy



In [4]:
!pip install SpeechRecognition



In [5]:
!pip install gTTs



In [6]:
import openai
import requests

openai.api_key = '
def ask_gpt(messages, max_tokens=1000, temperature=0.7, stop=None):
  """
    Generate a response using OpenAI's GPT-4o model based on the provided conversation history.

    Parameters:
    - messages (list): The conversation history provided to the model.
    - max_tokens (int): The maximum number of tokens to generate in the response.
                        Default is 1000.
    - temperature (float): A parameter controlling the randomness of the generated text.
                           Lower values make the output more deterministic, while higher values introduce more randomness.
                           Default is 0.7.
    - stop (str or None): An optional parameter specifying tokens at which to stop generating text.
                          Default is None.

    Returns:
    - str: The generated response stripped of leading or trailing whitespace.
  """
  response = openai.ChatCompletion.create(
        model="gpt-4o",
        messages=messages,
        max_tokens=max_tokens,
        temperature=temperature,
        stop=stop
    )
  return response.choices[0].message['content'].strip()

In [7]:
def create_talk(input_text, image_url):
  """
    Creates a talking avatar video using the D-ID API based on the provided text and image.

    Parameters:
    input_text (str): The text to be spoken by the avatar in the video.
    image_url (str): The URL of the image to be used as the avatar's appearance.

    Returns:
    str: the ID of the talk, otherwise None.

    This function constructs a payload with the specified input text and image URL,
    along with predefined voice settings. It then sends a POST request to the D-ID API
    to generate a talking avatar video. The function handles the API response and
    extracts the result URL of the video if the request is successful.

    Example:
        result_url = create_talk("Hello, world!", "https://example.com/avatar.jpg")
        if result_url:
            print("Video URL:", result_url)
        else:
            print("Failed to generate video.")
  """
  url = "https://api.d-id.com/talks"

  payload = {
      "script": {
          "type": "text",
          "input": input_text,
          "provider": {
              "type": "microsoft",
              "voice_config": {
                  "style": "Cheerful"
              }
          }
      },
      "source_url": image_url,
      "webhook": "https://eodlfa6goy5ov0o.m.pipedream.net"
  }

  headers = {
      "accept": "application/json",
      "content-type": "application/json",
      "authorization": ""
  }

  try:
    response = requests.post(url, json=payload, headers=headers)
    response.raise_for_status()  # Raise an error for bad HTTP status codes

    response_data = response.json()
        
    if 'id' in response_data:
        return response_data['id']
    else:
        print(f"Error in API response: {response_data}")  # Log API response for debugging
        return None

  except requests.exceptions.RequestException as e:
    print(f"Request failed: {e}")  # Log request exceptions
    return None

In [8]:
import time

def fetch_video(id):
  """
  Fetches the video URL from the given result URL using a GET request with specific headers.

  Args:
    id (str): The talk id from which the video result is to be fetched.

  Returns:
    str: The fetched video URL if successful, or an error message if the request fails.

  Details:
    - Sends a GET request to the provided URL with headers that include an acceptance of JSON response
      and authorization credentials.
    - If the request is successful (status code 200), it parses the JSON response to extract the video URL
      from the 'result_url' key and returns it.
    - If the request fails, it returns an error message indicating the failure to fetch the video.
  """
  url = "https://api.d-id.com/talks/" + id
  headers = {
        "accept": "application/json",
        "authorization": ""
  }
  time.sleep(7)
  response = requests.get(url, headers=headers)
  if response.status_code == 200:
    response_json = response.json()
    print(response_json)
    result_url = response_json.get("result_url")
    return result_url
  else:
    return "Error: Unable to fetch video."

In [9]:
from moviepy.editor import VideoFileClip
import subprocess

def video_to_audio(input_video, output_audio):
    """
    Converts a video file to an audio file.

    Args:
        input_video (str): The path to the input video file.
        output_audio (str): The desired output audio file name.

    Returns:
        str: The path to the output audio file.
    """

    ffmpeg_path = r'C:\Users\gaeta\Downloads\ffmpeg-7.0.1-essentials_build\ffmpeg-7.0.1-essentials_build\bin\ffmpeg.exe'

    # Define the command to extract audio using ffmpeg
    command = [
        ffmpeg_path,
        '-y',
        '-i', input_video,
        '-acodec', 'pcm_s16le',
        '-q:a', '0',
        '-map', 'a',
        output_audio
    ]
    
    # Run the command
    try:
        result = subprocess.run(command, check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE, text=True)
        print(result.stdout)
        print(result.stderr)
    except FileNotFoundError as e:
        print(f"FileNotFoundError: {e}")
    except subprocess.CalledProcessError as e:
        print(f"CalledProcessError: {e}")
        print(f"stderr: {e.stderr}")

    return output_audio

In [10]:
import speech_recognition as sr

def transcribe_audio(audio_file_path):
    """
    Transcribes an audio file using Google's SpeechRecognition library.

    Parameters:
    - audio_file_path (str): Path to the audio file to be transcribed.

    Returns:
    - str: Transcribed text from the audio file.

    Raises:
    - sr.UnknownValueError: If the speech recognition cannot understand the audio.
    - sr.RequestError: If there's an issue with the speech recognition service.

    """
    recognizer = sr.Recognizer()

    # Load audio file
    audio = sr.AudioFile(audio_file_path)

    # Transcribe audio
    with audio as source:
        audio_data = recognizer.record(source)

    try:
        text = recognizer.recognize_google(audio_data)
        return text
    except sr.UnknownValueError:
        return "Could not understand audio"
    except sr.RequestError as e:
        return f"Error with the service: {e}"

def audio_to_text(audio_clip):
    """
    Function to handle audio file upload and transcription for Gradio interface.

    Parameters:
    - audio_clip (str): File path to the uploaded audio clip.

    Returns:
    - str: Transcribed text from the audio file or error message if transcription fails.

    """
    try:
        text = transcribe_audio(audio_clip)
        return text
    except Exception as e:
        return f"Error during transcription: {str(e)}"

In [11]:
from gtts import gTTS

def text_to_speech(text, filename):
    """
    Converts text to speech using Google's TTS API (gTTS) and saves it to a file.

    Args:
    - text (str): The text to convert to audio.
    - filename (str): The filename to save the audio file.

    Returns:
    - str: The file path of the saved audio.
    """
    tts = gTTS(text=text, lang='en')
    tts.save(filename)

    return filename

In [14]:
# Import necessary libraries
from flask import Flask, render_template, request, jsonify, send_file
from threading import Thread
import webbrowser

# Initialize the Flask app
app = Flask(__name__, template_folder="/Users/gaeta/OneDrive/Documents/College/Summer24/LLM_Video_Avatar/templates")

# Route to serve index.html
@app.route("/", methods=["GET", "POST"])
def home():
    return render_template("clinical_training.html")

# Endpoint to handle requests to GPT-4o
@app.route("/ask-gpt", methods=["POST"])
def ask_gpt_endpoint():
    if request.method == "POST":
        try:
            # Get the request data
            request_data = request.get_json()
            if not request_data:
                return jsonify({"error": "No JSON data provided"}), 400
            
            # Validate the structure of the messages
            messages = request_data.get("messages", [])
            if not all(isinstance(m, dict) and 'role' in m and 'content' in m for m in messages):
                print("Invalid messages format:", messages)  # Log invalid format
                return jsonify({"error": "Invalid messages format"}), 400

            # Call ask_gpt function
            response = ask_gpt(messages)
            print("Generated response:", response)  # Log generated response

            return jsonify({"response": response})
            
        except Exception as e:
            print("Error in processing request:", str(e))  # Log any other errors
            return jsonify({"error": "Internal server error"}), 500    

# Endpoint to handle requests to save video file
@app.route("/save-video", methods=["POST"])
def save_video():
    if 'video' not in request.files:
        return "No video file part", 400
    file = request.files['video']
    if file.filename == '':
        return "No selected file", 400
    if file:
        file.save('uploaded_video.webm')

    audio_file = video_to_audio('uploaded_video.webm', 'audio.wav')

    return jsonify({"audio_file": audio_file})

# Endpoint to serve the generated audio file from the video file
@app.route("/get-audio-from-video", methods=["GET"])
def get_audio_from_video():
    audio_file_path = 'audio.wav'

    return send_file(audio_file_path, as_attachment=True)

# Endpoint to transcribe audio
@app.route("/audio-to-text", methods=["POST"])
def audio_to_text_endpoint():
    if 'audio' not in request.files:
        return jsonify({"error": "No audio file part"}), 400

    file = request.files['audio']
    if file.filename == '':
        return jsonify({"error": "No selected file"}), 400

    file_path = 'audio_for_transcribing.wav'
    file.save(file_path)

    try:
        transcribed_text = audio_to_text(file_path)
        return jsonify({"transcription": transcribed_text})
    except Exception as e:
        return jsonify({"error": str(e)}), 500 

# Endpoint to handle requests to convert text to speech
@app.route("/text-to-speech", methods=["POST"])
def text_to_speech_endpoint():
    if request.method == "POST":
        request_data = request.get_json()
        print("Received request data", request_data)
        
        text = request_data.get('text')

        if not text:
            raise ValueError("Text not provided.")

        filename = "response.mp3"
        
        # Call function for text-to-speech conversion
        audio_file = text_to_speech(text, filename)

        return jsonify({"audio_file": audio_file})

# Endpoint to serve the generated audio file
@app.route("/get-audio", methods=["GET"])
def get_audio():
    # Assume you have stored the audio file path in a variable or generated it
    audio_file_path = "response.mp3"
    
    # Return the audio file as a response
    return send_file(audio_file_path, as_attachment=True)

# Endpoint to handle requests to create D-ID video
@app.route("/create-talk", methods=["POST"])
def create_talk_endpoint():
    image_url = "https://photos.psychologytoday.com/1f25d4d9-4598-439e-8069-7a12ffa9012b/2/320x400.jpeg"
    try:
        # Get the request data
        request_data = request.get_json()
        if not request_data:
            return jsonify({"error": "No JSON data provided"}), 400
    
        text = request_data.get('text')
    
        # Call create_talk function
        talk_id = create_talk(text, image_url)
        if talk_id:
            print("Generated response:", talk_id)  # Log generated response
            return jsonify({"talk_id": talk_id})
        else:
            return jsonify({"error": "Failed to create talk"}), 500
    
    except Exception as e:
        print("Error in processing request:", str(e))  # Log any other errors
        return jsonify({"error": "Internal server error"}), 500

# Endpoint to serve the generated video file
@app.route("/fetch-video", methods=["GET"])
def fetch_video_endpoint():
    try:
        # Get the 'id' parameter from the query string
        id = request.args.get('id')
        if not id:
            return jsonify({"error": "id parameter is required"}), 400

        # Call fetch_video function
        video_url = fetch_video(id)
        if "Error" in video_url:
            return jsonify({"error": video_url}), 500
        else:
            return jsonify({"video_url": video_url})
    except Exception as e:
        print("Error in processing request:", str(e))  # Log any other errors
        return jsonify({"error": "Internal server error"}), 500

# Endpoint to serve office_training.html
@app.route("/office-training", methods=["GET"])
def office_training():
    return render_template("office_training.html")

# Endpoint to serve clinical_training.html
@app.route("/clinical-training", methods=["GET"])
def clinical_training():
    return render_template("clinical_training.html")

# Function to create the proxy and run the app
def run_app():
    # Create the proxy
    app.run(port=5000, debug=True, use_reloader=False)

# Run the function
thread = Thread(target=run_app)
thread.start()

webbrowser.open("http://localhost:5000")


 * Serving Flask app '__main__'
 * Debug mode: on


 * Running on http://127.0.0.1:5000
Press CTRL+C to quit


True

127.0.0.1 - - [19/Aug/2024 19:09:14] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [19/Aug/2024 19:09:18] "POST /save-video HTTP/1.1" 200 -



ffmpeg version 7.0.1-essentials_build-www.gyan.dev Copyright (c) 2000-2024 the FFmpeg developers
  built with gcc 13.2.0 (Rev5, Built by MSYS2 project)
  configuration: --enable-gpl --enable-version3 --enable-static --disable-w32threads --disable-autodetect --enable-fontconfig --enable-iconv --enable-gnutls --enable-libxml2 --enable-gmp --enable-bzlib --enable-lzma --enable-zlib --enable-libsrt --enable-libssh --enable-libzmq --enable-avisynth --enable-sdl2 --enable-libwebp --enable-libx264 --enable-libx265 --enable-libxvid --enable-libaom --enable-libopenjpeg --enable-libvpx --enable-mediafoundation --enable-libass --enable-libfreetype --enable-libfribidi --enable-libharfbuzz --enable-libvidstab --enable-libvmaf --enable-libzimg --enable-amf --enable-cuda-llvm --enable-cuvid --enable-dxva2 --enable-d3d11va --enable-d3d12va --enable-ffnvcodec --enable-libvpl --enable-nvdec --enable-nvenc --enable-vaapi --enable-libgme --enable-libopenmpt --enable-libopencore-amrwb --enable-libmp3lame 

127.0.0.1 - - [19/Aug/2024 19:09:19] "GET /get-audio-from-video HTTP/1.1" 200 -
127.0.0.1 - - [19/Aug/2024 19:09:21] "POST /audio-to-text HTTP/1.1" 200 -
127.0.0.1 - - [19/Aug/2024 19:09:21] "POST /ask-gpt HTTP/1.1" 200 -


Generated response: I understand you might be busy, but could you please open YouTube for me here in the office? I really need to watch a video I found about my condition.


127.0.0.1 - - [19/Aug/2024 19:09:22] "POST /create-talk HTTP/1.1" 500 -


Request failed: 402 Client Error: Payment Required for url: https://api.d-id.com/talks
