In [None]:
! python --version

In [None]:
# %%capture
! pip install opencv-python
! pip install SpeechRecognition
! pip install --upgrade pip
! pip install instructor
! pip install pydantic
! pip show instructor
! pip install anthropic
! pip install python-dotenv
! pip install pandas
! pip install scikit-learn
! pip install boto3

In [None]:
import time
import speech_recognition as sr
import cv2
import os
import json

# Input video file path
video_path = "./keeptrack-house-video-with-audio-horizontal-720p.mov"
ffmpeg_path = "/usr/local/bin/ffmpeg"

# Output directory for saving frames
output_dir = 'frames'
os.makedirs(output_dir, exist_ok=True)

# Open the video file
cap = cv2.VideoCapture(video_path)

x_interval = 3
json_results = []

# Function to capture frames and transcriptions every x seconds
def capture_frames_and_transcriptions(video_capture, video_path, output_directory, interval=3):
    # Initialize the recognizer
    recognizer = sr.Recognizer()
    
    # Get the total number of frames in the video
    total_frames = int(video_capture.get(cv2.CAP_PROP_FRAME_COUNT))
    # Get the frame rate of the video
    fps = video_capture.get(cv2.CAP_PROP_FPS)
    # Calculate the interval in frames
    frame_interval = int(fps * interval)
    
    frame_number = 0
    while frame_number < total_frames:
        # Set the position of the video to the current frame number
        video_capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
        
        # Read the frame at the current position
        success, frame = video_capture.read()
        
        if success:
            # Save the frame as a PNG file
            output_file = os.path.join(output_directory, f'frame_at_{frame_number // fps:.2f}_seconds.png')
            cv2.imwrite(output_file, frame)
            print(f'Successfully saved frame at {frame_number // fps:.2f} seconds to {output_file}')
            
            # Extract audio segment
            audio_output_file = os.path.join(output_directory, f'audio_at_{frame_number // fps:.2f}_seconds.wav')
            os.system(f'{ffmpeg_path} -loglevel error -i {video_path} -ss {frame_number // fps} -t {interval} -q:a 0 -map a {audio_output_file}')
            
            # Recognize the speech in the audio segment
            with sr.AudioFile(audio_output_file) as source:
                audio = recognizer.record(source)
                try:
                    transcription = recognizer.recognize_google(audio)
                    print(f'Transcription at {frame_number // fps:.2f} seconds: {transcription}')
                except sr.UnknownValueError:
                    print(f'Could not understand audio at {frame_number // fps:.2f} seconds')
                except sr.RequestError as e:
                    print(f'Could not request results from Google Speech Recognition service; {e}')

            # Save the results in json_results
            json_results.append({
                'timestamp': frame_number // fps,
                'frame': output_file,
                'audio': audio_output_file,
                'transcription': transcription if 'transcription' in locals() else None
            })
        else:
            print(f'Failed to capture frame at {frame_number // fps:.2f} seconds')
            
        # Move to the next frame interval
        frame_number += frame_interval
    
        # Define the output file path
        json_output_file = os.path.join(output_dir, 'json_results.json')

        # Save the json_results to the file
        with open(json_output_file, 'w') as f:
            json.dump(json_results, f, indent=4)


start_time = time.time()
# Capture frames and transcriptions every 3 secondsimport json
capture_frames_and_transcriptions(cap, video_path, output_dir, interval=x_interval)
end_time = time.time()

# Release the video capture object
cap.release()

print(f'Finished capturing frames and transcriptions every {x_interval} seconds in {end_time - start_time:.2f} seconds.')

In [25]:
# AWS Bedrock Claude price per 1,000 tokens for input and output tokens
model_price_list = {
    "haiku": {
        "input": 0.00025,
        "output": 0.00125,
        "input_batch": 0.000125,
        "output_batch": 0.000625,
    },
    "sonnet": {
        "input": 0.003,
        "output": 0.015,
        "input_batch": 0.0015,
        "output_batch": 0.0075,
    },
    "opus": {
        "input": 0.015,
        "output": 0.075,
        "input_batch": 0.0075,
        "output_batch": 0.0375,
    },
}

