In [3]:
#!/usr/bin/env python3
"""
bulk_gtt.py

Bulk create / modify / delete single-leg GTTs on Zerodha Kite.

Enhancement:
- Automatically loads API key, API secret, and redirect URL from environment variables.
- Only prompts for any missing credentials.

Environment variables:
  KITE_API_KEY
  KITE_API_SECRET
  KITE_REDIRECT_URL

Usage:
  export KITE_API_KEY="your_api_key"
  export KITE_API_SECRET="your_api_secret"
  export KITE_REDIRECT_URL="https://your-app.com/redirect"
  python3 bulk_gtt.py
"""

import csv
import json
import os
import sys
import time
import webbrowser
import requests
from urllib.parse import urlparse, parse_qs
from kiteconnect import KiteConnect

# ---------- Config ----------
CSV_PATH = "bulk-gtt-orders.csv"
REPORT_PATH = "report.csv"
THROTTLE_SECONDS = 1.0
KITE_API_VERSION_HEADER = "3"
KITE_API_BASE = "https://api.kite.trade"
# ----------------------------


def get_credentials():
    """Load credentials from environment or prompt if missing."""
    api_key = os.environ.get("KITE_API_KEY")
    api_secret = os.environ.get("KITE_API_SECRET")
    redirect_url = os.environ.get("KITE_REDIRECT_URL")

    if not api_key:
        api_key = input("Enter your Kite API Key: ").strip()
    if not api_secret:
        api_secret = input("Enter your Kite API Secret: ").strip()
    if not redirect_url:
        redirect_url = input("Enter your Redirect URL: ").strip()

    return api_key, api_secret, redirect_url


def oauth_and_get_tokens(api_key, api_secret, redirect_url):
    """
    Semi-automatic OAuth: opens login URL and asks for request_token.
    """
    kite = KiteConnect(api_key=api_key)
    login_url = kite.login_url()
    print("\nOpening Kite login URL in your browser...")
    print(
        "After logging in, copy the FULL redirect URL from your browser and paste it here.\n"
    )

    webbrowser.open(login_url)
    redirect_input = input("Paste the redirect URL here: ").strip()

    parsed = urlparse(redirect_input)
    qs = parse_qs(parsed.query)
    request_token_list = qs.get("request_token") or qs.get("requestToken")
    if not request_token_list:
        sys.exit("‚ùå Could not find 'request_token' in the pasted URL.")

    request_token = request_token_list[0]

    try:
        session_data = kite.generate_session(request_token, api_secret=api_secret)
        access_token = session_data["access_token"]
        kite.set_access_token(access_token)
        print("‚úÖ Access token generated successfully.\n")
        return kite, access_token
    except Exception as e:
        sys.exit(f"‚ùå Failed to generate access token: {e}")


def build_headers(api_key, access_token):
    return {
        "X-Kite-Version": KITE_API_VERSION_HEADER,
        "Authorization": f"token {api_key}:{access_token}",
        "Content-Type": "application/x-www-form-urlencoded",
    }


