In [1]:
import pandas as pd
import sys

sys.path.append("../..")



### 🔹 Set the base directory so we can find things

- All is loaded from the "../../artifacts"


---

In [2]:
import os

os.environ["BASE_DIR"] = "../../artifacts"

# Verify it's set
print(os.environ.get("BASE_DIR"))

../../artifacts


In [3]:
def validate_ohlcv(record):
    """
    Validates whether the input record follows the required OHLCV format.
    Ensures correct data structure, types, and valid `Date` formats.

    Args:
        record (list or dict): A single OHLCV record.

    Returns:
        bool: True if the record is valid, otherwise raises an error.
    """
    expected_keys = ["Date", "Open", "High", "Low", "Close", "Volume"]

    if isinstance(record, dict):
        # Ensure all required keys exist
        if not all(key in record for key in expected_keys):
            raise ValueError(
                f"Invalid OHLCV record: Missing required keys. Expected: {expected_keys}. Received: {list(record.keys())}"
            )

        # Validate `Date` format
        try:
            pd.to_datetime(record["Date"])  # Converts to datetime to check validity
        except Exception:
            raise ValueError(f"Invalid `Date` format: {record['Date']}")

        # Ensure numerical fields are valid
        if not all(
            isinstance(record[key], (int, float)) for key in expected_keys[1:]
        ):  # Exclude `Date`
            raise ValueError(
                f"Invalid OHLCV record: Values must be int/float for OHLCV fields. Received: {record}"
            )

    elif isinstance(record, list):
        # Ensure list has exactly 6 elements (Date + OHLCV)
        if len(record) != 6:
            raise ValueError(
                f"Invalid OHLCV record: Expected 6 elements (Date + OHLCV), got {len(record)}"
            )

        # Validate `Date` format
        try:
            pd.to_datetime(record[0])  # Converts to datetime to check validity
        except Exception:
            raise ValueError(f"Invalid `Date` format: {record[0]}")

        # Ensure OHLCV values are numeric
        if not all(isinstance(value, (int, float)) for value in record[1:]):
            raise ValueError(
                f"Invalid OHLCV record: Values must be int/float for OHLCV fields. Received: {record}"
            )

    else:
        raise TypeError("Invalid input type. Record must be a dictionary or list.")

    return True  # If no errors, return True (valid record)

In [4]:
# Example Usage
valid_dict = {
    "Date": "2025-02-06 22:00:00-05:00",  # Valid timezone-aware format
    "Open": 1.03612,
    "High": 1.03858,
    "Low": 1.0354,
    "Close": 1.03737,
    "Volume": 55878,
}
valid_list = ["2025-02-06 22:00:00", 1.03612, 1.03858, 1.0354, 1.03737, 55878]
invalid_dict = {"Open": 1.03612, "High": 1.03858, "Low": 1.0354}  # Missing keys
invalid_list = [
    "Invalid Date",
    1.03612,
    1.03858,
    1.0354,
    1.03737,
    55878,
]  # Invalid date

# Assertions for valid cases
assert validate_ohlcv(valid_dict) == True  # ✅ Should pass
assert validate_ohlcv(valid_list) == True  # ✅ Should pass

# Assertions for invalid cases (expected to fail)
try:
    assert validate_ohlcv(invalid_dict) == True  # ❌ Should raise ValueError
except ValueError as e:
    print(f"ValueError caught: {e}")

try:
    assert validate_ohlcv(invalid_list) == True  # ❌ Should raise ValueError
except ValueError as e:
    print(f"ValueError caught: {e}")

ValueError caught: Invalid OHLCV record: Missing required keys. Expected: ['Date', 'Open', 'High', 'Low', 'Close', 'Volume']. Received: ['Open', 'High', 'Low']
ValueError caught: Invalid `Date` format: Invalid Date


