<!-- Banner Image -->
<img src="https://github.com/trustless-engineering/egeria-lite/blob/main/docs/egeria-lite.png?raw=true" width="100%">

<!-- Links -->
<center>
  <a href="https://www.trustless.engineering/" style="color: #06b6d4;">Website</a> •
  <a href="https://discord.com/invite/pmbc4NjctV" style="color: #06b6d4;">Discord</a>
</center>

# Egeria Lite
## A simple, customizable, predictive-forecasting model for Solana defi tokens

📊 In this notebook, you will compile, train, and run predictions against your own version of our Egeria token risk model.

🎥 Don't forget to check out our accompanying video walk-through for a step-by-step guide!

🌐 We'll build this using Vybe Network API's, but you can augment the model to include any variables that you find personally risky. Then, we'll dive into training our model using the powerful XGBoost algorithm.

🔍 Throughout this notebook, we'll explore the ins and outs of XGBoost and how it can help us tackle the regression problem at hand.

📝 Help us enhance this tutorial even further! Share your feedback and thoughts on our [Discord Server](https://discord.com/invite/pmbc4NjctV) or directly on [X](https://x.com/trustlesseng). Your input is invaluable in making this tutorial the best it can be! 🚀

## Setup

Make sure you have all the necessary packages pre-installed. Run the following cell to do this:

In [None]:
#Installing necessary modules.
%pip install -U pandas
%pip install -U scikit-learn
%pip install -U numpy
%pip install -U matplotlib
%pip install xgboost==2.0.3
%pip install pandas==2.2.1
%pip install scikit-learn==1.4.1.post1
%pip install joblib==1.3.2


Now let's import the various modules into our working environment.

In [None]:
#importing the necessary libraries
import numpy as np
import pandas as pd
import joblib
import xgboost as xgb
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

# Let's Build Your Dataset! 📊
Before we dive into training our model, we need to create a robust training dataset. This dataset will be in the form of a .json file and will contain essential variables sourced from various Vybe token data APIs.
Here's an example of how an entry in your dataset should look:

```{
    "address": "CxBaBF4XJtn9HDzSiNg2sLq8C34VADKzbk3DNH2Lufug",
    "decimals": 9,
    "lastTradeUnixTime": null,
    "liquidity": 72.98954780564692,
    "logoURI": "https://img.fotofolio.xyz/?url=https%3A%2F%2Fgateway.irys.xyz%2FtLep8ljpMHgiybkmGKTuYlWe5MsJxHuKpYE58d7fzyQ",
    "mc": null,
    "name": "lola",
    "symbol": "lola",
    "v24hChangePercent": null,
    "v24hUSD": 0,
    "Risk": "Danger",
    "Volatility": 72.01814851023744,
    "holders_count": 1
}
```

Remember, the quality of your data directly impacts the performance and accuracy of your model. So, let's strive for excellence in data quality to unlock the full potential of our model! Note that our base dataset has roughly 2000 entries. We recommend at least 1600 entries that have a decent spread of safe and dangerous tokens labeled.🚀

# Let's Load Your Training Data! 🍽
Now that we have our dataset ready, it's time to load it into our model. We'll be using this data to train our machine learning model to accurately classify tokens based on their risk levels.

Make sure your dataset is in the proper format, containing the essential variables: address, decimals, liquidity, logoURI, name, symbol, v24hChangePercent, v24hUSD, Risk, Volatility, and holders_count.

Once we load the data, we'll be ready to embark on our machine learning journey and unlock valuable insights from the cryptocurrency market! 🚀


In [None]:
file_path = ("preProcessedTokens.json")

def load_data(file_path):
    return pd.read_json(file_path)

data = load_data(file_path)
data.head()

# Preprocess Your Training Data! 🛠️
Before we proceed with training our model, it's crucial to preprocess our training data to ensure it's in optimal shape. This involves dropping irrelevant columns, encoding categorical variables, and handling missing values using appropriate strategies.

We'll meticulously clean and prepare our data, leaving no stone unturned to maximize the effectiveness of our machine learning model.

Once our data is preprocessed and primed for training, we'll be one step closer to unveiling valuable insights and making informed decisions in the cryptocurrency market! 💼 Let's get started!

#Let's Split and Preprocess Your Data! 📊
To ensure the reliability of our model, we'll first split our preprocessed data into training and testing sets. This allows us to train our model on one subset of data and evaluate its performance on another subset.

Then, we'll construct a preprocessing pipeline to scale numeric features and encode categorical features. This ensures that our data is standardized and ready for our machine learning algorithm.

Finally, we'll train an XGBoost classifier using the preprocessed data. XGBoost is a powerful algorithm known for its accuracy and efficiency, making it an ideal choice for our token risk assessment model.

With our data split, preprocessed, and model trained, we're well-equipped to unlock valuable insights and make informed decisions in the dynamic cryptocurrency market! 💡 Let's dive in and get started! 🚀

In [None]:
def preprocess_data(df):
    df = df.drop(['address', 'lastTradeUnixTime', 'mc'], axis=1)
    X = df.drop('Risk', axis=1)
    y = df['Risk'].map({'Danger': 1, 'Warning': 1, 'Good': 0}).astype(int)
    return train_test_split(X, y, test_size=0.4, random_state=42)

In [None]:
def build_preprocessor(X_train):
    numeric_features = ['decimals', 'liquidity', 'v24hChangePercent', 'v24hUSD', 'Volatility', 'holders_count']
    categorical_features = ['logoURI', 'name', 'symbol']

    numeric_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='mean')),
        ('scaler', StandardScaler())
    ])
    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))
    ])

    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features),
            ('cat', categorical_transformer, categorical_features)
        ],
        remainder='passthrough'
        )

    return preprocessor