def read_csv_rows(csv_path):
    rows = []
    with open(csv_path, newline="", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        for r in reader:
            rows.append(
                {k: (v.strip() if isinstance(v, str) else v) for k, v in r.items()}
            )
    return rows


def write_report(report_path, report_rows):
    fieldnames = (
        list(report_rows[0].keys())
        if report_rows
        else ["row_index", "action", "status", "message", "trigger_id"]
    )
    with open(report_path, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(report_rows)
    print(f"\nüìÑ Report written to: {report_path}\n")


def find_triggers_by_instrument(headers, exchange, tradingsymbol, transaction_type):
    """Find GTTs matching given instrument details."""
    url = f"{KITE_API_BASE}/gtt/triggers"
    resp = requests.get(url, headers=headers, timeout=15)
    resp.raise_for_status()
    data = resp.json()
    if data.get("status") != "success":
        return []
    triggers = data.get("data", []) or []
    matches = []
    for t in triggers:
        cond = t.get("condition", {})
        if (
            cond.get("exchange") == exchange
            and cond.get("tradingsymbol") == tradingsymbol
        ):
            if transaction_type:
                orders = t.get("orders", []) or []
                any_match = any(
                    o.get("transaction_type") == transaction_type for o in orders
                )
                if not any_match:
                    continue
            matches.append(t)
    return matches


def create_single_gtt(headers, row):
    """Create a single-leg GTT."""
    url = f"{KITE_API_BASE}/gtt/triggers"
    condition = {
        "exchange": row["exchange"],
        "tradingsymbol": row["tradingsymbol"],
        "trigger_values": [float(row["trigger_price"])],
        "last_price": float(row.get("last_price", 0)) if row.get("last_price") else 0,
    }
    order = {
        "exchange": row["exchange"],
        "tradingsymbol": row["tradingsymbol"],
        "transaction_type": row["transaction_type"],
        "quantity": int(float(row["quantity"])),
        "order_type": row.get("order_type", "LIMIT"),
        "product": row.get("product", "CNC"),
        "price": float(row["order_price"]),
    }
    payload = {
        "type": "single",
        "condition": json.dumps(condition),
        "orders": json.dumps([order]),
    }
    return requests.post(url, data=payload, headers=headers, timeout=15)


def modify_gtt_by_id(headers, trigger_id, row):
    """Modify existing GTT."""
    url = f"{KITE_API_BASE}/gtt/triggers/{trigger_id}"
    condition = {
        "exchange": row["exchange"],
        "tradingsymbol": row["tradingsymbol"],
        "trigger_values": [float(row["trigger_price"])],
        "last_price": float(row.get("last_price", 0)) if row.get("last_price") else 0,
    }
    order = {
        "exchange": row["exchange"],
        "tradingsymbol": row["tradingsymbol"],
        "transaction_type": row["transaction_type"],
        "quantity": int(float(row["quantity"])),
        "order_type": row.get("order_type", "LIMIT"),
        "product": row.get("product", "CNC"),
        "price": float(row["order_price"]),
    }
    payload = {
        "type": "single",
        "condition": json.dumps(condition),
        "orders": json.dumps([order]),
    }
    return requests.put(url, data=payload, headers=headers, timeout=15)


def delete_gtt_by_id(headers, trigger_id):
    """Delete a GTT."""
    url = f"{KITE_API_BASE}/gtt/triggers/{trigger_id}"
    return requests.delete(url, headers=headers, timeout=15)


def process_rows(kite, api_key, access_token, rows):
    """Process CSV actions row by row."""
    headers = build_headers(api_key, access_token)
    report = []

    for idx, row in enumerate(rows, start=1):
        action = (row.get("action") or "").lower()
        gtt_id = row.get("gtt_id", "").strip()
        exchange = row["exchange"]
        tradingsymbol = row["tradingsymbol"]
        transaction_type = row.get("transaction_type")

        try:
            if action == "create":
                r = create_single_gtt(headers, row)
            elif action == "modify":
                if not gtt_id:
                    matches = find_triggers_by_instrument(
                        headers, exchange, tradingsymbol, transaction_type
                    )
                    if not matches:
                        raise ValueError("No matching GTT found to modify.")
                    gtt_id = matches[0]["id"]
                r = modify_gtt_by_id(headers, gtt_id, row)
            elif action == "delete":
                if not gtt_id:
                    matches = find_triggers_by_instrument(
                        headers, exchange, tradingsymbol, transaction_type
                    )
                    if not matches:
                        raise ValueError("No matching GTT found to delete.")
                    gtt_id = matches[0]["id"]
                r = delete_gtt_by_id(headers, gtt_id)
            else:
                report.append(
                    {
                        "row_index": idx,
                        "action": action,
                        "status": "skipped",
                        "message": "Unknown action",
                        "trigger_id": "",
                    }
                )
                continue

            data = r.json()
            status = "success" if data.get("status") == "success" else "failed"
            msg = data.get("data", data)
            trigger_id = (
                (msg.get("trigger_id") or msg.get("id"))
                if isinstance(msg, dict)
                else ""
            )
            report.append(
                {
                    "row_index": idx,
                    "action": action,
                    "status": status,
                    "message": msg,
                    "trigger_id": trigger_id,
                }
            )
        except Exception as e:
            report.append(
                {
                    "row_index": idx,
                    "action": action,
                    "status": "error",
                    "message": str(e),
                    "trigger_id": "",
                }
            )

        time.sleep(THROTTLE_SECONDS)

    return report


def main():
    print("üîÅ Bulk GTT Automation Script for Zerodha Kite\n")

    api_key, api_secret, redirect_url = get_credentials()
    kite, access_token = oauth_and_get_tokens(api_key, api_secret, redirect_url)

    if not os.path.exists(CSV_PATH):
        sys.exit(f"‚ùå CSV file not found: {CSV_PATH}")

    rows = read_csv_rows(CSV_PATH)
    print(f"Processing {len(rows)} rows from {CSV_PATH}...\n")
    report = process_rows(kite, api_key, access_token, rows)
    write_report(REPORT_PATH, report)
    print("‚úÖ All done.\n")


if __name__ == "__main__":
    main()

üîÅ Bulk GTT Automation Script for Zerodha Kite


Opening Kite login URL in your browser...
After logging in, copy the FULL redirect URL from your browser and paste it here.

‚úÖ Access token generated successfully.

Processing 1 rows from bulk-gtt-orders.csv...


üìÑ Report written to: report.csv

‚úÖ All done.

