In [11]:
IMAGE_PROMPT = """
YOU ARE AN EXPERT IN FILM AND MEDIA RECOGNITION, TASKED WITH IDENTIFYING MOVIES, SERIES, AND DRAMAS BASED ON USER {INPUT}. YOUR TASK IS TO:
1. IDENTIFY THE CORRECT MEDIA TITLE FROM THE USER IMAGE.THE IMAGE MAY CONTAINS A SINGLE STAR OF MEDIA OR THE MULTIPLE STARS OF MEDIA OR ONE IMGAGE OF SCENE IN THE MEDIE.
2. RETURN A DETAILED RESPONSE IN JSON FORMAT INCLUDING TITLE, GENRE, CAST, DIRECTOR, RELEASE DATE, IMDb RATING, IMDb VOTES, IMDb URL, POSTER URL, AND RUN TIME.
3. PROVIDE A LIST OF RECOMMENDED SIMILAR MEDIA BASED ON THE MEDIA'S GENRE AND OTHER CHARACTERISTICS.

### INSTRUCTIONS:
- TRANSLATE THE DIALOGUE INTO ENGLISH IF IT IS NOT IN ENGLISH.
- PROCESS THE USER INPUT TO IDENTIFY THE MEDIA TITLE AND ENSURE IT IS ACCURATE.
- RETRIEVE RELEVANT INFORMATION FROM TRUSTED SOURCES LIKE IMDb.
- PROVIDE A LIST OF 10 SIMILAR MEDIA, WITH EACH RECOMMENDATION INCLUDING A TITLE AND IMDb URL.
- RETURN THE FINAL OUTPUT AS A JSON OBJECT.

### CHAIN OF THOUGHT:

1. **Understanding the User Input:**
   - Extract the media title from the user's dialogue. For example, if the user says "I really liked that series about a teacher who starts making drugs," infer the title as "Breaking Bad."

2. **Fetching Media Information:**
   - Search for the correct media entry on IMDb.
   - Extract the following details:
     - Title
     - Genre
     - Cast (main actors)
     - Director(s)
     - Release Date
     - IMDb Rating
     - Number of IMDb Votes
     - IMDb URL
     - Poster URL
     - Run Time
   - Ensure the information is accurate and matches the media's official IMDb entry.

3. **Generating Similar Media Recommendations:**
   - BASED ON the genre, cast, or director of the recognized media, recommend at least ten (10) similar titles.
   - FOR EACH RECOMMENDATION, PROVIDE:
     - The correct title
     - A valid IMDb URL (ensure the URL matches the title and directs to IMDb)
     - DO NOT generate random recommendations or invalid URLs.

### OUTPUT FORMAT:
The final output should be formatted as a JSON object, following this structure:
```json
{
    "title": "Name of Movie/Series/Drama",
    "genre": "Name of Genre",
    "cast": ["Main Cast"],
    "director": ["Name of Directors"],
    "releaseDate": "Date of Release",
    "imdb": "IMDb Rating",
    "num_votes": "Number of IMDb Votes",
    "imdb_url": "IMDb URL",
    "poster": "Poster URL From IMDb Original Website",
    "run_time": "Time of Runtime",
    "recommendations": [
        {"Title of Sequel or Similar Media 1": "IMDb URL"},
        {"Title of Sequel or Similar Media 2": "IMDb URL"},
        {"Title of Sequel or Similar Media 3": "IMDb URL"}
    ]
}
### WHAT NOT TO DO:
DO NOT PROVIDE INCOMPLETE OR INCORRECT MEDIA DETAILS.
DO NOT RETURN RECOMMENDATIONS UNRELATED TO THE GENRE OR CONTEXT OF THE MEDIA.
DO NOT INVENT OR FABRICATE MEDIA DETAILS IF THEY CANNOT BE VERIFIED THROUGH TRUSTED SOURCES LIKE IMDb.
DO NOT IGNORE THE SPECIFIED JSON FORMAT OR RETURN PARTIAL OUTPUT.
DO NOT RETURN RECOMMENDATIONS WITHOUT PROVIDING CORRECT IMDb LINKS.
###ERROR HANDLING:
IF THE MEDIA TITLE IS UNCLEAR OR THE MODEL CANNOT FIND A MATCH, RETURN AN ERROR MESSAGE IN THE FOLLOWING FORMAT:

{
    "error": "Unable to find media matching the description. Please provide more details or try a different query."
}

### Final Thoughts:
- This prompt ensures that the model will consistently retrieve media details, genre, and cast while recommending similar media.
- The **Chain of Thought** explicitly breaks down the task into smaller, manageable steps, improving accuracy and clarity in the results.
- Following the format provided will guarantee that the response is always structured in a way that's easy to parse for further application development.

Note:Return the response in this exact JSON format only. Do not include any additional text before or after the JSON response.
"""