#Train Your ML Model! 🚀
It's time to train your machine learning model and unleash its predictive prowess!

By training your model, you'll enable it to learn from the data and identify patterns that help classify tokens based on their risk levels.

With each iteration of training, your model becomes more adept at making accurate predictions, empowering you to navigate the Solana defi token market with confidence.

In [None]:
def train_model(X_train, y_train, preprocessor):
    model = Pipeline(steps=[
        ('preprocessor', preprocessor),
        ('classifier', xgb.XGBClassifier(n_estimators=100, learning_rate=0.1, max_depth=3, random_state=42))
    ])
    model.fit(X_train, y_train)
    return model

## Voila! Your Raw ML Model Is Ready! 🎉

Congratulations on reaching this milestone! 🚀 With your raw ML model in hand, you now have the power to predict the risk associated with a token in the cryptocurrency market.

Harnessing the capabilities of machine learning, you can now make informed investment decisions, mitigating risks and maximizing potential profits.

But remember, this is just the beginning of your journey. Continuously refine and improve your model, stay updated with market trends, and never stop learning.

Here's to your success in navigating the exciting world of cryptocurrency with confidence and precision! 🌟

#Let's Analyze Your Model's Performance! 😵
To evaluate the effectiveness of your classification model, we'll visualize its performance using a confusion matrix. This matrix provides insights into how well your model predicts the classes of a set of test data, where the true values are known. Here's what the confusion matrix looks like:


```
                   Predicted Class         
            |   Positive   |   Negative   |
Actual  ---------------------------------------
Positive  |   TP         |       FP         |
Negative  |   FN         |       TN         |

```

- TP (True Positive): Model correctly predicts positive class.
- FP (False Positive): Model incorrectly predicts positive class.
- FN (False Negative): Model incorrectly predicts negative class.
- TN (True Negative): Model correctly predicts negative class.

By examining these values, we'll gain valuable insights into how well your model performs in classifying tokens based on their risk levels. Let's dive in and analyze the results! 📊


In [None]:
def evaluate_model(model, X_test, y_test):
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    classification_report_result = classification_report(y_test, y_pred)
    conf_matrix = confusion_matrix(y_test, y_pred)

    print(f'Model Accuracy: {accuracy}')
    print('Classification Report:\n', classification_report_result)
    print("Confusion Matrix:\n", conf_matrix)

