## Imports

In [1]:
import os
import json
from pathlib import Path
import yaml
import json
import os
import time
from google import genai
from google.genai import types
from dotenv import load_dotenv
from pathlib import Path
from tqdm import tqdm
import random
from helix.client import Query, Client
from helix.instance import Instance
from helix.types import Payload
from collections import defaultdict

load_dotenv()

True

## Create docs folder

In [2]:
DOCS_DIR = Path("../docs")
OUTPUT_DIR = Path("./processed_docs")
OUTPUT_DIR.mkdir(exist_ok=True)

In [3]:
with open("../mkdocs.yml", "r") as f:
    mkdocs_config = yaml.safe_load(f)

mkdocs_config

{'INHERIT': './docs/_global/mkdocs.yml',
 'site_name': 'Premiere Pro Scripting Guide',
 'site_url': 'https://ppro-scripting.docsforadobe.dev/',
 'repo_url': 'https://github.com/docsforadobe/premiere-scripting-guide/',
 'repo_name': 'premiere-scripting-guide',
 'nav': [{'Home': 'index.md'},
  {'Introduction': [{'Overview': 'introduction/extendscript-overview.md'},
    {'Changelog': 'introduction/changelog.md'},
    {'How to Execute ExtendScript in Premiere Pro': 'introduction/how-to-execute-scripts.md'}]},
  {'Application': [{'Application object': 'application/application.md'}]},
  {'General': [{'Anywhere object': 'general/anywhere.md'},
    {'Encoder object': 'general/encoder.md'},
    {'Marker object': 'general/marker.md'},
    {'Metadata object': 'general/metadata.md'},
    {'Production object': 'general/production.md'},
    {'Project object': 'general/project.md'},
    {'ProjectManager object': 'general/projectmanager.md'},
    {'Properties object': 'general/properties.md'},
    {'S

In [8]:
nav = mkdocs_config.get("nav", {})
nav = nav[2:]
nav

[{'Application': [{'Application object': 'application/application.md'}]},
 {'General': [{'Anywhere object': 'general/anywhere.md'},
   {'Encoder object': 'general/encoder.md'},
   {'Marker object': 'general/marker.md'},
   {'Metadata object': 'general/metadata.md'},
   {'Production object': 'general/production.md'},
   {'Project object': 'general/project.md'},
   {'ProjectManager object': 'general/projectmanager.md'},
   {'Properties object': 'general/properties.md'},
   {'SourceMonitor object': 'general/sourcemonitor.md'}]},
 {'Item': [{'ProjectItem object': 'item/projectitem.md'},
   {'TrackItem object': 'item/trackitem.md'}]},
 {'Sequence': [{'Component object': 'sequence/component.md'},
   {'ComponentParam object': 'sequence/componentparam.md'},
   {'Sequence object': 'sequence/sequence.md'},
   {'Track object': 'sequence/track.md'}]},
 {'Other': [{'AudioChannelMapping object': 'other/audiochannelmapping.md'},
   {'Time object': 'other/time.md'}]},
 {'Collection': [{'Collection obj

## Chunk MD files

In [26]:
import re

# chunk by separators
def chunk_by_separators(content):

    sections = re.split(r'\n---\n', content)
    
    chunks = []
    for i, section in enumerate(sections):
        section = section.strip()
        if not section:
            continue
        
        if i > 0:
            section = "---\n\n" + section
            
        chunks.append(section)
    
    return chunks


# determine which chunking method to use
def smart_api_chunking(content, file_path):
    
    separator_count = len(re.findall(r'\n---\n', content))
    
    if separator_count >= 3:
        return chunk_by_separators(content)
    else:
        #fall back to chonkie if separator-based chunking fails
        from chonkie import SentenceChunker
        chunker = SentenceChunker(chunk_size=1024, chunk_overlap=0)
        return [chunk.text for chunk in chunker.chunk(content)]

In [None]:
def extract_embeddable_content_and_code(chunk_text, chunk_id=0, section=""):
    code_blocks = []
    
    # extract code blocks
    code_fence_pattern = r'```[\s\S]*?```'
    code_fences = re.findall(code_fence_pattern, chunk_text)
    code_blocks.extend(code_fences)
    
    # remove code blocks from chunk_text
    embeddable_text = re.sub(code_fence_pattern, '', chunk_text)
    
    # remove paragraph markers
    embeddable_text = re.sub(r'¶', '', embeddable_text) 
    embeddable_text = re.sub(r'\n\s*\n', '\n\n', embeddable_text) 
    embeddable_text = embeddable_text.strip()
    
    # extract signatures
    signature_pattern = r'`[^`\n]+\([^)]*\)`'
    signatures = re.findall(signature_pattern, embeddable_text)
    
    title = extract_better_title(chunk_text, chunk_id, section)
    
    # extract section context (like "Methods", "Attributes", etc.)
    section_context = extract_section_context(chunk_text)
    
    # extract metadata
    metadata = {
        'title': title,
        'signatures': signatures,
        'has_code_examples': len(code_fences) > 0,
        'has_inline_code': len(re.findall(r'`[^`\n]+`', embeddable_text)) > 0,
        'section_context': section_context
    }
    
    return embeddable_text, code_blocks, metadata

def extract_better_title(chunk_text, chunk_id, section):
    if chunk_id == 0:
        main_title_match = re.search(r'^#\s+(.+)', chunk_text, re.MULTILINE)
        if main_title_match:
            return main_title_match.group(1).strip()
    
    # find all headers in order of specificity (deepest first)
    headers = []
    
    # look for ### headers
    h3_matches = re.findall(r'^###\s+(.+)', chunk_text, re.MULTILINE)
    for match in h3_matches:
        if match.strip().lower() not in ['description', 'parameters', 'returns', 'type']:
            headers.append(match.strip())
    
    h2_matches = re.findall(r'^##\s+(.+)', chunk_text, re.MULTILINE)
    for match in h2_matches:
        if match.strip().lower() not in ['description', 'parameters', 'returns', 'type', 'attributes', 'methods']:
            headers.append(match.strip())
    
    h1_matches = re.findall(r'^#\s+(.+)', chunk_text, re.MULTILINE)
    for match in h1_matches:
        headers.append(match.strip())
    
    # return the most specific relevant header
    if headers:
        return headers[0]
    
    signature_match = re.search(r'`([^`\n]+\([^)]*\))`', chunk_text)
    if signature_match:
        return signature_match.group(1)
    
    # final fallback
    return section if section else "Unknown"

def extract_section_context(chunk_text):
    
    # look for ## level headers that indicate sections
    section_match = re.search(r'^##\s+(Attributes|Methods|Properties|Events|Constants)', chunk_text, re.MULTILINE)
    if section_match:
        return section_match.group(1)
    
    return None

In [None]:
def extract_md_files(nav_item, parent_section=""):
    md_files = []
    
    if isinstance(nav_item, dict):
        for section, content in nav_item.items():
            if isinstance(content, str) and content.endswith(".md"):
                md_files.append({
                    "path": content,
                    "section": section
                })
            elif isinstance(content, list):
                for item in content:
                    if isinstance(item, dict):
                        for subsection, filepath in item.items():
                            if isinstance(filepath, str) and filepath.endswith(".md"):
                                md_files.append({
                                    "path": filepath,
                                    "section": subsection
                                })
    elif isinstance(nav_item, list):
        for item in nav_item:
            section_files = extract_md_files(item, parent_section)
            md_files.extend(section_files)
    elif isinstance(nav_item, str) and nav_item.endswith(".md"):
        md_files.append({
            "path": nav_item,
            "section": parent_section
        })
    
    return md_files

# extract all markdown files from the nav structure
all_md_files = []
for item in nav:
    all_md_files.extend(extract_md_files(item))

# print the extracted files to verify
print("Extracted markdown files:")
for md_file in all_md_files:
    print(f"  {md_file['section']}: {md_file['path']}")

Extracted markdown files:
  Application object: application/application.md
  Anywhere object: general/anywhere.md
  Encoder object: general/encoder.md
  Marker object: general/marker.md
  Metadata object: general/metadata.md
  Production object: general/production.md
  Project object: general/project.md
  ProjectManager object: general/projectmanager.md
  Properties object: general/properties.md
  SourceMonitor object: general/sourcemonitor.md
  ProjectItem object: item/projectitem.md
  TrackItem object: item/trackitem.md
  Component object: sequence/component.md
  ComponentParam object: sequence/componentparam.md
  Sequence object: sequence/sequence.md
  Track object: sequence/track.md
  AudioChannelMapping object: other/audiochannelmapping.md
  Time object: other/time.md
  Collection object: collection/collection.md
  ComponentCollection object: collection/componentcollection.md
  MarkerCollection object: collection/markercollection.md
  ProjectCollection object: collection/projectco

In [None]:
processed_docs = []

# process each markdown file
for md_file in all_md_files:
    file_path = DOCS_DIR / md_file["path"]
    section = md_file["section"]
    
    # skip if file doesn't exist
    if not file_path.exists():
        print(f"Warning: File {file_path} not found, skipping.")
        continue
    
    # skip index.md file
    if file_path.name == "index.md":
        print(f"Skipping index.md file: {file_path}")
        continue
    
    with open(file_path, "r", encoding="utf-8") as f:
        content = f.read()
    chunks = smart_api_chunking(content, file_path)
    
    # Process each chunk
    for i, chunk_text in enumerate(chunks):
        embeddable_text, code_blocks, metadata = extract_embeddable_content_and_code(
            chunk_text, chunk_id=i, section=section
        )

        final_title = metadata['title']
        if metadata['section_context'] and metadata['title'] != section:
            final_title = f"{metadata['title']} ({metadata['section_context']})"
        
        processed_docs.append({
            "title": final_title,
            "content": embeddable_text,  
            "full_content": chunk_text,  
            "code_blocks": code_blocks,  
            "signatures": metadata['signatures'],
            "has_code_examples": metadata['has_code_examples'],
            "path": str(file_path),
            "section": section,
            "category": file_path.parent.name,
            "section_context": metadata['section_context'],
            "chunk_id": i,
            "total_chunks": len(chunks)
        })

print(f"\nTotal processed documents: {len(processed_docs)}")

with open(OUTPUT_DIR / "processed_docs.json", "w", encoding="utf-8") as f:
    json.dump(processed_docs, f, indent=2, ensure_ascii=False)


Total processed documents: 348


In [35]:
processed_docs

[{'title': 'Application object',
  'content': '# Application object\n\n`app`\n\n#### Description\n\nProvides access to objects and application settings within Premiere Pro.\n\nThe single global object is always available by its name, `app`.',
  'full_content': '# Application object\n\n`app`\n\n#### Description\n\nProvides access to objects and application settings within Premiere Pro.\n\nThe single global object is always available by its name, `app`.',
  'code_blocks': [],
  'signatures': [],
  'has_code_examples': False,
  'path': '../docs/application/application.md',
  'section': 'Application object',
  'category': 'application',
  'section_context': None,
  'chunk_id': 0,
  'total_chunks': 39},
 {'title': 'app.anywhere (Attributes)',
  'content': '---\n\n## Attributes\n\n### app.anywhere\n\n`app.anywhere`\n\n#### Description\n\nAn [Anywhere object](../general/anywhere.md), providing access to available Anywhere servers. Only available when running in Anywhere configuration (discont

## Generate Gemini Embeddings For Chunks

In [None]:
INPUT_FILE = OUTPUT_DIR / "processed_docs.json"
OUTPUT_FILE = OUTPUT_DIR / "embedded_docs.json"
CHECKPOINT_FILE = OUTPUT_DIR / "embedding_checkpoint.json"

# reload environment variables to pick up any changes to .env
load_dotenv(override=True)

api_key = os.environ.get("GOOGLE_API_KEY")
if not api_key:
    raise ValueError("GOOGLE_API_KEY environment variable not set")

# Recreate client with the updated API key
client = genai.Client(api_key=api_key)

In [38]:
with open(INPUT_FILE, "r", encoding="utf-8") as f:
    processed_docs = json.load(f)

print(f"{len(processed_docs)} processed documents")

348 processed documents


In [41]:
embedded_docs = []
processed_indices = set()
if CHECKPOINT_FILE.exists():
    try:
        with open(CHECKPOINT_FILE, "r", encoding="utf-8") as f:
            embedded_docs = json.load(f)
            processed_indices = {doc.get("original_index") for doc in embedded_docs if "original_index" in doc}
    except json.JSONDecodeError:
        if CHECKPOINT_FILE.exists():
            backup_file = CHECKPOINT_FILE.with_suffix('.json.bak')
            CHECKPOINT_FILE.rename(backup_file)
            print(f"usingcheckpoint {backup_file}")

In [42]:
# adding batch size and pause because gemini rate limits
total_docs = len(processed_docs)
BATCH_SIZE = 145
BATCH_PAUSE = 60

In [43]:
for batch_start in range(0, len(processed_docs), BATCH_SIZE):
    batch_end = min(batch_start + BATCH_SIZE, len(processed_docs))
    current_batch = processed_docs[batch_start:batch_end]
    
    print(f"\nbatch {batch_start//BATCH_SIZE + 1}/{(len(processed_docs) + BATCH_SIZE - 1)//BATCH_SIZE}")
    print(f"documents {batch_start} to {batch_end-1}")
    
    #process each document in the current batch
    for idx, doc in enumerate(tqdm(current_batch, desc=f"batch {batch_start//BATCH_SIZE + 1}")):
        success = False
        max_retries = 3
        retry_count = 0
        
        #retry if rate limit is hit
        while not success and retry_count < max_retries:
            try:
                #generate the embedding
                result = client.models.embed_content(
                    model="models/text-embedding-004",
                    contents=doc["content"],
                    config=types.EmbedContentConfig(task_type="RETRIEVAL_DOCUMENT")
                )
                #extract the embedding
                embedding_list = result.embeddings[0].values
                #add the embedding to the document
                doc_copy = doc.copy()
                doc_copy["embedding"] = embedding_list
                doc_copy["original_index"] = batch_start + idx
                embedded_docs.append(doc_copy)
                success = True
                
                #save the checkpoint 
                if len(embedded_docs) % 10 == 0:
                    temp_file = CHECKPOINT_FILE.with_suffix('.tmp')
                    with open(temp_file, "w", encoding="utf-8") as f:
                        json.dump(embedded_docs, f, ensure_ascii=False)
                    temp_file.replace(CHECKPOINT_FILE)
                    
            except Exception as e:
                error_message = str(e)
                if "RESOURCE_EXHAUSTED" in error_message or "429" in error_message:
                    retry_count += 1
                    sleep_time = (2 ** retry_count) + random.uniform(0, 1)
                    print(f"rate limit hit, retrying in {sleep_time:.1f} seconds")
                    time.sleep(sleep_time)
                else:
                    print(f"error generating embedding for document {batch_start + idx}: {error_message}")
                    retry_count += 1
                    time.sleep(1)
        
        if not success:
            print(f"failed to generate embedding for document {batch_start + idx} after {max_retries} retries")
        
    
    temp_file = CHECKPOINT_FILE.with_suffix('.tmp')
    with open(temp_file, "w", encoding="utf-8") as f:
        json.dump(embedded_docs, f, ensure_ascii=False)
    temp_file.replace(CHECKPOINT_FILE)
    
    if batch_end < len(processed_docs):
        print(f"\nbatch complete, waiting {BATCH_PAUSE} seconds before starting next batch...")
        for remaining in range(BATCH_PAUSE, 0, -1):
            print(f"Next batch starting in {remaining} seconds...", end="\r")
            time.sleep(1)
        print("\nstarting next batch...")


temp_file = OUTPUT_FILE.with_suffix('.tmp')
with open(temp_file, "w", encoding="utf-8") as f:
    json.dump(embedded_docs, f, ensure_ascii=False)
temp_file.replace(OUTPUT_FILE)

print(f"\generated embeddings for {len(embedded_docs)}/{total_docs} documents")
print(f"saved to: {OUTPUT_FILE}")


  print(f"\generated embeddings for {len(embedded_docs)}/{total_docs} documents")



batch 1/3
documents 0 to 144


batch 1: 100%|██████████| 145/145 [00:30<00:00,  4.73it/s]



batch complete, waiting 60 seconds before starting next batch...
Next batch starting in 1 seconds....
starting next batch...

batch 2/3
documents 145 to 289


batch 2:  14%|█▍        | 20/145 [00:03<00:24,  5.10it/s]

rate limit hit, retrying in 2.0 seconds


batch 2:  56%|█████▌    | 81/145 [00:17<00:11,  5.76it/s]

rate limit hit, retrying in 2.8 seconds


batch 2: 100%|██████████| 145/145 [00:32<00:00,  4.41it/s]



batch complete, waiting 60 seconds before starting next batch...
Next batch starting in 1 seconds....
starting next batch...

batch 3/3
documents 290 to 347


batch 3: 100%|██████████| 58/58 [00:11<00:00,  4.99it/s]


\generated embeddings for 348/348 documents
saved to: processed_docs/embedded_docs.json


## Setup HelixDB

In [None]:
db = Client(local=True, port=6969)

[32m[HELIX][0m Helix instance found at 'http://0.0.0.0:6970'


## Python SDK

In [45]:
class load_docs_rag(Query):
    def __init__(self, chapters):
        super().__init__()
        self.chapters = chapters
    
    def query(self):
        return [{"chapters": self.chapters}]
    
    def response(self, response):
        return response
    
class get_chapter_content(Query):
    def __init__(self, chapter_id):
        super().__init__()
        self.chapter_id = chapter_id
    
    def query(self):
        return [{"chapter_id": self.chapter_id}]
    
    def response(self, response):
        return response

class search_docs_rag(Query):
    def __init__(self, query_vector, k=5):
        super().__init__()
        self.query_vector = query_vector
        self.k = k
    
    def query(self):
        return [{"query": self.query_vector, "k": self.k}]
    
    def response(self, response):
        return response

In [None]:
def organize_all_data_for_helix(docs):
    chapters_data = defaultdict(lambda: defaultdict(list))
    
    # organize the data by category and title
    for doc in docs:
        category = doc['category']
        title = doc['title']
        
        # add the data to the chapters_data dictionary
        chapters_data[category][title].append({
            'chunk': doc['content'],
            'vector': doc['embedding']
        })
    
    helix_chapters = []

    # get the category names
    category_names = list(chapters_data.keys())
    
    # create the subchapters
    for chapter_idx, (category, files) in enumerate(chapters_data.items()):
        subchapters = []
        # create the subchapters
        for title, chunks in files.items():
            # create the subchapters
            subchapters.append({
                'title': title,
                'content': f"Premiere Pro {category} documentation for {title}",
                'chunks': chunks
            })
        
        # add the subchapters to the helix chapters
        helix_chapters.append({
            'id': chapter_idx,
            'subchapters': subchapters
        })
    
    return helix_chapters, category_names

In [47]:
with open('processed_docs/embedded_docs.json', 'r') as f:
    embedded_docs = json.load(f)

all_helix_data, category_list = organize_all_data_for_helix(embedded_docs)

total_chunks = 0
for i, chapter in enumerate(all_helix_data):
    category_name = category_list[i]
    chapter_chunks = sum(len(sub['chunks']) for sub in chapter['subchapters'])
    total_chunks += chapter_chunks
    print(f"  Chapter {chapter['id']} ({category_name}): {len(chapter['subchapters'])} files, {chapter_chunks} chunks")

print(f"\n{len(all_helix_data)} chapters, {total_chunks} chunks across all categories")

try:
    load_all_query = load_docs_rag(all_helix_data)
    result = db.query(load_all_query)
    print(f"✅ HelixDB full load result: {result}")
    
except Exception as e:
    print(f"❌ Error loading all data: {e}")
    import traceback
    traceback.print_exc()

  Chapter 0 (application): 39 files, 39 chunks
  Chapter 1 (general): 117 files, 121 chunks
  Chapter 2 (item): 78 files, 78 chunks
  Chapter 3 (sequence): 86 files, 86 chunks
  Chapter 4 (other): 9 files, 9 chunks
  Chapter 5 (collection): 15 files, 15 chunks

6 chapters, 348 chunks across all categories


[32m[HELIX][0m Querying 'http://0.0.0.0:6970/load_docs_rag': 100%|██████████| 1/1 [00:00<00:00,  1.15it/s]

✅ HelixDB full load result: [{'Success': 'Success'}]





In [48]:
#search the adobe after effects docs
def search_ae_docs(user_question, top_k=5):
    # Reload environment variables and recreate client to ensure fresh API key
    load_dotenv(override=True)
    api_key = os.environ.get("GOOGLE_API_KEY")
    if not api_key:
        raise ValueError("GOOGLE_API_KEY environment variable not set")
    
    search_client = genai.Client(api_key=api_key)

    try:
        #embed the user question
        result = search_client.models.embed_content(
            model="models/text-embedding-004",
            contents=user_question,
            config=types.EmbedContentConfig(task_type="QUESTION_ANSWERING")
        )
        #extract the embedding
        query_embedding = result.embeddings[0].values
        #search the docs

        search_query = search_docs_rag(query_embedding, k=10)
        search_results = db.query(search_query)
        
        if search_results and search_results[0]:
            all_results = search_results[0].get('embedding_edges', [])
            
            top_results = all_results[:top_k]
        
            
            #print the results
            for i, result in enumerate(top_results):
                chunk = result.get('chunk', 'No chunk content')
                if isinstance(chunk, list) and len(chunk) > 0:
                    chunk = chunk[0]
                #print the subchapter title
                subchapter_title = result.get('subchapter_title', 'Unknown file')
                if isinstance(subchapter_title, list) and len(subchapter_title) > 0:
                    subchapter_title = subchapter_title[0]
                
                print(f"Result {i+1}: {subchapter_title}")
                print(f"Content: {chunk}")
                print("─" * 80)
        else:
            print("No results found")
            
    except Exception as e:
        print(f"Error searching: {e}")
        import traceback
        traceback.print_exc()

# question to test the search
search_ae_docs("how do i move the inPoint of the track item", top_k=5)

[32m[HELIX][0m Querying 'http://0.0.0.0:6970/search_docs_rag': 100%|██████████| 1/1 [00:00<00:00, 107.72it/s]

Result 1: TrackItem.move()
Content: ---

### TrackItem.move()

`app.project.sequences[index].audioTracks[index].clips[index].move(newInPoint)`

`app.project.sequences[index].videoTracks[index].clips[index].move(newInPoint)`

#### Description

Moves the inPoint of the track item to a new time, by shifting it by a number of seconds.

#### Parameters

|  Parameter   |              Type               |                                          Description                                          |
| ------------ | ------------------------------- | --------------------------------------------------------------------------------------------- |
| `newInPoint` | [Time object](../other/time.md) | A Time object that represent the amount of time, in seconds, to shift the track item's start. |

#### Returns

Returns `0` if successful.
────────────────────────────────────────────────────────────────────────────────
Result 2: TrackItem.inPoint
Content: ---

### TrackItem.inPoint

`app.project.sequenc




In [49]:
if 'api_key' in locals():
    del api_key