In [1]:
import os
import re
from click import prompt
import pandas as pd
import ace_lib as ace
import nest_asyncio
import asyncio
from google import genai  # Using Google AI studio
from dotenv import load_dotenv
import json
import time
import random


FEEDBACK_FILE = "feedback_memory.json"

def load_feedback_memory():
    if os.path.exists(FEEDBACK_FILE):
        with open(FEEDBACK_FILE, "r") as f:
            return json.load(f)
    return {"mistakes": [], "corrections": []}

def save_feedback_memory(memory):
    with open(FEEDBACK_FILE, "w") as f:
        json.dump(memory, f, indent=4)

def record_feedback(problem, fix):
    memory = load_feedback_memory()
    memory["mistakes"].append(problem)
    memory["corrections"].append(fix)
    save_feedback_memory(memory)

def build_feedback_context():
    memory = load_feedback_memory()
    if not memory["mistakes"]:
        return ""
    summary = "\n".join([
        f"- Previously, you made this mistake: '{m}' → Corrected as: '{c}'"
        for m, c in zip(memory["mistakes"], memory["corrections"])
    ])
    return f"\nImportant: Avoid repeating previous mistakes.\n{summary}\n"


# Small helper to handle rate limits / transient errors when calling Brain API
def get_with_retries(session, url, params=None, max_retries=5, backoff_base=1.0):
    """Perform session.get with retries on 429 and 5xx errors.

    Args:
        session: requests-like session with .get
        url: URL to fetch
        params: optional params dict
        max_retries: number of attempts
        backoff_base: base seconds for exponential backoff

    Returns:
        Response object

    Raises:
        requests.exceptions.RequestException or the last exception encountered
    """
    last_exc = None
    for attempt in range(max_retries):
        try:
            if params is not None:
                r = session.get(url, params=params)
            else:
                r = session.get(url)
        except Exception as e:
            last_exc = e
            wait = backoff_base * (2 ** attempt) + random.random()
            print(f"Request exception: {e}. Retrying in {wait:.1f}s (attempt {attempt+1}/{max_retries})")
            time.sleep(wait)
            continue

        # If rate limited or server error, retry with backoff
        if r.status_code == 429 or (500 <= r.status_code < 600):
            wait = backoff_base * (2 ** attempt) + random.random()
            print(f"HTTP {r.status_code} received from {url}. Retrying in {wait:.1f}s (attempt {attempt+1}/{max_retries})")
            time.sleep(wait)
            last_exc = Exception(f"HTTP {r.status_code}")
            continue

        # Success
        return r

    # All retries failed
    if last_exc:
        raise last_exc
    raise Exception("Failed to get response")


nest_asyncio.apply()
_llm_instance = None


# Load API key from .env and initialize Gemini client (new SDK)
load_dotenv()
GENAI_API_KEY = os.getenv("GENAI_API_KEY")
if not GENAI_API_KEY:
    print("⚠️ Warning: GENAI_API_KEY not set in environment. Please set it in .env or the environment.")
GEMINI_MODEL = os.getenv("GEMINI_MODEL", "gemini-2.5-flash-lite")
client = genai.Client(api_key=GENAI_API_KEY)

async def call_llm(prompt):
    """
    Async interface with Google Gemini API using google-genai SDK.
    """
    try:
        print("→ Sending prompt to Gemini 2.5 Flash Lite...")

        # Run the sync model call in a separate thread to stay async-safe
        response = await asyncio.to_thread(
            client.models.generate_content,
            model=GEMINI_MODEL,
            contents=(f"You are a Quantitative Finance expert. While avoiding same combination of operators, help users explain alphas clearly. {prompt}"
            )
        )

        # Extract text response
        if hasattr(response, "text"):
            return response.text.strip()
        elif hasattr(response, "candidates"):
            return response.candidates[0].content.parts[0].text.strip()
        else:
            print("⚠️ No valid response content.")
            return None

    except Exception as e:
        print(f"Error calling Gemini: {e}")
        return None

