In [1]:
# Installing necessary modules.
%pip install -U pandas
%pip install -U scikit-learn==1.5.2
%pip install -U numpy
%pip install -U matplotlib
%pip install xgboost==2.0.3
%pip install pandas==2.2.1
%pip install joblib==1.3.2
%pip install --upgrade scikit-learn xgboost


Collecting pandas
  Using cached pandas-2.2.3-cp313-cp313-macosx_11_0_arm64.whl.metadata (89 kB)
Using cached pandas-2.2.3-cp313-cp313-macosx_11_0_arm64.whl (11.3 MB)
[0mInstalling collected packages: pandas
  Attempting uninstall: pandas
[0m    Found existing installation: pandas 2.2.1
    Uninstalling pandas-2.2.1:
      Successfully uninstalled pandas-2.2.1
[0mSuccessfully installed pandas-2.2.3
[0mNote: you may need to restart the kernel to use updated packages.
[0mCollecting scikit-learn==1.5.2
  Using cached scikit_learn-1.5.2-cp313-cp313-macosx_12_0_arm64.whl.metadata (13 kB)
Using cached scikit_learn-1.5.2-cp313-cp313-macosx_12_0_arm64.whl (11.0 MB)
[0mInstalling collected packages: scikit-learn
  Attempting uninstall: scikit-learn
[0m    Found existing installation: scikit-learn 1.6.1
    Uninstalling scikit-learn-1.6.1:
      Successfully uninstalled scikit-learn-1.6.1
[0mSuccessfully installed scikit-learn-1.5.2
[0mNote: you may need to restart the kernel to use upd

In [1]:
import numpy as np
import pandas as pd
import joblib
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

In [2]:
file_path = ("preProcessedTokens.json")

def load_data(file_path):
    return pd.read_json(file_path)

data = load_data(file_path)
data.head()

Unnamed: 0,address,decimals,lastTradeUnixTime,liquidity,logoURI,mc,name,symbol,v24hChangePercent,v24hUSD,Risk,Volatility,holders_count
0,CxBaBF4XJtn9HDzSiNg2sLq8C34VADKzbk3DNH2Lufug,9,,72.989548,https://img.fotofolio.xyz/?url=https%3A%2F%2Fg...,,lola,lola,,0.0,Danger,72.018149,11
1,HmNHpgKvmwfLBkkCWPhLp2ofDCVJpm3PpkQ7W4KHsW8c,6,,0.0,https://img.fotofolio.xyz/?url=https%3A%2F%2Fb...,,ElonDog,ELONDOG,,0.0,Danger,,1
2,2hKYEfZ8ND4GgWevchMvm84NU8AY5Y6uJLAteo1YoSTt,9,,225.685972,https://img.fotofolio.xyz/?url=https%3A%2F%2Fb...,,pepe on solana,pepecoin,,0.0,Danger,,11
3,9xLGTb8yGxsAB6bdCjrx4aaW1NDewnh3KH8jyZhLE7PV,9,,225.685972,https://img.fotofolio.xyz/?url=https%3A%2F%2Fd...,,SatoshiVM,SAVM,,0.0,Danger,,32
4,C1TsgQHTm1ojNu339xaQw8MANFGrKArvEDKQiNaEj5ja,9,,0.0,https://img.fotofolio.xyz/?url=https%3A%2F%2Fs...,,KITTEN SOL,$KITTEN,,0.0,Danger,,1


In [4]:
def preprocess_data(df):
    df = df.drop(['address', 'lastTradeUnixTime', 'mc'], axis=1)
    X = df.drop('Risk', axis=1)
    y = df['Risk'].map({'Danger': 1, 'Warning': 1, 'Good': 0}).astype(int)
    return train_test_split(X, y, test_size=0.4, random_state=42)

In [5]:
def build_preprocessor(X_train):
    numeric_features = ['decimals', 'liquidity', 'v24hChangePercent', 'v24hUSD', 'Volatility', 'holders_count']
    categorical_features = ['logoURI', 'name', 'symbol']

    numeric_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='mean')),
        ('scaler', StandardScaler())
    ])
    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))
    ])

    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features),
            ('cat', categorical_transformer, categorical_features)
        ],
        remainder='passthrough'
        )

    return preprocessor

