In [None]:
import pandas as pd
import numpy as np

import tensorflow as tf
from tensorflow.keras import layers

np.set_printoptions(precision=3, suppress=True)
#tf.random.set_seed(1)

def get_feat_and_labels(filename, as_tensor = True):
    data = pd.read_csv(filename)

    data["Angle"] = np.abs(np.arctan2(data["OppY"], data["OppX"]))
    data["OppY"] = np.abs(data["OppY"])
    data = data[["Angle",  "DistanceToGoal", "DistanceToOpp",  "OppX", "OppY","Success"]]
    
    #Converting text yes no to int.
    if(data["Success"].dtype == object):
        data["Success"] = (data["Success"] == "YES")*1
    
    if(as_tensor):
        return tf.convert_to_tensor(data.drop(columns="Success"),dtype=tf.float32), \
                tf.convert_to_tensor(data["Success"],dtype=tf.float32),\
                data.copy() 
    else:
        return data.drop(columns="Success"), data["Success"], data.copy() 


In [None]:
"""Code used to create training data"""
    
from sklearn.model_selection import train_test_split

def printCount(data):
    if type(data) == pd.DataFrame:
        success_no = data[data['Success'] == 0]
        success_yes = data[data['Success'] == 1]
    else:
        success_no = data[data == 0]
        success_yes = data[data == 1]
    print('success no:', success_no.shape[0])
    print('success yes:', success_yes.shape[0])
    
def get_data(names):
    _, _, original_data = get_feat_and_labels(names[0], as_tensor=False)
    
    print(names[0])#,original_data)
    printCount(original_data)
    for n in names[1:]:
        _, _, d = get_feat_and_labels(n, as_tensor=False)        
        print(n)#,_)
        printCount(d)
        original_data = pd.concat([original_data, d],axis=0)
    
    printCount(original_data)
    return original_data

# data_names = [
#                "data\\\MotionTrainingData\CanShootStillLocal3x.csv",#still
                
#                 "data\\\MotionTrainingData\CanShootTolLocal2x.csv",#Motion
#                 "data\\MotionTrainingData\CanShootTolLocal3x.csv",#Motion
#                 "data\\MotionTrainingData\CanShootTolLocal4x.csv",#Motion
                
#                 "data\\\MixedData\Testfeb8num3\CanShoot2MotionTol2x.csv",#Motion
#                 "data\\\MixedData\Testfeb8num3\CanShoot2MotionTol3x.csv",#Motion
#                 "data\\\MixedData\Testfeb8num3\CanShoot2MotionTol4x.csv",#Motion
                
#                 "data\\MixedData\Testfeb8num3\CanShoot2plus75tolerance.csv",#still
#                 "data\\MixedData\Testfeb8num3\CanShoot2Tol3x.csv",#still
#                 "data\\MixedData\Testfeb8num3\CanShoot2Tol4x.csv",#still
#                 "data\\MixedData\Testfeb8num3\CanShoot2Tolerance2x.csv",#still
                
#                 "data\\MixedData\Originals\CanShoot2Behavior.csv",#motion
                
#                 "data\\MixedData\Originals\CanShoot2Bottom.csv",#still
#                 "data\\MixedData\Originals\CanShoot2Upper.csv",#still
#                 "data\\MixedData\Originals\CanShoot2Center.csv",#still
#                 "data\\MixedData\Originals\CanShoot2Mid.csv"#still

#              ]

simple_load = True

if(simple_load):  
    can_shoot_features, can_shoot_labels, _ = get_feat_and_labels("TrainFinal.csv" )
    cs_feat_valid, cs_labels_valid, _ = get_feat_and_labels("ValidFinal.csv" )
    
else: #This is how the stratified train/valid split was created.
    data_og = get_data(data_names)

    can_shoot_features, cs_feat_valid, can_shoot_labels, cs_labels_valid = train_test_split(data_og.drop(columns="Success"), data_og["Success"], test_size=0.33, random_state=3, stratify=data_og["Success"])

    printCount(can_shoot_labels)
    printCount(cs_labels_valid)

    can_shoot_features = tf.convert_to_tensor(can_shoot_features, dtype=tf.float32) 
    can_shoot_labels = tf.convert_to_tensor(can_shoot_labels, dtype=tf.float32)  

    cs_feat_valid = tf.convert_to_tensor(cs_feat_valid, dtype=tf.float32) 
    cs_labels_valid = tf.convert_to_tensor(cs_labels_valid, dtype=tf.float32) 

In [None]:
"""Write dataframe back to csv after processing data"""
def writeToCSV(features, labels, name):
    to_csv_var = pd.DataFrame(tf.concat([features, tf.expand_dims(labels, axis=1)], axis=1).numpy(),
                                       columns=["Angle",  "DistanceToGoal", "DistanceToOpp",  "OppX", "OppY","Success"])
    to_csv_var["Success"] = to_csv_var["Success"]==1
    to_csv_var.to_csv(name, index=False)
    
# writeToCSV(can_shoot_features, can_shoot_labels, "TrainFinal.csv" )
# writeToCSV(cs_feat_valid, cs_labels_valid, "ValidFinal.csv" )

# Train Model loop

In [None]:
tf.keras.backend.clear_session()

num_feats = can_shoot_features.shape[1]

model = tf.keras.models.Sequential([
  tf.keras.layers.BatchNormalization(input_dim=num_feats),
  tf.keras.layers.Dense(6, activation='relu'),
 
  tf.keras.layers.BatchNormalization(),
  tf.keras.layers.Dense(3, activation='swish'),

  tf.keras.layers.Dense(1, activation='sigmoid') 
])

