# Vertex AI Trading Pipeline for BTC-USDT

This notebook outlines the steps to refactor the existing BTC-USDT trading pipeline to leverage Vertex AI services. 
We will define Kubeflow Pipeline (KFP) components for each stage of the ML workflow and orchestrate them using Vertex AI Pipelines.

## 1. Setup and Configuration

In [3]:
import os
import sys
from google.cloud import aiplatform
from google_cloud_pipeline_components.v1.dataset import TabularDatasetCreateOp
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp, ModelDeployOp
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import component, pipeline, Input, Output, Dataset, Model, Metrics, Artifact

# --- GCP Configuration ---
# Please fill in these values
PROJECT_ID = "jc-financial-459605"  # Replace with your Project ID
REGION = "us-east4"      # Replace with your GCP Region (e.g., 'us-central1')
BUCKET_NAME = "jc-financial-459605-raw-data-bucket" # Replace with your GCS Bucket name (e.g., 'my-trading-pipeline-bucket')

# --- Pipeline Configuration ---
PIPELINE_ROOT = f"gs://{BUCKET_NAME}/pipeline_root/btc_usdt"
PIPELINE_NAME = "btc-usdt-trading-pipeline"

# --- Environment Setup ---
# Authenticate (if running locally and not on Vertex AI Workbench)
# from google.colab import auth
# auth.authenticate_user()

# Initialize Vertex AI SDK
aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_NAME)

# Add project root to Python path to import custom modules
PROJECT_ROOT = os.path.abspath(os.path.join(os.getcwd())) # Assumes notebook is in root
if PROJECT_ROOT not in sys.path:
    sys.path.append(PROJECT_ROOT)

print(f"Project ID: {PROJECT_ID}")
print(f"Region: {REGION}")
print(f"GCS Bucket: {BUCKET_NAME}")
print(f"Pipeline Root: {PIPELINE_ROOT}")
print(f"Python Path includes: {PROJECT_ROOT}")

Project ID: jc-financial-459605
Region: us-east4
GCS Bucket: jc-financial-459605-raw-data-bucket
Pipeline Root: gs://jc-financial-459605-raw-data-bucket/pipeline_root/btc_usdt
Python Path includes: /workspaces/btc_usdt


### 1.1. (Optional) Initial Data Upload to GCS
If your raw data (`1m_btcusdt_raw.parquet`) is not already in GCS, you can upload it using the cell below. 
Ideally, your `fetch_data.py` script should be modified to write directly to GCS.

In [None]:
from google.cloud import storage

