In [None]:
!pip install pandas plotly dash

In [1]:
import os
import pandas as pd
import plotly.express as px
from datetime import datetime, timedelta
import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import plotly.graph_objects as go
import re

# Time based Strategy

The time based strategy is a simple strategy that buys and holds the asset for a fixed period of time.

Change the variables below.

In [31]:
# Rug Farm Name, same as folder
FARM_NAME = "98x"
# Drop % considered as a rug
PRICE_DROP_THRESHOLD = 0.4
# Holding period in seconds
HOLDING_PERIOD = 6 * 60
# Expected delay in seconds for a buy order to be executed
BUY_DELAY = 10  
# Expected delay in seconds for a sell order to be executed
SELL_DELAY = 10 

# Returns the time when the price dropped below the threshold
def get_drop_time(df, initial_price):
    for index, row in df.iterrows():
        if float(row['close']) <= initial_price * PRICE_DROP_THRESHOLD:
            return row['Time']
    return None

def has_been_rugged(data):
    # Calculate drop time and sell time
    drop_time = get_drop_time(data, data['close'][0])
    launch_time = data.iloc[0]['Time']
    buy_time = launch_time + pd.Timedelta(seconds=BUY_DELAY)
    sell_time = buy_time + pd.Timedelta(seconds=HOLDING_PERIOD + SELL_DELAY)

    return sell_time > drop_time


### Calculate backtesting results

In [32]:
# Function to process a single CSV file


def process_file(file_path, holding_period, buy_delay, sell_delay):
    # Load data
    data = pd.read_csv(file_path)

    data['Time'] = pd.to_datetime(data['Time'], unit='s', origin='unix', errors='coerce')
    data.dropna(subset=['Time'], inplace=True)  # Drop rows where Time is NaN

    data['open'] = pd.to_numeric(data['open'], errors='coerce')
    data['close'] = pd.to_numeric(data['close'], errors='coerce')

    # Drop rows with invalid data
    data.dropna(inplace=True)

    # Assume first row is token launch time
    launch_time = data.iloc[0]['Time']

    # Calculate buy and sell times
    buy_time = launch_time + timedelta(seconds=buy_delay)
    sell_time = buy_time + timedelta(seconds=holding_period + sell_delay)

    # Determine if bot got rugged
    rugged = has_been_rugged(data)

    # Get buy and sell prices
    try:
        buy_price_row = data[data['Time'] >= buy_time].iloc[0]
        buy_price = buy_price_row['close']
    except IndexError:
        print(f"Buy time not available for {file_path}")
        return None
    
    if(rugged):
        sell_price = data['close'].iloc[-1]
    else:
        sell_price_row = data[data['Time'] >= sell_time].iloc[0]
        sell_price = sell_price_row['close']

    return_percentage = ((sell_price - buy_price) / buy_price) * 100

    return {
        'token': os.path.splitext(os.path.basename(file_path))[0],
        'buy_price': buy_price,
        'sell_price': sell_price,
        'return_percentage': return_percentage,
        'rugged': rugged,
    }

# Function to process all files in a folder
def backtest_strategy(folder_path, holding_period, buy_delay, sell_delay):
    results = []

    for file_name in os.listdir(folder_path):
        if file_name.endswith('.csv'):
            file_path = os.path.join(folder_path, file_name)
            try:
                result = process_file(file_path, holding_period, buy_delay, sell_delay)
                if result:
                    results.append(result)
            except Exception as e:
                print(f"Error processing {file_name}: {e}")

    return pd.DataFrame(results)


price_data_folder = FARM_NAME + "/price_data" if FARM_NAME else "output/price_data"

results_df = backtest_strategy(price_data_folder, HOLDING_PERIOD, BUY_DELAY, SELL_DELAY)

output_folder = FARM_NAME if FARM_NAME else "output"
os.makedirs(output_folder, exist_ok=True)
results_file = os.path.join(output_folder, "backtest_results.csv")
results_df.to_csv(results_file, index=False)

# Calculate overall metrics
if not results_df.empty:
    total_return = results_df['return_percentage'].sum()
    times_rugged = results_df['rugged'].sum() if 'rugged' in results_df.columns else 0

    print(f"Total return percentage: {total_return:.2f}%")
    print(f"Times rugged: {times_rugged}")
else:
    total_return = 0
    times_rugged = 0
    print("No valid data to process.")

# --------------------------------
# Append results to a Markdown file
# --------------------------------

md_file_path = os.path.join(output_folder, "backtest_results.md")

# 1) Determine the "Backtest #"
#    We'll scan the file for existing backtests and increment the count.
if os.path.exists(md_file_path):
    with open(md_file_path, "r", encoding="utf-8") as f:
        content = f.read()
    # Look for lines like "## Backtest 1"
    matches = re.findall(r'## Backtest (\d+)', content)
    if matches:
        last_number = int(matches[-1])
        backtest_number = last_number + 1
    else:
        backtest_number = 1
else:
    backtest_number = 1

# 2) Format the Markdown block
#    Adjust or add/remove parameters as you like:
markdown_block = f"""## Backtest {backtest_number}

### Parameters
- Price drop threshold: {PRICE_DROP_THRESHOLD}
- Holding period: {HOLDING_PERIOD/60} minutes
- Buy delay: {BUY_DELAY} seconds
- Sell delay: {SELL_DELAY} seconds

### Results
Times rugged: {times_rugged}

Return: {total_return:.2f}%

---

"""