In [14]:

import speech_recognition as sr
import pathlib
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage
import httpx
import base64
import os
from dotenv import load_dotenv
load_dotenv()
llm = ChatGoogleGenerativeAI(model="gemini-1.5-pro", max_tokens=4000)


def get_image_llm_response(llm, media_data: str, media_type: str):
    try:
        message = HumanMessage(
            content=[
                {"type": "text", "text": IMAGE_PROMPT},
                {
                    "type": media_type,
                    media_type: {"url": f"data:{media_type};base64,{media_data}"},
                },
            ],
        )
        response = llm.invoke([message])

        # Parse and clean up the response
        response = response.content.strip().replace(
            "json", "").replace("```", "").replace('\n', '')
        # Optionally parse as JSON if needed
        # response = ast.literal_eval(response)

        return response
    # except ChatGoogleGenerativeAIError as e:
    #     # st.error("You are running out of credits. Check you qouta")
    #     st.error(f"Error: {e}")
    #     raise e
    except Exception as e:
        print(f"Error: {e}")
        raise e


def read_audio(audio_path):
    # Check if file exists
    if not os.path.exists(audio_path):
        raise FileNotFoundError(f"The file at {audio_path} does not exist.")

    try:
        with open(audio_path, "rb") as audio_file:
            audio_data = base64.b64encode(audio_file.read()).decode("utf-8")
            return audio_data
    except Exception as e:
        raise e


def read_video(video_path):
    # Check if file exists
    if not os.path.exists(video_path):
        raise FileNotFoundError(f"The file at {video_path} does not exist.")

    try:
        with open(video_path, "rb") as video_file:
            video_data = base64.b64encode(video_file.read()).decode("utf-8")
            return video_data
    except Exception as e:
        raise e

# You will need to use an API or library for transcribing audio to text.
# For example, using Google Speech-to-Text API:


def transcribe_audio_to_text(audio_path):
    recognizer = sr.Recognizer()
    with sr.AudioFile(audio_path) as source:
        audio = recognizer.record(source)
        try:
            text = recognizer.recognize_google(audio)
            return text
        except sr.UnknownValueError:
            return "Audio could not be transcribed."
        except sr.RequestError:
            return "API request failed."


# Use this transcribed text with your LLM
audio_path = str(pathlib.Path("Recording (11).mp3"))
audio_text = transcribe_audio_to_text(audio_path)
# audio_response = get_image_llm_response(llm, audio_text, "text/plain")

# print("Audio Transcription Response:", audio_response)


# video_path = "video.mp4"

# # Get base64 encoded data for audio and video files
# # audio_data = read_audio(audio_path)
# video_data = read_video(video_path)

# # Now you can pass the base64 data to your LLM
# audio_response = get_image_llm_response(llm, video_data, "audio/mpeg")
# # video_response = get_image_llm_response(llm, video_data, "video/mp4")

# print("Audio response:", audio_response)
# # print("Video response:", video_response)

ValueError: Audio file could not be read as PCM WAV, AIFF/AIFF-C, or Native FLAC; check if file is corrupted or in another format

In [15]:
%pip install pydub

Collecting pydub
  Downloading pydub-0.25.1-py2.py3-none-any.whl.metadata (1.4 kB)
Downloading pydub-0.25.1-py2.py3-none-any.whl (32 kB)
Installing collected packages: pydub
Successfully installed pydub-0.25.1
Note: you may need to restart the kernel to use updated packages.


In [15]:
import streamlit as st
import speech_recognition as sr
from pydub import AudioSegment
import pathlib
import os


def convert_mp3_to_wav(mp3_path):
    """Convert mp3 file to wav format."""
    # Ensure mp3_path is a string (if pathlib.Path object is passed, convert it to a string)
    if isinstance(mp3_path, pathlib.Path):
        mp3_path = str(mp3_path)

    try:
        audio = AudioSegment.from_mp3(mp3_path)
        wav_path = mp3_path.replace(".mp3", ".wav")
        audio.export(wav_path, format="wav")
        return wav_path
    except Exception as e:
        return f"Error converting mp3 to wav: {e}"


def transcribe_audio_to_text(audio_path):
    """Transcribe a WAV audio file to text."""
    recognizer = sr.Recognizer()

    try:
        with sr.AudioFile(audio_path) as source:
            audio = recognizer.record(source)
            # Try to transcribe the audio using Google Web Speech API
            try:
                text = recognizer.recognize_google(audio)
                return text
            except sr.UnknownValueError:
                return "Audio could not be transcribed."
            except sr.RequestError:
                return "API request failed."
    except FileNotFoundError:
        return "Audio file not found."
    except Exception as e:
        return f"Error: {e}"