In [None]:
def main():
    file_path = 'preProcessedTokens.json'  # Update this path
    df = load_data(file_path)
    X_train, X_test, y_train, y_test = preprocess_data(df)
    print(X_train.shape, X_test.shape, y_train.shape, y_test.shape)

    preprocessor = build_preprocessor(X_train)
    model = train_model(X_train, y_train, preprocessor)
    evaluate_model(model, X_test, y_test)

    # Save model and preprocessor
    joblib.dump(model, "predictModel.pkl")
    joblib.dump(preprocessor, "mainPreprocessor.pkl")

    # Example for a single item prediction
    single_item_corrected = {
    "decimals": 6,
    "liquidity": 62215.15524335994,
    "logoURI": "https://img.fotofolio.xyz/?url=https%3A%2F%2Fbafkreifhqihaiwyo4g2aogdu4qyfqftkxy3aq4xxbhoxdkbkufrobsnjwm.ipfs.nftstorage.link",
    "name": "SBF",
    "symbol": "SBF",
    "v24hChangePercent": -49.17844813082829,
    "v24hUSD": 18220.724466666383,
    "Volatility": 76.06539722778419,
    "holders_count": 0
}

    # Convert to DataFrame
    single_item_df = pd.DataFrame(single_item_corrected, index=[0])
    prediction = model.predict(single_item_df)  # Predict
    print(f'Prediction for the single item: {prediction}')


if __name__ == "__main__":
    main()

# Save Your Model! 🔽
Now that your model is trained and ready to go, it's crucial to save it for seamless integration with your application.

By saving your model, you can deploy it effortlessly and leverage its predictive power in real-world scenarios.

Let's ensure your hard work doesn't go to waste by saving your model and paving the way for its successful implementation! 🚀

# 🚀 Data Fetching Expedition Begins!

In this section, we embark on a thrilling journey to retrieve and whip into shape the data essential for predicting token behavior! 🌟 We're talking historical price data, token specifics, and holder count - the very backbone of our prediction! 📈

We're not just fetching data; we're summoning it with flair! Utilizing the mighty Vybe Network as our data oracle, we dive into the digital depths to extract the insights that fuel our predictions! 💡

But hold onto your hats, space cadets! To unlock this cosmic treasure trove of data, you'll need the legendary Vybe Networks API key. 🗝️ Ready to blast off into the data cosmos? Let's do this! 🚀

First lets install and import the required libraries for this section.


In [None]:
%pip install requests
%pip install python-dotenv

In [None]:
import requests
import time
import joblib
import os
import dotenv

# Lets set up the environment variables.
Just convert the .env.example file into a .env file and replace the ADD_YOUR_VYBE_API_KEY_HERE with yur actual vybe key. 🚀

In [None]:
dotenv.load_dotenv()
vybe_key = os.environ.get('VYBE_KEY')
print(vybe_key) # verify your key has been properly loaded.

# 📈 Ascending the Price History Peaks

Our first stop: price history! With the get_token_price_history_with_retry function, we brave the digital currents to extract token quotes and OHLCV data. It's a journey fraught with challenges, but with each retry, we inch closer to the summit of knowledge.

In [None]:
def get_token_price_history_with_retry(time_start, time_end, token_id, max_retries=3):
    url = f"https://api.vybenetwork.xyz/price/{token_id}/token-quote-ohlcv"

    headers = {
        "Content-Type": "application/json",
        'X-API-KEY': vybe_key
    }

    params = {
        "stride": "1 hour",
        "time_end": time_end,
        "time_start": time_start
    }

    backoff_time = 1  # Initial backoff time in seconds

    for attempt in range(max_retries):
        try:
            response = requests.get(url, headers=headers, params=params)

            if response.status_code == 200 or response.status_code == 204:
                # Request was successful, you can handle the response here
                return response.json()
            elif response.status_code == 429:
                print(f"Received 429 - Too Many Requests. Retrying in {backoff_time} seconds for {token_id}.")
                time.sleep(backoff_time)
                backoff_time *= 2  # You can adjust this multiplier based on your needs
            else:
                # Handle the error
                print(f"Error: {response.status_code} - {response.text} for {token_id}. Retrying...")
                time.sleep(backoff_time)
                backoff_time *= 2  # You can adjust this multiplier based on your needs
        except requests.exceptions.RequestException as e:
            # Handle request exception
            print(f"Request exception: {e}, {token_id}")
            return f"Request exception: {e}"

        # Increment backoff time for next retry
        backoff_time *= 2

        if attempt < max_retries - 1:
            # Only sleep if there are more retries remaining
            time.sleep(2)

    print(f"Maximum retries ({max_retries}) reached for {token_id}.")
    return None  # Or handle the failure in a different way as per your requirements

# 💥 Unleashing Volatility Insights

Next, we delve into volatility with the calculate_volatility function. Through the magic of standard deviation and daily returns, we gain insights into market fluctuations. It's not just about numbers; it's about understanding the ebb and flow of token dynamics.