# 3) Append to the Markdown file
with open(md_file_path, "a", encoding="utf-8") as md_file:
    # If it's the first time (fresh file), you may want a title at the top:
    if backtest_number == 1:
        md_file.write("# Backtesting results\n\n")
    md_file.write(markdown_block)

print(f"Results appended to {md_file_path}")

Total return percentage: -911.84%
Times rugged: 34
Results appended to 98x/backtest_results.md


### Visualize backtesting
Here you can view each token 

In [33]:
# Define the folder containing the CSV files
price_data_folder = FARM_NAME + "/price_data" if FARM_NAME else "output/price_data"


# Read and process all CSV files in the folder
all_data = {}
for file_name in os.listdir(price_data_folder):
    if file_name.endswith(".csv"):
        file_path = os.path.join(price_data_folder, file_name)

        # Read the CSV file
        data = pd.read_csv(file_path)

        # Cut the data to the holding period
        if not data.empty and 'close' in data.columns and 'Time' in data.columns:
            drop_time = get_drop_time(data, data['close'].iloc[0])
            if drop_time > data["Time"].iloc[0] + BUY_DELAY + HOLDING_PERIOD:
                data = data[data['Time'] < (drop_time + 60)]
            else:
                data = data[data['Time'] < (data["Time"].iloc[0] + BUY_DELAY + HOLDING_PERIOD + SELL_DELAY + 60)]

        data['Time'] = pd.to_datetime(data['Time'], unit='s', origin='unix', errors='coerce', utc=True)

        # Store the data with the file name (without extension) as the key
        all_data[os.path.splitext(file_name)[0]] = data

# Create a Dash app
app = dash.Dash(__name__)
app.title = "Interactive Candlestick Charts"

# Layout of the app
app.layout = html.Div([
    dcc.Dropdown(
        id="file-dropdown",
        options=[
            {
                "label": html.Span(
                    name + " (rugged)" if has_been_rugged(all_data[name]) else name,
                    style={"color": "red"} if has_been_rugged(all_data[name]) else {"color": "green"}
                ),
                "value": name
            } for name in all_data.keys()
        ],
        multi=False,
        placeholder="Select a file",
    ),
    dcc.Graph(id="candlestick-graph")
])


# Callback to update the graph based on the selected file
@app.callback(
    Output("candlestick-graph", "figure"),
    [Input("file-dropdown", "value")]
)
def update_graph(selected_file):
    if not selected_file:
        return go.Figure(layout={"title": "Select a file to display"})

    data = all_data[selected_file]

    # Calculate buy and sell times
    launch_time = data.iloc[0]['Time']
    buy_time = launch_time + pd.Timedelta(seconds=BUY_DELAY)
    sell_time = buy_time + pd.Timedelta(seconds=HOLDING_PERIOD + SELL_DELAY)

    # Calculate buy and sell prices
    buy_price = data.loc[data['Time'] == buy_time, 'close'].values[0]
    if sell_time > data['Time'].iloc[-1]:
        sell_time = data['Time'].iloc[-1]
    sell_price = data.loc[data['Time'] == sell_time, 'close'].values[0]

    # Calculate profit percentage
    profit_percentage = ((sell_price - buy_price) / buy_price) * 100

    # Determine text and color based on profit/loss
    if profit_percentage > 0:
        subtitle_text = f"Profit: +{profit_percentage:.2f}%"
        subtitle_color = "green"
    else:
        subtitle_text = f"Loss: {profit_percentage:.2f}%"
        subtitle_color = "red"

    figure = go.Figure(data=[go.Candlestick(
        x=data["Time"],
        open=data["open"],
        high=data["high"],
        low=data["low"],
        close=data["close"],
        name="Candlesticks"
    )])

    # Add vertical lines for buy and sell times using add_shape
    figure.add_shape(type="line",x0=buy_time,y0=0,x1=buy_time,y1=1,xref="x",yref="paper",line=dict(color="green", width=2, dash="dash"),name="Buy Time")
    figure.add_shape(type="line",x0=sell_time,y0=0,x1=sell_time,y1=1,xref="x",yref="paper",line=dict(color="red", width=2, dash="dash"),name="Sell Time")

    figure.add_annotation(x=buy_time, y=1, xref="x", yref="paper",text="Buy Time", showarrow=True, arrowhead=2, ax=0, ay=-20)
    figure.add_annotation(x=sell_time, y=1, xref="x", yref="paper",text="Sell Time", showarrow=True, arrowhead=2, ax=0, ay=-20)

    figure.update_layout(
         title={
            "text": f"Candlestick Chart - <a href='https://gmgn.ai/sol/token/{selected_file}'>{selected_file}</a><br><span style='color:{subtitle_color};'>{subtitle_text}</span>",
            "y": 0.95,
            "x": 0.5,
            "xanchor": "center",
            "yanchor": "top"
        },
        xaxis_title="Time",
        yaxis_title="Price",
        xaxis=dict(
            range=[
                data["Time"].iloc[0] - pd.Timedelta(seconds=10),
                data["Time"].iloc[0] + pd.Timedelta(seconds=HOLDING_PERIOD + BUY_DELAY + SELL_DELAY + 10)
            ]
        )
    )

    return figure

# Run the app
if __name__ == "__main__":
    app.run_server(debug=True)