def calclulate_usage_and_cost(completion, price_list, image_frame):
    """
    calclulate_usage_and_cost calculates the cost of the completion.
    Calculate cost (for Claude 3). https://aws.amazon.com/bedrock/pricing/
    """
    
    models = ["haiku", "sonnet", "opus"]

    model_name = completion.model.lower()
    model = next((m for m in models if m in model_name), None)
    
    if model in models:
        usd_cost_per_k_input_tokens = price_list[model]["input"]
        usd_cost_per_k_output_tokens = price_list[model]["output"]
    else:
        print(f"Unknown model: {completion.model}, cannot calculate cost.")
        return
    
    usage_cost = {
        "image_frame": image_frame,
        "input_tokens": completion.usage.input_tokens,
        "output_tokens": completion.usage.output_tokens,
        "model": completion.model,
        "total_cost": round((completion.usage.input_tokens * usd_cost_per_k_input_tokens + completion.usage.output_tokens * usd_cost_per_k_output_tokens) / 1000, 6)
    }
    
    cost_input_tokens = (completion.usage.input_tokens * usd_cost_per_k_input_tokens) / 1000
    cost_output_tokens = (completion.usage.output_tokens * usd_cost_per_k_output_tokens) / 1000
    print(f"Input tokens  : {completion.usage.input_tokens}, cost: USD {round(cost_input_tokens, 6)}")
    print(f"Output tokens : {completion.usage.output_tokens}, cost: USD {round(cost_output_tokens, 6)}")
    print(f"LLM Model     : {completion.model}")
    print(f"Total cost    : USD {round(cost_input_tokens + cost_output_tokens, 6)} for this call.\n")
    
    return usage_cost

# # Test cases for the calclulate_usage_and_cost function
# class Completion:
#     def __init__(self, model, input_tokens, output_tokens):
#         self.model = model
#         self.usage = self.Usage(input_tokens, output_tokens)
    
#     class Usage:
#         def __init__(self, input_tokens, output_tokens):
#             self.input_tokens = input_tokens
#             self.output_tokens = output_tokens

# # Test case 1: Model haiku
# completion_haiku = Completion(model="haiku-123", input_tokens=1000, output_tokens=2000)
# calclulate_usage_and_cost(completion_haiku, model_price_list)

# # Test case 2: Model sonnet
# completion_sonnet = Completion(model="sonnet-456", input_tokens=1000, output_tokens=2000)
# calclulate_usage_and_cost(completion_sonnet, model_price_list)

# # Test case 3: Model opus
# completion_opus = Completion(model="opus-234", input_tokens=1000, output_tokens=2000)
# calclulate_usage_and_cost(completion_opus, model_price_list)

# # Test case 4: Unknown model
# completion_unknown = Completion(model="unknown", input_tokens=1000, output_tokens=2000)
# calclulate_usage_and_cost(completion_unknown, model_price_list)

In [None]:
import base64
from IPython.display import Image, display

def mermaid_graph(graph, scale=2):
    graphbytes = graph.encode("ascii")
    base64_bytes = base64.b64encode(graphbytes)
    base64_string = base64_bytes.decode("ascii")
    # print(base64_string)
    display(
        Image(
            url=f"https://mermaid.ink/img/{base64_string}"
        )
    )

mermaid_graph("""
graph LR;
    A[Start] --> B[Target Video - 165,000 tokens]

    subgraph "Step 1: Initial Extraction"
        B --> C1[Gemini Model Initial Prompt - 10,000 tokens]
        C1 --> D1{CSV Validation}
        D1 -->|Valid| E1[Output 1]
        D1 -->|Invalid| FX1[Gemini CSV Fixer Model]
        FX1 --> D1
    end

    subgraph "Step 2: Expand Extraction"
        B & E1 --> C2[Gemini Model Secondary Prompt - 2000-5000 tokens]
        C2 --> D2{CSV Validation}
        D2 -->|Valid| E2[Output 2]
        D2 -->|Invalid| FX2[Gemini CSV Fixer Model]
        FX2 --> D2
    end

    subgraph "Step 3: Finalize Extraction"
        B & E1 & E2 --> C3[Gemini Model Final Prompt - 2000-5000 tokens]
        C3 --> D3{CSV Validation}
        D3 -->|Valid| E3[Final Output]
        D3 -->|Invalid| FX3[Gemini CSV Fixer Model]
        FX3 --> D3
    end

    E3 --> F[Final Results]

    %% Styling
    style C1 fill:#b3d9ff,stroke:#333,stroke-width:1px
    style C2 fill:#b3d9ff,stroke:#333,stroke-width:1px
    style C3 fill:#b3d9ff,stroke:#333,stroke-width:1px
    style D1 fill:#f9f,stroke:#333,stroke-width:1px
    style D2 fill:#f9f,stroke:#333,stroke-width:1px
    style D3 fill:#f9f,stroke:#333,stroke-width:1px
    style FX1 fill:#ccffcc,stroke:#333,stroke-width:1px
    style FX2 fill:#ccffcc,stroke:#333,stroke-width:1px
    style FX3 fill:#ccffcc,stroke:#333,stroke-width:1px
""")

