In [1]:
#import libraries

import pandas as pd
import numpy as np
import boto3

In [2]:
#import data

s3 = boto3.client('s3')

bucket = 'flood-prediction-master-dataset'
key = 'final-dataset-1-hr/final_data_1_hr.csv'

obj = s3.get_object(Bucket= bucket,Key= key)

dataset_1hr = pd.read_csv(obj['Body'])

In [3]:
#drop existing index column

dataset_1hr.drop(['Unnamed: 0'],axis=1,inplace=True)

#setting datetime datatype from string. Default is string when reading from csv

dataset_1hr['timerecorded'] = pd.to_datetime(dataset_1hr['timerecorded'])

#rearranging columns

dataset_1hr = dataset_1hr[['timerecorded','river','rain','temperature','wind_direction','wind_speed','source']]

#splitting GAN and sensor data

gan_file = dataset_1hr.loc[dataset_1hr['source']=='GAN']
sensor_file = dataset_1hr.loc[dataset_1hr['source']=='SENSOR']

In [4]:
#makes the GAN datetime go ahead by 1 month. June - July sensor data. June to august is summer. Hence GAN 1 month ahead.

gan_file['timerecorded']  = gan_file['timerecorded'] + pd.DateOffset(months=1) 

#merging both files and resetting index

dataset_1hr = sensor_file.append(gan_file)
dataset_1hr.reset_index(drop=True, inplace=True)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  app.launch_new_instance()


In [5]:
dataset_1hr

Unnamed: 0,timerecorded,river,rain,temperature,wind_direction,wind_speed,source
0,2020-06-12 00:00:00,5.702,0.0,11.67,6.0,5.282,SENSOR
1,2020-06-12 01:00:00,7.077,0.0,12.08,7.5,4.806,SENSOR
2,2020-06-12 02:00:00,7.881,0.0,12.35,8.3,3.607,SENSOR
3,2020-06-12 03:00:00,8.036,0.0,12.39,216.8,2.554,SENSOR
4,2020-06-12 04:00:00,7.920,0.0,12.30,10.5,4.524,SENSOR
...,...,...,...,...,...,...,...
1379,2020-08-11 10:00:00,4.028,0.0,16.96,264.3,5.898,GAN
1380,2020-08-11 11:00:00,5.234,0.0,17.22,263.3,5.929,GAN
1381,2020-08-11 12:00:00,6.189,0.0,17.50,266.8,5.007,GAN
1382,2020-08-11 13:00:00,6.743,0.2,17.44,268.0,5.341,GAN


In [6]:
# adding the datetime column value as a feature. River level being time dependent, datetime column value is saved as
# continuous columns.

dataset_1hr['dayofweek'] = dataset_1hr['timerecorded'].dt.dayofweek
dataset_1hr['hour'] = dataset_1hr['timerecorded'].dt.hour
dataset_1hr['minute'] = dataset_1hr['timerecorded'].dt.minute
dataset_1hr['month'] = dataset_1hr['timerecorded'].dt.month
dataset_1hr['year'] = dataset_1hr['timerecorded'].dt.year
dataset_1hr['dayofmonth'] = dataset_1hr['timerecorded'].dt.day
dataset_1hr['dayofyear'] = dataset_1hr['timerecorded'].dt.dayofyear

In [7]:
dataset_1hr.shape

(1384, 14)

In [8]:
train_dataset = dataset_1hr[1315:] 
test_dataset = dataset_1hr[:1315]  

In [9]:
train_dataset.tail()

Unnamed: 0,timerecorded,river,rain,temperature,wind_direction,wind_speed,source,dayofweek,hour,minute,month,year,dayofmonth,dayofyear
1379,2020-08-11 10:00:00,4.028,0.0,16.96,264.3,5.898,GAN,1,10,0,8,2020,11,224
1380,2020-08-11 11:00:00,5.234,0.0,17.22,263.3,5.929,GAN,1,11,0,8,2020,11,224
1381,2020-08-11 12:00:00,6.189,0.0,17.5,266.8,5.007,GAN,1,12,0,8,2020,11,224
1382,2020-08-11 13:00:00,6.743,0.2,17.44,268.0,5.341,GAN,1,13,0,8,2020,11,224
1383,2020-08-11 14:00:00,7.441,0.0,18.21,268.0,6.231,GAN,1,14,0,8,2020,11,224