# Generate English Description for Alpha
async def generate_alpha_description(alpha_id, brain_session):
    try:
        #1. Fetch Alpha Details (use get_with_retries to handle rate limiting)
        url = f"https://api.worldquantbrain.com/alphas/{alpha_id}"
        response = get_with_retries(brain_session, url)
        response.raise_for_status()
        details = response.json()
        alpha_expression = details.get('regular', {}).get('code', '')
        alpha_settings = details.get('settings', {})
        print("\n[Success] Alpha details and settings fetched.")
        

        # If needed get operators or other data
        operators = ace.get_operators(brain_session)
        operator_id_column = 'name'
        all_operators_names = set(operators[operator_id_column])

        #Parse alpha expression to find used operators
        print("\nParsing alpha expression to indentify components...")
        tokens = set(re.findall(r'[a-zA-Z0-9_.]+', alpha_expression))
        operators_used = sorted(list(tokens.intersection(all_operators_names)))
        potential_data_fields = tokens - all_operators_names
        data_fields_used = sorted([f for f in potential_data_fields if not f[0].isdigit() and len(f) > 4])
        print(f" > operators identified: {operators_used}")
        print(f" > data fields identified: {data_fields_used}")

        # 4. Fetch context for indentified components
        #Operators context
        operators_context = "[]"
        if operators_used:
            operators_df = operators[operators[operator_id_column].isin(operators_used)]
            operators_context = operators_df[[operator_id_column, 'description', 'definition']].to_json(orient='records')

        #Data fields context
        data_field_context_list = []
        if data_fields_used:
            print(f"\nFetching descriptions for data fields...")
            for field in data_fields_used:
                print(f" > searching for '{field}'...")
                # Use alpha's specific settings for an accurate search
                field_df = ace.get_datafields(
                    s=brain_session,
                     search=field,
                      region=alpha_settings.get('region', 'USA'),
                      universe=alpha_settings.get('universe', 'TOP3000'),
                       delay=alpha_settings.get('delay', 1),
                       data_type= 'ALL' #Search across all data types
                )
                if not field_df.empty:
                    #Find the exact match
                    exact_match = field_df[field_df['id'] == field]
                    if not exact_match.empty:
                        data_field_context_list.append(exact_match)
                    else:
                        print(f"    -Warning: Found potential match for '{field}', but no exact match.")
                else:
                    print(f"    -Warning: No datafield found for '{field}' with current alpha settings.")
        
        data_field_context = "[]"
        if data_field_context_list:
            final_df = pd.concat(data_field_context_list, ignore_index=True)
            data_field_context = final_df[['id', 'description']].to_json(orient='records')

    except Exception as e:
        print(f"Error generating alpha description: {e}")
        return None

    # Generate English description using call_llm
    
    operators_json = operators[operators['scope'] == 'REGULAR'][['name', 'description', 'definition']].to_json(orient='records')
    data_json = data_field_context if isinstance(data_field_context, str) else json.dumps(data_field_context)

    # Now build the prompt
    prompt = f"""
    Describe the following alpha in plain English.
    Alpha: {alpha_expression}
    Here are the components in JSON:
    "operators": {operators_json},
    "data_fields": {data_json}
    """
    description = await call_llm(prompt)
    if description:
        return description.strip()
    else:
        print("⚠️ LLM returned no response.")
        return "Error: LLM returned no response."