In [6]:
def train_model(X_train, y_train, preprocessor):
    model = Pipeline(steps=[
        ('preprocessor', preprocessor),
        ('classifier', xgb.XGBClassifier(n_estimators=100, learning_rate=0.1, max_depth=3, random_state=42))
    ])
    model.fit(X_train, y_train)
    return model

In [7]:
def evaluate_model(model, X_test, y_test):
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    classification_report_result = classification_report(y_test, y_pred)
    conf_matrix = confusion_matrix(y_test, y_pred)

    print(f'Model Accuracy: {accuracy}')
    print('Classification Report:\n', classification_report_result)
    print("Confusion Matrix:\n", conf_matrix)

In [8]:
def main():
    file_path = 'preProcessedTokens.json'  # Update this path
    df = load_data(file_path)
    X_train, X_test, y_train, y_test = preprocess_data(df)
    print(X_train.shape, X_test.shape, y_train.shape, y_test.shape)

    preprocessor = build_preprocessor(X_train)
    model = train_model(X_train, y_train, preprocessor)
    evaluate_model(model, X_test, y_test)

    # Save model and preprocessor
    joblib.dump(model, "predictModel.pkl")
    joblib.dump(preprocessor, "mainPreprocessor.pkl")

    # Example for a single item prediction
    single_item_corrected = {
    "decimals": 6,
    "liquidity": 62215.15524335994,
    "logoURI": "https://img.fotofolio.xyz/?url=https%3A%2F%2Fbafkreifhqihaiwyo4g2aogdu4qyfqftkxy3aq4xxbhoxdkbkufrobsnjwm.ipfs.nftstorage.link",
    "name": "SBF",
    "symbol": "SBF",
    "v24hChangePercent": -49.17844813082829,
    "v24hUSD": 18220.724466666383,
    "Volatility": 76.06539722778419,
    "holders_count": 0
}

    # Convert to DataFrame
    single_item_df = pd.DataFrame(single_item_corrected, index=[0])
    prediction = model.predict(single_item_df)  # Predict
    print(f'Prediction for the single item: {prediction}')


if __name__ == "__main__":
    main()

(1163, 9) (776, 9) (1163,) (776,)


NameError: name 'xgb' is not defined

In [None]:
%pip install requests
%pip install python-dotenv

In [None]:
import requests
import time
import joblib
import os
import dotenv

In [None]:
dotenv.load_dotenv()
vybe_key = os.environ.get('VYBE_KEY')
print(vybe_key) # verify your key has been properly loaded.

In [None]:
def get_token_price_history_with_retry(time_start, time_end, token_id, max_retries=3):
    url = f"https://api.vybenetwork.xyz/price/{token_id}/token-quote-ohlcv"

    headers = {
        "Content-Type": "application/json",
        'X-API-KEY': vybe_key
    }

    params = {
        "stride": "1 hour",
        "time_end": time_end,
        "time_start": time_start
    }

    backoff_time = 1  # Initial backoff time in seconds

    for attempt in range(max_retries):
        try:
            response = requests.get(url, headers=headers, params=params)

            if response.status_code == 200 or response.status_code == 204:
                # Request was successful, you can handle the response here
                return response.json()
            elif response.status_code == 429:
                print(f"Received 429 - Too Many Requests. Retrying in {backoff_time} seconds for {token_id}.")
                time.sleep(backoff_time)
                backoff_time *= 2  # You can adjust this multiplier based on your needs
            else:
                # Handle the error
                print(f"Error: {response.status_code} - {response.text} for {token_id}. Retrying...")
                time.sleep(backoff_time)
                backoff_time *= 2  # You can adjust this multiplier based on your needs
        except requests.exceptions.RequestException as e:
            # Handle request exception
            print(f"Request exception: {e}, {token_id}")
            return f"Request exception: {e}"

        # Increment backoff time for next retry
        backoff_time *= 2
        if attempt < max_retries - 1:
            # Only sleep if there are more retries remaining
            time.sleep(2)

    print(f"Maximum retries ({max_retries}) reached for {token_id}.")
    return None  # Or handle the failure in a different way as per your requirements

