# OptCache Report Builder

Этот скрипт загружает данные оптимизации из CSV файлов и выводит топ 5 результатов по убыванию custom_fitness и profit.

In [400]:

filter_rule = {
    'fields': {
        'profit': {
            'enabled': True,
            'range': (0.01, float('inf')),
            'color_ranges': []
        },
        'trades_per_30_days': {
            'enabled': True,
            'range': (0.5, float('inf')),
            'color_ranges': []
        },
        'profit_per_30_days_percent': {
            'enabled': False,
            'range': (0.005, float('inf')),
            'color_ranges': [
                {'range': (-float('inf'), 0.010), 'color': 'pink'},
                {'range': (0.010, 0.020), 'color': 'moccasin'},
                {'range': (0.020, float('inf')), 'color': 'green'}
            ]
        },
        'win_rate': {
            'enabled': False,
            'range': (0.0, float('inf')),
            'color_ranges': [
                {'range': (50.0, float('inf')), 'color': 'lightgreen'}
            ]
        },
        'custom_fitness': {
            'enabled': True,
            'range': (0.75, float('inf')),
            'color_ranges': [
                {'range': (0.00, 0.75), 'color': 'pink'},
                {'range': (0.76, 0.90), 'color': 'moccasin'},
                {'range': (0.90, float('inf')), 'color': 'lightgreen'}
            ]
        },
        'reldrawdownpercnt_e': {
            'enabled': True,
            'range': (0, 15),
            'color_ranges': [
                {'range': (0.00, 5.0),  'color': 'lightgreen'},
                {'range': (5.00, 15.0), 'color': 'moccasin'},
                {'range': (15.0, float('inf')), 'color': 'pink'}
            ]
        },
    },
    'max_runs_count': 5,
    'sort_by': ['profit', 'custom_fitness', 'reldrawdownpercnt_e'],
    'sort_dir': [False, False, True]
}

In [401]:
import pandas as pd
import numpy as np
from pathlib import Path
import re

In [None]:
def LoadHeaderFromFileToDF(optcache_filename: str) -> pd.DataFrame:
    header_file = Path(f'{optcache_filename}.Header.csv')

    if not header_file.exists():
        print(f"File {header_file} do not exist")
        return pd.DataFrame()

    header_df = pd.read_csv(header_file, sep=';', encoding='utf-16')
    header_df['days'] = (pd.to_datetime(header_df['date_to']) - pd.to_datetime(header_df['date_from'])).dt.days
    header_df['filename'] = optcache_filename

    return header_df

def ClassifySymbol(symbol: str) -> str:
    """Classify financial symbol into category using local dictionaries and patterns"""
    
    symbol_upper = symbol.upper()
    
    # Known commodities
    commodities = {
        'GOLD', 'SILVER', 'COPPER', 'ALUMINIUM', 'ALUMINUM', 'PALLADIUM', 'PLATINUM',
        'BRENT', 'WTI', 'CRUDE', 'NGAS', 'NATGAS',
        'WHEAT', 'CORN', 'SOYBEAN', 'SUGAR', 'COFFEE', 'COCOA', 'COTTON',
        'XAUUSD', 'XAGUSD', 'XTIUSD'
    }
    
    # Known crypto prefixes
    crypto_prefixes = {'BTC', 'ETH', 'LTC', 'XRP', 'BCH', 'ADA', 'DOT', 'LINK', 'XLM', 'DOGE', 'MATIC', 'SOL', 'AVAX'}
    
    # Check exact match for commodities
    if symbol_upper in commodities:
        return 'Commodities'
    
    # Check crypto (BTCUSD, ETHUSD, etc)
    for prefix in crypto_prefixes:
        if symbol_upper.startswith(prefix):
            return 'Crypto'
    
    # Check indices (contain numbers like 50, 100, 500, 2000, etc)
    indices_keywords = ['SP', 'NAS', 'DOW', 'DAX', 'FTSE', 'NIKKEI', 'CHINA50', 'RUSSELL', 
                        'STOXX50', 'DXY', 'HK50', 'UK100', 'US500', 'USTEC']
    if symbol_upper in indices_keywords:
        return 'Indices'
    
    # Check Forex (6 chars, 3 currency codes)
    if len(symbol_upper) == 6 and symbol_upper.isalpha():
        currency_codes = {'USD', 'EUR', 'GBP', 'JPY', 'AUD', 'NZD', 'CAD', 'CHF', 'CNY', 'HKD', 'SGD'}
        first_part = symbol_upper[:3]
        second_part = symbol_upper[3:]
        if first_part in currency_codes and second_part in currency_codes:
            return 'Forex'
    
    # If nothing matched, return Other
    return 'Other'