# Generate new Alphas based on generated description
async def generate_new_alphas(alpha_description, brain_session): 
    num_alphas = 5                                   #Number of Alphas

    # Fetch operator and data context
    operators = ace.get_operators(brain_session)
    dataset_ids = ['analyst10', 'analyst14']             # Choosing datasets
    data_fields = pd.concat(
        [
            ace.get_datafields(
                brain_session,
                region='USA',
                universe='TOP3000',
                delay=1,
                dataset_id=dataset_id,
                data_type='ALL'
            )
            for dataset_id in dataset_ids
        ],
        ignore_index=True
    )

    # Convert to JSON

    def truncate_json_records(df, cols, max_records):
        if len(df) > max_records:
            print(f"⚠️ Truncating {len(df)} records to {max_records} to reduce prompt size.")
            df = df.sample(max_records, random_state=42)
        return df[cols].to_json(orient='records')

    operators_json = truncate_json_records(
        operators[operators['scope'] == 'REGULAR'], 
        ['name', 'description', 'definition'], 
        30
    )

    data_fields_json = truncate_json_records(
        data_fields, 
        ['id', 'description', 'category', 'type'], 
        50
    )

    feedback_context = build_feedback_context()

    # Build the prompt — concise and strict
    prompt = f"""
    Based on the following description: '{alpha_description}', generate {num_alphas} new alpha expressions using the provided operators and data.
    {feedback_context}
    Operators: {operators[operators['scope']=='REGULAR'].to_json()}, data {data_fields.to_json()} where id is data field name
    Important: You can use type=MATRIX field by itself, as input to Arithmetic, 
    Cross Sectional, Time Series operators, With Logical and Transformational operators, As group in Group operators, with bucket().
    You can’t use type=VECTOR field by itself. You only can use type=VECTOR field as input to Vector operator. Then you can treat it as a MATRIX field.
    Always wrap type=VECTOR data in category=Vector operator.
    You can’t use type=GROUP field by itself. You need to use it as “group” parameter in Group operator.

    Provide only {num_alphas} alpha expressions, they should not be the same.
    """

    # Call the LLM
    response = await call_llm(prompt)
    if not response:
        print("⚠️ LLM returned no response while generating new alphas.")
        return []

    # Try to parse as JSON safely
    new_alphas_json = []  # Ensure variable exists even if parsing fails
    try:
        clean_response = response.strip()
        # Remove Markdown fences like ```json or ```
        clean_response = re.sub(r"^```(?:json)?|```$", "", clean_response, flags=re.MULTILINE).strip()

        # Parse JSON
        new_alphas_json = json.loads(clean_response)
        print("✅ Successfully parsed new alphas as JSON.")

    except json.JSONDecodeError:
        print("⚠️ Response is not valid JSON. Attempting fallback parsing.")
        clean_lines = [
            line.strip()
            for line in response.split("\n")
            if line.strip() and not line.strip().startswith("```")
        ]
        new_alphas_json = [
            {"alpha_name": f"Alpha_{i+1}", "expression": line}
            for i, line in enumerate(clean_lines)
        ]

    # Remove duplicates by operator signature and record feedback
    if new_alphas_json:
        def operator_signature(expr):
            ops = re.findall(r"[a-z_]+", expr)
            return "_".join(sorted(set(ops)))

        unique_alphas = []
        seen_sigs = set()

        for alpha in new_alphas_json:
            sig = operator_signature(alpha["expression"])
            if sig in seen_sigs:
                record_feedback(
                    problem=f"Repeated operator combination: {sig}",
                    fix="Enforce stronger variation across economic themes and operator types."
                )
                continue
            seen_sigs.add(sig)
            unique_alphas.append(alpha)

        new_alphas_json = unique_alphas
    else:
        print("⚠️ No valid alphas generated after parsing.")

    return new_alphas_json

# Start Brain session
brain_session = ace.start_session()

async def main():
    #alpha_description = "put Alpha Description here after generation"


    # List your alpha IDs
    alpha_ids = ["g2gngXg"]                                      #Add Parent Alpha IDs

    for alpha_id in alpha_ids:
        print(f"Processing Alpha ID: {alpha_id}", flush=True)

        # Step 1: Generate English description of the alpha
        alpha_description = await generate_alpha_description(alpha_id, brain_session)
        print(f"\nAlpha Description:\n{alpha_description}", flush=True)
        await asyncio.sleep(0.1)

        # Step 2: Generate new alphas based on the description
        new_alphas = await generate_new_alphas(alpha_description, brain_session)
        
        print(f"\nNew Alphas:\n{new_alphas}",flush=True)
        await asyncio.sleep(0.1)




Complete biometrics authentication and press any key to continue: 
https://api.worldquantbrain.com/authentication/persona?inquiry=inq_mVj4rK1bYJEctqsPGh45H5YBETFa



In [None]:
asyncio.run(main())

In [None]:
expression_list = ["Alpha Expressions"]

In [None]:
alpha_list = [
    ace.generate_alpha(
        x,
        region="USA",
        universe="TOP3000",
        test_period = "P2Y",
    )
    for x in expression_list
]

In [None]:
result = ace.simulate_alpha_list_multi(brain_session, alpha_list[:])