In [8]:
# Twelvelabs

In [43]:
from grp import struct_group
from typing import List
from dotenv import dotenv_values
from twelvelabs import TwelveLabs
from twelvelabs.types import VideoSegment
from twelvelabs.embed import TasksStatusResponse
from pinecone import Pinecone, ServerlessSpec

config = dotenv_values(".env")

# 1. Initialize the client
client = TwelveLabs(api_key=config["TWELVELABS_API_KEY"])

# 2. Upload a video
with open("pushup.MOV", "rb") as video_file:
    task = client.embed.tasks.create(
        model_name="marengo3.0",
        video_file=video_file, # Or use video_file to upload a file from the local file system
        # video_clip_length=5,
        # video_start_offset_sec=30,
        # video_end_offset_sec=60,
        # video_embedding_scope=["clip", "video"]
    )
print(f"Created video embedding task: id={task.id}")

# 3. Monitor the status
def on_task_update(task: TasksStatusResponse):
    print(f"  Status={task.status}")

status = client.embed.tasks.wait_for_done(sleep_interval=5, task_id=task.id, callback=on_task_update)
print(f"Embedding done: {status.status}")

# 4. Retrieve the embeddings
task = client.embed.tasks.retrieve(
    task_id=task.id,
    embedding_option=["visual", "audio", "transcription"]
)

# 5. Process the results
def print_segments(segments: List[VideoSegment], max_elements: int = 5):
    for segment in segments:
        print(f"  embedding_scope={segment.embedding_scope} embedding_option={segment.embedding_option} start_offset_sec={segment.start_offset_sec} end_offset_sec={segment.end_offset_sec}")
        first_few = segment.float_[:max_elements]
        print(
            f"  embeddings: [{', '.join(str(x) for x in first_few)}...] (total: {len(segment.float_)} values)"
        )


if task.video_embedding is not None and task.video_embedding.segments is not None:
    print_segments(task.video_embedding.segments)

    segments = task.video_embedding.segments
    metadata = task.video_embedding.metadata



Created video embedding task: id=69221298269c6f62024a0575
  Status=processing
  Status=ready