In [None]:
def calculate_volatility(result):
    DEFAULT_VOLATILITY_SCORE = None

    if 'data' in result:
        token_data = result['data']

        # Extract relevant columns and create DataFrame
        columns = ['timeBucketStart', 'open', 'high', 'low', 'close', 'count']
        data = pd.DataFrame(token_data, columns=columns)

        # Convert timeBucketStart to datetime and set it as index
        data['timeBucketStart'] = pd.to_datetime(data['timeBucketStart'], unit='s')
        data = data.set_index('timeBucketStart')

        # Convert numerical columns to float
        numerical_cols = ['open', 'high', 'low', 'close']
        data[numerical_cols] = data[numerical_cols].astype(float)

        # Calculate daily returns
        data['Daily_Returns'] = data['close'].pct_change()

        # Calculate volatility (standard deviation of daily returns)
        volatility = np.std(data['Daily_Returns'])

        # Normalize volatility to a scale of 1-100
        min_volatility = np.min(data['Daily_Returns'])
        max_volatility = np.max(data['Daily_Returns'])

        # Check if the denominator is close to zero
        if np.isclose(max_volatility, min_volatility):
            print("Denominator is close to zero. Setting volatility score to default value.")
            return DEFAULT_VOLATILITY_SCORE
        else:
            # Perform the division only if the denominator is not close to zero
            volatility_score = ((volatility - min_volatility) / (max_volatility - min_volatility)) * 100

        return volatility_score

    else:
        return DEFAULT_VOLATILITY_SCORE

In [None]:
def calculate_v24hChangePercent(token_data):
    if 'data' in token_data and len(token_data['data']) >= 2:
        first_close = float(token_data['data'][0]['close'])
        last_close = float(token_data['data'][-1]['close'])
        v24hChangePercent = ((last_close - first_close) / first_close) * 100
        return v24hChangePercent
    return None

def get_token_details(token_id, max_retries=3):

    url = f"https://api.vybenetwork.xyz/token/{token_id}"

    headers = {
        "Content-Type": "application/json",
        'X-API-KEY': vybe_key  # Assuming vybe_key is defined elsewhere in your code
    }

    backoff_time = 1  # Initial backoff time in seconds

    for attempt in range(max_retries):
        try:
            response = requests.get(url, headers=headers)

            if response.status_code == 200 or response.status_code == 204:
                # Request was successful, you can handle the response here
                return response.json()
            elif response.status_code == 429:
                print(f"Received 429 - Too Many Requests. Retrying in {backoff_time} seconds for {token_id}.")
                time.sleep(backoff_time)
                backoff_time *= 2  # You can adjust this multiplier based on your needs
            else:
                # Handle the error
                print(f"Error: {response.status_code} - {response.text} for {token_id}. Retrying...")
                time.sleep(backoff_time)
                backoff_time *= 2  # You can adjust this multiplier based on your needs
        except requests.exceptions.RequestException as e:
            # Handle request exception
            print(f"Request exception: {e}, {token_id}")
            return f"Request exception: {e}"

        # Increment backoff time for next retry
        backoff_time *= 2

        if attempt < max_retries - 1:
            # Only sleep if there are more retries remaining
            time.sleep(2)
    print(f"Maximum retries ({max_retries}) reached for {token_id}.")
    return None  # Or handle the failure in a different way as per your requirements            

In [None]:
def calculate_liquidity(token_data):
    if 'marketCap' in token_data and 'tokenAmountVolume' in token_data:
        market_cap = token_data['marketCap']
        token_volume = token_data['tokenAmountVolume']
        if token_volume is not None and token_volume > 0:
            liquidity = market_cap / token_volume
            return liquidity
    return 0

In [None]:
def get_number_of_holders(token_id, interval='day'):

    url = f"https://api.vybenetwork.xyz/token/{token_id}/holders-ts"

    headers = {
        "Content-Type": "application/json",
        'X-API-KEY': vybe_key
    }
    params = {
        "interval": interval,
        "time_end": 'null',
        "time_start": 'null'
    }

    response = requests.get(url, headers=headers, params=params)
    if response.status_code == 200:
        data = response.json()
        return data['data'][-1]['nHolders']
    else:
        print(f"Failed to fetch data: {response}")
        return None