In [10]:
test_dataset.head()

Unnamed: 0,timerecorded,river,rain,temperature,wind_direction,wind_speed,source,dayofweek,hour,minute,month,year,dayofmonth,dayofyear
0,2020-06-12 00:00:00,5.702,0.0,11.67,6.0,5.282,SENSOR,4,0,0,6,2020,12,164
1,2020-06-12 01:00:00,7.077,0.0,12.08,7.5,4.806,SENSOR,4,1,0,6,2020,12,164
2,2020-06-12 02:00:00,7.881,0.0,12.35,8.3,3.607,SENSOR,4,2,0,6,2020,12,164
3,2020-06-12 03:00:00,8.036,0.0,12.39,216.8,2.554,SENSOR,4,3,0,6,2020,12,164
4,2020-06-12 04:00:00,7.92,0.0,12.3,10.5,4.524,SENSOR,4,4,0,6,2020,12,164


In [11]:
# removing dependent columns from test dataset. timerecorded is not required for prediction but for further processes.

y_test = test_dataset[['timerecorded','river']]
test_dataset.drop(['timerecorded','river'],axis=1,inplace=True)

# converting training and testing datasets into csv files

train_dataset.to_csv("train_dataset.csv")
test_dataset.to_csv("test_dataset.csv")

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  errors=errors,


In [12]:
import datetime
import tarfile

import boto3 # AWS SDK for python. Provides low-level access to AWS services
from sagemaker import get_execution_role
import sagemaker

m_boto3 = boto3.client('sagemaker') 

sess = sagemaker.Session()

region = sess.boto_session.region_name

bucket = 'flood-prediction-master-dataset'  #  Bucket to store and retrieve data

print('Using bucket ' + bucket)

Using bucket flood-prediction-master-dataset


In [13]:
# send data to S3. SageMaker will take training data from s3
trainpath = sess.upload_data(
    path='train_dataset.csv', bucket=bucket,
    key_prefix='predictions-1-hr')

testpath = sess.upload_data(
    path='test_dataset.csv', bucket=bucket,
    key_prefix='predictions-1-hr')

In [14]:
%%writefile rftimeseries1hr.py

#doing by scripting

import argparse
import os

import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
import joblib

def model_fn(model_dir):
    clf = joblib.load(os.path.join(model_dir, "rfmodel.joblib"))
    return clf

if __name__ =='__main__':

    print('extracting arguments')
    parser = argparse.ArgumentParser()

    # hyperparameters sent by the client are passed as command-line arguments to the script.
        
    parser.add_argument('--n-estimators', type=int, default=1500)
    parser.add_argument('--max-leaf-nodes', type=int, default=15)

    # Data, model, and output directories
    parser.add_argument('--model_dir', type=str, default=os.environ.get('SM_MODEL_DIR'))
    parser.add_argument('--train', type=str, default=os.environ.get('SM_CHANNEL_TRAIN'))
    parser.add_argument('--test', type=str, default=os.environ.get('SM_CHANNEL_TEST'))
    parser.add_argument('--train-file', type=str, default='train_dataset.csv')
    parser.add_argument('--test-file', type=str, default='test_dataset.csv')
    
    args, _ = parser.parse_known_args()
    

    print('reading data')
    train_df = pd.read_csv(os.path.join(args.train, args.train_file))
    test_df = pd.read_csv(os.path.join(args.test, args.test_file))

    print('building training and testing datasets')
    
    #since only one file is accepted as a script parameter, the predictors and target are segregated here
    y_train = train_df['river']
    
    # remove unrequired columns
    train_df.drop(['river'],axis=1,inplace=True)
    train_df.drop(['Unnamed: 0'],axis=1,inplace=True)
    train_df.drop(['timerecorded'],axis=1,inplace=True)
    train_df.drop(['source'],axis=1,inplace=True)
    
    X_train = train_df
    
    test_df.drop(['Unnamed: 0'],axis=1,inplace=True)
    test_df.drop(['source'],axis=1,inplace=True)
    
    X_test = test_df
    
    
    # train
    print('training model')
    model = RandomForestRegressor(
        n_estimators=args.n_estimators,
        max_leaf_nodes =args.max_leaf_nodes,
        n_jobs=-1)
    
    model.fit(X_train,y_train)
     
    # persist model
    
    path = os.path.join(args.model_dir, "rfmodel.joblib")
    joblib.dump(model, path)
    print('model persisted at ' + path)
    
    # predicting value. It will not predict from the below code when deployed to AWS ML EC2.
    # but is required so that it can have a code when predict is called. It analyses the no of test parameters and its dtypes 
    # The print in this script are shown in CloudWatch.
    
    print('validating model')
    predictions = model.predict(X_test)

