<a href="https://colab.research.google.com/github/student-Asmi/App/blob/main/assignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install lark pandas numpy




In [2]:

import re

def normalize_text(text):
    return re.sub(r"\s+", " ", text.lower()).strip()

def extract_numbers(text):
    text = text.lower()
    match = re.findall(r"\d+", text)
    nums = [int(n) for n in match] if match else []

    if "million" in text:
        nums = [nums[0] * 1_000_000]
    if "thousand" in text:
        nums = [nums[0] * 1000]

    return nums

def convert_indicator(text):
    if "moving average" in text or "ma" in text:
        nums = extract_numbers(text)
        if nums:
            return f"SMA(close,{nums[0]})"

    if "rsi" in text:
        nums = extract_numbers(text)
        period = nums[0] if nums else 14
        return f"RSI(close,{period})"

    return None

def parse_condition(sentence):
    sentence = normalize_text(sentence)

    left = None
    op = None
    right = None

    if "close" in sentence or "price" in sentence:
        left = "close"
    elif "volume" in sentence:
        left = "volume"

    if "above" in sentence:
        op = ">"
    elif "below" in sentence:
        op = "<"

    right_indicator = convert_indicator(sentence)
    if right_indicator:
        right = right_indicator

    nums = extract_numbers(sentence)
    if nums and not right:
        right = nums[0]

    if left and op and right:
        return f"{left} {op} {right}"

    return None


def nl_to_dsl(nl_text):
    text = normalize_text(nl_text)
    parts = re.split(r" and | , ", text)

    rules = []
    for p in parts:
        cond = parse_condition(p)
        if cond:
            rules.append(cond)

    mode = "ENTRY"
    if "exit" in text or "sell" in text:
        mode = "EXIT"
    elif "buy" in text or "enter" in text:
        mode = "ENTRY"

    rule_text = " AND ".join(rules)
    return f"{mode}: {rule_text}"


In [3]:
import re

def normalize_text(text):
    return re.sub(r"\s+", " ", text.lower()).strip()

def extract_numbers(text):
    text = text.lower()
    match = re.findall(r"\d+", text)
    nums = [int(n) for n in match] if match else []
    if "million" in text and nums:
        nums = [nums[0] * 1_000_000]
    if "thousand" in text and nums:
        nums = [nums[0] * 1000]
    return nums


def parse_one_condition(sentence):
    """
    Ek condition ko JSON me convert karo.
    - "close price is above the 20-day moving average"
    - "volume is above 1 million"
    - "rsi(14) is below 30"
    """
    rule = parse_cross_condition(sentence)
    if rule:
     return rule

    s = normalize_text(sentence)

    left = None
    op = None
    right = None

    # 1) OPERATOR detect karo
    if "above" in s or "greater than" in s:
        op = ">"
    elif "below" in s or "less than" in s:
        op = "<"
    elif "equal" in s:
        op = "=="

    # 2) Special case: RSI
    if "rsi" in s:
        # numbers: e.g. "rsi(14) is below 30" -> [14, 30]
        nums = re.findall(r"\d+", s)
        period = int(nums[0]) if len(nums) >= 1 else 14
        # threshold last number hoga (e.g. 30)
        threshold = int(nums[-1]) if len(nums) >= 2 else 30

        left = f"rsi(close,{period})"
        right = threshold

        return {
            "left": left,
            "operator": op,
            "right": right
        }

    # 3) LEFT side: close / volume (non-RSI case)
    if "close" in s or "price" in s:
        left = "close"
    elif "volume" in s:
        left = "volume"

    # 4) RIGHT side KE LIYE:
    #    (a) moving average
    if "moving average" in s or "ma" in s:
        nums = extract_numbers(s)
        period = nums[0] if nums else 20
        right = f"sma({left},{period})"

    #    (b) plain number (e.g. "1 million")
    if right is None:
        nums = extract_numbers(s)
        if nums:
            right = nums[0]

    if left and op and right is not None:
        return {"left": left, "operator": op, "right": right}
    else:
        return None


In [4]:
def nl_to_json_rules(text):
    t = normalize_text(text)

    is_entry = any(word in t for word in ["buy", "enter", "trigger entry"])
    is_exit = any(word in t for word in ["exit", "sell"])

    entry_rules = []
    exit_rules = []

    parts = re.split(r"\band\b", t)

    for part in parts:
        rule = parse_one_condition(part)
        if rule:
            if is_exit:
                exit_rules.append(rule)
            else:
                entry_rules.append(rule)

    return {
        "entry": entry_rules,
        "exit": exit_rules
    }


In [5]:
import re

def normalize_text(text):
    return re.sub(r"\s+", " ", text.lower()).strip()

def extract_numbers(text):
    text = text.lower()
    match = re.findall(r"\d+", text)
    nums = [int(n) for n in match] if match else []
    if "million" in text and nums:
        nums = [nums[0] * 1_000_000]
    if "thousand" in text and nums:
        nums = [nums[0] * 1000]
    return nums

def parse_cross_condition(sentence):
    s = normalize_text(sentence)

    # check if sentence contains "crosses above"
    if "crosses above" in s or "cross above" in s:
        operator = "crosses_above"

        # LEFT side → always "close" when someone says price
        left = "close"

        # RIGHT side: yesterday's high, yesterday high
        if "yesterday" in s and "high" in s:
            right = "high[1]"
        elif "yesterday" in s and "low" in s:
            right = "low[1]"
        else:
            right = None

        if right:
            return {
                "left": left,
                "operator": operator,
                "right": right
            }

    return None

def parse_one_condition(sentence):
    """
    Ek condition ko JSON me convert karo.
    - "close price is above the 20-day moving average"
    - "volume is above 1 million"
    - "rsi(14) is below 30"
    """
    rule = parse_cross_condition(sentence)
    if rule:
     return rule

    s = normalize_text(sentence)

    left = None
    op = None
    right = None

    # 1) OPERATOR detect karo
    if "above" in s or "greater than" in s:
        op = ">"
    elif "below" in s or "less than" in s:
        op = "<"
    elif "equal" in s:
        op = "=="

    # 2) Special case: RSI
    if "rsi" in s:
        # numbers: e.g. "rsi(14) is below 30" -> [14, 30]
        nums = re.findall(r"\d+", s)
        period = int(nums[0]) if len(nums) >= 1 else 14
        # threshold last number hoga (e.g. 30)
        threshold = int(nums[-1]) if len(nums) >= 2 else 30

        left = f"rsi(close,{period})"
        right = threshold

        return {
            "left": left,
            "operator": op,
            "right": right
        }

    # 3) LEFT side: close / volume (non-RSI case)
    if "close" in s or "price" in s:
        left = "close"
    elif "volume" in s:
        left = "volume"

    # 4) RIGHT side KE LIYE:
    #    (a) moving average
    if "moving average" in s or "ma" in s:
        nums = extract_numbers(s)
        period = nums[0] if nums else 20
        right = f"sma({left},{period})"

    #    (b) plain number (e.g. "1 million")
    if right is None:
        nums = extract_numbers(s)
        if nums:
            right = nums[0]

    if left and op and right is not None:
        return {"left": left, "operator": op, "right": right}
    else:
        return None

