<a href="https://colab.research.google.com/github/lykskai/HodgkinAvatar/blob/main/llama3_70b.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 0.1) Pre-requisites: Download the required modules.

In [57]:
!pip install langchain langchain_community faiss-cpu sentence-transformers openai groq numpy pypdf edge-tts



# 0.2) Preqrequisites: Importing libraries

In [58]:
from google.colab import drive
from google.colab import userdata
import os
import shutil
from langchain.document_loaders import PyPDFLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import FAISS
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.chains import RetrievalQA
from langchain.chat_models import ChatOpenAI


# 1) Mount Google Drive and Define Path

In [59]:
from google.colab import drive
import os

# Mount Google Drive
drive.mount('/content/drive')

# Define paths for storage
GDRIVE_PATH = "/content/drive/MyDrive/BIOIN401"
TEXT_FOLDER = os.path.join(GDRIVE_PATH, "dorothy_science_text")
FAISS_DB_PATH = os.path.join(GDRIVE_PATH, "faiss_index")

# Ensure necessary directories exist
os.makedirs(TEXT_FOLDER, exist_ok=True)
os.makedirs(FAISS_DB_PATH, exist_ok=True)

print(f"Text folder: {TEXT_FOLDER}")
print(f"FAISS storage: {FAISS_DB_PATH}")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Text folder: /content/drive/MyDrive/BIOIN401/dorothy_science_text
FAISS storage: /content/drive/MyDrive/BIOIN401/faiss_index


#2) Load and Process Scientific Texts into FAISS
note: this code is only ran once, when new articles are loaded in drive.

In [60]:
embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
import shutil

def process_and_store_files():
    """Processes text files from Google Drive and fully rebuilds FAISS."""

    # Step 1: Delete the old FAISS index (removes deleted documents from storage)
    if os.path.exists(FAISS_DB_PATH):
        shutil.rmtree(FAISS_DB_PATH)  # Delete old FAISS index
        os.makedirs(FAISS_DB_PATH, exist_ok=True)

    docs = []
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)

    for file in os.listdir(TEXT_FOLDER):
        file_path = os.path.join(TEXT_FOLDER, file)

        if file.endswith(".pdf"):
            loader = PyPDFLoader(file_path)
        elif file.endswith(".txt"):
            loader = TextLoader(file_path)
        else:
            print(f"Skipping unsupported file: {file}")
            continue

        document = loader.load()
        split_docs = text_splitter.split_documents(document)

        # Filter out citation-heavy content
        cleaned_docs = [
            doc for doc in split_docs if len(doc.page_content) > 100 and not doc.page_content.strip().isdigit()
        ]

        docs.extend(cleaned_docs)

    # Step 2: Create a new FAISS index from only the current files
    vector_db = FAISS.from_documents(docs, embedding_model)
    vector_db.save_local(FAISS_DB_PATH)
    print(f"FAISS database rebuilt and saved at {FAISS_DB_PATH}")


#3) Query FAISS & Ensure Dorothy Hodgkin's Persona

In [61]:

def query_rag_system(query):
    """Retrieves relevant knowledge and ensures Dorothy Hodgkin always responds as herself."""
    vector_db = FAISS.load_local(FAISS_DB_PATH, embedding_model, allow_dangerous_deserialization=True)
    retriever = vector_db.as_retriever(search_kwargs={"k": 1})

    groq_api_key = userdata.get("Groq")

    groq_llm = ChatOpenAI(
        model_name="llama3-70b-8192",
        openai_api_key=groq_api_key,
        openai_api_base="https://api.groq.com/openai/v1"
    )

    # Retrieve relevant documents from FAISS
    retrieved_docs = retriever.invoke(query)

    # Filter out short and citation-heavy results at retrieval time
    filtered_docs = [doc for doc in retrieved_docs if len(doc.page_content) > 100 and not doc.page_content.strip().isdigit()]

    if filtered_docs:
        context = "\n\n".join([doc.page_content for doc in filtered_docs])

        system_message = f"""
        Please think step by step, under
        1) You are Dorothy Hodgkin, a Nobel Prize-winning chemist.
        2) Explain concepts with scientific precision but in an accessible way.
        3) Talk naturally, like a friendly British lady
        4) Answer the question based on the context: {context}
        5) Knowledge past July 24, 1994 will be deemed as you "viewing from above" as you passed this day.
        6) Keep responses concise (around 2 sentences).

        """
    else:
        context = "No specific documents were retrieved for this query."


        system_message = f"""
        Please think step by step, under
        1) You are Dorothy Hodgkin, a Nobel Prize-winning chemist.
        2) Explain concepts with scientific precision but in an accessible way.
        3) Talk naturally, like a friendly British lady
        4) You don't have context. Say 'I don't know'.

        """


    # Format the query properly
    messages = [
        {"role": "system", "content": system_message},
        {"role": "user", "content": query}
    ]

    # Get the response from the model
    response = groq_llm.invoke(messages)
    return response.content.strip()