In [16]:
import os
import anthropic
import instructor
from dotenv import load_dotenv
import pandas as pd
import shutil

load_dotenv(verbose=True, dotenv_path=".env")

client = anthropic.AnthropicBedrock(
    aws_access_key=os.getenv("AWS_ACCESS_KEY_ID"),
    aws_secret_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
    aws_session_token=os.getenv("AWS_SESSION_TOKEN"),
    aws_region=os.getenv("AWS_DEFAULT_REGION"),
)

# MODEL_ID = "anthropic.claude-3-haiku-20240307-v1:0"     # Claude 3 Haiku  
MODEL_ID = "anthropic.claude-3-sonnet-20240229-v1:0"    # Claude 3 Sonnet


# Regarding the LLM model choice, I would start with the cheapest option (Haiku). Depending on the tasks, the smallest 
# model might be sufficient. You will have to do some trial and error here. I found that the smallest model
# at times will have difficulty following certain instructions.
# For example, asking the LLM to summarise a body of text and constraining the summary say from minimum 100 and maximum 500 characters,
# the smallest model will sometimes fail to generate a summary that is within the character count constraints. In such cases,
# you might have to try a larger model.

instructor_client = instructor.from_anthropic(
    client,
    max_tokens=1000,
    model=MODEL_ID
)

In [17]:
from pydantic import BaseModel

# Define your desired output structure
class UserInfo(BaseModel):
    name: str
    age: int

user_info = instructor_client.chat.completions.create(
    response_model=UserInfo,
    messages=[{"role": "user", "content": "John Doe is 30 years old."}]
    )

print(user_info.name)
#> John Doe
print(user_info.age)
#> 30

John Doe
30


In [26]:
from typing import List
from pydantic import BaseModel, Field
from instructor.retry import InstructorRetryException
import warnings
import base64
import json

warnings.filterwarnings("ignore")

SYSTEM_PROMPT = """
You are an expert insurance agent who excels at cataloging all the items in a home based on images and voice transcriptions. The images and transcriptions
are extracted from a video every 3 seconds.  The purpose of this task is to automate the process of cataloging items for contents insurance. Do not
include parts of the home itself, only items that would be covered by insurance. It is importatnt to count the number of items accurately and mention the count,
color, size, and brand if visible, in the item description. Include everything visible, including items in the foreground and background.

For each room in the house, identify and list all the items you can find in this image.
"""
# Return the list in JSON format as:
# {
#     "catalog_items": [
#         {"item_number": 1, "item_name": "sofa", "item_description": "red three seater sofa"},
#         {"item_number": 2, "item_name": "LCD TV", "item_description": "Samsung 55 inch smart TV"},
#         {"item_number": 3, "item_name": "Dishwasher", "item_description": "Bosch 12 place setting dishwasher"}
#     ]
# }


class CatalogItem(BaseModel):
    """
    Response model for a catalog item.
    """
    item_no: int = Field(..., description="The sequential number of the catalog item.")
    item_name: str = Field(..., description="A name for this catalog item.")
    item_desc: str = Field(..., description="A short description of this catalog item.")
    room_loc: str = Field(..., description="The room location in the house of this catalog item.")

class CatalogItems(BaseModel):
    """
    Response model for a list of catalog items.
    """
    items: List[CatalogItem] = Field(..., description="A list of catalog items in a house.")

class HomeCatalogger:
    """
    Catalog all the items in a home based on the images and voice transcriptions provided.
    """

    def __init__(self, client):
        self.client = client

    def catalog(self, image_path: str) -> CatalogItems:
        try:
            with open(image_path, "rb") as image_file:
                image_data = base64.b64encode(image_file.read()).decode("utf-8")

            catalog, completion = self.client.chat.completions.create_with_completion(
                messages=[
                    {"role": "user",
                        "content": [
                            {
                                "type": "image",
                                "source": {
                                    "type": "base64",
                                    "media_type": "image/png",
                                    "data": image_data
                                }
                            },
                            {
                                "type": "text",
                                "text": f"This is the image frame: {image_path}"
                            }
                        ]
                    }
                ],
                system=SYSTEM_PROMPT,
                response_model=CatalogItems,
                temperature=0.0,
                max_retries=1,
            )

            # print(f"LLM Catalogging...")
            # print(catalog.model_dump_json(indent=2))
            # print()
            cost = calclulate_usage_and_cost(completion, model_price_list, os.path.basename(image_path))
            # print()
            return catalog, cost

        except InstructorRetryException as e:
            print(e)
            print(f"Retries attempted: {e.n_attempts}")
            print(e.last_completion)


