In [15]:
# Securely prompt for OpenAI API key (won't be saved in the notebook file)
import getpass
import os
key = getpass.getpass("Enter your OpenAI API key (input hidden): ")
if key:
    os.environ["OPENAI_API_KEY"] = key
    print("OPENAI_API_KEY set in this kernel (not saved to disk).")
else:
    print("No key entered; OPENAI_API_KEY not set.")

OPENAI_API_KEY set in this kernel (not saved to disk).


In [16]:
import pandas as pd
import openai
import numpy as np
from sklearn.decomposition import PCA
import umap
import hdbscan
import plotly.express as px

In [17]:
# Resilient embeddings helper
import time
import random
import hashlib
import json
import os

In [18]:
# Load CSV
df = pd.read_csv("complete_list.csv", delimiter=';')
definitions = df['gesture_definition'].fillna('').tolist()

In [19]:
# OpenAI client import (works with new and older SDKs)
import os
from openai import OpenAI
# Try to import SDK-specific exception classes, but fall back if not present.
try:
    from openai.error import RateLimitError, OpenAIError  # modern SDK
except Exception:
    # Fallback placeholders so code can still check exception types by name/str
    class RateLimitError(Exception):
        pass
    class OpenAIError(Exception):
        pass

# --- CONFIG ---
CACHE_FILE = "embeddings_cache.json"   # local cache file
BATCH_SIZE = 8                          # number of texts per API call (tune down if you see rate limits)
SLEEP_BETWEEN_BATCHES = 0.5             # seconds between batches
MAX_RETRIES = 6
MODEL_NAME = "text-embedding-ada-002"   # keep existing model or change per your account

# Load or init cache
if os.path.exists(CACHE_FILE):
    try:
        with open(CACHE_FILE, "r", encoding="utf-8") as f:
            embedding_cache = json.load(f)
    except Exception:
        embedding_cache = {}
else:
    embedding_cache = {}

def _cache_key(text):
    # deterministic small key for caching
    return hashlib.sha256(text.encode("utf-8")).hexdigest()

def save_cache():
    with open(CACHE_FILE + ".tmp", "w", encoding="utf-8") as f:
        json.dump(embedding_cache, f)
    os.replace(CACHE_FILE + ".tmp", CACHE_FILE)

def create_client_from_env():
    # prefer picking key from env var OPENAI_API_KEY for security
    return OpenAI()  # uses env var by default; adapt if you set api_key explicitly

def _is_insufficient_quota(exc):
    # Inspect exception text/response to detect insufficient quota; return True if quota problem
    try:
        txt = str(exc).lower()
        if "insufficient_quota" in txt or "quota" in txt:
            return True
        # some SDK responses include a response dict with details
        if hasattr(exc, 'response') and exc.response is not None:
            try:
                # if response has a .data or .text attribute, convert to str
                resp_txt = str(exc.response).lower()
                if "insufficient_quota" in resp_txt or "quota" in resp_txt:
                    return True
            except Exception:
                pass
    except Exception:
        pass
    return False


def get_embedding_with_retry(client, text, model=MODEL_NAME, max_retries=MAX_RETRIES):
    last_exc = None
    for attempt in range(max_retries):
        try:
            resp = client.embeddings.create(input=[text], model=model)
            return resp.data[0].embedding
        except RateLimitError as e:
            # If the error indicates insufficient quota, abort immediately so caller can fallback.
            if _is_insufficient_quota(e):
                # re-raise so higher level can detect and fallback
                raise
            last_exc = e
            wait = (2 ** attempt) + random.random()
            print(f"RateLimitError (attempt {attempt+1}/{max_retries}). Retrying in {wait:.1f}s...")
            time.sleep(wait)
        except OpenAIError as e:
            # explicit SDK-level error (e.g. insufficient_quota)
            print("OpenAI API error:", e)
            # If insufficient_quota, abort early — do not keep retrying
            if _is_insufficient_quota(e):
                raise
            # otherwise re-raise (caller may decide to retry or fallback)
            raise
        except Exception as e:
            # Generic fallback for SDK differences (inspects message/code)
            last_exc = e
            if _is_insufficient_quota(e):
                # immediately surface quota issues so caller can fallback
                raise
            # treat other errors as possibly transient
            print("Unexpected error calling embeddings (will retry):", e)
            time.sleep((2 ** attempt) + random.random())
    raise RuntimeError("Max retries exceeded") from last_exc