def GroupSymbolsByCategory(symbols: list) -> dict:
    """Group symbols by their categories"""
    
    categories = {
        'Commodities': [],
        'Indices': [],
        'Crypto': [],
        'Stocks': [],
        'Forex': [],
        'Other': []
    }
    
    for symbol in symbols:
        category = ClassifySymbol(symbol)
        categories[category].append(symbol)
    
    # Remove empty categories and sort symbols within each category
    return {cat: sorted(syms) for cat, syms in categories.items() if syms}

def BuildHeaderReport(header_df: pd.DataFrame, data_df: pd.DataFrame) -> list[str]:
    if header_df.empty:
        return []

    row = header_df.iloc[0]

    # Format dates as YYYY-MM-DD
    date_from = pd.to_datetime(row["date_from"]).strftime("%Y-%m-%d")
    date_to = pd.to_datetime(row["date_to"]).strftime("%Y-%m-%d")

    symbols = list(map(str, pd.unique(data_df['symbol'])))
    grouped_symbols = GroupSymbolsByCategory(symbols)
    
    report = [
        f'## Optimization Results for {header_df["expert_name"].iloc[0]} / {len(symbols)} sym',
        f'**Bot:** {row["expert_name"]}',
        f'**Server:** {row["server"]}',
        f'**Initial Deposit:** {row["trade_deposit"]}',
        f'**Leverage:** {row["trade_leverage"]}',
        f'**Time Interval:** [{date_from}; {date_to}) - {row["days"]} days'
    ]
    
    # Add symbols grouped by category
    for category, syms in grouped_symbols.items():
        report.append(f'**{category}:** {", ".join(syms)}')

    return report    

def BuildReportDF(optcache_filename: str, filter: dict) -> pd.DataFrame: 

    header_df = LoadHeaderFromFileToDF(optcache_filename)
    if header_df.empty:
        return pd.DataFrame()

    data_file = Path(f'{optcache_filename}.Data.csv')
    if not data_file.exists():
        print(f"File {data_file} do not exist")
        return pd.DataFrame()

    # add fields to data
    data_df = pd.read_csv(data_file, sep=';', encoding='utf-16')
    data_df['win_rate'] = data_df['profittrades'] / data_df['trades']
    data_df['profit_per_30_days'] = data_df['profit'] / header_df['days'].iloc[0] * 30
    data_df['profit_per_365_days'] = data_df['profit'] / header_df['days'].iloc[0] * 365
    data_df['profit_per_30_days_percent'] = data_df['profit'] / header_df['days'].iloc[0] * 30 / data_df['initial_deposit']
    data_df['profit_per_365_days_percent'] = data_df['profit'] / header_df['days'].iloc[0] * 365 / data_df['initial_deposit']
    data_df['trades_per_30_days'] = data_df['trades'] / header_df['days'].iloc[0] * 30
    
    data_df['filename'] = optcache_filename
    data_df['symbol'] = header_df['symbol'].iloc[0]
    data_df['expert_name'] = header_df['expert_name'].iloc[0]
    data_df['date_from'] = header_df['date_from'].iloc[0]
    data_df['date_to'] = header_df['date_to'].iloc[0]
    data_df['days'] = (pd.to_datetime(header_df['date_to'].iloc[0]) - pd.to_datetime(header_df['date_from'].iloc[0])).days
    data_df['months'] = (pd.to_datetime(header_df['date_to'].iloc[0]) - pd.to_datetime(header_df['date_from'].iloc[0])).days // 30
    data_df['period'] = header_df['period'].iloc[0]
    data_df['trade_deposit'] = header_df['trade_deposit'].iloc[0]
    data_df['trade_currency'] = header_df['trade_currency'].iloc[0]
    data_df['trade_leverage'] = header_df['trade_leverage'].iloc[0]
    data_df['server'] = header_df['server'].iloc[0]
    data_df['ticks_mode'] = header_df['ticks_mode'].iloc[0]
    
    data_df.fillna(0, inplace=True)

    # filter data
    report_data = data_df.copy()
    for column, rule in filter.get('fields', {}).items():
        if not rule.get('enabled', True):
            continue
        if not rule.get('range'):
            continue
        min_val, max_val = rule['range']
        report_data = report_data[(report_data[column] >= min_val) & (report_data[column] <= max_val)]

    report_data = report_data.sort_values(filter_rule.get('sort_by', []), ascending=filter_rule.get("sort_dir", []))
    if 'max_runs_count' in filter:
        report_data = report_data.head(filter.get('max_runs_count'))

    return report_data