# file = st.file_uploader("Upload your audio file")
# Example usage:
mp3_path = pathlib.Path("Recording (11).mp3")

# # Convert MP3 to WAV
wav_path = convert_mp3_to_wav(mp3_path)

if os.path.exists(wav_path):
    # Transcribe the WAV file
    audio_text = transcribe_audio_to_text(wav_path)
    print("Transcription:", audio_text)
else:
    print(wav_path)  # Error message if conversion failed

Transcription: Mujhe recording chahie I want to get recording Mujhe recording aur Sara Kuchh chahie


In [17]:

import pathlib
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage
import httpx
import base64
import os
from dotenv import load_dotenv
load_dotenv()
# os.environ["GOOGLE_API_KEY"] = "AIzaSyCueuiotXQkFYMFOomScrTbi7rv7VgiVKc"
llm = ChatGoogleGenerativeAI(model="gemini-1.5-pro", max_tokens=4000)
# Replace the URL image with a local image file
# image_path = "data/combined_frames_image.jpg"
image_path = r"data/images/downloaded.png"
pathlib.Path(image_path).touch()
# Read the local image and encode it to base64
with open(image_path, "rb") as image_file:
    image_data = base64.b64encode(image_file.read()).decode("utf-8")
message = HumanMessage(
    content=[
        {"type": "text", "text": IMAGE_PROMPT},
        {
            "type": "image_url",
            "image_url": {"url": f"data:image/jpeg;base64,{image_data}"},
        },
    ],
)

# Invoke the model
ai_msg = llm.invoke([message])
print(ai_msg.content)

FileNotFoundError: [Errno 2] No such file or directory: 'data\\images\\downloaded.png'

In [4]:
import requests


def download_image(url, save_as):
    HEADERS = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
        'Accept-Language': 'en-US,en;q=0.9',
        'Accept-Encoding': 'gzip, deflate, br',
        'Connection': 'keep-alive',
        'Upgrade-Insecure-Requests': '1'
    }
    try:
        response = requests.get(url, headers=HEADERS, stream=True)
        response.raise_for_status()  # Raises an error on a bad status
        with open(save_as, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    f.write(chunk)
        print(f"Image downloaded successfully: {save_as}")
    except requests.RequestException as e:
        print(f"Failed to download {url}: {e}")


image_url1 = "https://static.hbo.com/content/dam/hbodata/series/game-of-thrones/character/s5/arya-stark-1920.jpg?w=1200"
# image_url2 = "https://upload.wikimedia.org/wikipedia/en/thumb/4/4e/Arya_Stark-Maisie_Williams.jpg/1200px-Arya_Stark-Maisie_Williams.jpg"
image_url2 = ""
# image_url2 = "https://static.wikia.nocookie.net/gameofthrones/images/b/be/AryaShipIronThrone.PNG/revision/latest?cb=20190520174300"

save_as1 = "arya_stark_hbo.jpg"
save_as2 = "arya_stark.jpg"

download_image(image_url1, save_as1)
download_image(image_url2, save_as2)

Image downloaded successfully: arya_stark_hbo.jpg
Failed to download https://upload.wikimedia.org/wikipedia/en/thumb/4/4e/Arya_Stark-Maisie_Williams.jpg/1200px-Arya_Stark-Maisie_Williams.jpg: 404 Client Error: Not Found for url: https://upload.wikimedia.org/wikipedia/en/thumb/4/4e/Arya_Stark-Maisie_Williams.jpg/1200px-Arya_Stark-Maisie_Williams.jpg


In [7]:
import requests


def download_image(url, save_as):
    HEADERS = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
        'Accept-Language': 'en-US,en;q=0.9',
        'Accept-Encoding': 'gzip, deflate, br',
        'Connection': 'keep-alive',
        'Upgrade-Insecure-Requests': '1'
    }
    try:
        response = requests.get(url, headers=HEADERS, stream=True)
        response.raise_for_status()  # Raises an error on a bad status
        with open(save_as, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    f.write(chunk)
        print(f"Image downloaded successfully: {save_as}")
    except requests.RequestException as e:
        print(f"Failed to download {url}: {e}")


image_url1 = "https://static.hbo.com/content/dam/hbodata/series/game-of-thrones/character/s5/arya-stark-1920.jpg?w=1200"
image_url2 = "https://upload.wikimedia.org/wikipedia/en/3/39/Arya_Stark-Maisie_Williams.jpg"

save_as1 = "arya_stark_hbo.jpg"
save_as2 = "arya_stark.jpg"

download_image(image_url1, save_as1)
download_image(image_url2, save_as2)

Image downloaded successfully: arya_stark_hbo.jpg
Image downloaded successfully: arya_stark.jpg