In [None]:
def calculate_volatility(result):
    DEFAULT_VOLATILITY_SCORE = None

    if 'data' in result:
        token_data = result['data']

        # Extract relevant columns and create DataFrame
        columns = ['timeBucketStart', 'open', 'high', 'low', 'close', 'count']
        data = pd.DataFrame(token_data, columns=columns)

        # Convert timeBucketStart to datetime and set it as index
        data['timeBucketStart'] = pd.to_datetime(data['timeBucketStart'], unit='s')
        data = data.set_index('timeBucketStart')

        # Convert numerical columns to float
        numerical_cols = ['open', 'high', 'low', 'close']
        data[numerical_cols] = data[numerical_cols].astype(float)

        # Calculate daily returns
        data['Daily_Returns'] = data['close'].pct_change()

        # Calculate volatility (standard deviation of daily returns)
        volatility = np.std(data['Daily_Returns'])

        # Normalize volatility to a scale of 1-100
        min_volatility = np.min(data['Daily_Returns'])
        max_volatility = np.max(data['Daily_Returns'])

        # Check if the denominator is close to zero
        if np.isclose(max_volatility, min_volatility):
            print("Denominator is close to zero. Setting volatility score to default value.")
            return DEFAULT_VOLATILITY_SCORE
        else:
            # Perform the division only if the denominator is not close to zero
            volatility_score = ((volatility - min_volatility) / (max_volatility - min_volatility)) * 100

        return volatility_score

    else:
        return DEFAULT_VOLATILITY_SCORE

# 🚀 Embarking on a Token Details Expedition

Our quest continues as we uncover token details with the get_token_details function. Armed with determination, we navigate through the digital landscape, overcoming obstacles to reveal the inner workings of tokens.



In [None]:
def calculate_v24hChangePercent(token_data):
    if 'data' in token_data and len(token_data['data']) >= 2:
        first_close = float(token_data['data'][0]['close'])
        last_close = float(token_data['data'][-1]['close'])
        v24hChangePercent = ((last_close - first_close) / first_close) * 100
        return v24hChangePercent
    return None

def get_token_details(token_id, max_retries=3):

    url = f"https://api.vybenetwork.xyz/token/{token_id}"

    headers = {
        "Content-Type": "application/json",
        'X-API-KEY': vybe_key  # Assuming vybe_key is defined elsewhere in your code
    }

    backoff_time = 1  # Initial backoff time in seconds

    for attempt in range(max_retries):
        try:
            response = requests.get(url, headers=headers)

            if response.status_code == 200 or response.status_code == 204:
                # Request was successful, you can handle the response here
                return response.json()
            elif response.status_code == 429:
                print(f"Received 429 - Too Many Requests. Retrying in {backoff_time} seconds for {token_id}.")
                time.sleep(backoff_time)
                backoff_time *= 2  # You can adjust this multiplier based on your needs
            else:
                # Handle the error
                print(f"Error: {response.status_code} - {response.text} for {token_id}. Retrying...")
                time.sleep(backoff_time)
                backoff_time *= 2  # You can adjust this multiplier based on your needs
        except requests.exceptions.RequestException as e:
            # Handle request exception
            print(f"Request exception: {e}, {token_id}")
            return f"Request exception: {e}"

        # Increment backoff time for next retry
        backoff_time *= 2

        if attempt < max_retries - 1:
            # Only sleep if there are more retries remaining
            time.sleep(2)

    print(f"Maximum retries ({max_retries}) reached for {token_id}.")
    return None  # Or handle the failure in a different way as per your requirements

# 🌊 Surfing the Waves of Liquidity

Dive into the depths of liquidity with the calculate_liquidity function. As we ride the waves of market cap and token volume, we gain a deeper understanding of liquidity's role in the token ecosystem.

In [None]:
def calculate_liquidity(token_data):
    if 'marketCap' in token_data and 'tokenAmountVolume' in token_data:
        market_cap = token_data['marketCap']
        token_volume = token_data['tokenAmountVolume']
        if token_volume is not None and token_volume > 0:
            liquidity = market_cap / token_volume
            return liquidity
    return 0

# 👥 Exploring Holder Count Mysteries

Last but not least, we explore holder counts with the get_number_of_holders function. It's a journey through API calls and response codes, uncovering the strength of token communities one holder at a time.