# Initialize the HomeCatalogger with the instructor client
home_catalogger = HomeCatalogger(instructor_client)

# Load the JSON results
with open("./frames/json_results.json", "r") as file:
    json_results = json.load(file)

# Extract the first x image frames
image_frames = [item["frame"] for item in json_results[:30] ]

catalog_items = []
llm_costs = []
# Iterate through the image frames and pass each image to the catalogger
for frame in image_frames:
    print(f"Cataloging items in image: {frame}")
    catalog, cost = home_catalogger.catalog(frame)
    catalog_items.extend(catalog.items)
    llm_costs.append(cost)

# # Path to an example image
# output_dir = "frames" # re-declaring here... you can do it once at the top
# example_image_path = os.path.join(output_dir, "frame_at_63.00_seconds.png")

# # Catalog the items in the example image
# catalog_items = home_catalogger.catalog(example_image_path)

# Print the cataloged items
print(catalog_items)
print(llm_costs)

Cataloging items in image: frames/frame_at_0.00_seconds.png
Input tokens  : 1939, cost: USD 0.005817
Output tokens : 90, cost: USD 0.00135
LLM Model     : claude-3-sonnet-20240229
Total cost    : USD 0.007167 for this call.

Cataloging items in image: frames/frame_at_3.00_seconds.png
Input tokens  : 1939, cost: USD 0.005817
Output tokens : 227, cost: USD 0.003405
LLM Model     : claude-3-sonnet-20240229
Total cost    : USD 0.009222 for this call.

Cataloging items in image: frames/frame_at_6.00_seconds.png
Input tokens  : 1939, cost: USD 0.005817
Output tokens : 446, cost: USD 0.00669
LLM Model     : claude-3-sonnet-20240229
Total cost    : USD 0.012507 for this call.

Cataloging items in image: frames/frame_at_9.00_seconds.png
Input tokens  : 1939, cost: USD 0.005817
Output tokens : 505, cost: USD 0.007575
LLM Model     : claude-3-sonnet-20240229
Total cost    : USD 0.013392 for this call.

Cataloging items in image: frames/frame_at_12.00_seconds.png
Input tokens  : 1939, cost: USD 0.

In [27]:
# Fix item numbers to be sequential
for idx, item in enumerate(catalog_items, start=1):
    item.item_no = idx

for item in catalog_items:
    # print(f"Frame {item.}, Item {item.item_no}: {item.item_name} - {item.item_desc}")
    print(item)
    
total_input_tokens = sum(cost['input_tokens'] for cost in llm_costs)
total_output_tokens = sum(cost['output_tokens'] for cost in llm_costs)
total_cost_usd = sum(cost['total_cost'] for cost in llm_costs)

print()
print(f"Total input tokens: {total_input_tokens}")
print(f"Total output tokens: {total_output_tokens}")
print(f"Total cost in USD: {total_cost_usd:.6f}")

item_no=1 item_name='Sofa' item_desc='1 tan colored fabric sofa visible in the hallway' room_loc='Hallway'
item_no=2 item_name='Sofa' item_desc='A large beige sectional sofa with brown and tan throw pillows, appears to be made of fabric' room_loc='Living room'
item_no=3 item_name='Ceiling fan' item_desc='A ceiling mounted fan with multiple blades, likely metal' room_loc='Living room'
item_no=4 item_name='Light fixtures' item_desc='Multiple recessed light fixtures in the ceiling, likely LED or halogen bulbs' room_loc='Living room'
item_no=5 item_name='Sofa' item_desc='A large white fabric sofa, seating 3-4 people' room_loc='Living room'
item_no=6 item_name='Armchairs' item_desc='Two green upholstered armchairs' room_loc='Living room'
item_no=7 item_name='Coffee table' item_desc='A rectangular wooden coffee table with a glass top' room_loc='Living room'
item_no=8 item_name='TV stand' item_desc='A low wooden TV stand or media console' room_loc='Living room'
item_no=9 item_name='Artwork' i

In [28]:
import json

# Save catalog_items to catalog_items_first_pass.json
with open('catalog_items_first_pass.json', 'w') as f:
    json.dump([item.dict() for item in catalog_items], f, indent=4)

# Save llm_costs to llm_costs_first_pass.json
with open('llm_costs_first_pass.json', 'w') as f:
    json.dump(llm_costs, f, indent=4)

print('Saved catalog_items and llm_costs to JSON files.')

Saved catalog_items and llm_costs to JSON files.