In [5]:
def validate_ohlcv_batch(records):
    """
    Validates a batch of OHLCV records using assertions.

    Args:
        records (list): A list of OHLCV records (each record is a list or dict).

    Returns:
        list: A list of booleans indicating validity of each record.
    """
    results = []

    for i, record in enumerate(records):
        try:
            assert validate_ohlcv(record) == True  # Assert validity
            results.append(True)  # Valid record
        except ValueError as e:
            print(f"AssertionError: Record {i} failed validation: {e}")
            results.append(False)  # Invalid record

    return results




In [6]:
# Example Usage
valid_records = [
    [
        "2025-02-06 22:00:00",
        1.03612,
        1.03858,
        1.0354,
        1.03737,
        55878,
    ],  # Standard datetime format
    ["2025-02-07", 1.03812, 1.03758, 1.0384, 1.03537, 69878],  # Date-only format
    {
        "Date": "2025-02-06 22:00:00-05:00",
        "Open": 1.03612,
        "High": 1.03858,
        "Low": 1.0354,
        "Close": 1.03737,
        "Volume": 55878,
    },  # ISO 8601 format
    [
        "2025-02-06T22:00:00-05:00",
        1.03812,
        1.03758,
        1.0384,
        1.03537,
        69878,
    ],  #
    [
        "2025-02-06T22:00:00",
        1.03812,
        1.03758,
        1.0384,
        1.03537,
        69878,
    ],  #
]

invalid_records = [
    ["Invalid Date", 1.03612, 1.03858, 1.0354, 1.03737, 55878],  # Invalid date format
    [1.03612, 1.03858, 1.0354, 1.03737],  # Missing volume
    {
        "Date": "2025-02-06 22:00:00-05:00",
        "Open": 1.03612,
        "High": 1.03858,
        "Low": 1.0354,
    },  # Missing Close & Volume
]

# Assertions for batch validation
assert validate_ohlcv_batch(valid_records) == [True, True, True, True, True]  # ✅ Should pass

try:
    assert validate_ohlcv_batch(invalid_records) == [True, True, True]  # ❌ Should fail
except AssertionError as e:
    print(f"AssertionError caught: {e}")

AssertionError: Record 0 failed validation: Invalid `Date` format: Invalid Date
AssertionError: Record 1 failed validation: Invalid OHLCV record: Expected 6 elements (Date + OHLCV), got 4
AssertionError: Record 2 failed validation: Invalid OHLCV record: Missing required keys. Expected: ['Date', 'Open', 'High', 'Low', 'Close', 'Volume']. Received: ['Date', 'Open', 'High', 'Low']
AssertionError caught: 


### 🔹 Step 1: Load the Test Dataset
- Ensure the dataset contains the same features used during training.

In [7]:
# Load test dataset
test_data_path = "../data/Predict_EUR_USD_D.csv"  # Adjust path as needed
df_analyze = pd.read_csv(test_data_path)

# Display sample data
print(df_analyze.tail(1))
print(df_analyze.shape)

      Unnamed: 0                       time  volume    mid_o    mid_h   mid_l  \
1846        1846  2025-02-11 22:00:00-05:00   55878  1.03612  1.03858  1.0354   

        mid_c    bid_o   bid_h    bid_l    bid_c    ask_o    ask_h    ask_l  \
1846  1.03737  1.03604  1.0385  1.03532  1.03729  1.03621  1.03865  1.03548   

        ask_c  
1846  1.03745  
(1847, 15)


### 🔹 Step 2: Extract lookback

- Now, grab the data we are interest in for our lookback analysis


---

In [8]:
lookback = 100

# Select the last lookback number of rows from the dataset
df_lookback = df_analyze.tail(lookback).copy()

# Convert the 'time' column to datetime format for proper analysis
df_lookback["time"] = pd.to_datetime(df_lookback["time"])

print(df_lookback.tail(1))
print(df_lookback.shape)

      Unnamed: 0                      time  volume    mid_o    mid_h   mid_l  \
1846        1846 2025-02-11 22:00:00-05:00   55878  1.03612  1.03858  1.0354   

        mid_c    bid_o   bid_h    bid_l    bid_c    ask_o    ask_h    ask_l  \