def nl_to_json_rules(text):
    t = normalize_text(text)

    is_entry = any(word in t for word in ["buy", "enter", "trigger entry"])
    is_exit = any(word in t for word in ["exit", "sell"])

    entry_rules = []
    exit_rules = []

    parts = re.split(r"\band\b", t)

    for part in parts:
        rule = parse_one_condition(part)
        if rule:
            if is_exit:
                exit_rules.append(rule)
            else:
                entry_rules.append(rule)

    return {
        "entry": entry_rules,
        "exit": exit_rules
    }

nl_to_json_rules("Exit when RSI(14) is below 30.")

{'entry': [],
 'exit': [{'left': 'rsi(close,14)', 'operator': '<', 'right': 30}]}

In [6]:
nl_to_json_rules("Buy when the close price is above the 20-day moving average and volume is above 1 million.")

{'entry': [{'left': 'close', 'operator': '>', 'right': 'sma(close,20)'},
  {'left': 'volume', 'operator': '>', 'right': 1000000}],
 'exit': []}

In [7]:
def parse_cross_condition(sentence):
    s = normalize_text(sentence)

    # check if sentence contains "crosses above"
    if "crosses above" in s or "cross above" in s:
        operator = "crosses_above"

        # LEFT side → always "close" when someone says price
        left = "close"

        # RIGHT side: yesterday's high, yesterday high
        if "yesterday" in s and "high" in s:
            right = "high[1]"
        elif "yesterday" in s and "low" in s:
            right = "low[1]"
        else:
            right = None

        if right:
            return {
                "left": left,
                "operator": operator,
                "right": right
            }

    return None


In [8]:
nl_to_json_rules("Enter when price crosses above yesterday’s high.")


{'entry': [{'left': 'close', 'operator': 'crosses_above', 'right': 'high[1]'}],
 'exit': []}

In [9]:
nl_to_json_rules("Trigger entry when volume increases by more than 30 percent compared to last week.")


{'entry': [], 'exit': []}

In [10]:
def parse_percent_last_week(sentence):
    s = normalize_text(sentence)

    # Check percent increase pattern
    if "percent" in s and ("increase" in s or "increases" in s):
        # extract percent number
        nums = re.findall(r"\d+", s)
        percent = int(nums[0]) if nums else None

        if percent is None:
            return None

        # detect left side (volume)
        if "volume" in s:
            left = "volume"
        else:
            return None

        # detect "last week" -> 7 day lookback
        if "last week" in s:
            base = "volume[7]"
        else:
            return None

        multiplier = 1 + percent / 100

        right = f"{base} * {multiplier}"

        return {
            "left": left,
            "operator": ">",
            "right": right
        }

    return None


In [11]:
nl_to_json_rules("Trigger entry when volume increases by more than 30 percent compared to last week.")


{'entry': [], 'exit': []}

In [12]:
def parse_one_condition(sentence):
    s = normalize_text(sentence)

    # 1) Percent + last week rule
    rule = parse_percent_last_week(sentence)
    if rule:
        return rule

    # 2) Cross above rule
    rule = parse_cross_condition(sentence)
    if rule:
        return rule

    # 3) RSI rule
    if "rsi" in s:
        nums = re.findall(r"\d+", s)
        period = int(nums[0]) if len(nums) >= 1 else 14
        threshold = int(nums[-1]) if len(nums) >= 2 else 30

        # operator detection
        if "below" in s:
            op = "<"
        elif "above" in s:
            op = ">"
        else:
            op = None

        return {
            "left": f"rsi(close,{period})",
            "operator": op,
            "right": threshold
        }

    # 4) Basic comparisons (close, volume, SMA)
    left = None
    op = None
    right = None

    if "close" in s or "price" in s:
        left = "close"
    elif "volume" in s:
        left = "volume"

    if "above" in s:
        op = ">"
    elif "below" in s:
        op = "<"
    elif "equal" in s:
        op = "=="

    # moving average
    if "moving average" in s or "ma" in s:
        nums = extract_numbers(s)
        period = nums[0] if nums else 20
        right = f"sma({left},{period})"

    # plain numeric
    if right is None:
        nums = extract_numbers(s)
        if nums:
            right = nums[0]

    if left and op and right is not None:
        return {"left": left, "operator": op, "right": right}

    return None


In [13]:
nl_to_json_rules("Trigger entry when volume increases by more than 30 percent compared to last week.")


{'entry': [{'left': 'volume', 'operator': '>', 'right': 'volume[7] * 1.3'}],
 'exit': []}

In [14]:
#DSL Parser + AST Construction


In [15]:
!pip install lark




In [16]:
from lark import Lark, Transformer, v_args

# -------------------------
# DSL GRAMMAR
# -------------------------
dsl_grammar = r"""
    start: entry exit?

    entry: "ENTRY:" expr
    exit: "EXIT:" expr

    ?expr: expr "AND" expr   -> and_op
         | expr "OR" expr    -> or_op
         | condition
         | "(" expr ")"

    ?condition: comparison
              | cross_above
              | cross_below

    comparison: series OP value

    cross_above: series "crosses_above" series
    cross_below: series "crosses_below" series

    series: CNAME ("[" NUMBER "]")?

    indicator: CNAME "(" series "," NUMBER ")"

    value: NUMBER
         | indicator
         | series

    OP: ">" | "<" | ">=" | "<=" | "=="

    %import common.CNAME
    %import common.NUMBER
    %import common.WS
    %ignore WS
"""

# Create parser
dsl_parser = Lark(dsl_grammar, start="start")


In [17]:
tree = dsl_parser.parse("ENTRY: close crosses_above high[1] AND volume > 1000000")
print(tree.pretty())


start
  entry
    and_op
      cross_above
        series	close
        series
          high
          1
      comparison
        series	volume
        >
        value	1000000



In [18]:
# ats builder

In [19]:
class DSLtoAST(Transformer):

    def entry(self, items):
        return ("entry", items[0])

    def exit(self, items):
        return ("exit", items[0])

    def and_op(self, items):
        return {
            "type": "and",
            "left": items[0],
            "right": items[1]
        }

    def or_op(self, items):
        return {
            "type": "or",
            "left": items[0],
            "right": items[1]
        }

    def comparison(self, items):
        return {
            "type": "comparison",
            "left": items[0],
            "operator": items[1].value,
            "right": items[2]
        }

    def cross_above(self, items):
        return {
            "type": "cross",
            "direction": "above",
            "left": items[0],
            "right": items[1]
        }

    def cross_below(self, items):
        return {
            "type": "cross",
            "direction": "below",
            "left": items[0],
            "right": items[1]
        }

    def series(self, items):
        if len(items) == 2:
            return f"{items[0]}[{items[1]}]"
        return items[0].value

    def indicator(self, items):
        return {
            "type": "indicator",
            "name": items[0].value.lower(),
            "series": items[1],
            "period": int(items[2])
        }

    def value(self, items):
        if len(items) == 1:
            return items[0]
        return items


In [20]:
dsl = "ENTRY: close crosses_above high[1] AND volume > 1000000"
tree = dsl_parser.parse(dsl)
ast_transformer = DSLtoAST()
ast = ast_transformer.transform(tree)

ast