def CreateSetFile(optcache_filename: str, pass_number: int, output_dir: str = None, record_num: int = 1, total_records: int = 1) -> str:
    """
    Create a .set file from optimization data for a specific pass.
    
    Args:
        optcache_filename: Path to the .opt file
        pass_number: Pass number to extract parameters from
        output_dir: Output directory for .set file (default: same as opt file)
        record_num: Record number in the report (for filename prefix)
        total_records: Total number of records (to determine prefix width)
    
    Returns:
        Path to created .set file or empty string if failed
    """
    
    # Load header to get expert name and symbol
    header_df = LoadHeaderFromFileToDF(optcache_filename)
    if header_df.empty:
        print(f"Cannot load header from {optcache_filename}")
        return ""
    
    expert_name = header_df['expert_name'].iloc[0]
    symbol = header_df['symbol'].iloc[0]
    
    # Load data to find the specific pass
    data_file = Path(f'{optcache_filename}.Data.csv')
    if not data_file.exists():
        print(f"File {data_file} does not exist")
        return ""
    
    data_df = pd.read_csv(data_file, sep=';', encoding='utf-16')
    
    # Find the pass
    pass_data = data_df[data_df['Pass'] == pass_number]
    if pass_data.empty:
        print(f"Pass {pass_number} not found in {data_file}")
        return ""
    
    # Get parameter columns - all columns after 'avgconloosers'
    all_columns = list(data_df.columns)
    try:
        # Find the index of the last standard MetaTrader field
        last_standard_field_idx = all_columns.index('avgconloosers')
        # All columns after this are optimization parameters
        param_columns = all_columns[last_standard_field_idx + 1:]
    except ValueError:
        # If 'avgconloosers' not found, fall back to 'Inp_' prefix
        param_columns = [col for col in all_columns if col.startswith('Inp_')]
    
    if not param_columns:
        print(f"No input parameters found (columns after 'avgconloosers')")
        return ""
    
    # Create .set file content
    set_content = [
        "; saved automatically on generation",
        "; this file contains input parameters for testing/optimizing expert advisor",
        ";"
    ]
    
    # Add each parameter
    pass_row = pass_data.iloc[0]
    for param in param_columns:
        value = pass_row[param]
        
        # Determine parameter type and format value
        if isinstance(value, (int, np.integer)):
            set_content.append(f"{param}={int(value)}")
        elif isinstance(value, (float, np.floating)):
            set_content.append(f"{param}={value}")
        else:
            set_content.append(f"{param}={value}")
    
    # Determine output path
    if output_dir is None:
        output_dir = Path(optcache_filename).parent
    else:
        output_dir = Path(output_dir)
    
    output_dir.mkdir(parents=True, exist_ok=True)
    
    # Calculate prefix width based on total records
    prefix_width = len(str(total_records))
    
    # Create .set filename with zero-padded prefix
    num_prefix = str(record_num).zfill(prefix_width)
    set_filename = output_dir / f"{num_prefix}_{expert_name}.{symbol}.{pass_number}.set"
    
    # Write .set file
    try:
        with open(set_filename, 'w', encoding='utf-8') as f:
            f.write('\n'.join(set_content))
        return str(set_filename)
    except Exception as e:
        print(f"Error writing .set file: {e}")
        return ""