def upload_to_gcs(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    blob.upload_from_filename(source_file_name)

    print(
        f"File {source_file_name} uploaded to gs://{bucket_name}/{destination_blob_name}."
    )

# Example: Upload raw data if it's local
LOCAL_RAW_DATA_PATH = 'data/1m_btcusdt_raw.parquet' # Adjust if your local path is different
GCS_RAW_DATA_PATH = 'data/raw/1m_btcusdt_raw.parquet' # Desired path in GCS

if os.path.exists(LOCAL_RAW_DATA_PATH):
    # upload_to_gcs(BUCKET_NAME, LOCAL_RAW_DATA_PATH, GCS_RAW_DATA_PATH)
    print(f"To upload, uncomment and run: upload_to_gcs({BUCKET_NAME}, {LOCAL_RAW_DATA_PATH}, {GCS_RAW_DATA_PATH})")
else:
    print(f"Local raw data file not found at {LOCAL_RAW_DATA_PATH}. Ensure it's available or already in GCS at gs://{BUCKET_NAME}/{GCS_RAW_DATA_PATH}")

## 2. Define KFP Components
We will now define KFP components for each step of the pipeline. These components will encapsulate the logic from your existing Python scripts.

### 2.1. Data Ingestion and Preparation Component
This component will be responsible for fetching the latest data (if applicable) and preparing it for feature engineering. It should output a GCS path to the prepared data.

In [None]:
@component(
    base_image="python:3.9", # Choose an appropriate base image
    packages_to_install=["google-cloud-storage", "pandas", "pyarrow"] # Add other necessary packages
)
def data_ingestion_component(
    project_id: str,
    bucket_name: str,
    gcs_raw_data_uri: str, # Input: gs://bucket/path/to/raw_data.parquet
    output_prepared_data_uri: Output[Dataset] # Output: gs://bucket/path/to/prepared_data.parquet
):
    import pandas as pd
    from google.cloud import storage
    import os

    # This is a placeholder. You'll need to adapt your fetch_data.py logic here.
    # For now, let's assume raw data is already in GCS and we just copy it or do minimal prep.
    
    # Example: Read from gcs_raw_data_uri, process, and write to output_prepared_data_uri.path
    df = pd.read_parquet(gcs_raw_data_uri)
    
    # Perform any initial preparation if needed
    # df['timestamp'] = pd.to_datetime(df['timestamp'])
    
    # Ensure output directory exists (KFP handles this for Output[Dataset])
    # os.makedirs(os.path.dirname(output_prepared_data_uri.path), exist_ok=True) # Not needed for GCS URI
    df.to_parquet(output_prepared_data_uri.path, index=False)
    output_prepared_data_uri.metadata["gcs_path"] = output_prepared_data_uri.path
    print(f"Prepared data saved to: {output_prepared_data_uri.path}")

### 2.2. Feature Engineering Component
This component will take the prepared data, compute features, and save the enriched dataset to GCS.

In [None]:
@component(
    base_image="python:3.9",
    packages_to_install=["google-cloud-storage", "pandas", "pyarrow", "numpy", "technicalindicators"] # Add your project's specific dependencies for compute_features.py
    # If your project has many local modules, consider building a custom container image.
)
def feature_engineering_component(
    project_id: str,
    bucket_name: str,
    prepared_data: Input[Dataset],
    output_enriched_data_uri: Output[Dataset] # Output: gs://bucket/path/to/enriched_data.parquet
):
    import pandas as pd
    import os
    # You will need to import or include your feature engineering logic here.
    # from btc_usdt_pipeline.features.compute_features import FeatureEngineer # Example
    
    print(f"Prepared data URI: {prepared_data.uri}")
    df = pd.read_parquet(prepared_data.uri)
    
    # Placeholder for actual feature engineering logic from compute_features.py
    # This is highly dependent on your existing 'compute_features.py' script.
    # You might need to adapt it to be callable as a function.
    # Example:
    # feature_engineer = FeatureEngineer(df.copy())
    # enriched_df = feature_engineer.compute_all_features()
    
    # For now, let's assume a simple transformation or pass-through
    enriched_df = df # Replace with actual feature computation
    enriched_df['example_feature'] = 1.0 
    
    enriched_df.to_parquet(output_enriched_data_uri.path, index=False)
    output_enriched_data_uri.metadata["gcs_path"] = output_enriched_data_uri.path
    print(f"Enriched data saved to: {output_enriched_data_uri.path}")

### 2.3. Model Training Component (Custom Training Job)
This component will launch a Vertex AI Custom Training Job. Your `train.py` script will need to be packaged (e.g., in a Docker container if it has complex dependencies or needs specific Python versions/libraries not easily installed via `packages_to_install`) or adapted to run in a standard Vertex AI training container.

In [None]:
# This component definition is more complex as it typically involves creating a training script 
# and potentially a Dockerfile if you're using a custom container.

# Step 1: Create your training script (e.g., 'training_script.py')
# This script should:
#   - Accept command-line arguments for hyperparameters, data paths (GCS URIs from previous steps).
#   - Load data from GCS.
#   - Train the model.
#   - Save the trained model to a GCS path (provided by Vertex AI, e.g., AIP_MODEL_DIR).
#   - Optionally, save metrics.

TRAINING_SCRIPT_CONTENT = '''
import argparse
import pandas as pd
import os
from sklearn.ensemble import RandomForestClassifier # Example model
import joblib

parser = argparse.ArgumentParser()
parser.add_argument('--enriched_data_path', type=str, required=True)
parser.add_argument('--n_estimators', type=int, default=100)
parser.add_argument('--target_column', type=str, default='target_move') # Example target
args = parser.parse_args()

print(f"Loading data from: {args.enriched_data_path}")
df = pd.read_parquet(args.enriched_data_path)

# Simple data prep for example
df = df.dropna(subset=[args.target_column]) # Ensure target is not NaN
features = [col for col in df.columns if col not in [args.target_column, 'open_time', 'close_time']] # Adjust features
X = df[features].fillna(0) # Simple NaN fill, improve this
y = df[args.target_column]

if X.empty or y.empty:
    print("No data to train after preprocessing.")
    # Create a dummy model file if required by Vertex AI
    # model_filename = 'dummy_model.joblib'
    # joblib.dump({}, os.path.join(os.environ.get('AIP_MODEL_DIR', '.'), model_filename))
    # print(f"Dummy model saved to {os.path.join(os.environ.get('AIP_MODEL_DIR', '.'), model_filename)}")
    exit(1)

print(f"Training RandomForestClassifier with n_estimators={args.n_estimators}")
model = RandomForestClassifier(n_estimators=args.n_estimators, random_state=42)
model.fit(X, y)

model_filename = 'model.joblib'
# AIP_MODEL_DIR is an environment variable set by Vertex AI Training for the GCS location to save the model
model_save_path = os.path.join(os.environ.get('AIP_MODEL_DIR', '.'), model_filename)
joblib.dump(model, model_save_path)
print(f"Model saved to {model_save_path}")
'''

with open("training_script.py", "w") as f:
    f.write(TRAINING_SCRIPT_CONTENT)

print("Created training_script.py. You'll need to upload this to GCS or include in a custom container.")

# Upload training script to GCS (if not using a custom container with the script baked in)
GCS_TRAINING_SCRIPT_PATH = f"gs://{BUCKET_NAME}/training_scripts/trading_pipeline/training_script.py"
try:
    upload_to_gcs(BUCKET_NAME, "training_script.py", GCS_TRAINING_SCRIPT_PATH.replace(f"gs://{BUCKET_NAME}/", ""))
except Exception as e:
    print(f"Error uploading training script: {e}. Ensure GCS bucket is configured and accessible.")

### 2.4. Model Evaluation/Backtesting Component
This component will take the trained model and the enriched data (or a specific test set) to perform backtesting and generate evaluation metrics.

In [None]:
@component(
    base_image="python:3.9",
    packages_to_install=["google-cloud-storage", "google-cloud-aiplatform", "pandas", "pyarrow", "scikit-learn", "joblib"] # Add packages for backtest.py
)
def model_evaluation_component(
    project_id: str,
    bucket_name: str,
    enriched_data: Input[Dataset], # Or a dedicated test dataset
    trained_model: Input[Model], # Model from the training step
    gcs_metrics_path: str, # GCS path to save metrics JSON
    evaluation_metrics: Output[Metrics],
    model_performance: Output[Artifact] # Could be a visualization or detailed report
):
    import pandas as pd
    import joblib
    import json
    import os
    from sklearn.metrics import classification_report # Example metrics
    from google.cloud import storage

    print(f"Model URI: {trained_model.uri}")
    # Model URI from custom training job is a GCS directory. Model file is inside.
    # We need to find the actual model file (e.g., model.joblib)
    model_directory = trained_model.uri.replace('gs://', '') # Remove gs:// prefix for GCS client
    storage_client = storage.Client()
    bucket_name_model = model_directory.split('/')[0]
    model_path_prefix = '/'.join(model_directory.split('/')[1:])
    
    model_blob_path = None
    blobs = storage_client.list_blobs(bucket_name_model, prefix=model_path_prefix)
    for blob in blobs:
        if blob.name.endswith('model.joblib'): # Or your specific model file name
            model_blob_path = f"gs://{bucket_name_model}/{blob.name}"
            break
    
    if not model_blob_path:
        raise FileNotFoundError(f"Model file (e.g., model.joblib) not found in {trained_model.uri}")

    print(f"Actual model file path: {model_blob_path}")
    # Download the model to a local temp file to load it
    local_model_file = 'downloaded_model.joblib'
    blob = storage.Blob.from_string(model_blob_path, client=storage_client)
    blob.download_to_filename(local_model_file)
    model = joblib.load(local_model_file)

    print(f"Enriched data URI: {enriched_data.uri}")
    df = pd.read_parquet(enriched_data.uri)

    # Placeholder for backtesting logic from backtest.py
    # This is highly dependent on your 'backtest.py' and 'model_metrics.py'
    # Example using scikit-learn metrics:
    target_column = 'target_move' # Example target
    df = df.dropna(subset=[target_column])
    features = [col for col in df.columns if col not in [target_column, 'open_time', 'close_time']]
    X_test = df[features].fillna(0)
    y_test = df[target_column]

    if X_test.empty or y_test.empty:
        print("No data for evaluation.")
        metrics = {'accuracy': 0.0, 'precision': 0.0, 'recall': 0.0, 'f1_score': 0.0}
    else:
        y_pred = model.predict(X_test)
        report = classification_report(y_test, y_pred, output_dict=True, zero_division=0)
        metrics = {
            'accuracy': report.get('accuracy', 0.0),
            'precision': report.get('weighted avg', {}).get('precision', 0.0),
            'recall': report.get('weighted avg', {}).get('recall', 0.0),
            'f1_score': report.get('weighted avg', {}).get('f1-score', 0.0)
        }

    print(f"Evaluation Metrics: {metrics}")
    evaluation_metrics.log_metric("accuracy", metrics['accuracy'])
    evaluation_metrics.log_metric("f1_score", metrics['f1_score'])

    # Save metrics to GCS
    # The gcs_metrics_path should be like 'gs://bucket/path/to/metrics.json'
    # KFP Output[Metrics] also stores them, but saving a separate JSON can be useful.
    # Ensure the path is a blob path, not a directory
    if not gcs_metrics_path.startswith("gs://"):
        actual_gcs_metrics_path = f"gs://{bucket_name}/{gcs_metrics_path.lstrip(/)}"
    else:
        actual_gcs_metrics_path = gcs_metrics_path
        
    bucket_name_metrics = actual_gcs_metrics_path.split('/')[2]
    blob_name_metrics = '/'.join(actual_gcs_metrics_path.split('/')[3:])
    
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name_metrics)
    blob = bucket.blob(blob_name_metrics)
    blob.upload_from_string(
        data=json.dumps(metrics, indent=4),
        content_type='application/json'
    )
    print(f"Metrics saved to {actual_gcs_metrics_path}")

    # Create a simple artifact for model performance (e.g., a markdown file)
    with open(model_performance.path, "w") as f:
        f.write(f"# Model Performance Report
")
        f.write(f"Accuracy: {metrics['accuracy']:.4f}
")
        f.write(f"F1-score: {metrics['f1_score']:.4f}
")
        f.write(f"Full metrics available at: {actual_gcs_metrics_path}")
    model_performance.metadata["description"] = "Basic model performance report."

## 3. Define the Vertex AI Pipeline

In [None]:
@dsl.pipeline(
    name=PIPELINE_NAME,
    description="An E2E pipeline for BTC-USDT trading: data ingestion, feature engineering, training, and evaluation.",
    pipeline_root=PIPELINE_ROOT,
)
def trading_pipeline(
    project_id: str = PROJECT_ID,
    bucket_name: str = BUCKET_NAME,
    gcs_raw_data_uri: str = f"gs://{BUCKET_NAME}/data/raw/1m_btcusdt_raw.parquet", # Default path
    gcs_prepared_data_base_path: str = f"gs://{BUCKET_NAME}/data/prepared/",
    gcs_enriched_data_base_path: str = f"gs://{BUCKET_NAME}/data/enriched/",
    gcs_metrics_base_path: str = f"gs://{BUCKET_NAME}/evaluation_metrics/",
    training_script_gcs_uri: str = GCS_TRAINING_SCRIPT_PATH,
    training_container_uri: str = "us-docker.pkg.dev/vertex-ai/training/sklearn-cpu.0-23:latest", # Pre-built container for scikit-learn
    model_display_name: str = "btc-usdt-trading-model",
    n_estimators_train: int = 100 # Hyperparameter for training
):
    # Data Ingestion
    data_ingestion_task = data_ingestion_component(
        project_id=project_id,
        bucket_name=bucket_name,
        gcs_raw_data_uri=gcs_raw_data_uri
        # output_prepared_data_uri will be named by KFP, e.g., {gcs_prepared_data_base_path}pipeline_name/run_id/task_name/output_artifact_name
    ).set_display_name("Data Ingestion")

    # Feature Engineering
    feature_engineering_task = feature_engineering_component(
        project_id=project_id,
        bucket_name=bucket_name,
        prepared_data=data_ingestion_task.outputs["output_prepared_data_uri"]
    ).set_display_name("Feature Engineering")

    # Model Training (using CustomTrainingJobOp for more control or a custom component that wraps it)
    # For simplicity, we'll use a pre-built container and pass the script.
    # The model output will be a GCS URI to the directory containing the model artifacts.
    # This requires the training_script.py to be accessible, e.g., in GCS.
    from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp

    # Define where the model artifacts will be stored by the training job
    # Vertex AI will create a unique subdirectory here for each training job run
    model_output_directory = f"gs://{bucket_name}/trained_models/{PIPELINE_NAME}/"

    custom_train_job_task = CustomTrainingJobOp(
        project=project_id,
        display_name="btc-usdt-custom-training",
        script_path=training_script_gcs_uri, # GCS path to your training script
        container_uri=training_container_uri, # Or your custom container
        requirements=[], # Add if your script needs packages not in the container
        model_serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-23:latest", # For deploying scikit-learn models
        # Pass arguments to your training script
        args=[
            f"--enriched_data_path={feature_engineering_task.outputs['output_enriched_data_uri'].uri}",
            f"--n_estimators={n_estimators_train}",
            # Add other args your training script expects
        ],
        # Define the GCS directory where the model should be saved by the training script (AIP_MODEL_DIR)
        base_output_directory=model_output_directory,
        # Ensure this task runs after feature engineering
        # KFP infers this from data passing, but explicit dependency can be set if needed
    ).set_display_name("Model Training")
    # custom_train_job_task.after(feature_engineering_task) # Usually not needed if outputs are passed

    # Model Evaluation
    # The model artifact from CustomTrainingJobOp is a GCS path to the directory.
    # The evaluation component needs to know how to load the model from this directory.
    evaluation_task = model_evaluation_component(
        project_id=project_id,
        bucket_name=bucket_name,
        enriched_data=feature_engineering_task.outputs["output_enriched_data_uri"],
        trained_model=custom_train_job_task.outputs["model"], # This output refers to the GCS directory
        gcs_metrics_path=f"{gcs_metrics_base_path}{PIPELINE_NAME}/{{dsl.PIPELINE_JOB_ID}}/evaluation_metrics.json"
    ).set_display_name("Model Evaluation")

    # (Optional) Model Upload and Deployment
    # This section can be added if you want to automatically register and deploy the model.
    # For now, we'll focus on training and evaluation.

    # Example: Upload model to Vertex AI Model Registry
    # model_upload_op = ModelUploadOp(
    #     project=project_id,
    #     display_name=model_display_name,
    #     artifact_uri=custom_train_job_task.outputs["model"].uri, # GCS path to model directory
    #     serving_container_image_uri=training_container_uri, # Or a specific prediction container
    #     # depends_on evaluation_task passing some threshold
    # )
    # model_upload_op.after(evaluation_task)

    # Example: Create Endpoint and Deploy Model
    # endpoint_create_op = EndpointCreateOp(
    #     project=project_id,
    #     display_name=f"{model_display_name}-endpoint"
    # )
    # endpoint_create_op.after(model_upload_op)
    # 
    # model_deploy_op = ModelDeployOp(
    #     project=project_id,
    #     endpoint=endpoint_create_op.outputs["endpoint"],
    #     model=model_upload_op.outputs["model"],
    #     deployed_model_display_name=model_display_name,
    #     dedicated_resources_machine_type="n1-standard-2", # Choose machine type
    #     dedicated_resources_min_replica_count=1,
    #     dedicated_resources_max_replica_count=1,
    # )

## 4. Compile and Run the Pipeline

In [None]:
PIPELINE_JSON_SPEC_PATH = "btc_usdt_trading_pipeline.json"

compiler.Compiler().compile(
    pipeline_func=trading_pipeline,
    package_path=PIPELINE_JSON_SPEC_PATH,
)

print(f"Pipeline compiled to {PIPELINE_JSON_SPEC_PATH}")

In [None]:
# Run the pipeline

try:
    pipeline_job = aiplatform.PipelineJob(
        display_name=PIPELINE_NAME,
        template_path=PIPELINE_JSON_SPEC_PATH,
        pipeline_root=PIPELINE_ROOT, # Redundant if already in pipeline decorator, but good for clarity
        # Pass pipeline parameters if they differ from defaults
        parameter_values={
            'project_id': PROJECT_ID,
            'bucket_name': BUCKET_NAME,
            # 'gcs_raw_data_uri': 'gs://my-other-bucket/data/raw/1m_btcusdt_raw.parquet', # Example override
            'n_estimators_train': 150 # Example override of a training hyperparameter
        },
        enable_caching=True # Set to False to disable caching for a run
    )

    pipeline_job.submit()
    print(f"Pipeline job submitted. View in Vertex AI Pipelines: {pipeline_job._dashboard_uri()}")
except Exception as e:
    print(f"Error submitting pipeline job: {e}")
    print("Please ensure your PROJECT_ID, REGION, and BUCKET_NAME are correctly set, ")
    print("and that the GCS bucket exists and the Vertex AI API is enabled.")

## 5. Next Steps and Refinements

1.  **Implement Component Logic:** Replace placeholder logic in `data_ingestion_component`, `feature_engineering_component`, and `model_evaluation_component` with actual code adapted from your existing Python scripts (`fetch_data.py`, `compute_features.py`, `backtest.py`, `model_metrics.py`).
2.  **Refine Training Script:** Adapt your `train.py` to be the `training_script.py` used by the `CustomTrainingJobOp`. Ensure it handles GCS paths for input data and model output (using `AIP_MODEL_DIR`).
3.  **Custom Containers (If Needed):** If your components or training script have complex dependencies not easily installed via `packages_to_install` or `requirements` in `CustomTrainingJobOp`, you'll need to build custom Docker container images and push them to Google Container Registry (GCR) or Artifact Registry. Then, update the `base_image` for components or `container_uri` for the training job.
4.  **Configuration Management:** Integrate your `ConfigManager` or a similar approach for handling configurations (e.g., feature lists, model parameters) within the components, potentially by passing config file paths (in GCS) or specific config values as parameters.
5.  **Error Handling and Logging:** Add robust error handling and logging within each component.
6.  **Model Registry and Deployment:** Implement the commented-out sections for `ModelUploadOp`, `EndpointCreateOp`, and `ModelDeployOp` if you want to automate model registration and deployment to a Vertex AI Endpoint.
7.  **Hyperparameter Tuning:** Vertex AI offers HyperparameterTuningJob which can be integrated into your pipeline before the main training step.
8.  **Monitoring:** Set up Vertex AI Model Monitoring for deployed models.
9.  **Testing:** Develop tests for your KFP components.

## Vertex AI Project Constants and GCS Path Setup
Define all essential constants for your Vertex AI project and GCS artifact locations.

In [None]:
# --- Vertex AI Project Constants ---
PROJECT_ID = "jc-financial-459605"
REGION = "us-east4"
GCS_BUCKET_URI = "gs://jc-financial-459605-raw-data-bucket"

# --- GCS Artifact Paths ---
CONFIG_GCS_PATH = f"{GCS_BUCKET_URI}/configs"
RAW_DATA_GCS_PATH = f"{GCS_BUCKET_URI}/data/raw"
PROCESSED_DATA_GCS_PATH = f"{GCS_BUCKET_URI}/data/processed"
MODEL_GCS_PATH = f"{GCS_BUCKET_URI}/models"
PIPELINE_ROOT_GCS_PATH = f"{GCS_BUCKET_URI}/pipeline_root"  # For Vertex AI Pipelines artifacts

print(f"PROJECT_ID: {PROJECT_ID}")
print(f"REGION: {REGION}")
print(f"GCS_BUCKET_URI: {GCS_BUCKET_URI}")
print(f"CONFIG_GCS_PATH: {CONFIG_GCS_PATH}")
print(f"RAW_DATA_GCS_PATH: {RAW_DATA_GCS_PATH}")
print(f"PROCESSED_DATA_GCS_PATH: {PROCESSED_DATA_GCS_PATH}")
print(f"MODEL_GCS_PATH: {MODEL_GCS_PATH}")
print(f"PIPELINE_ROOT_GCS_PATH: {PIPELINE_ROOT_GCS_PATH}")

## Vertex AI SDK Initialization
Initialize the Vertex AI SDK with your project, region, and GCS bucket.

In [None]:
from google.cloud import aiplatform

aiplatform.init(
    project=PROJECT_ID,
    location=REGION,
    staging_bucket=GCS_BUCKET_URI  # You may use PIPELINE_ROOT_GCS_PATH for more specific staging
)
print(f"Vertex AI SDK initialized. Project: {PROJECT_ID}, Region: {REGION}, Staging Bucket: {aiplatform.initializer.global_config.staging_bucket}")

## Google Cloud Storage Client Initialization
Set up the GCS client for future storage operations.

In [None]:
from google.cloud import storage

storage_client = storage.Client(project=PROJECT_ID)
print(f"Google Cloud Storage client initialized for project {PROJECT_ID}.")

## Configuration Management from GCS
Define a ConfigManager class to load YAML configuration files from GCS and access their values.

In [None]:
import yaml
from google.cloud import storage

class ConfigManager:
    def __init__(self, project_id: str):
        self.project_id = project_id
        self.storage_client = storage.Client(project=self.project_id)
        self.configs = {}

    def _download_config_from_gcs(self, gcs_uri: str) -> dict:
        """Downloads a YAML file from GCS and parses it."""
        try:
            bucket_name = gcs_uri.split('/')[2]
            blob_name = '/'.join(gcs_uri.split('/')[3:])

            bucket = self.storage_client.bucket(bucket_name)
            blob = bucket.blob(blob_name)

            config_string = blob.download_as_text()
            config_dict = yaml.safe_load(config_string)
            return config_dict
        except Exception as e:
            print(f"Error downloading or parsing config from {gcs_uri}: {e}")
            return {}

    def load_config(self, config_name: str, gcs_path: str) -> None:
        """Loads a specific configuration file from GCS."""
        print(f"Loading {config_name} from {gcs_path}...")
        self.configs[config_name] = self._download_config_from_gcs(gcs_path)
        if self.configs[config_name]:
            print(f"{config_name} loaded successfully.")
        else:
            print(f"Failed to load {config_name}.")

    def get(self, config_name: str, key: str = None, default=None):
        """Gets a specific key from a loaded configuration or the entire config."""
        config_data = self.configs.get(config_name)
        if config_data is None:
            print(f"Configuration '{config_name}' not loaded.")
            return default
        if key:
            # Simple key access, can be expanded for nested keys
            return config_data.get(key, default)
        return config_data

# Example of initializing the ConfigManager
config_manager = ConfigManager(project_id=PROJECT_ID)

### Example: Load and Inspect app_config.yaml from GCS

In [None]:
# Example usage: Load app_config.yaml from GCS and print sample values
app_config_path = f"{CONFIG_GCS_PATH}/app_config.yaml"
config_mgr = ConfigManager(app_config_path)

print("Sample config values from app_config.yaml:")
for k, v in list(config_mgr.config.items())[:5]:
    print(f"{k}: {v}")

## Integrate Binance Data Fetch KFP Component and Compile Pipeline
This section loads the custom KFP component for fetching Binance kline data, defines the pipeline, and compiles it to a JSON spec.

In [1]:
# 1. Load the component
import sys
component_dir = "/workspaces/btc_usdt/btc_usdt/btc_usdt_pipeline/components"
if component_dir not in sys.path:
    sys.path.append(component_dir)

from fetch_binance_data_component import fetch_binance_klines_component

In [4]:
# 2. Define the pipeline
import kfp
from kfp.v2 import compiler

# Ensure PIPELINE_ROOT is defined in a previous cell
@kfp.dsl.pipeline(
    name="btc-usdt-data-ingestion-pipeline",
    description="A pipeline to fetch historical BTC/USDT data from Binance.",
    pipeline_root=PIPELINE_ROOT,
)
def btc_usdt_data_ingestion_pipeline(
    symbol: str = "BTCUSDT",
    interval: str = "1m",
    start_date: str = "2023-01-01",
    end_date: str = "2023-01-02",
    api_url: str = "https://api.binance.com/api/v3/klines"
):
    fetch_data_task = fetch_binance_klines_component(
        symbol=symbol,
        interval=interval,
        start_date_str=start_date,
        end_date_str=end_date,
        api_url=api_url,
    )
    # The output of fetch_data_task (fetch_data_task.outputs['output_data'])
    # can be passed to subsequent components if needed.

In [5]:
# 3. Compile the pipeline
PIPELINE_JSON_PACKAGE_PATH = "/workspaces/btc_usdt/btc_usdt_data_ingestion_pipeline.json"

compiler.Compiler().compile(
    pipeline_func=btc_usdt_data_ingestion_pipeline,
    package_path=PIPELINE_JSON_PACKAGE_PATH,
)

print(f"Pipeline compiled to: {PIPELINE_JSON_PACKAGE_PATH}")

Pipeline compiled to: /workspaces/btc_usdt/btc_usdt_data_ingestion_pipeline.json


In [10]:
from google.cloud import aiplatform

# Ensure PROJECT_ID, REGION, and PIPELINE_ROOT are defined and available
# from previous cells in your notebook.

# Define a display name for your pipeline run
pipeline_display_name = "btc-usdt-data-ingestion-run-1" # You can change this for each run

# Define any parameters you want to pass to your pipeline.
# These will override the default values set in your pipeline definition.
# For example, if you want to run it for a different date range or symbol:
pipeline_parameters = {
    'symbol': 'BTCUSDT',
    'interval': '1m',
    'start_date': '2023-03-01',
    'end_date': '2023-03-05',
    'api_url': 'https://api.binance.com/api/v3/klines' # Or your preferred Binance API endpoint
}
# If you want to use the default parameters defined in the pipeline,
# you can pass an empty dictionary: pipeline_parameters = {}

job = aiplatform.PipelineJob(
    display_name=pipeline_display_name,
    template_path="/workspaces/btc_usdt/btc_usdt_data_ingestion_pipeline.json",
    pipeline_root=PIPELINE_ROOT,  # This should be the same PIPELINE_ROOT used during compilation
    parameter_values=pipeline_parameters,
    project=PROJECT_ID,
    location=REGION,
    # enable_caching=True # Optional: Set to True to enable caching for pipeline steps
)

print("Submitting pipeline job...")
job.submit()

print(f"Pipeline job '{job.display_name}' submitted. State: {job.state}")
print(f"View the pipeline run in the Vertex AI console: {job._dashboard_uri()}")
# You can also use job.wait() if you want the notebook to block until the pipeline completes.
# job.wait()
# print("Pipeline job finished.")


Submitting pipeline job...
Creating PipelineJob


PipelineJob created. Resource name: projects/798634104503/locations/us-east4/pipelineJobs/btc-usdt-data-ingestion-pipeline-20250516164507
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/798634104503/locations/us-east4/pipelineJobs/btc-usdt-data-ingestion-pipeline-20250516164507')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-east4/pipelines/runs/btc-usdt-data-ingestion-pipeline-20250516164507?project=798634104503
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/798634104503/locations/us-east4/pipelineJobs/btc-usdt-data-ingestion-pipeline-20250516164507')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-east4/pipelines/runs/btc-usdt-data-ingestion-pipeline-20250516164507?project=798634104503
Pipeline job 'btc-usdt-data-ingestion-run-1' submitted. State: 2
View the pipeline run in the Vertex AI console: https://console.cloud.google.com/vert

# (Optional) Submitting the pipeline job
# You can submit and run the pipeline using aiplatform.PipelineJob in a later step.
# For now, just compiling is sufficient.

In [9]:
from kfp import dsl
from kfp import compiler
from google.cloud import aiplatform

# 1. Define a very simple component
@dsl.component(base_image="python:3.9-slim")
def hello_world_component(text: str) -> str:
    print(text)
    return text

# 2. Define a minimal pipeline
@dsl.pipeline(
    name="minimal-hello-world-pipeline",
    description="A very simple pipeline to test basic functionality.",
    pipeline_root=PIPELINE_ROOT, # PIPELINE_ROOT should be defined from previous cells
)
def minimal_pipeline(
    greeting: str = "Hello, Vertex AI Pipelines!"
):
    hello_task = hello_world_component(text=greeting)

# 3. Compile the minimal pipeline
MINIMAL_PIPELINE_JSON_PACKAGE_PATH = "/workspaces/btc_usdt/minimal_hello_world_pipeline.json"

compiler.Compiler().compile(
    pipeline_func=minimal_pipeline,
    package_path=MINIMAL_PIPELINE_JSON_PACKAGE_PATH,
)

print(f"Minimal pipeline compiled to: {MINIMAL_PIPELINE_JSON_PACKAGE_PATH}")

# 4. Submit and run the minimal pipeline
minimal_pipeline_display_name = "minimal-hello-world-run-1"

minimal_job = aiplatform.PipelineJob(
    display_name=minimal_pipeline_display_name,
    template_path=MINIMAL_PIPELINE_JSON_PACKAGE_PATH,
    pipeline_root=PIPELINE_ROOT,
    parameter_values={
        'greeting': 'Testing minimal execution'
    },
    project=PROJECT_ID, # PROJECT_ID should be defined from previous cells
    location=REGION,    # REGION should be defined from previous cells
)

print("Submitting minimal pipeline job...")
minimal_job.submit()

print(f"Minimal pipeline job '{minimal_job.display_name}' submitted. State: {minimal_job.state}")
print(f"View the minimal pipeline run in the Vertex AI console: {minimal_job._dashboard_uri()}")



Minimal pipeline compiled to: /workspaces/btc_usdt/minimal_hello_world_pipeline.json
Submitting minimal pipeline job...
Creating PipelineJob
PipelineJob created. Resource name: projects/798634104503/locations/us-east4/pipelineJobs/minimal-hello-world-pipeline-20250516164135
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/798634104503/locations/us-east4/pipelineJobs/minimal-hello-world-pipeline-20250516164135')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-east4/pipelines/runs/minimal-hello-world-pipeline-20250516164135?project=798634104503
PipelineJob created. Resource name: projects/798634104503/locations/us-east4/pipelineJobs/minimal-hello-world-pipeline-20250516164135
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/798634104503/locations/us-east4/pipelineJobs/minimal-hello-world-pipeline-20250516164135')
View Pipeline Job:
https://console.cloud.google.com/vert