# Training ML Model on SageMaker with Minute Bar Derived from FinSpace

After generating minute bar or daily bar from FinSpace kdb+, you can train an ML model and develop a trading strategy. This example demonstrates how to load historical minute bar data and train an XGBoost model easily on SageMaker. The procedure is as follows:

##### 1. Load Data from S3 and Prepare Data

Load the data from S3, and prepare the data with the target variable as the first column. We will use OHLCV, VWAP, TWAP, SMA5, SMA20, OBC, VROC as the features, and the return of coming bar as the target.

##### 2. Divide Data into Training and Validation Sets

Divide the data into training and validation datasets.

##### 3. Train SageMaker Built-in XGBoost Model

Pass the training and validation data to train the SageMaker built-in XGBoost model.

##### 4. Deploy the Model

Deploy the trained model for inference or further evaluation.

## Preparation

In [12]:
!pip install ta



## Import and prepare training data

In [13]:
import sagemaker

# Set up SageMaker session
sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()

from sagemaker import get_execution_role
role = get_execution_role()
prefix = 'xgboost-hsbc'
role

'arn:aws:iam::383386985941:role/service-role/AmazonSageMaker-ExecutionRole-20240909T091637'

In [14]:
import boto3
import pandas as pd
from io import StringIO

# Set up S3 client
s3 = boto3.client('s3')

# Specify your bucket and file details
bucket_name = 'ai-doc-383386985941'
file_key = 'minutebars.csv'  

# Read the file from S3
obj = s3.get_object(Bucket=bucket_name, Key=file_key)
data = obj['Body'].read().decode('utf-8')

# Convert to DataFrame and only get HSBA.L
df = pd.read_csv(StringIO(data))
df = df[(df['volume'] != 0) & (df['close'] != 0) & (df['twap'] != 0)]


import numpy as np
import ta

# Set bucket as index
df.set_index('bucket', inplace=True)

# Feature engineering
df['returns'] = df['close'].pct_change()
df['SMA_5'] = ta.trend.sma_indicator(df['close'], window=5)
df['SMA_20'] = ta.trend.sma_indicator(df['close'], window=20)
df['OBV'] = ta.volume.on_balance_volume(df['close'], df['volume'])
df['VROC'] = ta.volume.volume_price_trend(df['close'], df['volume'])
df

Unnamed: 0_level_0,ticker,trades,open,high,low,close,volume,vwap,twap,returns,SMA_5,SMA_20,OBV,VROC
bucket,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1
2024-07-01 07:00:00,AAL.L,46,2401.0,2431.5,2401.0,2423.0,72859,2402.990063,2425.252569,,,,72859,
2024-07-01 07:01:00,AAL.L,128,2425.0,2429.5,2420.0,2425.5,12212,2425.881387,2425.991103,0.001032,,,85071,1.260008e+01
2024-07-01 07:02:00,AAL.L,19,2427.5,2427.5,2416.5,2416.5,1546,2418.197930,2423.425674,-0.003711,,,83525,6.863533e+00
2024-07-01 07:03:00,AAL.L,83,2418.0,2418.5,2410.5,2416.0,8590,2414.251397,2415.928137,-0.000207,,,74935,5.086169e+00
2024-07-01 07:04:00,AAL.L,90,2419.0,2419.0,2412.0,2412.5,10772,2415.266153,2415.349853,-0.001449,2418.7,,64163,-1.051896e+01
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2024-07-31 15:27:00,ULVR.L,23,4770.0,4772.0,4770.0,4772.0,6003,4770.818591,4770.872757,0.000419,4769.8,4763.95,250233770,5.489619e+06
2024-07-31 15:28:00,ULVR.L,44,4772.0,4772.0,4771.0,4771.0,12895,4771.427065,4771.326630,-0.000210,4770.2,4764.65,250220875,5.489616e+06
2024-07-31 15:29:00,ULVR.L,47,4771.0,4772.0,4770.0,4771.0,12730,4770.804399,4770.806618,0.000000,4770.6,4765.40,250233605,5.489616e+06
2024-07-31 15:35:00,ULVR.L,4,4776.0,4776.0,4776.0,4776.0,2109582,4776.000000,4776.000000,0.001048,4772.0,4766.35,252343187,5.491827e+06