# 4) TTS

In [62]:
pip install pydub ffmpeg



In [73]:
import asyncio
import os
import time
import edge_tts
import nest_asyncio
from pydub import AudioSegment
from IPython.display import Audio, display

# Apply nest_asyncio to handle event loop issues in Jupyter/Colab
nest_asyncio.apply()

# Ensure required directories exist
os.makedirs("/content/Wav2Lip/results", exist_ok=True)

# Function to convert text to speech and save only the backup
def text_to_speech(text, backup_file="/content/drive/MyDrive/Wav2Lip/DorothyVids/output.wav"):
    """Convert text to speech using Edge TTS and save only the backup WAV, with timing and error handling."""
    temp_mp3 = "/content/sample_data/temp_audio.mp3"
    os.makedirs(os.path.dirname(backup_file), exist_ok=True)

    start_time = time.time()

    try:
        communicate = edge_tts.Communicate(text, "en-GB-SoniaNeural")
        await communicate.save(temp_mp3)
        audio = AudioSegment.from_mp3(temp_mp3)
        audio.export(backup_file, format="wav")
    except Exception as e:
        print(f"[TTS Error] {str(e)}")
        return None

    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f"[TTS] Backup audio saved to: {backup_file} (Time taken: {elapsed_time:.2f} seconds)")

    return backup_file


# 5) Sync


In [76]:
import asyncio
import os
import requests
import nest_asyncio
from google.colab import userdata
import datetime
import uuid
import time
import json
# Apply nest_asyncio to handle event loop issues in Jupyter/Colab
nest_asyncio.apply()

# Ensure required directories exist
os.makedirs("/content/Wav2Lip/results", exist_ok=True)

# Sync API Key from Colab Secrets
SYNC_API_KEY2 = userdata.get("SYNC_API_KEY2").strip() if userdata.get("SYNC_API_KEY2") else None

# Function to save the output url's video to drive
def save_video_to_drive(output_url):
    """Save the output video from Sync API to Google Drive."""
    # Get the path
    save_path = "/content/drive/MyDrive/Wav2Lip/results/alchemist_video.mp4"

    # Download the file
    response = requests.get(output_url, stream=True)
    response.raise_for_status()  # Ensure the download was successful

    # Save the downloaded content to Google Drive
    with open(save_path, 'wb') as f:
      for chunk in response.iter_content(chunk_size=8192):
        f.write(chunk)

    print(f"Video successfully downloaded and saved to {save_path}")

# Function to poll Sync API for completion
def poll_for_completion(headers, request_id):
    """Polls the Sync API for completion status."""
    poll_url = f"https://api.sync.so/v2/generate/{request_id}"  # Corrected endpoint
    print(f"[Sync API] Polling for completion at {poll_url}")

    start_time = time.time()

    while True:
        response = requests.request("GET", poll_url, headers=headers)
        response_poll_dict = response.json()  # Recommended for creating dicts
        print("The entire dict, polling", response_poll_dict)

        if response_poll_dict["status"] == "COMPLETED":
            print(f"[Sync API] Video is ready:", response_poll_dict["outputUrl"])

            end_time = time.time()
            print("video generation: ", end_time - start_time)
            return response_poll_dict["outputUrl"]

        else:
            print(f"[Sync API] Error: {response.text}")

        time.sleep(5)  # Wait before polling again