model.compile(
    optimizer=tf.keras.optimizers.Adam(0.05),
    loss=tf.keras.losses.BinaryCrossentropy(from_logits=False),
    metrics=[tf.keras.metrics.BinaryAccuracy(), tf.keras.metrics.Recall(), tf.keras.metrics.Precision()], 
)

model.summary()

from keras.callbacks import ReduceLROnPlateau
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5,
                              patience=75, min_lr=0.001)


model.fit(
    can_shoot_features,
    can_shoot_labels,
    batch_size=16,
    epochs = 64,   
    validation_data= (cs_feat_valid, cs_labels_valid ),
    callbacks=[reduce_lr] 
)


#add early stopping
early_stop= tf.keras.callbacks.EarlyStopping(
    monitor='val_loss', min_delta=0, patience=50, verbose=0,
    mode='min', baseline=None, restore_best_weights=True
)

model.fit(
    can_shoot_features,
    can_shoot_labels,
    batch_size=16,
    epochs = 512,   
    validation_data= (cs_feat_valid, cs_labels_valid ),
    callbacks=[reduce_lr, early_stop] 
)



# Summary of Trained Model

In [None]:
def getAcc_Rec_Prec(conf_mat):
    accuracy = (conf_mat[0,0]+conf_mat[1,1])/np.sum(conf_mat)

    recall = conf_mat[1,1]/sum(conf_mat[1,:])

    precision = conf_mat[1,1]/sum(conf_mat[:,1])
    
    return accuracy, recall, precision

def getPosNeg(prediction, actual):
    tp,fp,tn,fn = 0,0,0,0
    for i in range(len(prediction)):
        if(np.round(prediction[i]) == 1):
            if(np.round(prediction[i]) == actual[i]):
                tp+=1
            else:
                fp+=1
        else:
            if(np.round(prediction[i]) == actual[i]):
                tn+=1
            else:
                fn+=1
    return tp,fp,tn,fn
            

In [None]:
results = model.evaluate(can_shoot_features, can_shoot_labels)
print("Final train_ loss, train_acc:", results)

results = model.evaluate(cs_feat_valid, cs_labels_valid)
print("Final val_ loss, val_acc:", results)

out=model.predict(cs_feat_valid)
# out = tf.squeeze(out)

conf_mat = tf.math.confusion_matrix(cs_labels_valid, np.round(out))
print(conf_mat)

# Logistic Regression Results

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, confusion_matrix

lr_model = LogisticRegression(solver='liblinear') #,random_state=0)

#Build logistic regressor on Training Set
lr_model.fit(can_shoot_features, can_shoot_labels)

#Output results on Validation set
output_lr = lr_model.predict(cs_feat_valid)

lr_conf_mat = confusion_matrix(cs_labels_valid, output_lr)

lr_acc, lr_rec, lr_prec = getAcc_Rec_Prec(lr_conf_mat)

print("LogisticRegression_Accuracy: ",lr_acc)
print("LogisticRegression_Recall: ", lr_rec)
print("LogisticRegression_Precision: ", lr_prec)
print(lr_conf_mat)

# Save model if wanted. Specify folder directory

In [None]:
save = False
if(save):
    write_name='Feb13StratifiedModel'
    model.save(write_name, save_format='tf')
    import subprocess
    #Prints out information needed on C++ side
    subprocess.check_output("saved_model_cli show --dir "+write_name+" --tag_set serve --signature_def serving_default")


# Test Results

In [None]:
load_trained_model = True
if(load_trained_model):
    model = tf.keras.models.load_model('Feb13StratifiedModel/')
    
    model.summary()

In [None]:
test_names = ["data\\Testing\CanShoot2MotionTol2x.csv",
              "data\\Testing\CanShoot2MotionTol3x.csv",
              "data\\Testing\CanShoot2MotionTol4x.csv",
              "data\\Testing\CanShootStillTol2x.csv"
             ]
test_feat, test_label, _ = get_feat_and_labels(test_names[0], as_tensor=False)
data_test = test_feat
data_label = test_label
for n in test_names[1:]:
    f,l,_ = get_feat_and_labels(n, as_tensor=False)
    test_feat = pd.concat([test_feat, f], axis = 0)
    test_label = pd.concat([test_label, l], axis = 0)
    

test_feat = tf.convert_to_tensor(test_feat, dtype=tf.float32)
test_label = tf.convert_to_tensor(test_label, dtype=tf.float32)



results = model.evaluate(test_feat, test_label)

print("Final train_ loss, train_acc:", results)

test_out = model.predict(test_feat)

test_conf = tf.math.confusion_matrix(test_label, np.round(test_out))

print(test_conf)

# Logistic Regression models performance on the Test Data

In [None]:

print("\n_____________________________\n|Logistic Regression Results|")
output_lr = lr_model.predict(test_feat)

lr_conf_mat = confusion_matrix(test_label, output_lr)
print(lr_conf_mat)

lr_acc, lr_rec, lr_prec = getAcc_Rec_Prec(lr_conf_mat)

print("LogisticRegression_Accuracy: ",lr_acc)
print("LogisticRegression_Recall: ", lr_rec)
print("LogisticRegression_Precision: ", lr_prec)

In [None]:
np.set_printoptions(precision=100, suppress=True)

fun = lr_model.coef_[0]
print('Coefficients: ',fun)
print("intercept: ", lr_model.intercept_)