In [None]:
async def fetchDataFunc(token_id):
    token_data = get_token_details(token_id)
    time_start = int(time.time()) - (24 * 60 * 60)
    time_end = int(time.time())
    token_OHLCV_data =  get_token_price_history_with_retry(time_start, time_end, token_id)
    v24hChangePercent = calculate_v24hChangePercent(token_OHLCV_data)
    liquidity = calculate_liquidity(token_data)
    volatility_score = calculate_volatility(token_OHLCV_data)
    holder_count =  get_number_of_holders(token_id)
    v24hUSD = 0
    if token_data['usdValueVolume'] is not None:
        v24hUSD = token_data['usdValueVolume']
    input_data = {
        "decimals": token_data['decimal'],
        "liquidity":liquidity,
        "logoURI":1,
        "name": 1,
        "symbol": 1,
        "v24hChangePercent": v24hChangePercent,
        "v24hUSD": v24hUSD,
        "Volatility": volatility_score,
        "holders_count": holder_count
        }
    return input_data

In [None]:
async def predict_token_risk(input_data):
    """Predicts the risk of a token based on the input parameters.

    Args:
        input_data (dict): Input data containing token address.

    Returns:
        int: 0 is safe and 1 is dangerous
    """
    try:
        model = joblib.load("predictModel.pkl")
        token_id = input_data['token_address']
        as_dict = await fetchDataFunc(token_id)
        single_item_df = pd.DataFrame(as_dict, index=[0])
        prediction = model.predict(single_item_df)  # Predict
        single_prediction = prediction[0]  # Extract single element
        return int(single_prediction)  # Convert prediction to int
    except Exception as e:
        # Handle exceptions appropriately
        raise RuntimeError(f"An error occurred: {str(e)}")

In [None]:
async def main():
    while True:
        # Get user input for token address
        token_address = input("Enter token address (or 'q' to exit): ").strip()

        # Check if user wants to quit
        if token_address.lower() == 'q':
            print("Exiting...")
            break

        # Create input data dictionary
        input_data = {"token_address": token_address}

        # Call predict_token_risk_async function
        try:
            risk_level = await predict_token_risk(input_data)
            if risk_level == 0:
              print("Risk Level: Safe: ", risk_level)
            elif risk_level == 1:
              print("Risk Level: Danger: ", risk_level)
        except Exception as e:
            print("Error occurred:", e)

# Run the async main function in the event loop
await main()

In [None]:
import numpy as np
import pandas as pd
import xgboost as xgb
import joblib
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.metrics import accuracy_score, classification_report

# Load dataset
file_path = "merged_pool_data4.csv"  # Ensure this file is available
data_raw = pd.read_csv(file_path, index_col="pool_id")

# Drop unnecessary columns
drop_cols = ['logindex', 'weth', 'low', 'high', 'close', 'open']
X = data_raw.drop(columns=['rugpull'] + [col for col in data_raw.columns if any(dc in col for dc in drop_cols)])
y = data_raw['rugpull']

# Split dataset
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=5)

# Handle missing values using imputation
imputer = SimpleImputer(strategy='mean')
X_train_imputed = imputer.fit_transform(X_train)
X_test_imputed = imputer.transform(X_test)

# Standardize data
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train_imputed)
X_test_scaled = scaler.transform(X_test_imputed)

# Train XGBoost model
model = xgb.XGBClassifier(n_estimators=100, learning_rate=0.1, max_depth=5, random_state=42)
model.fit(X_train_scaled, y_train)

# Evaluate model
y_pred = model.predict(X_test_scaled)
print("Model Evaluation:")
print(classification_report(y_test, y_pred))

# Save model & preprocessing pipeline
joblib.dump(model, "backend/api/services/xgboost_rugpull.pkl")
joblib.dump(imputer, "backend/api/services/imputer.pkl")
joblib.dump(scaler, "backend/api/services/scaler.pkl")

print("✅ Model training complete. Saved model and preprocessor.")