1846  1.03737  1.03604  1.0385  1.03532  1.03729  1.03621  1.03865  1.03548   

        ask_c  
1846  1.03745  
(100, 15)


### 🔹 Step 2: Reformat into DOHLCV
- Manipulate the dataset into our alternative formats

In [9]:
def reformat_to_dohlcv(df: pd.DataFrame):
    """
    Converts a dataset from mid_* format to DOHLCV format.

    Args:
        df (pd.DataFrame): Raw dataset with `time`, `volume`, `mid_*`, `bid_*`, `ask_*`.

    Returns:
        pd.DataFrame: Reformatted dataset in `Date, Open, High, Low, Close, Volume` format.
    """
    try:
        # Convert 'time' to 'Date' and ensure correct datetime format
        if "time" in df.columns:
            df = df.rename(columns={"time": "Date"})
            df["Date"] = pd.to_datetime(
                df["Date"]
            )  # Ensures proper datetime conversion

        # Drop unnecessary bid/ask price columns
        columns_to_drop = [
            "bid_o",
            "bid_h",
            "bid_l",
            "bid_c",
            "ask_o",
            "ask_h",
            "ask_l",
            "ask_c",
        ]
        df.drop(
            columns=[col for col in columns_to_drop if col in df.columns],
            errors="ignore",
            inplace=True,
        )

        # Rename mid-price columns to standard OHLCV format
        rename_mapping = {
            "mid_o": "Open",
            "mid_h": "High",
            "mid_l": "Low",
            "mid_c": "Close",
            "volume": "Volume",
        }
        df = df.rename(columns=rename_mapping)

        # Ensure correct column order for DOHLCV
        dohlcv_columns = ["Date", "Open", "High", "Low", "Close", "Volume"]
        df = df[dohlcv_columns]

        print("Dataset successfully reformatted to DOHLCV format.")
        return df

    except Exception as e:
        print(f"Error during reformatting: {e}")
        return None

In [10]:
df_dohlcv = reformat_to_dohlcv(df_lookback)

Dataset successfully reformatted to DOHLCV format.


In [11]:
print(df_dohlcv.tail(1))
print(df_dohlcv.shape)

                          Date     Open     High     Low    Close  Volume
1846 2025-02-11 22:00:00-05:00  1.03612  1.03858  1.0354  1.03737   55878
(100, 6)


In [12]:
df_dohlcv.to_csv("../data/lookback_dohlcv.csv", index=False)
print("✅ File saved successfully.")

✅ File saved successfully.


### 🔹 Step 3: Run it through data ingestion 

- Compute all the features 


---

In [13]:
from src.services.data_ingestion_service import DataIngestionService

ingestion_service = DataIngestionService()
# Apply preprocessing (renaming, dropping unnecessary columns)
df_cleaned = ingestion_service.preprocess_data(df_dohlcv)