In [15]:
# Drop NaN values
df.dropna(inplace=True)

# Create target variable (predicting next bar's return)
df['expected_return'] = df['returns'].shift(-1)
return_threshold = 0.0001
df['target'] = np.where(df['expected_return'] > return_threshold, 1, 0)

# Prepare features and target, 'target' to be the first position
desired_cols = ['target', 'open', 'high', 'low', 'close', 'volume', 'vwap', 'twap', 'SMA_5', 'SMA_20', 'OBV', 'VROC']
df = df[desired_cols]
df

Unnamed: 0_level_0,target,open,high,low,close,volume,vwap,twap,SMA_5,SMA_20,OBV,VROC
bucket,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
2024-07-01 07:19:00,1,2442.5,2442.5,2433.0,2436.5,8103,2437.016537,2439.501375,2440.9,2426.450,73171,-1.364295e+01
2024-07-01 07:20:00,0,2437.5,2439.0,2437.5,2438.0,1032,2438.259205,2437.511347,2439.6,2427.200,74203,-1.300761e+01
2024-07-01 07:21:00,0,2438.0,2438.0,2431.5,2436.5,5575,2435.326099,2434.813581,2438.6,2427.750,68628,-1.643768e+01
2024-07-01 07:22:00,0,2437.0,2437.0,2433.5,2435.0,3198,2434.611945,2435.147308,2437.6,2428.675,65430,-1.840648e+01
2024-07-01 07:23:00,0,2435.0,2435.0,2421.5,2422.0,21356,2427.273296,2428.005198,2433.6,2428.975,44074,-1.324221e+02
...,...,...,...,...,...,...,...,...,...,...,...,...
2024-07-31 15:26:00,1,4770.0,4770.0,4770.0,4770.0,4291,4770.000000,4770.000000,4769.0,4763.250,250227767,5.489616e+06
2024-07-31 15:27:00,0,4770.0,4772.0,4770.0,4772.0,6003,4770.818591,4770.872757,4769.8,4763.950,250233770,5.489619e+06
2024-07-31 15:28:00,0,4772.0,4772.0,4771.0,4771.0,12895,4771.427065,4771.326630,4770.2,4764.650,250220875,5.489616e+06
2024-07-31 15:29:00,1,4771.0,4772.0,4770.0,4771.0,12730,4770.804399,4770.806618,4770.6,4765.400,250233605,5.489616e+06


In [16]:
# Function to save DataFrame to temp file and upload to S3
def save_and_upload(data, bucket, s3key, filename):
    data.to_csv(filename, header=False, index=False)
    boto3.Session().resource("s3").Bucket(bucket).Object(s3key).upload_file(filename)

In [17]:
# Split your data into train and validation sets
split_point = int(len(df) * 0.8)
train_data = df.iloc[:split_point]
validation_data = df.iloc[split_point:]

# Save and upload train data
train_data_key = f"{prefix}/train/train.csv"
save_and_upload(train_data, bucket,train_data_key , 'train.csv')
train_data_path = f"s3://{bucket}/{train_data_key}"
print(f"training data: {train_data_path}")

# Save and upload validation data
validation_data_key = f"{prefix}/validation/validation.csv"
save_and_upload(validation_data, bucket, validation_data_key, 'validation.csv')
validation_data_path = f"s3://{bucket}/{validation_data_key}"
print(f"validation data: {validation_data_path}")

training data: s3://sagemaker-us-east-1-383386985941/xgboost-hsbc/train/train.csv
validation data: s3://sagemaker-us-east-1-383386985941/xgboost-hsbc/validation/validation.csv


## Parallel Model Training with 20 Instances

In [20]:
from sagemaker.inputs import TrainingInput

container = sagemaker.image_uris.retrieve("xgboost", sagemaker_session.boto_region_name, "1.7-1")
display(container)