Tree(Token('RULE', 'start'), [('entry', {'type': 'and', 'left': {'type': 'cross', 'direction': 'above', 'left': 'close', 'right': 'high[1]'}, 'right': {'type': 'comparison', 'left': 'volume', 'operator': '>', 'right': Token('NUMBER', '1000000')}})])

In [21]:
def build_final_ast(dsl_text):
    tree = dsl_parser.parse(dsl_text)
    ast_transformer = DSLtoAST()
    transformed = ast_transformer.transform(tree)

    final_ast = {"entry": [], "exit": []}

    # transformed is a Tree: children contain ('entry', ruleAST) / ('exit', ruleAST)
    for item in transformed.children:
        section, ast = item
        if section == "entry":
            final_ast["entry"].append(ast)
        elif section == "exit":
            final_ast["exit"].append(ast)

    return final_ast


In [22]:
dsl = "ENTRY: close crosses_above high[1] AND volume > 1000000"
print(build_final_ast(dsl))


{'entry': [{'type': 'and', 'left': {'type': 'cross', 'direction': 'above', 'left': 'close', 'right': 'high[1]'}, 'right': {'type': 'comparison', 'left': 'volume', 'operator': '>', 'right': Token('NUMBER', '1000000')}}], 'exit': []}


In [23]:
#AST → Python Code Generator

In [24]:
{
 "type": "and",
 "left": {
     "type": "cross",
     "direction": "above",
     "left": "close",
     "right": "high[1]"
 },
 "right": {
     "type": "comparison",
     "left": "volume",
     "operator": ">",
     "right": 1000000
 }
}


{'type': 'and',
 'left': {'type': 'cross',
  'direction': 'above',
  'left': 'close',
  'right': 'high[1]'},
 'right': {'type': 'comparison',
  'left': 'volume',
  'operator': '>',
  'right': 1000000}}

In [25]:
#Comparison Nodes

In [26]:
{
 "type": "comparison",
 "left": "close",
 "operator": ">",
 "right": 1000000
}


{'type': 'comparison', 'left': 'close', 'operator': '>', 'right': 1000000}

In [27]:
#Indicator Nodes

In [28]:
{
 "type": "indicator",
 "name": "sma",
 "series": "close",
 "period": 20
}


{'type': 'indicator', 'name': 'sma', 'series': 'close', 'period': 20}

In [29]:
#Cross events

In [30]:
dsl_code = "ENTRY: close crosses_above high[1]"
tree = dsl_parser.parse(dsl_code)
print(tree.pretty())

start
  entry
    cross_above
      series	close
      series
        high
        1



In [31]:
#Final Code Generator Implementation

In [32]:
from lark.lexer import Token

def generate_python_expr(node):
    """Convert AST node into pandas expression string"""

    # ---------------------------------------------------
    # 1. COMPARISON NODE
    # ---------------------------------------------------
    if node["type"] == "comparison":
        left = node["left"]
        op = node["operator"]
        right = node["right"]

        # convert NUMBER tokens → int
        if isinstance(right, Token) and right.type == "NUMBER":
            right = int(right.value)

        # ---- LEFT SIDE PROCESSING ----
        if isinstance(left, dict) and left.get("type") == "indicator":
            left_expr = generate_python_expr(left) # Recursively call for indicator
        elif isinstance(left, str): # Assume it's a series name (e.g., "close", "volume")
            # This handles series with lag like 'volume[7]'
            if "[" in left:
                series, lag = left.split("[")
                lag = lag.replace("]", "")
                left_expr = f"df['{series}'].shift({lag})"
            else:
                left_expr = f"df['{left}']"
        else:
            raise ValueError("Unsupported LHS type for comparison:", left)

        # ---- RIGHT SIDE PROCESSING ----
        if isinstance(right, (int, float)):
            right_expr = str(right)
        elif isinstance(right, str):
            # e.g. volume[7] or 'high' or 'close' (if it were in RHS)
            if "[" in right:
                series, lag = right.split("[")
                lag = lag.replace("]", "")
                right_expr = f"df['{series}'].shift({lag})"
            else:
                right_expr = f"df['{right}']"
        elif isinstance(right, dict) and right.get("type") == "indicator":
            right_expr = generate_python_expr(right)
        else:
            raise ValueError("Unsupported RHS type for comparison:", right)

        return f"({left_expr} {op} {right_expr})"

    # ---------------------------------------------------
    # 2. INDICATOR NODE
    # ---------------------------------------------------
    if node["type"] == "indicator":
        name = node["name"].upper()     # SMA / RSI
        series = node["series"]
        period = node["period"]
        # Ensure series is properly formatted if it's not already a string (e.g., if it's a sub-indicator, though not in current grammar)
        if isinstance(series, dict) and series.get("type") == "indicator":
            series_expr = generate_python_expr(series)
        elif isinstance(series, str):
            # Handle potential lag in the series definition within an indicator, if applicable
            if "[" in series:
                col, lag = series.split("[")
                lag = lag.replace("]", "")
                series_expr = f"df['{col}'].shift({lag})"
            else:
                series_expr = f"df['{series}']"
        else:
            # Default to direct df access if it's a simple column name
            series_expr = f"df['{series}']"

        return f"{name}({series_expr}, {period})"


    # ---------------------------------------------------
    # 3. CROSS EVENTS
    # ---------------------------------------------------
    if node["type"] == "cross":
        left = node["left"]
        right = node["right"]

        # Recursive call for left and right if they are indicators (though not expected by current grammar)
        left_now_expr = generate_python_expr(left) if isinstance(left, dict) else f"df['{left}']"
        left_prev_expr = generate_python_expr(left) if isinstance(left, dict) else f"df['{left}'].shift(1)"

        if "[" in right:
            series_name, lag = right.split("[")
            lag = int(lag.replace("]", ""))
            right_now_expr = f"df['{series_name}'].shift({lag})"
            right_prev_expr = f"df['{series_name}'].shift({lag + 1})"
        else:
            right_now_expr = generate_python_expr(right) if isinstance(right, dict) else f"df['{right}']"
            right_prev_expr = generate_python_expr(right) if isinstance(right, dict) else f"df['{right}'].shift(1)"

        if node["direction"] == "above":
            return f"((({left_prev_expr}) <= ({right_prev_expr})) & (({left_now_expr}) > ({right_now_expr})))"

        if node["direction"] == "below":
            return f"((({left_prev_expr}) >= ({right_prev_expr})) & (({left_now_expr}) < ({right_now_expr})))"

    # ---------------------------------------------------
    # 4. LOGICAL OPERATORS
    # ---------------------------------------------------
    if node["type"] == "and":
        left_expr = generate_python_expr(node["left"])
        right_expr = generate_python_expr(node["right"])
        return f"({left_expr} & {right_expr})"

    if node["type"] == "or":
        left_expr = generate_python_expr(node["left"])
        right_expr = generate_python_expr(node["right"])
        return f"({left_expr} | {right_expr})"

    # ---------------------------------------------------
    # UNKNOWN NODE TYPE
    # ---------------------------------------------------
    raise ValueError("Unknown AST node type:", node)

In [33]:
#Function to convert full AST to Python functions