[ 2025-02-15 04:29:23,495 ] INFO [src.services.data_ingestion_service:67] - Dropped bid and ask price columns.
[ 2025-02-15 04:29:23,495 ] INFO [src.services.data_ingestion_service:80] - Renamed mid-price columns to OHLC format.
[ 2025-02-15 04:29:23,496 ] INFO [src.services.data_ingestion_service:93] - Set index to Date with unique timestamps.
[ 2025-02-15 04:29:23,497 ] INFO [src.services.data_ingestion_service:95] - Processing leavitt data.
[ 2025-02-15 04:29:23,507 ] INFO [src.services.data_ingestion_service:97] - Finished leavitt data.
[ 2025-02-15 04:29:23,507 ] INFO [src.services.data_ingestion_service:99] - Processing indicators.
[ 2025-02-15 04:29:23,510 ] INFO [src.services.data_ingestion_service:101] - Finished indicators.
[ 2025-02-15 04:29:23,510 ] INFO [src.services.data_ingestion_service:103] - Processing target.
[ 2025-02-15 04:29:23,511 ] INFO [src.services.data_ingestion_service:105] - Finished target.
[ 2025-02-15 04:29:23,513 ] INFO [src.services.data_ingestion_serv

In [14]:
print(df_cleaned.shape)
print(df_cleaned.describe())
print(df_cleaned.columns)

(78, 29)
            Open       High        Low      Close         Volume       AHMA  \
count  78.000000  78.000000  78.000000  78.000000      78.000000  78.000000   
mean    1.050034   1.053723   1.045359   1.049511  158979.858974   1.049304   
std     0.017334   0.017146   0.017282   0.017086   48031.345766   0.016844   
min     1.024340   1.025010   1.017790   1.024340   55878.000000   1.025912   
25%     1.037985   1.042588   1.034203   1.037497  132557.750000   1.037979   
50%     1.048200   1.052290   1.041620   1.047060  160770.500000   1.046549   
75%     1.056715   1.059658   1.052647   1.056607  178745.500000   1.053758   
max     1.092970   1.093740   1.087260   1.092970  398838.000000   1.088000   

       Leavitt_Projection  Leavitt_Convolution   LC_Slope  LC_Intercept  ...  \
count           78.000000            78.000000  78.000000     78.000000  ...   
mean             1.048633             1.048046  -0.000582      1.049792  ...   
std              0.017553             0

#### What is the last record??

- It should be the record that we will be predicting

In [15]:
df_cleaned.tail(1)

Unnamed: 0_level_0,Open,High,Low,Close,Volume,AHMA,Leavitt_Projection,Leavitt_Convolution,LC_Slope,LC_Intercept,...,Returns_T-10,Momentum_T-10,Returns_T-21,Momentum_T-21,Hour,Day_Of_Week,Month,Year,ATR,Movement_Class
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2025-02-11 22:00:00-05:00,1.03612,1.03858,1.0354,1.03737,55878,1.034488,1.032559,1.032416,-1.3e-05,1.032455,...,-0.001026,0.002232,0.006326,-0.00512,22,1,2,2025,0.007814,1


### 🔹 Step 4: Create an instance of out predict pipeline

- Do it an crack on


---

In [16]:
from src.pipeline.predict_pipeline import PredictPipeline

# Initialize the prediction pipeline
predict_pipeline = PredictPipeline()


[ 2025-02-15 04:29:23,544 ] INFO [src.pipeline.predict_pipeline:27] - Loading model and preprocessor.
[ 2025-02-15 04:29:23,605 ] INFO [src.pipeline.predict_pipeline:33] - Model and preprocessor loaded successfully.


### 🔹 Step 5: Format the records 

- Lets try a format and predict something.  Just to say we can


---

In [17]:
dohlcv_list = df_dohlcv.values.tolist()
print(dohlcv_list)

[[Timestamp('2024-09-23 21:00:00-0500', tz='UTC-05:00'), 1.11108, 1.1181, 1.11035, 1.11797, 89839], [Timestamp('2024-09-24 21:00:00-0500', tz='UTC-05:00'), 1.11797, 1.12142, 1.11216, 1.1133, 92771], [Timestamp('2024-09-25 21:00:00-0500', tz='UTC-05:00'), 1.1133, 1.11894, 1.11258, 1.11766, 106658], [Timestamp('2024-09-26 21:00:00-0500', tz='UTC-05:00'), 1.11766, 1.12031, 1.11248, 1.1164, 118013], [Timestamp('2024-09-29 21:00:00-0500', tz='UTC-05:00'), 1.1164, 1.1209, 1.11137, 1.11348, 110504], [Timestamp('2024-09-30 21:00:00-0500', tz='UTC-05:00'), 1.11348, 1.11442, 1.1046, 1.10678, 128195], [Timestamp('2024-10-01 21:00:00-0500', tz='UTC-05:00'), 1.10678, 1.10829, 1.10327, 1.10452, 103826], [Timestamp('2024-10-02 21:00:00-0500', tz='UTC-05:00'), 1.10452, 1.10496, 1.10084, 1.10298, 113597], [Timestamp('2024-10-03 21:00:00-0500', tz='UTC-05:00'), 1.10298, 1.10396, 1.09514, 1.0975, 145178], [Timestamp('2024-10-06 21:00:00-0500', tz='UTC-05:00'), 1.0975, 1.0987, 1.09542, 1.09748, 145985], [

### 🔹 Step 6: Format into a DataFrame DOHLCV

- Preprocessing happens before entering the pipeline (data is already DOHLCV).
- The pipeline focuses only on feature computation, scaling, and inference.
- No redundant checks inside the pipeline, leading to faster and more efficient predictions.

---

In [18]:
_ = validate_ohlcv_batch(dohlcv_list)

In [19]:

if len(dohlcv_list) < 100:
    raise ValueError(
        "Insufficient data: At least 100 records are required for prediction."
    )

# Define column names
columns = ["Date", "Open", "High", "Low", "Close", "Volume"]

# Convert list to DataFrame
df_dohlcv = pd.DataFrame(dohlcv_list, columns=columns)

In [20]:
# Now lets predict 
result = predict_pipeline.predict(df_dohlcv)
print(result.predictions.shape, result.confidence.shape)

[ 2025-02-15 04:29:23,617 ] INFO [src.pipeline.predict_pipeline:49] - Starting prediction.
[ 2025-02-15 04:29:23,617 ] INFO [src.pipeline.predict_pipeline:51] - Computing necessary features.
[ 2025-02-15 04:29:23,618 ] INFO [src.services.data_ingestion_service:67] - Dropped bid and ask price columns.
[ 2025-02-15 04:29:23,618 ] INFO [src.services.data_ingestion_service:80] - Renamed mid-price columns to OHLC format.
[ 2025-02-15 04:29:23,619 ] INFO [src.services.data_ingestion_service:93] - Set index to Date with unique timestamps.
[ 2025-02-15 04:29:23,620 ] INFO [src.services.data_ingestion_service:95] - Processing leavitt data.
[ 2025-02-15 04:29:23,629 ] INFO [src.services.data_ingestion_service:97] - Finished leavitt data.
[ 2025-02-15 04:29:23,630 ] INFO [src.services.data_ingestion_service:99] - Processing indicators.
[ 2025-02-15 04:29:23,632 ] INFO [src.services.data_ingestion_service:101] - Finished indicators.
[ 2025-02-15 04:29:23,632 ] INFO [src.services.data_ingestion_ser

### 🔹 Step 7: Append the resuls

- Get the prediction counts
- Convert the named tuple results to a dataframe
- Concat the two together

---

- The number of predictions will be less than the number of records passed in because of the windowing.

In [21]:
prediction_count = result.predictions.shape[0]
print(df_dohlcv.shape[0], " vs ", prediction_count)

100  vs  78


In [22]:
# Select the last N rows from df_lookback where N = number of predictions
df_dohlcv_subset = df_dohlcv.tail(prediction_count).reset_index(drop=True)
# Convert namedtuple to DataFrame
df_predictions = pd.DataFrame(result._asdict())  
# Merge datasets
df_dohlcv_subset = pd.concat([df_dohlcv_subset, df_predictions], axis=1)

In [23]:
df_dohlcv_subset.tail()

Unnamed: 0,Date,Open,High,Low,Close,Volume,predictions,confidence
73,2025-02-05 22:00:00-05:00,1.04028,1.0406,1.03526,1.0383,136702,0,0.580402
74,2025-02-06 22:00:00-05:00,1.0383,1.04134,1.03052,1.0327,183108,0,0.947176
75,2025-02-09 22:00:00-05:00,1.0327,1.03366,1.02838,1.0307,112919,1,0.603804
76,2025-02-10 22:00:00-05:00,1.0307,1.03816,1.02922,1.03612,119270,2,0.910548
77,2025-02-11 22:00:00-05:00,1.03612,1.03858,1.0354,1.03737,55878,1,0.73984


In [24]:
df_dohlcv_subset.to_csv("../data/dohlcv_with_results.csv", index=False)
print("✅ File saved successfully.")

✅ File saved successfully.