# Set up the XGBoost estimator
xgb_estimator = sagemaker.estimator.Estimator(
    container,
    role,
    instance_count=20,
    instance_type="ml.m5.xlarge",
    output_path=f"s3://{bucket}/{prefix}/output",
    sagemaker_session=sagemaker_session
)

xgb_estimator.set_hyperparameters(
    max_depth=6,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    verbosity=0,
    objective="binary:logistic",
    num_round=1000,
)
                            
# Define the input data configuration
train_input = TrainingInput(s3_data=train_data_path, content_type="text/csv")
validation_input = TrainingInput(s3_data=validation_data_path, content_type="text/csv")

# Train the model
xgb_estimator.fit({"train": train_input, "validation": validation_input})

INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


'683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:1.7-1'

INFO:sagemaker:Creating training-job with name: sagemaker-xgboost-2024-09-12-01-45-15-380


2024-09-12 01:45:18 Starting - Starting the training job...
2024-09-12 01:45:33 Starting - Preparing the instances for training......
2024-09-12 01:46:48 Downloading - Downloading input data......
2024-09-12 01:47:23 Downloading - Downloading the training image...
2024-09-12 01:48:05 Training - Training image download completed. Training in progress.[33m[2024-09-12 01:48:10.014 ip-10-0-104-228.ec2.internal:7 INFO utils.py:28] RULE_JOB_STOP_SIGNAL_FILENAME: None[0m
[33m[2024-09-12 01:48:10.038 ip-10-0-104-228.ec2.internal:7 INFO profiler_config_parser.py:111] User has disabled profiler.[0m
[33m[2024-09-12:01:48:10:INFO] Imported framework sagemaker_xgboost_container.training[0m
[33m[2024-09-12:01:48:10:INFO] Failed to parse hyperparameter objective value binary:logistic to Json.[0m
[33mReturning the value itself[0m
[33m[2024-09-12:01:48:10:INFO] No GPUs detected (normal if no gpus installed)[0m
[33m[2024-09-12:01:48:10:INFO] Running XGBoost Sagemaker in algorithm mode[0m


## Deploy and invoke the model 

In [10]:
predictor = xgb_estimator.deploy(initial_instance_count=1, instance_type="ml.m5.large")

INFO:sagemaker:Creating model with name: sagemaker-xgboost-2024-09-12-01-26-43-605
INFO:sagemaker:Creating endpoint-config with name sagemaker-xgboost-2024-09-12-01-26-43-605
INFO:sagemaker:Creating endpoint with name sagemaker-xgboost-2024-09-12-01-26-43-605


-------!

In [25]:
def generate_signal(predictor, o, h, l, c, v, vwap, twap, sma5, sma20, obv, vroc):
    # Create a numpy array with the input data
    
    input_data = pd.DataFrame({
        'open': [o],
        'high': [h],
        'low': [l],
        'close': [c],
        'volume': [v],
        'vwap': [vwap],
        'twap': [twap],
        'SMA_5': [sma5],
        'SMA_20': [sma20],
        'OBV': [obv],
        'VROC': [vroc]
    })
    input_df = pd.DataFrame(input_data)
    serializer = sagemaker.serializers.CSVSerializer(content_type="text/csv")
    data = serializer.serialize(input_df)
    
    # Get the prediction from the deployed model
    prediction = predictor.predict(data, initial_args={'ContentType': 'text/csv'})
    predictions = prediction.decode("utf-8")
    predictions = np.fromstring(predictions[1:], sep=",")
    
    return predictions[0]


In [26]:
# test inference for sample data
up_singal = generate_signal(predictor, 704.3,704.6,704.3,704.3,32667,704.422338,704.421387,703.98,702.995,28071850,42606.534711)
print(f"predicted up percentage: {up_singal}")
    
up_singal = generate_signal(predictor, 703.7,704.0,703.7,704.0,20568,703.855100,703.879121,703.74,702.670,27987999,42584.723347)
print(f"predicted up percentage: {up_singal}")

predicted up percentage: 0.36411571502685547
predicted up percentage: 0.38617801666259766