Embedding done: ready
  embedding_scope=clip embedding_option=audio start_offset_sec=0.0 end_offset_sec=6.0
  embeddings: [0.067871094, -0.032958984, -0.11230469, 0.040039062, 0.02722168...] (total: 512 values)
  embedding_scope=clip embedding_option=audio start_offset_sec=6.0 end_offset_sec=12.0
  embeddings: [0.040039062, -0.076171875, -0.046142578, 0.018676758, 0.053466797...] (total: 512 values)
  embedding_scope=clip embedding_option=audio start_offset_sec=12.0 end_offset_sec=19.5
  embeddings: [0.06591797, -0.048583984, -0.083984375, 0.045898438, 0.037597656...] (total: 512 values)
  embedding_scope=clip embedding_option=visual start_offset_sec=0.0 end_offset_sec=6.0
  embeddings: [0.026489258, 0.026855469, -0.0138549805, 0.078125, -0.0022735596...] (total: 512 values)
  embedding_scope=clip embedding_option=visual start_offset_sec=6.0 end_offset_sec=12.0
  embeddings: [0.0064086914, 0.05

In [46]:
segments[0]

VideoSegment(float_=[0.067871094, -0.032958984, -0.11230469, 0.040039062, 0.02722168, 0.10449219, -0.01171875, 0.03564453, 0.07421875, 0.02331543, 0.064941406, 0.030395508, -0.07861328, 0.07470703, -0.02331543, 0.022827148, -0.037597656, 0.03100586, 0.044189453, 0.034179688, 0.020141602, 0.05053711, 0.016845703, -0.019897461, 0.020141602, 0.04345703, 0.03564453, 0.06542969, 0.008178711, 0.007293701, -0.059570312, -0.100097656, 0.103515625, -0.01928711, 0.010864258, 0.025512695, -0.033691406, 0.024536133, 0.010192871, -0.0005226135, 0.07861328, 0.024414062, 0.00982666, 0.08154297, 0.0008239746, 0.049804688, 0.00982666, -0.03100586, 0.013000488, 0.038330078, -0.08935547, -0.01940918, 0.02709961, -0.033203125, 0.03515625, 0.07128906, 0.043701172, 0.03125, 5.2452087e-06, 0.016723633, -0.011962891, -0.033935547, -0.028808594, 0.044921875, -0.04321289, 0.0030517578, 0.072265625, 0.026489258, 0.0008583069, 0.014587402, -0.02709961, 0.092285156, -0.024291992, 0.10205078, -0.011108398, -0.05004

In [2]:
# Load to Pinecone

In [53]:
pc = Pinecone(api_key=config["PINECONE_API_KEY"])

index = pc.Index(host="https://gymogul-videos-kuv1rfi.svc.aped-4627-b74a.pinecone.io")

In [14]:
# Create vector metadata list

video_id = "pushup"

vectors = []

for idx, seg in enumerate(segments):
    vectors.append({
        "id": f"{video_id}-{idx}",
        "values": seg.float_,
        "metadata": {
            "video_id": video_id,
            "start": seg.start_offset_sec,
            "end": seg.end_offset_sec
        }
    })


In [16]:
# Upsert to Pinecone

index.upsert(vectors)

{'upserted_count': 8}

In [69]:
from typing import List

from twelvelabs import TwelveLabs
from twelvelabs.types import BaseSegment

# 2. Create text embeddings
res = client.embed.create(
    model_name="marengo3.0",
    text="What is the video about? What does the man say?",
)




In [70]:
question_vectors = res.text_embedding.segments[0].float_

In [71]:
index.query(
    namespace="__default__",
    vector=question_vectors,
    top_k=3,
    include_metadata=True,
    include_values=False
)

{'matches': [{'id': 'pushup:7',
              'metadata': {'end': 19.5,
                           'start': 12.0,
                           'transcript': 'The man holds the plank position for '
                                         'several seconds before pushing off '
                                         'the ground with his hands. He '
                                         'returns to a standing position, '
                                         'completing the exercise.',
                           'video_id': 'pushup'},
              'score': 0.483910918,
              'values': []},
             {'id': 'pushup:6',
              'metadata': {'end': 6.0,
                           'start': 0.0,
                           'transcript': 'A man in black shorts and a black '
                                         't-shirt stands with his feet '
                                         'together, then bends down and places '
                                         'his ha

In [41]:
# To delete Pinecone vectors

# for count in range(0,8):
#     print(count)
#     index.delete(ids=f"pushup-{count}", namespace="__default__")

0
1
2
3
4
5
6
7


In [42]:
# Create a Pegasus enabled index

from twelvelabs import TwelveLabs
from twelvelabs.indexes import IndexesCreateRequestModelsItem
from twelvelabs.tasks import TasksRetrieveResponse

# Initialize the client
client = TwelveLabs(api_key=config["TWELVELABS_API_KEY"])

index = client.indexes.create(
    index_name="pegasus-index",
    models=[
        IndexesCreateRequestModelsItem(
            model_name="pegasus1.2", model_options=["visual", "audio"]
        )
    ]
)
print(f"Created index: id={index.id}")

# task = client.tasks.create(
#     index_id=index.id,
#     video_url="<YOUR_VIDEO_URL>" # Or use video_file to upload a file from the local file system
#     )
# print(f"Created task: id={task.id}")
#
# def on_task_update(task: TasksRetrieveResponse):
#     print(f"  Status={task.status}")
#
# task = client.tasks.wait_for_done(task_id=task.id, callback=on_task_update)
# if task.status != "ready":
#     raise RuntimeError(f"Indexing failed with status {task.status}")
# print(
#     f"Upload complete. The unique identifier of your video is {task.video_id}.")
#
# gist = client.gist(video_id=task.video_id,types=["title", "topic", "hashtag"])
# print(f"Title={gist.title}\nTopics={gist.topics}\nHashtags={gist.hashtags}")


Created index: id=69220af621611291fd429500


In [61]:
import json

def get_pegasus_transcript(client: TwelveLabs, pegasus_index_id: str, video_path: str):

    # 2. Upload a video
    with open(video_path, "rb") as video_file:
        # 1) Upload to Pegasus-enabled index
        task = client.tasks.create(
            index_id=pegasus_index_id,
            video_file=video_file
        )

    task = client.tasks.wait_for_done(task_id=task.id)

    if task.status != "ready":
        raise RuntimeError(f"Pegasus indexing failed or not ready. Status={task.status}")

    video_id = task.video_id  # this is what analyze() uses

    # 2) Ask Pegasus for a rich, time-stamped transcript in JSON
    # Weâ€™ll use open-ended analyze() with a JSON schema so it returns structured text.
    schema = {
        "type": "object",
        "properties": {
            "segments": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "start_sec": {"type": "number"},
                        "end_sec": {"type": "number"},
                        "text": {"type": "string"},
                    },
                    "required": ["start_sec", "end_sec", "text"],
                },
            }
        },
        "required": ["segments"],
    }

    prompt = (
        "Create a detailed transcript of this video. "
        "Split the transcript into short consecutive segments. "
        "For each segment, return JSON with: start_sec, end_sec, text. "
        "Use the original language, keep sentences complete, and ensure timestamps "
        "are monotonically increasing and aligned with the video timeline."
    )

    res = client.analyze(
        video_id=video_id,
        prompt=prompt,
        temperature=0.1,
        response_format={"type": "json_schema", "json_schema": schema},
        max_tokens=4000,
    )
    # res.data is a JSON string per docs :contentReference[oaicite:4]{index=4}
    parsed = json.loads(res.data)
    return parsed["segments"]  # list of {start_sec, end_sec, text}

In [50]:
def transcript_for_segment(transcript_segments, start, end):
    """Concatenate transcript text that overlaps [start, end]."""
    chunks = []
    for t in transcript_segments:
        if t["end_sec"] <= start:
            continue
        if t["start_sec"] >= end:
            break
        chunks.append(t["text"])
    return " ".join(chunks).strip()

In [64]:
from pinecone import Pinecone

def ingest_video_to_pinecone(
    client: TwelveLabs,
    pegasus_index_id: str,
    video_path: str,
    video_id: str,   # your own ID (filename, UUID, etc.)
):

    # 2) Pegasus transcript (time-coded)
    transcript_segments = get_pegasus_transcript(client, pegasus_index_id, video_path)

    # 3) Build vectors with per-segment transcript
    vectors = []
    for idx, seg in enumerate(segments):
        start = seg.start_offset_sec
        end = seg.end_offset_sec
        snippet = transcript_for_segment(transcript_segments, start, end)

        vectors.append({
            "id": f"{video_id}:{idx}",
            "values": seg.float_,
            "metadata": {
                "video_id": video_id,
                "start": start,
                "end": end,
                "transcript": snippet,
            },
        })

    # 4) Upsert to Pinecone
    index.upsert(vectors=vectors)
    return len(vectors)

In [65]:
ingest_video_to_pinecone(client=client, pegasus_index_id="69220af621611291fd429500", video_path="pushup.MOV", video_id=video_id)

8