In [403]:
def BuildReportRowsMD(header_df: pd.DataFrame, data_df: pd.DataFrame, filter: dict) -> list:
    """Build markdown report with dynamic columns based on varying values"""
    
    def colorize(field_name, value, text):
        rule = filter.get('fields', {}).get(field_name, {})

        if not rule:
            return text

        color_ranges = rule.get('color_ranges', [])
        if not color_ranges:
            return text

        for cr in color_ranges:
            if value >= cr['range'][0] and value < cr['range'][1]:
                return f'<span style="background-color:{cr["color"]};">{text}</span>'
        return text

    # Define all columns in desired order with their properties
    # 'optional': True means column will be shown only if values vary
    all_columns = [
        {'field': '#', 'title': '#', 'optional': False},
        {'field': 'symbol', 'title': 'Symbol', 'optional': False},
        {'field': 'expert_name', 'title': 'Expert', 'optional': True},
        {'field': 'period', 'title': 'TF', 'optional': True},
        {'field': 'server', 'title': 'Server', 'optional': True},
        {'field': 'trade_deposit', 'title': 'Deposit', 'optional': True},
        {'field': 'ticks_mode', 'title': 'Ticks', 'optional': True},
        {'field': 'pass_num', 'title': 'Pass', 'optional': False},
        {'field': 'profit', 'title': 'Profit', 'optional': False},
        {'field': 'profit_per_30_days_percent', 'title': "<span title='Profit per 30 days, %'>30d,%</span>", 'optional': False},
        {'field': 'win_rate', 'title': "<span title='Win Rate'>WR</span>", 'optional': False},
        {'field': 'custom_fitness', 'title': "<span title='Custom Criterion'>CC</span>", 'optional': False},
        {'field': 'trades', 'title': 'Trades', 'optional': False},
        {'field': 'trades_per_30_days', 'title': "<span title='Trades per 30 days'>30d</span>", 'optional': False},
        {'field': 'reldrawdownpercnt_e', 'title': "<span title='Equity Drawdown, %'>DD</span>", 'optional': False},
        {'field': 'profit_factor', 'title': "<span title='Profit Factor'>PF</span>", 'optional': False},
        {'field': 'recovery_factor', 'title': "<span title='Recovery Factor'>RF</span>", 'optional': False},
        {'field': 'sharpe_ratio', 'title': 'Sharpe', 'optional': False},
        {'field': 'rank', 'title': 'Rank', 'optional': False},
        {'field': 'comment', 'title': 'Comment', 'optional': False},
    ]
    
    # Determine which columns to show
    columns_to_show = []
    for col in all_columns:
        if col['optional']:
            # Show only if field exists and has varying values
            if col['field'] in data_df.columns and data_df[col['field']].nunique() > 1:
                columns_to_show.append(col)
        else:
            columns_to_show.append(col)
    
    # Build dynamic header
    header_parts = []
    separator_parts = []
    
    for col in columns_to_show:
        header_parts.append(col['title'])
        separator_parts.append('-')
    
    data_report = [
        '| ' + ' | '.join(header_parts) + ' |',
        '| ' + ' | '.join(separator_parts) + ' |'
    ]
    
    # Build dynamic row template parts
    row_parts = []
    for col in columns_to_show:
        row_parts.append('{' + col['field'] + '}')
    
    row_template = '| ' + ' | '.join(row_parts) + ' |'
    
    data_df = data_df.sort_values(filter.get('sort_by', []), ascending=filter.get("sort_dir", []))
    for i, (_, data_row) in enumerate(data_df.iterrows(), 1):
        row_data = {
            '#': int(i),
            'symbol': f"<span title='{data_row['filename']}'>[{data_row['symbol']}]({data_row['filename']})</span>",
            'expert_name': str(data_row['expert_name']),
            'period': str(data_row['period']),
            'server': str(data_row['server']),
            'trade_deposit': f"{data_row['trade_deposit']:.0f}",
            'ticks_mode': str(data_row['ticks_mode']),
            'pass_num': int(data_row['Pass']),
            'profit': colorize('profit', data_row['profit'], f"{data_row['profit']:,.0f}".replace(',', ' ')),
            'profit_per_30_days_percent': colorize('profit_per_30_days_percent', data_row['profit_per_30_days_percent'],
                                                round(data_row['profit_per_30_days_percent'] * 100, 1)),
            'win_rate': round(data_row['win_rate'] * 100, 1),
            'custom_fitness': colorize('custom_fitness', data_row['custom_fitness'], round(data_row['custom_fitness'], 2)),
            'trades': int(data_row['trades']),
            'trades_per_30_days': round(data_row['trades_per_30_days'], 1),
            'reldrawdownpercnt_e': colorize('reldrawdownpercnt_e', data_row['reldrawdownpercnt_e'],
                                        round(data_row['reldrawdownpercnt_e'], 2)),
            'profit_factor': round(data_row['profit_factor'], 2),
            'recovery_factor': round(data_row['recovery_factor'], 2),
            'sharpe_ratio': round(data_row['sharpe_ratio'], 2),
            'rank': '☆☆☆☆☆',
            'comment': ''
        }
        
        data_report.append(row_template.format(**row_data))

    return data_report