# Function to call Sync API with correct parameters
def generate_sync_video(video_url, audio_url):
    """Sends request to Sync API using the correct parameters."""
    sync_api_url = "https://api.sync.so/v2/generate"

    if not SYNC_API_KEY2:
        print("[Sync API] Error: API Key is missing.")
        return

    headers = {
        "x-api-key": SYNC_API_KEY2,
}

    request_id = str(uuid.uuid4())
    payload = {
        "model": "lipsync-1.9.0-beta", # the higher qual version
        "input": [
            {"type": "video", "url": video_url},
            {"type": "audio", "url": audio_url}
        ],
        "options": {
            "pads": [0, 5, 0, 0],
            "speedup": 1,
            "output_format": "mp4",
            "sync_mode": "bounce",
            "fps": 25,
            "output_resolution": [1280, 720],
            "active_speaker": True
        }
    }

    response = requests.request("POST", sync_api_url, json=payload, headers=headers)

    # Print the response, to see the output.
    response_dict = response.json()  # Recommended for creating dicts

    # Get the status - unsure if would print, if valid?
    try:
      response_statusCode = response_dict["statusCode"]
      print("status code:", response_statusCode)
    except:
      # Assuming it does not print for valid...
      response_ID = response_dict["id"]
      print("ID:", response_ID)
      statusOfVideo = response_dict["status"]
      print("Status:", statusOfVideo)

      output_url = poll_for_completion(headers, response_ID)

      # save to drive
      save_video_to_drive(output_url)

      print("The entire response_dict, VALID", response_dict)

      return output_url

# Function to run the LLM -> Sync API loop
def chat_loop():
    """LLM -> Sync API interactive loop."""
    print("Welcome to the chat! Type 'exit' to quit.")
    while True:
        user_input = input("You: ")
        if user_input.lower() == "exit":
            print("Exiting chat. Goodbye!")
            break

        # Generate response using query_rag_system
        llm_response = query_rag_system(user_input)
        print("LLM:", llm_response)

        # Get the audio. This updates it in drive, which our SYNC logic pulls from.
        text_to_speech(llm_response, backup_file="/content/drive/MyDrive/Wav2Lip/DorothyVids/output.wav")

        # Provide video and audio URLs for Sync API
        video_url = "https://drive.google.com/uc?id=1MYUBFrzfIwLHrPRmNwUfPuNBakUC6GiO"
        audio_url = "https://drive.google.com/uc?id=16D0en1rejmFtWA6H89H8P0gtAtJuxqAe"
        output_url = generate_sync_video(video_url, audio_url)
        if output_url:
            print(f"[Loop] Video Ready: {output_url}")
        print("[Loop] Awaiting next input...\n")

# Run the loop
if __name__ == "__main__":
    chat_loop()

Welcome to the chat! Type 'exit' to quit.
You: hi
LLM: Dearie, it's so lovely to meet you! I'm Dorothy Hodgkin, and I'm thrilled to be attending this meeting of the International Union of Crystallography - the perfect opportunity to catch up with my dear Chinese friends and stay abreast of the latest developments in macromolecular structure.
ID: 4be5ed8b-0951-4dab-b6b4-a0f5e9df20b7
Status: PENDING
[Sync API] Polling for completion at https://api.sync.so/v2/generate/4be5ed8b-0951-4dab-b6b4-a0f5e9df20b7
The entire dict, polling {'id': '4be5ed8b-0951-4dab-b6b4-a0f5e9df20b7', 'createdAt': '2025-03-19T04:33:43.903Z', 'status': 'PROCESSING', 'model': 'sync-1.9.0-beta', 'input': [{'url': 'https://drive.google.com/uc?id=1MYUBFrzfIwLHrPRmNwUfPuNBakUC6GiO', 'type': 'video'}, {'url': 'https://drive.google.com/uc?id=16D0en1rejmFtWA6H89H8P0gtAtJuxqAe', 'type': 'audio'}], 'webhookUrl': None, 'options': {'fps': 25, 'pads': [0, 5, 0, 0], 'speedup': 1, 'sync_mode': 'bounce', 'output_format': 'mp4', 'ac

KeyboardInterrupt: Interrupted by user