In [None]:
def get_number_of_holders(token_id, interval='day'):

    url = f"https://api.vybenetwork.xyz/token/{token_id}/holders-ts"

    headers = {
        "Content-Type": "application/json",
        'X-API-KEY': vybe_key
    }
    params = {
        "interval": interval,
        "time_end": 'null',
        "time_start": 'null'
    }

    response = requests.get(url, headers=headers, params=params)
    if response.status_code == 200:
        data = response.json()
        return data['data'][-1]['nHolders']
    else:
        print(f"Failed to fetch data: {response}")
        return None

# 🚀 Embark on a Data Fetch Odyssey

Join us on a data-fetching odyssey with the fetchDataFunc function. From token details to price history, from volatility scores to holder counts, we gather the essence of tokens, shaping them into a beacon of knowledge to guide us through the digital cosmos.

In [None]:
async def fetchDataFunc(token_id):
    token_data = get_token_details(token_id)
    time_start = int(time.time()) - (24 * 60 * 60)
    time_end = int(time.time())
    token_OHLCV_data =  get_token_price_history_with_retry(time_start, time_end, token_id)
    v24hChangePercent = calculate_v24hChangePercent(token_OHLCV_data)
    liquidity = calculate_liquidity(token_data)
    volatility_score = calculate_volatility(token_OHLCV_data)
    holder_count =  get_number_of_holders(token_id)
    v24hUSD = 0
    if token_data['usdValueVolume'] is not None:
        v24hUSD = token_data['usdValueVolume']
    input_data = {
        "decimals": token_data['decimal'],
        "liquidity":liquidity,
        "logoURI":1,
        "name": 1,
        "symbol": 1,
        "v24hChangePercent": v24hChangePercent,
        "v24hUSD": v24hUSD,
        "Volatility": volatility_score,
        "holders_count": holder_count
        }
    return input_data

# 🔮 Predict Function Unveiled

In this segment, we demystify the process of prediction. 🌟 Here, we simply showcase how our saved model, snugly nestled within a pkl file, comes to life. 📦 With a straightforward invocation of the predict method, we harness the power of our input data to yield a binary classification. 💫

It's a peek behind the curtain, revealing the simplicity that underpins the magic of prediction. So relax and enjoy the gentle unveiling of this fundamental step in our data journey! 🔍✨

In [None]:
async def predict_token_risk(input_data):
    """Predicts the risk of a token based on the input parameters.

    Args:
        input_data (dict): Input data containing token address.

    Returns:
        int: 0 is safe and 1 is dangerous
    """
    try:
        model = joblib.load("predictModel.pkl")
        token_id = input_data['token_address']
        as_dict = await fetchDataFunc(token_id)
        single_item_df = pd.DataFrame(as_dict, index=[0])
        prediction = model.predict(single_item_df)  # Predict
        single_prediction = prediction[0]  # Extract single element
        return int(single_prediction)  # Convert prediction to int
    except Exception as e:
        # Handle exceptions appropriately
        raise RuntimeError(f"An error occurred: {str(e)}")


# 🔍 Token Inquiry: A Data Adventure

Welcome to the heart of our data journey! 🌟 Here, in this interactive segment, we invite you to embark on an exploration of token safety. 💼

Simply run the following cell, and it will beckon you to provide a token address. Once entered, our trusty data-fetching code springs into action, gathering all the relevant data features needed to assess the safety of your chosen token. 📊

With bated breath, you'll soon discover whether your token is deemed safe or harbors any danger. It's a journey of discovery and insight, unfolding with each inquiry. So, without further ado, let's dive in and explore the safety of tokens together! 🚀🔍

In [None]:
async def main():
    while True:
        # Get user input for token address
        token_address = input("Enter token address (or 'q' to exit): ").strip()

        # Check if user wants to quit
        if token_address.lower() == 'q':
            print("Exiting...")
            break

        # Create input data dictionary
        input_data = {"token_address": token_address}

        # Call predict_token_risk_async function
        try:
            risk_level = await predict_token_risk(input_data)
            if risk_level == 0:
              print("Risk Level: Safe: ", risk_level)
            elif risk_level == 1:
              print("Risk Level: Danger: ", risk_level)
        except Exception as e:
            print("Error occurred:", e)

# Run the async main function in the event loop
await main()