def batch_get_embeddings(texts, client=None, batch_size=BATCH_SIZE, throttle=SLEEP_BETWEEN_BATCHES):
    if client is None:
        client = create_client_from_env()
    embeddings = []
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i+batch_size]
        # check cache first
        batch_emb = []
        to_request = []
        to_request_idx = []
        for j, t in enumerate(batch):
            key = _cache_key(t)
            if key in embedding_cache:
                batch_emb.append(embedding_cache[key])
            else:
                batch_emb.append(None)
                to_request.append(t)
                to_request_idx.append(j)
        # request missing embeddings in small sub-batches (1 by 1 or grouped)
        if to_request:
            # We'll request individually with retry to get clearer error handling:
            for idx, text in zip(to_request_idx, to_request):
                try:
                    emb = get_embedding_with_retry(client, text)
                except OpenAIError as e:
                    # If quota problems, re-raise so outer code can fallback
                    if _is_insufficient_quota(e):
                        raise
                    else:
                        raise
                embedding_cache[_cache_key(text)] = emb
                batch_emb[idx] = emb
            save_cache()
        embeddings.extend(batch_emb)
        time.sleep(throttle)
    return embeddings

# Optional local fallback (sentence-transformers):
def local_fallback_embeddings(texts, model_name="all-MiniLM-L6-v2"):
    try:
        from sentence_transformers import SentenceTransformer
    except Exception as e:
        raise RuntimeError("sentence-transformers not installed. Install it with: pip install sentence-transformers") from e
    m = SentenceTransformer(model_name)
    return m.encode(texts, show_progress_bar=False).tolist()


# Note: do not create the client here automatically when editing — caller will create and call batch_get_embeddings

In [20]:
{
"cell_type":"code",
"metadata":{"language":"python"},
"source":[
"client = create_client_from_env()",
"try:",
" embeddings = batch_get_embeddings(definitions, client=client)",
"except Exception as e:",
" print('API embeddings failed (likely quota). Falling back to local model:', e)",
" embeddings = local_fallback_embeddings(definitions)"
]
}

{'cell_type': 'code',
 'metadata': {'language': 'python'},
 'source': ['client = create_client_from_env()',
  'try:',
  ' embeddings = batch_get_embeddings(definitions, client=client)',
  'except Exception as e:',
  " print('API embeddings failed (likely quota). Falling back to local model:', e)",
  ' embeddings = local_fallback_embeddings(definitions)']}

In [21]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import TruncatedSVD
from sklearn.preprocessing import normalize

def local_fallback_embeddings(texts, n_components=384):  # Match OpenAI embedding dimension
    """Generate embeddings locally using TF-IDF + SVD."""
    # Convert texts to list if single string
    if isinstance(texts, str):
        texts = [texts]
    elif isinstance(texts, dict):
        texts = [item['text'] for item in texts]

    # Create embeddings using TF-IDF followed by dimensionality reduction
    vectorizer = TfidfVectorizer(stop_words='english')
    tfidf_matrix = vectorizer.fit_transform(texts)

    # Reduce to desired dimensionality (similar to sentence transformers)
    svd = TruncatedSVD(n_components=n_components)
    embeddings = svd.fit_transform(tfidf_matrix)

    return embeddings

In [22]:
# Install required packages
import sys
!pip install --no-deps torch==2.1.1
!pip install --no-deps sentence-transformers==2.2.2
!pip install transformers tqdm numpy scikit-learn

[31mERROR: Could not find a version that satisfies the requirement torch==2.1.1 (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for torch==2.1.1[0m[31m


In [23]:
# Generate embeddings using batch helper with fallback
client = create_client_from_env()
try:
    embeddings = batch_get_embeddings(definitions, client=client)
except Exception as e:
    print('API embeddings failed. Falling back to local model:', e)
    embeddings = local_fallback_embeddings(definitions)

In [24]:
# Dimensionality reduction
reducer = umap.UMAP(n_components=2, random_state=42)
embedding_2d = reducer.fit_transform(embeddings)


n_jobs value 1 overridden to 1 by setting random_state. Use no seed for parallelism.



In [25]:

import hdbscan

X_norm = normalize(embeddings)

# Clustering
clusterer = hdbscan.HDBSCAN(min_cluster_size=2, min_samples=1, metric='euclidean')
labels = clusterer.fit_predict(X_norm)


# X_norm = normalize(embedding_2d)   # normalize to unit length


'force_all_finite' was renamed to 'ensure_all_finite' in 1.6 and will be removed in 1.8.


'force_all_finite' was renamed to 'ensure_all_finite' in 1.6 and will be removed in 1.8.



In [26]:
# Visualization
viz_df = pd.DataFrame({
    'gesture_name': df['gesture_name'],
    'x': embedding_2d[:, 0],
    'y': embedding_2d[:, 1],
    'cluster': labels
})

fig = px.scatter(viz_df, x='x', y='y', color=viz_df['cluster'].astype(str), hover_data=['gesture_name'])
fig.show()