In [34]:
def ast_to_python_code(final_ast):
    entry_expr = ""
    exit_expr = ""

    if final_ast["entry"]:
        entry_expr = generate_python_expr(final_ast["entry"][0])
    if final_ast["exit"]:
        exit_expr = generate_python_expr(final_ast["exit"][0])

    code = f"""
def run_strategy(df):
    import pandas as pd

    signals = pd.DataFrame(index=df.index)
    signals['entry'] = {entry_expr}
    signals['exit'] = {exit_expr}

    return signals
"""
    return code


In [35]:
dsl = "ENTRY: close crosses_above high[1] AND volume > 1000000"
ast = build_final_ast(dsl)

code = ast_to_python_code(ast)
print(code)



def run_strategy(df):
    import pandas as pd

    signals = pd.DataFrame(index=df.index)
    signals['entry'] = ((((df['close'].shift(1)) <= (df['high'].shift(2))) & ((df['close']) > (df['high'].shift(1)))) & (df['volume'] > 1000000))
    signals['exit'] = 

    return signals



In [36]:
#Part 5: Backtest Engine

In [37]:
import pandas as pd
import numpy as np

def backtest_signals(df, signals, initial_capital=100000.0, slippage=0.0, commission=0.0):
    """
    Run a simple backtest using boolean entry/exit signals.

    Args:
        df (pd.DataFrame): OHLCV dataframe with columns ['open','high','low','close','volume'].
        signals (pd.DataFrame): DataFrame with boolean columns 'entry' and 'exit' indexed same as df.
        initial_capital (float): starting cash.
        slippage (float): per-share slippage to apply on entry/exit prices (absolute).
        commission (float): fixed commission per trade.
    Returns:
        dict: {
            'trades': list of trade dicts,
            'equity': pd.Series (daily equity curve),
            'final_capital': float,
            'total_return_pct': float,
            'max_drawdown_pct': float,
            'num_trades': int
        }
    """
    # sanity checks
    assert 'entry' in signals.columns and 'exit' in signals.columns, "signals must have 'entry' and 'exit' columns"
    assert df.shape[0] == signals.shape[0], "df and signals must have the same number of rows and aligned index"
    # align indices
    signals = signals.reindex(df.index)

    cash = float(initial_capital)
    position = 0.0      # number of shares currently held (0 means flat)
    entry_price = None  # price we entered
    entry_index = None

    trades = []
    equity_list = []
    equity_index = []

    # track last known equity for days before first trade
    # iterate rows by index position so we can look at next row for fills
    idxs = list(df.index)
    n = len(idxs)

    for i, idx in enumerate(idxs):
        row = df.loc[idx]
        # default today's market value of any open position (use close price to mark-to-market)
        close_price = float(row['close'])

        # 1) If currently flat, check for entry signal at this row
        if position == 0:
            if signals.loc[idx, 'entry']:
                # determine fill price: next open if exists else current close
                if i+1 < n:
                    fill_idx = idxs[i+1]
                    fill_price = float(df.loc[fill_idx, 'open'])
                else:
                    fill_price = close_price

                # apply slippage on entry (assume slippage increases buy price)
                buy_price = fill_price + slippage

                # calculate number of shares (allow fractional)
                shares = cash / buy_price if buy_price > 0 else 0.0
                if shares > 0:
                    position = shares
                    entry_price = buy_price
                    entry_index = idx

                    # deduct commission
                    cash -= commission

                    # Note: we keep full cash invested (cash becomes 0), but to avoid tiny rounding issues:
                    cash = max(0.0, cash)

                    trades.append({
                        'entry_index': idx,
                        'entry_fill_index': fill_idx if i+1 < n else idx,
                        'entry_price': buy_price,
                        'exit_index': None,
                        'exit_fill_index': None,
                        'exit_price': None,
                        'shares': shares,
                        'pnl': None,
                        'return_pct': None
                    })

        # 2) If in position, check exit signal
        elif position > 0:
            if signals.loc[idx, 'exit']:
                # determine exit fill price (next open if exists else current close)
                if i+1 < n:
                    fill_idx = idxs[i+1]
                    fill_price = float(df.loc[fill_idx, 'open'])
                else:
                    fill_price = close_price

                # apply slippage (assume slippage increases sell price negatively, i.e. lowers proceeds)
                sell_price = fill_price - slippage

                # compute proceeds
                proceeds = position * sell_price

                # compute trade PnL = proceeds - cost
                cost = position * entry_price
                pnl = proceeds - cost - commission  # subtract commission at exit
                return_pct = (pnl / cost) if cost != 0 else 0.0

                # update cash
                cash += proceeds
                # ensure cash is float
                cash = float(cash)

                # record in last trade dict
                last_trade = trades[-1]
                last_trade['exit_index'] = idx
                last_trade['exit_fill_index'] = fill_idx if i+1 < n else idx
                last_trade['exit_price'] = sell_price
                last_trade['pnl'] = pnl
                last_trade['return_pct'] = return_pct

                # reset position
                position = 0.0
                entry_price = None
                entry_index = None

        # daily equity mark-to-market: cash + position * close
        mtm = cash + position * close_price
        equity_list.append(mtm)
        equity_index.append(idx)

    # after loop, if still in position, close at last close price (mark-to-market but also treat as exit)
    if position > 0:
        last_idx = idxs[-1]
        last_close = float(df.loc[last_idx, 'close'])
        sell_price = last_close - slippage
        proceeds = position * sell_price
        cost = trades[-1]['shares'] * trades[-1]['entry_price']
        pnl = proceeds - cost - commission
        return_pct = (pnl / cost) if cost != 0 else 0.0

        # finalize last trade
        last_trade = trades[-1]
        last_trade['exit_index'] = last_idx
        last_trade['exit_fill_index'] = last_idx
        last_trade['exit_price'] = sell_price
        last_trade['pnl'] = pnl
        last_trade['return_pct'] = return_pct

        cash += proceeds
        position = 0.0
        equity_list[-1] = cash  # final equity equals cash after forced close

    equity = pd.Series(equity_list, index=equity_index)

    # metrics
    final_capital = float(equity.iloc[-1]) if len(equity) else float(initial_capital)
    total_return_pct = ((final_capital - initial_capital) / initial_capital) * 100.0

    # max drawdown calculation
    roll_max = equity.cummax()
    drawdown = (equity - roll_max) / roll_max
    max_drawdown_pct = float(drawdown.min() * 100.0) if not drawdown.empty else 0.0

    num_trades = len(trades)
    # format trades nicely (round floats)
    for t in trades:
        # convert indices to str for JSON friendliness if they are timestamps
        t['entry_index'] = str(t['entry_index'])
        t['exit_index'] = str(t['exit_index']) if t['exit_index'] is not None else None
        t['pnl'] = float(t['pnl']) if t['pnl'] is not None else None
        t['return_pct'] = float(t['return_pct']) * 100.0 if t['return_pct'] is not None else None
        t['entry_price'] = float(t['entry_price']) if t['entry_price'] is not None else None
        t['exit_price'] = float(t['exit_price']) if t['exit_price'] is not None else None
        t['shares'] = float(t.get('shares', 0.0))

    results = {
        'trades': trades,
        'equity': equity,
        'final_capital': final_capital,
        'total_return_pct': total_return_pct,
        'max_drawdown_pct': max_drawdown_pct,
        'num_trades': num_trades
    }

    return results


In [38]:
# defining df
import pandas as pd
import numpy as np

np.random.seed(0)