def BuildReportRowsCSV(data_df: pd.DataFrame, header_df: pd.DataFrame, filter: dict) -> list:
    """Build CSV report with all fields"""
    
    data_report_csv = ["num;filename;symbol;period;date_from;date_to;deposit;leverage;server;ticks_mode;pass_num;profit;profit_per_30_days_percent;profit_per_365_days_percent;win_rate;custom_fitness;trades;trades_per_30_days;reldrawdownpercnt_e;profit_factor;recovery_factor;sharpe_ratio"]
    row_template_csv = "{i};{filename};{symbol};{period};{date_from};{date_to};{deposit};{leverage};{server};{ticks_mode};{pass_num};{profit};{profit_per_30_days_percent};{profit_per_365_days_percent};{win_rate};{custom_fitness};{trades};{trades_per_30_days};{reldrawdownpercnt_e};{profit_factor};{recovery_factor};{sharpe_ratio}"

    data_df = data_df.sort_values(filter.get('sort_by', []), ascending=filter.get("sort_dir", []))
    for i, (_, data_row) in enumerate(data_df.iterrows(), 1):
        data_report_csv.append(row_template_csv.format(
            i=int(i),
            filename=data_row['filename'],
            symbol=data_row['symbol'],
            period=data_row['period'],
            date_from=data_row['date_from'],
            date_to=data_row['date_to'],
            deposit=data_row['trade_deposit'],
            leverage=data_row['trade_leverage'],
            server=data_row['server'],
            ticks_mode=data_row['ticks_mode'],
            
            pass_num=int(data_row['Pass']),
            profit=data_row['profit'],
            profit_per_30_days_percent=round(data_row['profit_per_30_days_percent'] * 100, 1),
            profit_per_365_days_percent=round(data_row['profit_per_365_days_percent'] * 100, 1),
            win_rate=round(data_row['win_rate'] * 100, 1),
            custom_fitness=round(data_row['custom_fitness'], 2),
            trades=int(data_row['trades']),
            trades_per_30_days=round(data_row['trades_per_30_days'], 1),
            reldrawdownpercnt_e=round(data_row['reldrawdownpercnt_e'], 2),
            profit_factor=round(data_row['profit_factor'], 2),
            recovery_factor=round(data_row['recovery_factor'], 2),
            sharpe_ratio=round(data_row['sharpe_ratio'], 2)
        ))
    
    return data_report_csv

## 2. Сортировка и вывод топ 5 результатов

In [404]:
dir = Path('csv')
set_output_dir = Path('set_files')

full_df = pd.DataFrame()

report_md = []
report_cvs = ["num;filename;symbol;period;date_from;date_to;deposit;leverage;server;ticks_mode;pass_num;profit;profit_per_30_days_percent;profit_per_365_days_percent;win_rate;custom_fitness;trades;trades_per_30_days;reldrawdownpercnt_e;profit_factor;recovery_factor;sharpe_ratio"]

opt_files = sorted(dir.glob("*.opt"))
if not opt_files:
    print(f"No .opt files found in {dir}")

# build combined dataframe from all .opt files
full_df = pd.DataFrame()
for opt_file in opt_files:
    opt_df = BuildReportDF(opt_file, filter_rule)
    if not opt_df.empty:
        full_df = pd.concat([full_df, opt_df], ignore_index=True)

# load header from the first .opt file
header_df = LoadHeaderFromFileToDF(opt_files[0])

# Get report parameters for filename
expert_name = header_df['expert_name'].iloc[0] if not header_df.empty else 'Unknown'
server = header_df['server'].iloc[0] if not header_df.empty else 'Unknown'
symbols_count = full_df['symbol'].nunique() if not full_df.empty else 0

# Generate report filename
report_filename = f"{expert_name}_{server}_{symbols_count}sym.md"

# build MD-report
report_header = BuildHeaderReport(header_df, full_df)
report_data = BuildReportRowsMD(header_df, full_df, filter_rule)
report_md = report_header + [''] + report_data

# Save MD report
with open(report_filename, "w", encoding="utf-8") as f:
    for line in report_md:
        f.write(line + "\n")

print(f"MD report saved: {report_filename}")

# Create .set files for all records in the report
if not full_df.empty:
    set_output_dir.mkdir(parents=True, exist_ok=True)
    
    # Sort full_df the same way as in the report
    sorted_df = full_df.sort_values(filter_rule.get('sort_by', []), ascending=filter_rule.get("sort_dir", []))
    
    total_records = len(sorted_df)
    print(f"\nCreating .set files in {set_output_dir}/...")
    created_count = 0
    
    for idx, row in sorted_df.iterrows():
        opt_filename = row['filename']
        pass_num = int(row['Pass'])
        record_num = created_count + 1  # 1-based numbering
        
        set_file = CreateSetFile(
            opt_filename, 
            pass_num, 
            output_dir=str(set_output_dir),
            record_num=record_num,
            total_records=total_records
        )
        if set_file:
            created_count += 1
    
    print(f"Created {created_count} .set files")

# report_csv = BuildReportRowsCSV(full_df, header_df, filter_rule)
# with open("optcache_report.csv", "w", encoding="utf-8") as f:
#     for line in report_csv:
#         f.write(line + "\n")

MD report saved: DSRBR-MT5-Bot_Tickmill-Live_21sym.md

Creating .set files in set_files/...
Created 80 .set files
Created 80 .set files