Overwriting rftimeseries1hr.py


In [15]:
# use of Estimator from the SageMaker Python SDK. stating the script and hyperparameters

from sagemaker.sklearn.estimator import SKLearn

sklearn_estimator = SKLearn(
    entry_point='rftimeseries1hr.py',
    role = get_execution_role(),
    train_instance_count=1,
    train_instance_type='ml.m4.xlarge',
    framework_version='0.23-1',
    base_job_name='randomforest-1-hr',
    hyperparameters = {
                        'n-estimators': 2000,
                        'max-leaf-nodes': 20
                       })

In [16]:
# launch training job, with asynchronous call

sklearn_estimator.fit({'train':trainpath, 'test': testpath}, wait=False)

's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.
's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.


In [17]:
# after training the model is created which is used for prediction. Here the model is generated. The path is displayed.

sklearn_estimator.latest_training_job.wait(logs='None')
artifact = m_boto3.describe_training_job(
    TrainingJobName=sklearn_estimator.latest_training_job.name)['ModelArtifacts']['S3ModelArtifacts']

print('Model artifact persisted at ' + artifact)


2020-08-16 13:43:34 Starting - Starting the training job
2020-08-16 13:43:35 Starting - Launching requested ML instances...................
2020-08-16 13:45:16 Starting - Preparing the instances for training..........
2020-08-16 13:46:15 Downloading - Downloading input data......
2020-08-16 13:46:50 Training - Downloading the training image.........
2020-08-16 13:47:40 Training - Training image download completed. Training in progress..
2020-08-16 13:47:51 Uploading - Uploading generated training model.
2020-08-16 13:47:58 Completed - Training job completed
Model artifact persisted at s3://sagemaker-us-east-1-884654660367/randomforest-1-hr-2020-08-16-13-43-33-455/output/model.tar.gz


In [18]:
# An EC2 model is deployed based on the script and model

predictor = sklearn_estimator.deploy(instance_type='ml.m4.xlarge',initial_instance_count=1)

Parameter image will be renamed to image_uri in SageMaker Python SDK v2.


-----------------!

In [19]:
# removing unrequired columns

test_dataset.drop(['source'],axis=1,inplace=True)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  errors=errors,


In [22]:
# "outcome" contains ML predictions. rf_final has the datetime value for each prediction taken from y_test.
# "rf_final" is then provided prediction values saved as a column. Also source column states the algorithm name.
# By just counting the number of predictions above flood level, a better algorithm can be decided, 
# but the datetime column will help analyze the delay between two algorithms.  

outcome = pd.DataFrame(predictor.predict(test_dataset))
outcome.rename(columns={0:"river"},inplace=True)


rf_final = y_test['timerecorded'].to_frame()
rf_final.reset_index(drop=True,inplace=True)
rf_final['river'] = outcome['river'].astype(float)
rf_final['source'] = 'RF'

# saving as a csv file locally
rf_final.to_csv("random_forest_predictions_1_hr.csv")

# saving file to s3
sess.upload_data(
    path='random_forest_predictions_1_hr.csv', bucket=bucket,
    key_prefix='predictions-1-hr')

print("Success!")

Success!