dates = pd.date_range("2023-01-01", periods=100)
prices = np.linspace(100, 150, 100) + np.random.normal(0, 2, 100)

df = pd.DataFrame({
    "open": prices + np.random.normal(0, 1, 100),
    "high": prices + np.random.normal(1, 1, 100),
    "low": prices - np.random.normal(1, 1, 100),
    "close": prices,
    "volume": np.random.randint(10000, 20000, size=100)
}, index=dates)


In [39]:
#defining signals
signals = pd.DataFrame(index=df.index)
signals["entry"] = df["close"] > df["close"].shift(1)   # dummy example
signals["exit"] = df["close"] < df["close"].shift(1)    # dummy example


In [40]:
# df: your OHLCV DataFrame
# signals: DataFrame with columns ['entry','exit'] (booleans), same index as df

out = backtest_signals(df, signals, initial_capital=100000, slippage=0.0, commission=0.0)

print("Total Return (pct):", out['total_return_pct'])
print("Max Drawdown (pct):", out['max_drawdown_pct'])
print("Number of trades:", out['num_trades'])
print("Trades log:")
for t in out['trades']:
    print(t)


Total Return (pct): 3979569165673.9907
Max Drawdown (pct): -2.140356181356539
Number of trades: 35
Trades log:
{'entry_index': '2023-01-03 00:00:00', 'entry_fill_index': Timestamp('2023-01-04 00:00:00'), 'entry_price': 106.96633462171245, 'exit_index': '2023-01-05 00:00:00', 'exit_fill_index': Timestamp('2023-01-06 00:00:00'), 'exit_price': 102.514317951149, 'shares': 934.873578249185, 'pnl': -4162.07275523468, 'return_pct': -4.1620727552346795}
{'entry_index': '2023-01-07 00:00:00', 'entry_fill_index': Timestamp('2023-01-08 00:00:00'), 'entry_price': 102.48518430731738, 'exit_index': '2023-01-08 00:00:00', 'exit_fill_index': Timestamp('2023-01-09 00:00:00'), 'exit_price': 105.75690836329731, 'shares': 1910.890130787252, 'pnl': 6251.905209231307, 'return_pct': 3.192387346613125}
{'entry_index': '2023-01-09 00:00:00', 'entry_fill_index': Timestamp('2023-01-10 00:00:00'), 'entry_price': 106.84716634076572, 'exit_index': '2023-01-11 00:00:00', 'exit_fill_index': Timestamp('2023-01-12 00:0

In [41]:
# Part 6 – Final Integration / Demo

In [42]:
# demo.py

import pandas as pd
from lark import Lark
# ------- IMPORT YOUR OWN MODULES HERE -------
# from nl_parser import nl_to_json_rules         # Part 1
# from dsl_parser import dsl_parser              # Part 3
# from ast_builder import DSLtoAST, build_final_ast
# from code_generator import ast_to_python_code, generate_python_expr
# from backtester import backtest_signals        # Part 5

# BUT since your code is in the notebook, we directly call functions:
# (Assuming all previous Parts are executed in cells above)

# ---------------------------------------------
# STEP 1 → Natural Language → DSL
# ---------------------------------------------

def json_to_dsl(json_rules):
    """Convert NL → JSON rules into DSL string."""
    entry_rules = []
    exit_rules = []

    # Entry
    for rule in json_rules["entry"]:
        left = rule["left"]
        op = rule["operator"]
        right = rule["right"]
        entry_rules.append(f"{left} {op} {right}")

    # Exit
    for rule in json_rules["exit"]:
        left = rule["left"]
        op = rule["operator"]
        right = rule["right"]
        exit_rules.append(f"{left} {op} {right}")

    dsl = ""

    if entry_rules:
        dsl += "ENTRY: " + " AND ".join(entry_rules) + "\n"

    if exit_rules:
        dsl += "EXIT: " + " AND ".join(exit_rules)

    return dsl.strip()


# ---------------------------------------------
# STEP 2 → DSL → AST
# ---------------------------------------------

def parse_dsl_to_ast(dsl_text):
    tree = dsl_parser.parse(dsl_text)
    transformer = DSLtoAST()
    parsed = transformer.transform(tree)
    final_ast = {"entry": [], "exit": []}

    for node in parsed.children:
        section, ast = node
        final_ast[section].append(ast)

    return final_ast


# ---------------------------------------------
# STEP 3 → AST → Python expression
# ---------------------------------------------

def ast_to_signals(df, ast):
    signals = pd.DataFrame(index=df.index)
    signals["entry"] = False
    signals["exit"] = False

    if ast["entry"]:
        entry_expr = generate_python_expr(ast["entry"][0])
        signals["entry"] = eval(entry_expr)

    if ast["exit"]:
        exit_expr = generate_python_expr(ast["exit"][0])
        signals["exit"] = eval(exit_expr)

    signals = signals.fillna(False)
    return signals


# ---------------------------------------------
# FINAL END-TO-END FUNCTION
# ---------------------------------------------

def run_pipeline(entry_nl, exit_nl, df):
    print("\n========================")
    print("1. NL → JSON")
    print("========================")
    entry_json = nl_to_json_rules(entry_nl)
    exit_json = nl_to_json_rules(exit_nl)

    json_rules = {
        "entry": entry_json["entry"],
        "exit": exit_json["exit"]
    }

    print(json_rules)

    print("\n========================")
    print("2. JSON → DSL")
    print("========================")
    dsl = json_to_dsl(json_rules)
    print(dsl)

    print("\n========================")
    print("3. DSL → AST")
    print("========================")
    ast = parse_dsl_to_ast(dsl)
    print(ast)

    print("\n========================")
    print("4. AST → Python Code → Signals")
    print("========================")
    signals = ast_to_signals(df, ast)
    print(signals.head())

    print("\n========================")
    print("5. Running Backtest")
    print("========================")
    result = backtest_signals(df, signals)

    print("\nFinal Results:")
    print("Total Return (%) =", result["total_return_pct"])
    print("Max Drawdown (%) =", result["max_drawdown_pct"])
    print("Number of Trades =", result["num_trades"])
    print("\nTrades Log:")
    for t in result["trades"]:
        print(t)

    return result



In [43]:
dsl_grammar = r"""
    start: entry exit?

    entry: "ENTRY:" expr
    exit: "EXIT:" expr

    ?expr: expr "AND" expr   -> and_op
         | expr "OR" expr    -> or_op
         | condition
         | "(" expr ")"

    ?condition: comparison
              | cross_above
              | cross_below

    comparison: operand OP operand

    ?operand: indicator
            | series
            | NUMBER

    cross_above: series "crosses_above" series
    cross_below: series "crosses_below" series

    series: CNAME ("[" NUMBER "]")?

    indicator: CNAME "(" operand "," NUMBER ")"

    OP: ">" | "<" | ">=" | "<=" | "=="

    %import common.CNAME
    %import common.NUMBER
    %import common.WS
    %ignore WS
"""


In [44]:
dsl_parser = Lark(dsl_grammar, start="start", parser="lalr")


In [45]:
import pandas as pd
import numpy as np

def SMA(series, period):
    """Simple Moving Average"""
    return series.rolling(window=period).mean()

def RSI(series, period):
    """Relative Strength Index"""
    delta = series.diff()

    gain = delta.where(delta > 0, 0)
    loss = -delta.where(delta < 0, 0)

    avg_gain = gain.rolling(period).mean()
    avg_loss = loss.rolling(period).mean()

    rs = avg_gain / avg_loss
    rsi = 100 - (100 / (1 + rs))

    return rsi


In [47]:
entry_rule = "Buy when the close price is above the 20-day moving average"
exit_rule  = "Exit when RSI(14) is below 30"

result = run_pipeline(entry_rule, exit_rule, df)


1. NL → JSON
{'entry': [{'left': 'close', 'operator': '>', 'right': 'sma(close,20)'}], 'exit': [{'left': 'rsi(close,14)', 'operator': '<', 'right': 30}]}

2. JSON → DSL
ENTRY: close > sma(close,20)
EXIT: rsi(close,14) < 30

3. DSL → AST
{'entry': [{'type': 'comparison', 'left': 'close', 'operator': '>', 'right': {'type': 'indicator', 'name': 'sma', 'series': 'close', 'period': 20}}], 'exit': [{'type': 'comparison', 'left': {'type': 'indicator', 'name': 'rsi', 'series': 'close', 'period': 14}, 'operator': '<', 'right': Token('NUMBER', '30')}]}

4. AST → Python Code → Signals
            entry   exit
2023-01-01  False  False
2023-01-02  False  False
2023-01-03  False  False
2023-01-04  False  False
2023-01-05  False  False

5. Running Backtest

Final Results:
Total Return (%) = 143.11653691695906
Max Drawdown (%) = -3.1271098515427282
Number of Trades = 1

Trades Log:
{'entry_index': '2023-01-20 00:00:00', 'entry_fill_index': Timestamp('2023-01-21 00:00:00'), 'entry_price': 105.37145600

In [48]:
%%writefile nl_parser.py
import re

# ---------------------------------------------
# BASIC HELPERS
# ---------------------------------------------
def normalize_text(text):
    return re.sub(r"\s+", " ", text.lower()).strip()

def extract_numbers(text):
    text = text.lower()
    nums = re.findall(r"\d+", text)
    nums = [int(n) for n in nums] if nums else []

    if "million" in text and nums:
        nums = [nums[0] * 1_000_000]
    if "thousand" in text and nums:
        nums = [nums[0] * 1000]

    return nums


# ---------------------------------------------
# CROSS CONDITION (price crosses above yesterday’s high)
# ---------------------------------------------
def parse_cross_condition(sentence):
    s = normalize_text(sentence)

    if "crosses above" in s or "cross above" in s:
        left = "close"

        if "yesterday" in s and "high" in s:
            right = "high[1]"
        elif "yesterday" in s and "low" in s:
            right = "low[1]"
        else:
            right = None

        if right:
            return {"left": left, "operator": "crosses_above", "right": right}

    return None


# ---------------------------------------------
# PERCENT INCREASE COMPARED TO LAST WEEK
# "volume increases by more than 30 percent compared to last week"
# ---------------------------------------------
def parse_percent_last_week(sentence):
    s = normalize_text(sentence)

    if "percent" in s and ("increase" in s or "increases" in s):
        nums = re.findall(r"\d+", s)
        if not nums:
            return None

        percent = int(nums[0])

        if "volume" not in s:
            return None

        if "last week" in s:
            base = "volume[7]"
        else:
            return None

        multiplier = 1 + (percent / 100)
        right = f"{base} * {multiplier}"

        return {"left": "volume", "operator": ">", "right": right}

    return None


# ---------------------------------------------
# MAIN CONDITION HANDLER
# ---------------------------------------------
def parse_one_condition(sentence):
    s = normalize_text(sentence)

    # 1) Percent change rule
    rule = parse_percent_last_week(sentence)
    if rule:
        return rule

    # 2) Cross condition
    rule = parse_cross_condition(sentence)
    if rule:
        return rule

    # 3) RSI rule
    if "rsi" in s:
        nums = re.findall(r"\d+", s)
        period = int(nums[0]) if nums else 14
        threshold = int(nums[-1]) if len(nums) >= 2 else 30

        op = "<" if "below" in s else ">" if "above" in s else None

        return {"left": f"rsi(close,{period})", "operator": op, "right": threshold}

    # 4) Basic comparisons (close, volume, SMA)
    left = None
    op = None
    right = None

    if "close" in s or "price" in s:
        left = "close"
    elif "volume" in s:
        left = "volume"

    if "above" in s:
        op = ">"
    elif "below" in s:
        op = "<"
    elif "equal" in s:
        op = "=="

    if "moving average" in s or "ma" in s:
        nums = extract_numbers(s)
        period = nums[0] if nums else 20
        right = f"sma({left},{period})"

    if right is None:
        nums = extract_numbers(s)
        if nums:
            right = nums[0]

    if left and op and right is not None:
        return {"left": left, "operator": op, "right": right}

    return None


# ---------------------------------------------
# ENTRY POINT: NL → JSON RULES
# ---------------------------------------------
def nl_to_json_rules(text):
    t = normalize_text(text)

    is_entry = any(x in t for x in ["buy", "enter", "trigger entry"])
    is_exit = any(x in t for x in ["exit", "sell"])

    entry_rules = []
    exit_rules = []

    parts = re.split(r"\band\b", t)

    for part in parts:
        rule = parse_one_condition(part)
        if rule:
            if is_exit:
                exit_rules.append(rule)
            else:
                entry_rules.append(rule)

    return {"entry": entry_rules, "exit": exit_rules}


Writing nl_parser.py


In [49]:
%%writefile dsl_parser.py
from lark import Lark

dsl_grammar = r"""
    start: entry exit?

    entry: "ENTRY:" expr
    exit: "EXIT:" expr

    ?expr: expr "AND" expr   -> and_op
         | expr "OR" expr    -> or_op
         | condition
         | "(" expr ")"

    ?condition: comparison
              | cross_above
              | cross_below

    comparison: operand OP operand

    ?operand: indicator
            | series
            | NUMBER

    cross_above: series "crosses_above" series
    cross_below: series "crosses_below" series

    series: CNAME ("[" NUMBER "]")?

    indicator: CNAME "(" operand "," NUMBER ")"

    OP: ">" | "<" | ">=" | "<=" | "=="

    %import common.CNAME
    %import common.NUMBER
    %import common.WS
    %ignore WS
"""

# IMPORTANT: use LALR, not Earley
dsl_parser = Lark(dsl_grammar, start="start", parser="lalr")


Writing dsl_parser.py


In [50]:
%%writefile ast_builder.py
from lark import Transformer

class DSLtoAST(Transformer):

    def entry(self, items):
        return ("entry", items[0])

    def exit(self, items):
        return ("exit", items[0])

    def and_op(self, items):
        return {
            "type": "and",
            "left": items[0],
            "right": items[1]
        }

    def or_op(self, items):
        return {
            "type": "or",
            "left": items[0],
            "right": items[1]
        }

    def comparison(self, items):
        return {
            "type": "comparison",
            "left": items[0],
            "operator": items[1].value,
            "right": items[2]
        }

    def cross_above(self, items):
        return {
            "type": "cross",
            "direction": "above",
            "left": items[0],
            "right": items[1]
        }

    def cross_below(self, items):
        return {
            "type": "cross",
            "direction": "below",
            "left": items[0],
            "right": items[1]
        }

    def series(self, items):
        if len(items) == 2:
            return f"{items[0]}[{items[1]}]"
        return items[0].value

    def indicator(self, items):
        return {
            "type": "indicator",
            "name": items[0].value.lower(),
            "series": items[1],
            "period": int(items[2])
        }


def build_final_ast(tree):
    """
    Returns: {"entry":[...], "exit":[...]}
    """
    final_ast = {"entry": [], "exit": []}

    for section, ast in tree:
        if section == "entry":
            final_ast["entry"].append(ast)
        elif section == "exit":
            final_ast["exit"].append(ast)

    return final_ast


Writing ast_builder.py


In [51]:
%%writefile code_generator.py
from lark.lexer import Token

# ============================================================
# Convert AST Node → Pandas Expression String
# ============================================================
def generate_python_expr(node):
    """Convert AST node into a valid pandas-evaluable expression string."""

    # ---------------------------------------------------
    # 1. COMPARISON NODE
    # ---------------------------------------------------
    if node["type"] == "comparison":
        left = node["left"]
        op = node["operator"]
        right = node["right"]

        # Token → int
        if isinstance(right, Token) and right.type == "NUMBER":
            right = int(right.value)

        # LEFT SIDE
        if isinstance(left, dict) and left.get("type") == "indicator":
            left_expr = generate_python_expr(left)

        elif isinstance(left, str):
            if "[" in left:        # e.g. volume[7]
                col, lag = left.split("[")
                lag = lag.replace("]", "")
                left_expr = f"df['{col}'].shift({lag})"
            else:
                left_expr = f"df['{left}']"

        else:
            raise ValueError("Unsupported left operand:", left)

        # RIGHT SIDE
        if isinstance(right, (int, float)):
            right_expr = str(right)

        elif isinstance(right, str):
            if "[" in right:        # e.g. high[1]
                col, lag = right.split("[")
                lag = lag.replace("]", "")
                right_expr = f"df['{col}'].shift({lag})"
            else:
                right_expr = f"df['{right}']"

        elif isinstance(right, dict) and right.get("type") == "indicator":
            right_expr = generate_python_expr(right)

        else:
            raise ValueError("Unsupported right operand:", right)

        return f"({left_expr} {op} {right_expr})"

    # ---------------------------------------------------
    # 2. INDICATOR NODE (SMA, RSI)
    # ---------------------------------------------------
    if node["type"] == "indicator":
        name = node["name"].upper()
        series = node["series"]
        period = node["period"]

        if isinstance(series, dict):   # nested indicator
            series_expr = generate_python_expr(series)

        elif "[" in series:            # e.g. close[5]
            col, lag = series.split("[")
            lag = lag.replace("]", "")
            series_expr = f"df['{col}'].shift({lag})"

        else:
            series_expr = f"df['{series}']"

        return f"{name}({series_expr}, {period})"

    # ---------------------------------------------------
    # 3. CROSS EVENTS (crosses_above / crosses_below)
    # ---------------------------------------------------
    if node["type"] == "cross":
        left = node["left"]
        right = node["right"]

        # Left side
        if isinstance(left, dict):
            left_now = generate_python_expr(left)
            left_prev = f"({left_now}).shift(1)"
        else:
            left_now = f"df['{left}']"
            left_prev = f"df['{left}'].shift(1)"

        # Right side
        if "[" in right:           # high[1]
            col, lag = right.split("[")
            lag = int(lag.replace("]", ""))
            right_now = f"df['{col}'].shift({lag})"
            right_prev = f"df['{col}'].shift({lag+1})"

        else:
            right_now = f"df['{right}']"
            right_prev = f"df['{right}'].shift(1)"

        # CROSS ABOVE
        if node["direction"] == "above":
            return (
                f"((({left_prev}) <= ({right_prev})) "
                f"& (({left_now}) > ({right_now})))"
            )

        # CROSS BELOW
        if node["direction"] == "below":
            return (
                f"((({left_prev}) >= ({right_prev})) "
                f"& (({left_now}) < ({right_now})))"
            )

    # ---------------------------------------------------
    # 4. LOGICAL OPERATORS
    # ---------------------------------------------------
    if node["type"] == "and":
        return f"({generate_python_expr(node['left'])} & {generate_python_expr(node['right'])})"

    if node["type"] == "or":
        return f"({generate_python_expr(node['left'])} | {generate_python_expr(node['right'])})"

    # ---------------------------------------------------
    # UNKNOWN NODE
    # ---------------------------------------------------
    raise ValueError("Unknown AST node:", node)


# ============================================================
# Convert Full AST → Python Function Code
# ============================================================
def ast_to_python_code(final_ast):
    entry_expr = ""
    exit_expr = ""

    if final_ast["entry"]:
        entry_expr = generate_python_expr(final_ast["entry"][0])
    if final_ast["exit"]:
        exit_expr = generate_python_expr(final_ast["exit"][0])

    code = f"""
def run_strategy(df):
    import pandas as pd

    signals = pd.DataFrame(index=df.index)
    signals['entry'] = {entry_expr}
    signals['exit'] = {exit_expr}

    return signals
"""
    return code


Writing code_generator.py


In [52]:
%%writefile indicators.py
import pandas as pd

# -----------------------------------------------------------
# Simple Moving Average
# -----------------------------------------------------------
def SMA(series, period):
    """
    Calculate Simple Moving Average.
    series : pandas.Series (df['close'])
    period : int
    """
    return series.rolling(window=period).mean()


# -----------------------------------------------------------
# Relative Strength Index (RSI)
# -----------------------------------------------------------
def RSI(series, period=14):
    """
    Compute RSI using Wilder's smoothing.
    """
    delta = series.diff()

    gain = delta.where(delta > 0, 0.0)
    loss = -delta.where(delta < 0, 0.0)

    avg_gain = gain.rolling(period).mean()
    avg_loss = loss.rolling(period).mean()

    rs = avg_gain / avg_loss

    rsi = 100 - (100 / (1 + rs))
    return rsi


Writing indicators.py


In [53]:
%%writefile backtest.py
import pandas as pd
import numpy as np

def backtest_signals(df, signals, initial_capital=100000.0, slippage=0.0, commission=0.0):
    """
    Simple backtesting engine that trades based on ENTRY and EXIT signals.

    Args:
        df (DataFrame): OHLCV data with columns ['open','high','low','close','volume']
        signals (DataFrame): Boolean DataFrame with columns ['entry','exit']
        initial_capital (float): Starting cash
        slippage (float): Per-share slippage
        commission (float): Fixed commission per trade

    Returns:
        dict with:
        - trades (list)
        - equity (Series)
        - final_capital
        - total_return_pct
        - max_drawdown_pct
        - num_trades
    """

    assert 'entry' in signals.columns, "signals must include 'entry'"
    assert 'exit' in signals.columns, "signals must include 'exit'"
    assert len(df) == len(signals), "df and signals must have same length"

    signals = signals.reindex(df.index)

    cash = float(initial_capital)
    position = 0.0  # number of shares (fractional allowed)
    entry_price = None

    trades = []
    equity_values = []
    equity_index = []

    idxs = list(df.index)
    n = len(idxs)

    for i, idx in enumerate(idxs):
        row = df.loc[idx]
        close_price = float(row["close"])

        # =============================================================
        # ENTRY
        # =============================================================
        if position == 0 and signals.loc[idx, "entry"]:
            if i + 1 < n:
                fill_idx = idxs[i+1]
                fill_price = float(df.loc[fill_idx]["open"])
            else:
                fill_idx = idx
                fill_price = close_price

            buy_price = fill_price + slippage
            shares = cash / buy_price if buy_price > 0 else 0

            if shares > 0:
                position = shares
                entry_price = buy_price
                cash -= commission  # commission on entry

                trades.append({
                    "entry_index": str(idx),
                    "entry_fill_index": str(fill_idx),
                    "entry_price": float(buy_price),
                    "exit_index": None,
                    "exit_fill_index": None,
                    "exit_price": None,
                    "shares": float(shares),
                    "pnl": None,
                    "return_pct": None
                })

        # =============================================================
        # EXIT
        # =============================================================
        elif position > 0 and signals.loc[idx, "exit"]:
            if i + 1 < n:
                fill_idx = idxs[i+1]
                fill_price = float(df.loc[fill_idx]["open"])
            else:
                fill_idx = idx
                fill_price = close_price

            sell_price = fill_price - slippage

            proceeds = position * sell_price
            cost = position * entry_price

            pnl = proceeds - cost - commission
            return_pct = pnl / cost if cost != 0 else 0

            last_trade = trades[-1]
            last_trade["exit_index"] = str(idx)
            last_trade["exit_fill_index"] = str(fill_idx)
            last_trade["exit_price"] = float(sell_price)
            last_trade["pnl"] = float(pnl)
            last_trade["return_pct"] = float(return_pct) * 100.0

            cash += proceeds
            position = 0
            entry_price = None

        # =============================================================
        # DAILY MARK TO MARKET
        # =============================================================
        mtm_equity = cash + position * close_price
        equity_values.append(mtm_equity)
        equity_index.append(idx)

    # =============================================================
    # FORCE CLOSE at last price if still in position
    # =============================================================
    if position > 0:
        last_idx = idxs[-1]
        last_close = float(df.loc[last_idx, "close"])

        sell_price = last_close - slippage
        proceeds = position * sell_price
        cost = trades[-1]["shares"] * trades[-1]["entry_price"]

        pnl = proceeds - cost - commission
        return_pct = pnl / cost if cost != 0 else 0

        last_trade = trades[-1]
        last_trade["exit_index"] = str(last_idx)
        last_trade["exit_fill_index"] = str(last_idx)
        last_trade["exit_price"] = float(sell_price)
        last_trade["pnl"] = float(pnl)
        last_trade["return_pct"] = float(return_pct) * 100.0

        cash += proceeds
        position = 0

        equity_values[-1] = cash

    # =============================================================
    # EQUITY CURVE & METRICS
    # =============================================================
    equity = pd.Series(equity_values, index=equity_index)

    final_capital = float(equity.iloc[-1])
    total_return_pct = ((final_capital - initial_capital) / initial_capital) * 100.0

    roll_max = equity.cummax()
    drawdown = (equity - roll_max) / roll_max
    max_drawdown_pct = float(drawdown.min() * 100.0)

    results = {
        "trades": trades,
        "equity": equity,
        "final_capital": final_capital,
        "total_return_pct": total_return_pct,
        "max_drawdown_pct": max_drawdown_pct,
        "num_trades": len(trades)
    }

    return results


Writing backtest.py


In [54]:
%%writefile demo.py
import pandas as pd

# -----------------------------------------
# IMPORTING MODULES FROM PROJECT FILES
# -----------------------------------------
from nl_parser import nl_to_json_rules
from dsl_parser import dsl_parser
from ast_builder import DSLtoAST, build_final_ast
from code_generator import generate_python_expr
from indicators import SMA, RSI
from backtest import backtest_signals


# ---------------------------------------------------
# JSON → DSL (Simple converter)
# ---------------------------------------------------
def json_to_dsl(json_rules):
    entry_rules = []
    exit_rules = []

    for rule in json_rules["entry"]:
        entry_rules.append(f"{rule['left']} {rule['operator']} {rule['right']}")

    for rule in json_rules["exit"]:
        exit_rules.append(f"{rule['left']} {rule['operator']} {rule['right']}")

    dsl = ""
    if entry_rules:
        dsl += "ENTRY: " + " AND ".join(entry_rules) + "\n"
    if exit_rules:
        dsl += "EXIT: " + " AND ".join(exit_rules)

    return dsl.strip()


# ---------------------------------------------------
# DSL → AST
# ---------------------------------------------------
def parse_dsl_to_ast(dsl_text):
    tree = dsl_parser.parse(dsl_text)
    transformer = DSLtoAST()
    parsed = transformer.transform(tree)

    final_ast = {"entry": [], "exit": []}

    for node in parsed:
        section, ast = node
        final_ast[section].append(ast)

    return final_ast


# ---------------------------------------------------
# AST → Signals (eval python expressions)
# ---------------------------------------------------
def ast_to_signals(df, ast):
    signals = pd.DataFrame(index=df.index)
    signals["entry"] = False
    signals["exit"] = False

    if ast["entry"]:
        expr = generate_python_expr(ast["entry"][0])
        signals["entry"] = eval(expr)

    if ast["exit"]:
        expr = generate_python_expr(ast["exit"][0])
        signals["exit"] = eval(expr)

    return signals.fillna(False)


# ---------------------------------------------------
# END-TO-END PIPELINE
# ---------------------------------------------------
def run_pipeline(entry_nl, exit_nl, df):
    print("\n========================")
    print("1. NL → JSON")
    print("========================")
    entry_json = nl_to_json_rules(entry_nl)
    exit_json = nl_to_json_rules(exit_nl)

    combined_json = {
        "entry": entry_json["entry"],
        "exit": exit_json["exit"]
    }
    print(combined_json)

    print("\n========================")
    print("2. JSON → DSL")
    print("========================")
    dsl = json_to_dsl(combined_json)
    print(dsl)

    print("\n========================")
    print("3. DSL → AST")
    print("========================")
    ast = parse_dsl_to_ast(dsl)
    print(ast)

    print("\n========================")
    print("4. AST → Signals")
    print("========================")
    signals = ast_to_signals(df, ast)
    print(signals.head())

    print("\n========================")
    print("5. Backtest")
    print("========================")
    result = backtest_signals(df, signals)

    print("\nFinal Backtest Result")
    print("Total Return (%) =", result["total_return_pct"])
    print("Max Drawdown (%) =", result["max_drawdown_pct"])
    print("Number of Trades =", result["num_trades"])

    print("\nTrades Log:")
    for t in result["trades"]:
        print(t)

    return result


# ---------------------------------------------------
# SAMPLE HOW TO RUN (Uncomment to test)
# ---------------------------------------------------
"""
df = pd.read_csv("ohlcv_sample.csv")

entry_rule = "Buy when the close price is above the 20-day moving average"
exit_rule  = "Exit when RSI(14) is below 30"

run_pipeline(entry_rule, exit_rule, df)
"""


Writing demo.py


In [55]:
!zip -r project.zip /



zip error: Nothing to do! (try: zip -r project.zip . -i /content/my_project_folder)
