Permalink
Switch branches/tags
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
6706 lines (5209 sloc) 301 KB
import os, inspect, datetime, uuid, json, glob, random, logging, sys, copy, time
from sqlalchemy import Column, Integer, String, ForeignKey, Table, create_engine, MetaData, Date, DateTime, Float, Boolean, and_, or_, asc, desc
from sqlalchemy.orm import relationship, backref, scoped_session, sessionmaker, relation
from sqlalchemy.ext.declarative import declarative_base
from src.common.shellprinting import *
from src.logger.logger import Logger
class PyCore:
def __init__(self, core_manifest=str(os.getenv("ENV_CONFIGS_DIR", "/opt/work/configs") + "/jupyter.json")):
# the json config for this core:
self.m_core_file = core_manifest
self.m_db_json = {}
self.m_debug = False
# needs to be in the /opt/work/configs/jupyter.json => Core.Envs JSON Dict
self.m_env = str(os.getenv("ENV_DEPLOYMENT_TYPE", "Local"))
self.m_log = None # Eventually this will have syslog with: Logger(self.m_name, "/dev/log", logging.DEBUG)
self.m_core_json = {}
self.m_dbs = {} # dictionary where the keys are logical names for the underlying db session (applications)
self.m_db_apps_json = {}
self.m_db_app_system = None
self.m_rds = {} # dictionary where the keys are logical names for the underlying redis connections (applications)
self.m_rd_apps_json = {}
self.m_rd_app_system = None
self.m_slack_node_name = "jupyter"
self.m_slack_node = {
"BotName" : str(os.getenv("ENV_SLACK_BOTNAME", "bugbot")),
"ChannelName" : str(os.getenv("ENV_SLACK_CHANNEL", "debugging")),
"NotifyUser" : str(os.getenv("ENV_SLACK_NOTIFY_USER", "channel")),
"Token" : str(os.getenv("ENV_SLACK_TOKEN", "xoxb-51351043345-Lzwmto5IMVb8UK36MghZYMEi")),
"EnvName" : str(os.getenv("ENV_SLACK_ENVNAME", "dev-jupyter"))
}
self.m_slack_bot = str(self.m_slack_node["BotName"])
self.m_slack_enabled = os.getenv("ENV_SLACK_ENABLED", "1") == "1"
# load the core
self.load_json()
# Assign the name out of the json config
self.m_name = str(self.m_core_json["Core"]["Name"])
self.m_locale_dict = {}
self.m_locale_abb_dict = {}
self.load_db_apps()
self.load_redis_apps()
self.m_last_start_time = None
self.m_last_end_time = None
self.m_last_check_time = None
self.m_colors = {
"red" : "#E74C3C",
"feldspar" : "#D19275",
"orange" : "#FF7D40",
"pink" : "#FFCCCC",
"green" : "#2ECC71",
"blue" : "#3498db",
"black" : "#111111",
"copper" : "#EDC393",
"brown" : "#6B4226",
"lightgreen" : "#C0FF3E",
"darkgreen" : "#385E0F",
"maroon" : "#800000",
"gray" : "#8B8989",
"gold" : "#FFCC11",
"yellow" : "#FFE600",
"volumetop" : "#385E0F",
"volume" : "#ADFF2F",
"high" : "#CC1100",
"low" : "#164E71",
"open" : "#608DC0",
"close" : "#99CC32",
"white" : "#FFFFFF"
}
self.m_last_color_idx = -1
self.m_color_rotation = []
self.m_is_notebook = False
# end of __init__
def lg(self, msg, level=6):
# log it to syslog
if self.m_log:
# concat them
full_msg = self.m_name + ": " + msg
self.m_log.log(full_msg, level)
else:
if not self.m_is_notebook:
lg(str(self.m_name) + ": " + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + " " + str(msg), level)
if self.m_debug:
lg(str(self.m_name) + ": " + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + " " + str(msg), level)
return None
# end of lg
#####################################################################################################
#
# Initialize Resources
#
#####################################################################################################
def init_resources(self):
self.load_json()
self.load_db_apps()
self.load_redis_apps()
# end of init_resources
def load_json(self):
if len(self.m_core_json) == 0:
self.m_core_json = json.loads(open(self.m_core_file).read())
self.m_db_apps_json = json.loads(open(self.m_core_json["Core"]["Envs"][self.m_env]["Database"]).read())
self.m_rd_apps_json = json.loads(open(self.m_core_json["Core"]["Envs"][self.m_env]["Redis"]).read())
# end of load_json
def load_db_apps(self):
self.load_json()
if self.m_debug:
self.lg("Looking up DB Connectivity File", 6)
# end of if debug
# Load them if there are apps
if len(self.m_db_apps_json["Database Applications"]) > 0:
# Add the source base path
self.load_src_path("ENV_PYTHON_SRC_DIR", "/opt/work/src")
from connectors.database.database_application_system import DatabaseApplicationSystem
self.m_db_app_system = DatabaseApplicationSystem(None, False)
self.m_dbs = self.m_db_app_system.return_connectors(self.m_db_apps_json["Database Applications"])
self.m_db_app_system.connect_all()
# end of if there are Database apps to load
# end of load_db_apps
def load_redis_apps(self):
self.load_json()
if self.m_debug:
self.lg("Looking up RA Connectivity File", 6)
# end of if debug
# Load them if there are apps
if len(self.m_rd_apps_json["Redis Applications"]) > 0:
# Add the source base path
self.load_src_path("ENV_PYTHON_SRC_DIR", "/opt/work/src")
from connectors.redis.redis_application_system import RedisApplicationSystem
self.m_ra_app_system = RedisApplicationSystem(self.m_log, False)
self.m_rds = self.m_ra_app_system.return_connectors(self.m_rd_apps_json["Redis Applications"])
self.m_ra_app_system.connect_all()
# end of if there are Redis apps to load
# end of load_redis_apps
def load_thirdparty_dir(self, dir_name, debug=False):
load_dir = "/opt/work/src/thirdparty/" + str(dir_name)
if debug:
print load_dir
self.load_src_path("ENV_THIRD_PARTY_SOURCE_DIR", load_dir)
# end of load_thirdparty_dir
def load_src_path(self, env_key, default_path):
import sys
add_path = os.getenv(env_key, default_path)
found_path = False
for path in sys.path:
if path == add_path:
found_path = True
break
# end of iterating through path
if not found_path:
sys.path.insert(0, add_path)
# end of need to add path
# end of load_src_path
def enable_debug(self):
self.m_debug = True
# end of enable_debug
def disable_debug(self):
self.m_debug = False
# end of disable_debug
def is_notebook(self):
self.m_is_notebook = True
# end of is_notebook
#####################################################################################################
#
# AWS Methods
#
#####################################################################################################
def running_on_aws(self, debug=False):
return os.path.exists("/opt/aws/bin/ec2-metadata")
# end of running_on_aws
def aws_get_keys(self, debug=False):
record = {
"Key" : os.getenv("ENV_AWS_KEY"),
"Secret" : os.getenv("ENV_AWS_SECRET")
}
return record
# end of aws_get_keys
def aws_get_instance_from_metadata(self, debug=False):
results = self.build_def_hash("Display Error", "Failed to Get EC2 Instance from MetaData", {
"Name" : "",
"InstanceID" : "",
"InstanceType" : "",
"ImageID" : "",
"Running" : "",
"ImageName" : "",
"ExternalIP" : "",
"InternalIP" : ""
})
try:
import os, boto
if os.path.exists("/tmp/publicip"):
os.system("rm -f /tmp/publicip >> /dev/null")
cur_ip = ""
if self.running_on_aws():
os.system("/opt/aws/bin/ec2-metadata | grep public-ipv4 | awk '{print $NF}' > /tmp/publicip")
cur_ip = open("/tmp/publicip").read().strip().lstrip()
else:
cur_ip = "54.188.188.188"
if cur_ip == "":
self.lg("ERROR: Invalid IP(" + str(cur_ip) + ")", 0)
else:
self.lg("Looking for IP(" + str(cur_ip) + ")", 6)
aws_creds = self.aws_get_keys()
ec2 = boto.connect_ec2(aws_creds["Key"], aws_creds["Secret"])
filters = {"ip-address": cur_ip}
inst_list = ec2.get_only_instances(filters=filters)
if len(inst_list) > 0:
instance = inst_list[0]
if str(instance.ip_address) == str(cur_ip):
#self.lg("Res(" + str(reservation.id) + ") Instance(" + str(iidx) + ") ID(" + str(instance) + ")", 6)
tag_name = ""
if "Name" in instance.tags:
tag_name = str(instance.tags["Name"])
ami_results = ec2.get_all_images(image_ids=[str(instance.image_id)])
ami_name = ""
if len(ami_results) > 0:
if str(ami_results[0].name) != "":
ami_name= str(ami_results[0].name)
results["Record"] = {
"Name" : str(tag_name),
"InstanceID" : str(instance.id),
"InstanceType" : str(instance.instance_type),
"ImageID" : str(instance.image_id),
"Running" : str(instance.state),
"ImageName" : str(ami_name),
"ExternalIP" : str(instance.ip_address),
"InternalIP" : str(instance.private_ip_address)
}
if not self.running_on_aws():
results["Record"]["Name"] = "DSWorker-1"
# end if the ip matches
# end of get all instances
# end of if there's an ip or not
os.system("rm -f /tmp/publicip >> /dev/null")
results["Status"] = "SUCCESS"
results["Error"] = ""
except Exception,e:
err_msg = "Failed to Get Instance from MetaData with Ex(" + str(e) + ")"
self.lg("ERROR: " + str(err_msg), 0)
results = self.build_def_hash("Display Error", "Failed to Get Instance From MetaData", {
"Name" : "",
"InstanceID" : "",
"InstanceType" : "",
"ImageID" : "",
"Running" : "",
"ImageName" : "",
"ExternalIP" : "",
"InternalIP" : ""
})
results["Status"] = "FAILED"
results["Error"] = ""
# end of try/ex
return results
# end of aws_get_instance_from_metadata
def aws_get_instance_from_name(self, cur_name, debug=False):
results = self.build_def_hash("Display Error", "Failed to Get EC2 Instance by Name", {
"Name" : "",
"InstanceID" : "",
"InstanceType" : "",
"ImageID" : "",
"Running" : "",
"ImageName" : "",
"ExternalIP" : "",
"InternalIP" : ""
})
try:
import os, boto
if cur_name == "":
self.lg("ERROR: Invalid Name(" + str(cur_name) + ")", 0)
else:
self.lg("Looking for Name(" + str(cur_name) + ")", 6)
aws_creds = self.aws_get_keys()
ec2 = boto.connect_ec2(aws_creds["Key"], aws_creds["Secret"])
filters = {"tag:Name" : cur_name}
inst_list = ec2.get_only_instances(filters=filters)
if len(inst_list) > 0:
instance = inst_list[0]
if str(instance.ip_address) != "":
tag_name = ""
if "Name" in instance.tags:
tag_name = str(instance.tags["Name"])
if str(tag_name).lower().strip().lstrip() == str(cur_name).lower().strip().lstrip():
ami_results = ec2.get_all_images(image_ids=[str(instance.image_id)])
ami_name = ""
if len(ami_results) > 0:
if str(ami_results[0].name) != "":
ami_name= str(ami_results[0].name)
results["Record"] = {
"Name" : str(tag_name),
"InstanceID" : str(instance.id),
"InstanceType" : str(instance.instance_type),
"ImageID" : str(instance.image_id),
"Running" : str(instance.state),
"ImageName" : str(ami_name),
"ExternalIP" : str(instance.ip_address),
"InternalIP" : str(instance.private_ip_address)
}
# end if the ip matches
# end of get all instances
# end of if there's an ip or not
results["Status"] = "SUCCESS"
results["Error"] = ""
except Exception,e:
err_msg = "Failed to Get Instance from Name with Ex(" + str(e) + ")"
self.lg("ERROR: " + str(err_msg), 0)
results = self.build_def_hash("Display Error", "Failed to Get Instance From Name", {
"Name" : "",
"InstanceID" : "",
"InstanceType" : "",
"ImageID" : "",
"Running" : "",
"ImageName" : "",
"ExternalIP" : "",
"InternalIP" : ""
})
results["Status"] = "FAILED"
results["Error"] = ""
# end of try/ex
return results
# end of aws_get_instance_from_name
def aws_get_instances(self, debug=False):
results = self.build_def_hash("Display Error", "Failed to Get EC2 Instances", { "Instances" : [] })
try:
import os, boto
aws_creds = self.aws_get_keys()
ec2 = boto.connect_ec2(aws_creds["Key"], aws_creds["Secret"])
reservations = ec2.get_all_instances()
for ridx, reservation in enumerate(reservations):
for iidx, instance in enumerate(reservation.instances):
#self.lg("Res(" + str(reservation.id) + ") Instance(" + str(iidx) + ") ID(" + str(instance) + ")", 6)
tag_name = ""
if "Name" in instance.tags:
tag_name = str(instance.tags["Name"])
ami_results = ec2.get_all_images(image_ids=[str(instance.image_id)])
ami_name = ""
if len(ami_results) > 0:
if str(ami_results[0].name) != "":
ami_name= str(ami_results[0].name)
results["Record"]["Instances"].append({
"Name" : str(tag_name),
"InstanceID" : str(instance.id),
"InstanceType" : str(instance.instance_type),
"ImageID" : str(instance.image_id),
"Running" : str(instance.state),
"ImageName" : str(ami_name),
"ExternalIP" : str(instance.ip_address),
"InternalIP" : str(instance.private_ip_address)
})
# end of get all instances
# end of get all reservations
results["Status"] = "SUCCESS"
results["Error"] = ""
except Exception,e:
err_msg = "Failed to Get All Instances with Ex(" + str(e) + ")"
self.lg("ERROR: " + str(err_msg), 0)
results = self.build_def_hash("Display Error", "Failed to Get EC2 Instances", {})
results["Status"] = "FAILED"
results["Error"] = ""
# end of try/ex
return results
# end of aws_get_instances
#####################################################################################################
#
# General Util Methods
#
#####################################################################################################
def get_dbs(self):
return self.m_dbs
# end of get_dbs
def get_rds(self):
return self.m_rds
# end of get_rds
def db_convert_utc_date_to_str(self, db_cur_date):
return_str = ""
if db_cur_date != None:
convert_date = db_cur_date - datetime.timedelta(hours=4)
return_str = str(convert_date.strftime('%m/%d/%Y %H:%M:%S'))
return return_str
# end of db_convert_utc_date_to_str
def db_convert_date_to_str(self, db_cur_date, optional_format="%m/%d/%Y %H:%M:%S"):
return_str = ""
if db_cur_date != None:
return_str = str(db_cur_date.strftime(optional_format))
return return_str
# end of db_convert_date_to_str
def convert_utc_date_to_str(self, cur_date, optional_format="%m/%d/%Y %H:%M:%S"):
return_str = ""
if cur_date != None:
convert_date = cur_date - datetime.timedelta(hours=4)
return_str = str(convert_date.strftime('%m/%d/%Y %H:%M:%S'))
return return_str
# end of convert_utc_date_to_str
def convert_date_string_to_date(self, date_str, optional_format="%Y-%m-%dT%H:%M:%S.%fZ"):
date_to_return = None
try:
import datetime
date_to_return = datetime.datetime.strptime(str(date_str), optional_format)
except Exception,f:
self.lg("ERROR: Failed Converting Date(" + str(date_str) + ") with Format(" + str(optional_format) + ")", 0)
# end of tries to read this string as a valid date...
return date_to_return
# end of convert_date_string_to_date
def build_unique_key(self, length=-1):
return str(str(uuid.uuid4()).replace("-", "")[0:length])
# end of build_unique_key
def ppj(self, json_data):
return str(json.dumps(json_data, sort_keys=True, indent=4, separators=(',', ': ')))
# end of ppj
def convert_utc_date_string_to_est_datetime(self, utc_date_str, optional_format="%Y-%m-%dT%H:%M:%S.%fZ"):
utc_time = datetime.datetime.strptime(utc_date_str, optional_format)
utc_adjusted = utc_time.replace(tzinfo=self.m_utc_tz_zone)
offset_est_local_time = utc_adjusted.astimezone(self.m_est_tz_zone)
est_local_time = datetime.datetime(
year = offset_est_local_time.year,
month = offset_est_local_time.month,
day = offset_est_local_time.day,
hour = offset_est_local_time.hour,
minute = offset_est_local_time.minute,
second = offset_est_local_time.second)
return est_local_time
# end of convert_utc_date_string_to_est_datetime
def create_random_float(self, original, min_value, max_value):
new_random_float= float(original) + float(random.uniform(min_value, max_value))
return new_random_float
# end of create_random_float
def create_random_int(self, original, min_value, max_value):
new_random_int = random.uniform(min_value, max_value)
return original + int(int(original) * float(new_random_int))
# end of create_random_int
def get_random_number_in_range(self, min_int, max_int):
import random
return random.randint(min_int, max_int)
# end of get_random_number_in_range
def get_percent_done(self, progress, total):
return "%0.2f" % float(float(progress)/float(total)*100.00)
# end of get_percent_done
def to_float_str(self, cur_float):
return str("%0.2f" % float(cur_float))
# end of to_float_str
def to_upper(self, cur_str):
return str(cur_str).upper().strip().lstrip()
# end of to_upper
def width_percent_done(self, idx, total):
percent_str = "Percent(" + str(self.get_percent_done(idx, total)) + ")"
if len(percent_str) < 14:
percent_str += " "
return percent_str
# end of width_percent_done
def timer_start(self):
self.m_last_start_time = datetime.datetime.now()
# end of timer_start
def timer_end(self):
self.m_last_end_time = datetime.datetime.now()
# end of timer_end
def how_long(self):
if self.m_last_start_time == None:
self.lg("Forgot to start the timer with: self.timer_start()", 0)
return "-1"
# In case I forget to stop the timer just set it to whenever the how_long was invoked
elif self.m_last_end_time == None:
self.timer_end()
return self.to_float_str(float((self.m_last_end_time - self.m_last_start_time).total_seconds()))
# end of how_long
def log_msg_to_unique_file(self, msg, path_to_file="/tmp/errors_to_look_at_"):
log_file = str(path_to_file) + str(uuid.uuid4()).replace("-", "") + ".log"
with open(log_file, "w") as output_file:
output_file.write(str(msg))
return log_file
# end of log_msg_to_unique_file
def write_json_to_file(self, output_file, record_json):
try:
temp_output = "/tmp/tmpfile_" + datetime.datetime.now().strftime("%y-%m-%d-%H-%M-%S") + "_"
temp_file = self.log_msg_to_unique_file(self.ppj(record_json), temp_output)
os.system("mv " + str(temp_file) + " " + str(output_file))
if os.path.exists(output_file):
return output_file
else:
return "FAILED_TO_CREATE_FILE"
except Exception,k:
print "ERROR: Failed to write file with ppj with Ex(" + str(k) + ")"
return "FILE_DOES_NOT_EXIST"
# end of try/ex
return output_file
# end of write_json_to_file
def build_def_hash(self, start_status="FAILED", start_error="", record={}):
return { "Status" : str(start_status), "Error" : str(start_error), "Record" : record }
# end of build_def_hash
def handle_display_error(self, err_msg, def_rec={}, debug=False):
results = {}
results["Status"] = "Display Error"
results["Error"] = str(err_msg)
results["Record"] = def_rec
self.lg("ERROR: " + str(err_msg), 0)
if debug:
lg("ERROR: " + str(err_msg), 0)
return results
# end of handle_display_error
def handle_general_error(self, status, err_msg, def_rec={}, debug=False):
results = {}
results["Status"] = str(status)
results["Error"] = str(err_msg)
results["Record"] = def_rec
if debug:
self.lg("ERROR: " + str(err_msg), 0)
return results
# end of handle_general_error
def handle_exception(self, status, err_msg, ex, debug=False):
results = self.build_def_hash(status, err_msg, {
"Exception" : "Failed to Process"
})
try:
if self.m_slack_enabled:
self.handle_send_slack_internal_ex(ex, debug)
results = self.build_def_hash("SUCCESS", "", {
"Exception" : ""
})
else:
import traceback, sys
exc_type, exc_obj, exc_tb = sys.exc_info()
ex_error = self.get_exception_error_message(ex, exc_type, exc_obj, exc_tb, False, debug)
results = self.build_def_hash("SUCCESS", "", {
"Exception" : str(ex_error)
})
# end of branching if slack supported or not
except Exception, e:
err_msg = "Failed to post message to slack with Ex(" + str(e) + ")"
self.lg("ERROR: " + str(err_msg), 0)
results = self.build_def_hash("Display Error", str(err_msg), {
"Exception" : "Failed Processing with Exception"
})
results["Status"] = "FAILED"
# end of try/ex
return results
# end of handle_exception
def get_exception_error_message(self, ex, exc_type, exc_obj, exc_tb, slack_formatted=False, debug=False):
error_log_msg = "FAILED to process exception"
try:
path_to_file = str(exc_tb.tb_frame.f_code.co_filename)
last_line = int(exc_tb.tb_lineno)
gh_line_number = int(last_line) - 1
file_name = str(os.path.split(exc_tb.tb_frame.f_code.co_filename)[1])
path_to_file = str(exc_tb.tb_frame.f_code.co_filename)
ex_details_msg = str(ex)
if ex_details_msg != "":
error_log_msg = "Error on Line: \x1b[31m" + str(last_line) + "\x1b[0m Code: \n\t\x1b[31m" + str(ex_details_msg) + "\x1b[0m\n"
if slack_formatted:
self.lg("" + str(error_log_msg), 0)
error_log_msg = " Error on Line: *" + str(last_line) + "* Code: \n```" + str(ex_details_msg) + "``` \n"
else:
error_log_msg = "Error on Line Number: " + str(last_line)
except Exception,k:
error_log_msg = "Failed to process exception with(" + str(k) + ")"
self.lg("ERROR: " + str(error_log_msg), 0)
# end of trying to process the exception
return error_log_msg
# end of get_exception_error_message
#####################################################################################################
#
# Redis Application Methods
#
#####################################################################################################
def get_message_no_block(self, redis_app, input_key):
results = {}
results["Status"] = "FAILED"
results["Error"] = ""
results["Record"] = None
try:
results["Record"] = redis_app.get_message(False, input_key)
results["Status"] = "SUCCESS"
results["Error"] = ""
except ValueError, e:
err_msg = "RA - ValueError to Address(" + redis_app.get_address() + ") ValueEx(" + str(e) +")"
self.lg("ERROR: " + str(err_msg), 0)
results["Record"] = None
results["Status"] = "FAILED"
results["Error"] = str(err_msg)
except Exception, e:
test_for_redis_connectivity = str(e)
if "Connection refused." in test_for_redis_connectivity:
err_msg = "RA - Connection REFUSED to Address(" + redis_app.get_address() + ") Key(" + input_key + ") ConEx(" + str(e) +")"
self.lg("ERROR: " + str(err_msg), 0)
results["Record"] = None
results["Status"] = "FAILED"
results["Error"] = str(err_msg)
else:
err_msg = "RA - EX Address(" + redis_app.get_address() + ") Key(" + input_key + ") General Ex(" + str(e) +")"
self.lg("ERROR: " + str(err_msg), 0)
results["Record"] = None
results["Status"] = "FAILED"
results["Error"] = str(err_msg)
# end of try/ex
return results
# end of get_message_no_block
def get_message_with_block(self, redis_app, input_key):
results = {}
results["Status"] = "FAILED"
results["Error"] = ""
results["Record"] = None
try:
results["Record"] = redis_app.get_message(True, input_key)
results["Status"] = "SUCCESS"
results["Error"] = ""
except ValueError, e:
err_msg = "RA - ValueError to Address(" + redis_app.get_address() + ") ValueEx(" + str(e) +")"
self.lg("ERROR: " + str(err_msg), 0)
results["Record"] = None
results["Status"] = "FAILED"
results["Error"] = str(err_msg)
except Exception, e:
test_for_redis_connectivity = str(e)
if "Connection refused." in test_for_redis_connectivity:
err_msg = "RA - Connection REFUSED to Address(" + redis_app.get_address() + ") Key(" + input_key + ") ConEx(" + str(e) +")"
self.lg("ERROR: " + str(err_msg), 0)
results["Record"] = None
results["Status"] = "FAILED"
results["Error"] = str(err_msg)
else:
err_msg = "RA - EX Address(" + redis_app.get_address() + ") Key(" + input_key + ") General Ex(" + str(e) +")"
self.lg("ERROR: " + str(err_msg), 0)
results["Record"] = None
results["Status"] = "FAILED"
results["Error"] = str(err_msg)
# end of try/ex
return results
# end of get_message_with_block
def publish_message_to_key(self, message, dest_key, redis_app, debug=False):
results = {}
results["Status"] = "FAILED"
results["Error"] = ""
results["Record"] = {}
try:
if debug:
self.lg("Publish(" + str(dest_key) + ") RA(" + str(redis_app.get_address()) + ")", 6)
if redis_app.m_rw == None:
redis_app.connect()
redis_app.put_into_key(dest_key, message)
results["Record"] = {
"Address" : str(redis_app.get_address()),
"Key" : dest_key
}
results["Status"] = "SUCCESS"
results["Error"] = ""
if debug:
self.lg("Publish Done", 6)
except ValueError, e:
err_msg = "Connection to Address(" + str(redis_app.get_address()) + ") Results Key(" + str(dest_key) + ") Received a Non-Pickle Formatted Message Ex(" + str(e) +")"
self.lg("ERROR: " + str(err_msg), 0)
results["Status"] = "FAILED"
results["Error"] = str(err_msg)
results["Record"] = {}
except Exception, e:
test_for_redis_connectivity = str(e)
if "Connection refused." in test_for_redis_connectivity:
err_msg = "Connection REFUSED to Address(" + str(redis_app.get_address()) + ") Results Key(" + str(dest_key) + ") Received a Ex(" + str(e) +")"
self.lg("ERROR: " + str(err_msg), 0)
results["Status"] = "FAILED"
results["Error"] = str(err_msg)
results["Record"] = {}
else:
err_msg = "Publish Failed to Address(" + str(redis_app.get_address()) + ") Results Key(" + str(dest_key) + ") Received a Ex(" + str(e) +")"
self.lg("ERROR: " + str(err_msg), 0)
results["Status"] = "FAILED"
results["Error"] = str(err_msg)
results["Record"] = {}
# end of try/ex
if debug:
self.lg("Base Done Publish Result Key", 6)
return results
# end of publish_message_to_key
def add_records_to_cache_in_redis(self, json_record, dest_key, redis_app, debug=False):
results = {}
results["Status"] = "FAILED"
results["Error"] = ""
results["Record"] = {}
try:
if debug:
self.lg("Add Records to Cache(" + str(dest_key) + ") RA(" + str(redis_app.get_address()) + ")", 6)
if redis_app.m_rw == None:
redis_app.connect()
redis_app.put_into_key(dest_key, json_record)
results["Record"] = {
"Address" : str(redis_app.get_address()),
"Key" : dest_key
}
results["Status"] = "SUCCESS"
results["Error"] = ""
if debug:
self.lg("Add Records to Cache Done", 6)
except ValueError, e:
err_msg = "Connection to Address(" + str(redis_app.get_address()) + ") Results Key(" + str(dest_key) + ") Received a Non-Pickle Formatted Message Ex(" + str(e) +")"
self.lg("ERROR: " + str(err_msg), 0)
results["Status"] = "FAILED"
results["Error"] = str(err_msg)
results["Record"] = {}
except Exception, k:
test_for_redis_connectivity = str(k)
if "Connection refused." in test_for_redis_connectivity:
err_msg = "Add Records - Connection REFUSED to Address(" + str(redis_app.get_address()) + ") Results Key(" + str(dest_key) + ") Received a Ex(" + str(k) +")"
self.lg("ERROR: " + str(err_msg), 0)
results["Status"] = "FAILED"
results["Error"] = str(err_msg)
results["Record"] = {}
else:
err_msg = "Add Records - Cache Failed to Address(" + str(redis_app.get_address()) + ") Results Key(" + str(dest_key) + ") Received a Ex(" + str(k) +")"
self.lg("ERROR: " + str(err_msg), 0)
cache_results = self.handle_exception("FAILED", err_msg, k)
# end of try/ex
if debug:
self.lg("Add to Cache Done", 6)
return results
# end of add_records_to_cache_in_redis
def purge_and_cache_records_in_redis(self, redis_app, dest_key, message, debug=False):
results = {}
results["Status"] = "FAILED"
results["Error"] = ""
results["Record"] = {}
try:
if debug:
self.lg("Purge and Cache(" + str(dest_key) + ") RA(" + str(redis_app.get_address()) + ")", 6)
if redis_app.m_rw == None:
redis_app.connect()
found_msg = True
while found_msg != None:
found_msg = redis_app.m_rw.get(False, 0.01, dest_key)
# end of purging this cache
redis_app.put_into_key(dest_key, message)
results["Record"] = {
"Address" : str(redis_app.get_address()),
"Key" : dest_key
}
results["Status"] = "SUCCESS"
results["Error"] = ""
if debug:
self.lg("Purge and Cache Done", 6)
except ValueError, e:
err_msg = "Connection to Address(" + str(redis_app.get_address()) + ") Results Key(" + str(dest_key) + ") Received a Non-Pickle Formatted Message Ex(" + str(e) +")"
self.lg("ERROR: " + str(err_msg), 0)
results["Status"] = "FAILED"
results["Error"] = str(err_msg)
results["Record"] = {}
except Exception, e:
test_for_redis_connectivity = str(e)
if "Connection refused." in test_for_redis_connectivity:
err_msg = "Connection REFUSED to Address(" + str(redis_app.get_address()) + ") Results Key(" + str(dest_key) + ") Received a Ex(" + str(e) +")"
self.lg("ERROR: " + str(err_msg), 0)
results["Status"] = "FAILED"
results["Error"] = str(err_msg)
results["Record"] = {}
else:
err_msg = "Purge and Cache Failed to Address(" + str(redis_app.get_address()) + ") Results Key(" + str(dest_key) + ") Received a Ex(" + str(e) +")"
self.lg("ERROR: " + str(err_msg), 0)
results["Status"] = "FAILED"
results["Error"] = str(err_msg)
results["Record"] = {}
# end of try/ex
if debug:
self.lg("Purge and Cache Done", 6)
return results
# end of purge_and_cache_records_in_redis
def get_all_records_from_cache_in_redis(self, redis_app, dest_key, debug=False):
results = {}
results["Status"] = "FAILED"
results["Error"] = ""
results["Record"] = {
"Cache" : []
}
try:
if debug:
self.lg("Get All From Cache(" + str(dest_key) + ") RA(" + str(redis_app.get_address()) + ")", 6)
if redis_app.m_rw == None:
redis_app.connect()
found_msg = True
while found_msg != None:
found_msg = redis_app.m_rw.get(False, 0.01, dest_key)
if found_msg:
results["Record"]["Cache"].append(found_msg)
# end of purging this cache
results["Status"] = "SUCCESS"
results["Error"] = ""
if debug:
self.lg("Get All(" + str(len(results["Record"]["Cache"])) + ") From Cache Done", 6)
except ValueError, e:
err_msg = "Get All From Cache - Connection to Address(" + str(redis_app.get_address()) + ") Results Key(" + str(dest_key) + ") Received a Non-Pickle Formatted Message Ex(" + str(e) +")"
self.lg("ERROR: " + str(err_msg), 0)
results["Status"] = "FAILED"
results["Error"] = str(err_msg)
results["Record"] = {}
except Exception, e:
test_for_redis_connectivity = str(e)
if "Connection refused." in test_for_redis_connectivity:
err_msg = "Get All From Cache - Connection REFUSED to Address(" + str(redis_app.get_address()) + ") Results Key(" + str(dest_key) + ") Received a Ex(" + str(e) +")"
self.lg("ERROR: " + str(err_msg), 0)
results["Status"] = "FAILED"
results["Error"] = str(err_msg)
results["Record"] = {}
else:
err_msg = "Get All From Cache - Failed to Address(" + str(redis_app.get_address()) + ") Results Key(" + str(dest_key) + ") Received a Ex(" + str(e) +")"
self.lg("ERROR: " + str(err_msg), 0)
results["Status"] = "FAILED"
results["Error"] = str(err_msg)
results["Record"] = {}
# end of try/ex
if debug:
self.lg("Get All From Cache Done", 6)
return results
# end of get_all_records_from_cache_in_redis
def get_a_message_from_redis(self, redis_app, dest_key, debug=False):
results = {}
results["Status"] = "FAILED"
results["Error"] = ""
results["Record"] = {}
try:
if debug:
self.lg("Get A Msg(" + str(dest_key) + ") RA(" + str(redis_app.get_address()) + ")", 6)
if redis_app.m_rw == None:
redis_app.connect()
found_msg = True
while found_msg != None:
found_msg = redis_app.m_rw.get(False, 0.01, dest_key)
if found_msg:
results["Record"] = found_msg
results["Status"] = "SUCCESS"
results["Error"] = ""
return results
else:
results["Record"] = {}
results["Status"] = "No Record"
results["Error"] = ""
return results
# end of purging this cache
if debug:
self.lg("Get Msg(" + str(len(results["Record"])) + ") Done", 6)
except ValueError, e:
err_msg = "Get A Msg - Connection to Address(" + str(redis_app.get_address()) + ") Results Key(" + str(dest_key) + ") Received a Non-Pickle Formatted Message Ex(" + str(e) +")"
self.lg("ERROR: " + str(err_msg), 0)
results["Status"] = "FAILED"
results["Error"] = str(err_msg)
results["Record"] = {}
except Exception, e:
test_for_redis_connectivity = str(e)
if "Connection refused." in test_for_redis_connectivity:
err_msg = "Get A Msg - Connection REFUSED to Address(" + str(redis_app.get_address()) + ") Results Key(" + str(dest_key) + ") Received a Ex(" + str(e) +")"
self.lg("ERROR: " + str(err_msg), 0)
results["Status"] = "FAILED"
results["Error"] = str(err_msg)
results["Record"] = {}
else:
err_msg = "Get A Msg - Failed to Address(" + str(redis_app.get_address()) + ") Results Key(" + str(dest_key) + ") Received a Ex(" + str(e) +")"
self.lg("ERROR: " + str(err_msg), 0)
results["Status"] = "FAILED"
results["Error"] = str(err_msg)
results["Record"] = {}
# end of try/ex
if debug:
self.lg("Get a Msg Done", 6)
return results
# end of get_a_message_from_redis
def blocking_get_cache_from_redis(self, redis_app, cache_key, timeout_in_seconds=0, debug=False):
cache_results = self.build_def_hash("No Results", "No Results", {})
try:
if debug:
self.lg("Blocking Getting Cache(" + str(cache_key) + ")", 6)
if redis_app.m_rw == None:
redis_app.connect()
test_cache = {}
# timeout_in_seconds = 0 means it will be forever
test_cache = redis_app.m_rw.get(True, timeout_in_seconds, cache_key)
if test_cache != None:
if len(test_cache) > 0:
cache_results = self.build_def_hash("SUCCESS", "", test_cache)
else:
cache_results = self.build_def_hash("SUCCESS", "", {})
else:
cache_results = self.build_def_hash("No Results", "No Results", {})
if debug:
self.lg("Cache(" + str(cache_key) + ") Results(" + str(cache_results)[0:100] + ")", 6)
except Exception,k:
err_msg = "Failed to Blocking Get Cache(" + str(cache_key) + ") Results with Error(" + str(cache_results["Error"]) + ") Exception(" + str(k) + ")"
self.lg("ERROR: " + str(err_msg), 0)
cache_results = self.handle_exception("FAILED", err_msg, k)
# end of try/ex
return cache_results
# end of blocking_get_cache_from_redis
def get_cache_from_redis(self, redis_app, cache_key, ra_blocking=True, debug=False):
cache_results = self.build_def_hash("No Results", "No Results", {})
try:
if debug:
self.lg("Getting Cache(" + str(cache_key) + ")", 6)
if redis_app.m_rw == None:
redis_app.connect()
test_cache = {}
if ra_blocking:
test_cache = redis_app.get_cached_data_for_key(cache_key)
# for now there's no block call...and that's to prevent deadlock-ed jobs
else:
test_cache = redis_app.get_cached_data_for_key(cache_key)
if test_cache["Status"] == "SUCCESS":
if test_cache["Value"]:
cache_results = self.build_def_hash("SUCCESS", "", test_cache["Value"])
else:
cache_results = self.build_def_hash("SUCCESS", "", {})
else:
if "Error" in test_cache:
cache_results = self.build_def_hash("No Results", str(test_cache["Error"]), {})
else:
cache_results = self.build_def_hash("No Results", "No Data in Cache Key(" + str(cache_key) + ")", {})
if debug:
self.lg("Cache(" + str(cache_key) + ") Results(" + str(cache_results)[0:100] + ")", 6)
except Exception,k:
err_msg = "Failed to get Cache(" + str(cache_key) + ") Results with Error(" + str(cache_results["Error"]) + ") Exception(" + str(k) + ")"
self.lg("ERROR: " + str(err_msg), 0)
cache_results = self.handle_exception("FAILED", err_msg, k)
# end of try/ex
return cache_results
# end of get_cache_from_redis
def ra_get_cache_from_redis(self, ra_loc, rds, ra_blocking=True, debug=False):
cache_results = self.build_def_hash("No Results", "No Results", {})
try:
split_arr = ra_loc.split(":")
ra_name = str(split_arr[0])
cache_key = str(split_arr[1])
if ra_name not in rds:
err_msg = "Missing RA App Name(" + str(ra_name) + ") from RDS"
return self.handle_display_error(err_msg, record, True)
redis_app = rds[ra_name]
if debug:
self.lg("Getting Cache(" + str(cache_key) + ")", 6)
if redis_app.m_rw == None:
redis_app.connect()
test_cache = {}
if ra_blocking:
test_cache = redis_app.get_cached_data_for_key(cache_key)
# for now there's no block call...and that's to prevent deadlock-ed jobs
else:
test_cache = redis_app.get_cached_data_for_key(cache_key)
if test_cache["Status"] == "SUCCESS":
if test_cache["Value"]:
cache_results = self.build_def_hash("SUCCESS", "", test_cache["Value"])
else:
cache_results = self.build_def_hash("SUCCESS", "", {})
else:
if "Error" in test_cache:
cache_results = self.build_def_hash("No Results", str(test_cache["Error"]), {})
else:
cache_results = self.build_def_hash("No Results", "No Data in Cache Key(" + str(cache_key) + ")", {})
if debug:
self.lg("Cache(" + str(cache_key) + ") Results(" + str(cache_results)[0:100] + ")", 6)
except Exception,k:
err_msg = "Failed to get Cache(" + str(cache_key) + ") Results with Error(" + str(cache_results["Error"]) + ") Exception(" + str(k) + ")"
self.lg("ERROR: " + str(err_msg), 0)
cache_results = self.handle_exception("FAILED", err_msg, k)
# end of try/ex
return cache_results
# end of ra_get_cache_from_redis
def safe_get_cache_from_redis(self, rd_app, cache_key, num_retries=1000, blocking=False, debug=False):
cache_results = self.build_def_hash("No Results", "No Results", {})
try:
if rd_app != None and cache_key != "":
if rd_app.m_rw == None:
rd_app.connect()
if debug:
self.lg("SC(" + str(rd_app.get_address()) + ") Cache(" + str(cache_key) + ") Retries(" + str(num_retries) + ")", 5)
import datetime
from time import time, sleep
waiting = True
while waiting:
cache_records = self.get_cache_from_redis(rd_app, cache_key, blocking, debug)
if debug:
self.lg("Cache(" + str(cache_key) + ") Records(" + str(cache_records) + ")", 6)
if len(cache_records) > 0:
if debug:
self.lg("Found Hits", 6)
cache_results = {}
cache_results["Status"] = "SUCCESS"
cache_results["Error"] = ""
cache_results["Record"] = cache_records["Record"]
if debug:
self.lg("Returning(" + str(cache_results["Record"]).replace("\n", "")[0:128] + ")", 5)
return cache_results
elif "Status" in cache_records and cache_records["Status"] == "SUCCESS":
if debug:
self.lg("Success Cache Record", 6)
self.lg("Cache(" + str(cache_key) + ") Records(" + str(len(cache_records)) + ")", 6)
cache_results = self.build_def_hash("SUCCESS", "", cache_records)
return cache_results
else:
if debug:
self.lg("Counting Down", 6)
num_retries -= 1
if num_retries < 0:
self.lg("ERROR: SAFE - Max retry for Cache(" + str(cache_key) + ")", 0)
waiting = False
else:
self.lg("Sleeping", 6)
sleep(0.001)
# end of while waiting for a cache restore
if num_retries < 0 and cache_results["Status"] != "SUCCESS":
err_msg = "Safe - Failed to find Cache in Key(" + str(cache_key) + ") Records"
self.lg("ERROR: " + str(err_msg), 0)
cache_results = self.build_def_hash("Display Error", err_msg, {})
return cache_results
# end of valid safe cache reader
else:
err_msg = "Safe - Invalid Attempt to read cache RA(" + str(rd_app) + ") Key(" + str(cache_key) + ")"
self.lg("ERROR: " + str(err_msg), 0)
cache_results = self.build_def_hash("Display Error", err_msg, {})
return cache_results
# end of
except Exception,k:
err_msg = "Failed to Safe Get from Cache(" + str(cache_key) + ") Results with Error(" + str(cache_results["Error"]) + ") Exception(" + str(k) + ")"
self.lg("ERROR: " + str(err_msg), 0)
cache_results = self.handle_exception("FAILED", err_msg, k)
# end of try/ex
return cache_results
# end of safe_get_cache_from_redis
#####################################################################################################
#
# Core Slack Integration
#
#####################################################################################################
def post_message_to_slack(self, channel, message, username="algobot", debug=False):
results = self.build_def_hash("Display Error", "Not Run", {
})
try:
import slackclient
if debug:
self.lg("Posting to Slack(" + str(channel) + ") Message(" + str(message)[0:10] + ")", 6)
else:
slack = slackclient.SlackClient(self.m_slack_node["Token"])
slack.api_call("chat.postMessage", channel=channel, text=message, username=username, as_user=True)
if debug:
self.lg("Done Posting to Slack(" + str(channel) + ")", 6)
except Exception, e:
err_msg = "Failed to post message to slack with Ex(" + str(e) + ")"
self.lg("ERROR: " + str(err_msg), 0)
results = self.build_def_hash("Display Error", str(err_msg), {
})
results["Status"] = "FAILED"
# end of try/ex
return results
# end of post_message_to_slack
def handle_send_slack_internal_ex(self, ex, debug=False):
try:
if self.m_slack_enabled:
import traceback, sys
exc_type, exc_obj, exc_tb = sys.exc_info()
header_str = datetime.datetime.now().strftime("%m-%d-%Y %H:%M:%S") + " @" + str(self.m_slack_node["NotifyUser"]) + " `" + str(self.m_slack_node["EnvName"]) + "` *" + str(ex) + "*\n"
ex_error = self.get_exception_error_message(ex, exc_type, exc_obj, exc_tb, True, debug)
send_slack_msg = header_str + str(ex_error) + "\n"
self.post_message_to_slack("#" + str(self.m_slack_node["ChannelName"]), send_slack_msg, self.m_slack_bot, debug)
except Exception,k:
if debug:
self.lg("ERROR: Failed to Send Slack Error with Ex(" + str(k) + ")", 0)
return None
# end of handle_send_slack_internal_ex
#####################################################################################################
#
# Pandas and Matlab Helper Methods
#
#####################################################################################################
def pd_convert_df_dates_to_list(self, df_list, date_format_str="%Y-%m-%d"):
date_series = df_list["Date"].apply(lambda x: x.strftime(date_format_str))
return date_series.tolist()
# end of pd_convert_df_dates_to_list
def pd_json_to_df(self, data_json, sorted_by_key="Date", in_ascending=True):
import pandas as pd
new_df = pd.read_json(data_json).sort_values(by=sorted_by_key, ascending=in_ascending)
return new_df
# end of pd_json_to_df
def pd_empty_df(self, debug=False):
import pandas as pd
return pd.DataFrame({'Timestamp' : [ 0 ]})
# end of pd_empty_df
def pd_get_color(self, color_key="blue"):
hex_plot_color = self.m_colors["blue"]
test_key = str(color_key).lower().strip().lstrip()
if test_key in self.m_colors:
hex_plot_color = str(self.m_colors[test_key])
return hex_plot_color
# end of pd_get_color
def pd_add_footnote(self, fig):
footnote_text = str(os.getenv("ENV_PLOT_FOOTNOTE", "Your Footnote"))
fig.text(0.99, 0.01, footnote_text, ha="right", va="bottom", fontsize=8, color="#888888")
# end of pd_add_footnote
def pd_print(self, df, mask=None):
if mask:
print df[mask]
else:
print df
# end of pd_print
def pd_change_xtick_freq(self, total_ticks, ax, debug=False):
# Change the xaxis frequency
if total_ticks > 30:
n = 10
ticks = ax.xaxis.get_ticklocs()
ticklabels = [l.get_text() for l in ax.xaxis.get_ticklabels()]
ax.xaxis.set_ticks(ticks[::n])
ax.xaxis.set_ticklabels(ticklabels[::n])
# end of changing the frequency
# end of pd_change_xtick_freq
def pd_set_date_ticks_by_number(self, total_ticks, ax, debug=False):
ticks = ax.xaxis.get_ticklocs()
ticklabels = [l.get_text() for l in ax.xaxis.get_ticklabels()]
ax.xaxis.set_ticks(ticks[::total_ticks])
ax.xaxis.set_ticklabels(ticklabels[::total_ticks])
# end of pd_set_date_ticks_by_number
def pd_show_with_entities(self, x_label, y_label, title_msg, ax, fig, plt, legend_list=[], show_plot=True, debug=False):
plt.xlabel(x_label)
plt.ylabel(y_label)
ax.set_title(title_msg)
if len(legend_list) == 0:
ax.legend(loc="best", prop={"size":"medium"})
else:
ax.legend(legend_list, loc="best", prop={"size": "medium"})
self.pd_add_footnote(fig)
plt.tight_layout()
if show_plot:
plt.show()
else:
plt.plot()
# end of pd_show_with_entities
def get_colors(self):
return self.m_colors
# end of get_colors
def pd_get_color_from_id(self, color_int):
if len(self.m_color_rotation) == 0:
add_in_order = [
"blue",
"green",
"red",
"gold",
"brown",
"lightgreen",
"black",
"darkgreen",
"copper",
"maroon",
"orange"
]
self.m_color_rotation = []
for ck in add_in_order:
self.m_color_rotation.append(self.m_colors[ck])
# end of list
# if color rotation
hex_color = self.m_color_rotation[0]
if color_int == -1:
if self.m_last_color_idx == -1:
self.m_last_color_idx = 0
else:
self.m_last_color_idx += 1
if self.m_last_color_idx > len(self.m_color_rotation):
self.m_last_color_idx = 0
return self.m_color_rotation[self.m_last_color_idx]
else:
target_idx = 0
max_colors = len(self.m_color_rotation)
if color_int > 0 and color_int > max_colors:
target_idx = int(color_int % max_colors)
else:
target_idx = color_int
for idx,ck in enumerate(self.m_color_rotation):
if idx == int(target_idx):
self.m_last_color_idx = idx
return self.m_color_rotation[idx]
# end of for loop
self.m_last_color_idx = 0
return hex_color
# end of pd_get_color_from_id
def plt_feature_importance(self, record, plot_max_features, max_valid_features, plot_title):
import pandas as pd
width = 15.0
height = 10.0
fig, ax = plt.subplots(figsize=(width, height))
plt.title(plot_title)
max_bars = plot_max_features
plt_display_names = []
max_range = max_valid_features
if max_range > max_bars:
max_range = max_bars
plot_df = pd.DataFrame(record["Rankings"][0:max_bars])
bar_colors = []
for idx in range(max_range):
hex_color = self.pd_get_color_from_id(idx)
bar_colors.append(hex_color)
# end of trained features
total_rows = float(len(plot_df.index))
bar_width = 0.6
bar_offset = float(bar_width)/2.0
largest_imp = 0.0
for ridx, pd_row in plot_df.iterrows():
plt.bar(ridx-bar_offset, float(pd_row["Importance"]), bar_width, color=bar_colors[ridx])
plt_display_names.append(pd_row["DisplayName"].replace(" ", "\n"))
if float(pd_row["Importance"]) > largest_imp:
largest_imp = float(pd_row["Importance"])
# end of all rows
plt.xticks(range(max_range), plt_display_names)
plt.xlim([-1, max_range])
if largest_imp < 20:
plt.ylim([0, 20.0])
self.pd_add_footnote(fig)
self.pd_show_with_entities("Ranked Features", "Importance", plot_title, ax, fig, plt, x_labels, True, False)
return None
# end of plt_feature_importance
#####################################################################################################
#
# S3 Helper Methods
#
#####################################################################################################
def s3_create_id(self, dataset_name, date_str, debug=False):
id_name = "DS_" + str(dataset_name) + "_" + str(date_str) + "_" + str(self.build_unique_key())
return id_name
# end of s3_create_id
def s3_create_new_bucket(self, bucketname, bucket_location="sa-east-1", debug=False):
record = {
}
results = self.build_def_hash("Display Error", "Not Run", record)
try:
cur_keys = self.aws_get_keys(debug)
import boto
import boto.s3
conn_s3 = boto.connect_s3(cur_keys["Key"], cur_keys["Secret"])
bucket = conn_s3.create_bucket(bucketname, location=bucket_location)
if bucket:
self.lg("Created Bucket(" + str(bucketname) + ")", 6)
results = self.build_def_hash("SUCCESS", "", {})
else:
results = self.build_def_hash("Display Error", "Failed to Create Bucket(" + str(bucketname) + ")", {})
except Exception,k:
status = "FAILED"
err_msg = "Unable to Create new S3 Bucket(" + str(bucketname) + ") with Ex(" + str(k) + ")"
self.lg("ERROR: " + str(err_msg), 0)
results = self.build_def_hash("Display Error", err_msg, {})
# end of try/ex
return results
# end of s3_create_new_s3_bucket
def s3_upload_json_dataset(self, dataset_name, date_str, data_json, rds, dbs, bucketname="dataset-new", filename="", compress_json=True, delete_when_done=True, debug=False):
record = {
"File" : "",
"Size" : "0"
}
results = self.build_def_hash("Display Error", "Not Run", record)
try:
if filename == "":
record["File"] = self.ml_create_s3_id(dataset_name, date_str, debug) + ".json"
else:
record["File"] = filename
cur_keys = self.aws_get_keys(debug)
cur_filename = "/tmp/" + record["File"]
file_path = cur_filename
if compress_json:
temp_output = "/tmp/tmpfile_" + datetime.datetime.now().strftime("%y-%m-%d-%H-%M-%S") + "_"
temp_file = self.log_msg_to_unique_file(data_json, temp_output)
os.system("mv " + str(temp_file) + " " + str(cur_filename))
else:
file_path = self.write_json_to_file(cur_filename, data_json)
import boto, math
import boto.s3
conn_s3 = boto.connect_s3(cur_keys["Key"], cur_keys["Secret"])
bucket = conn_s3.get_bucket(bucketname, validate=False)
cur_filename = os.path.basename(file_path)
k = bucket.new_key(cur_filename)
mp = bucket.initiate_multipart_upload(cur_filename)
source_size = os.stat(file_path).st_size
bytes_per_chunk = 5000*1024*1024
chunks_count = int(math.ceil(source_size / float(bytes_per_chunk)))
for i in range(chunks_count):
offset = i * bytes_per_chunk
remaining_bytes = source_size - offset
bytes = min([bytes_per_chunk, remaining_bytes])
part_num = i + 1
self.lg("S3 Uploading(" + str(cur_filename) + ") part " + str(part_num) + " of " + str(chunks_count), 6)
with open(file_path, 'r') as fp:
fp.seek(offset)
mp.upload_part_from_file(fp=fp, part_num=part_num, size=bytes)
# end of writing multipart files
record["Size"] = str(source_size)
if len(mp.get_all_parts()) == chunks_count:
mp.complete_upload()
results = self.build_def_hash("SUCCESS", "", record)
else:
mp.cancel_upload()
results = self.build_def_hash("Display Error", "Failed to Upload", record)
if os.path.exists(cur_filename):
os.system("rm -f " + str(cur_filename))
if delete_when_done:
if os.path.exists(file_path):
os.system("rm -f " + str(file_path))
except Exception,k:
status = "FAILED"
err_msg = "Unable to Upload JSON Data to S3 with Ex(" + str(k) + ")"
self.lg("ERROR: " + str(err_msg), 0)
results = self.build_def_hash("Display Error", err_msg, {})
# end of try/ex
return results
# end of s3_upload_json_dataset
def s3_upload_json_record(self, req, debug=False):
record = {
"Size" : "0"
}
results = self.build_def_hash("Display Error", "Not Run", record)
try:
cur_filename = "/tmp/uploadfile_" + datetime.datetime.now().strftime("%y-%m-%d-%H-%M-%S") + "_"
s3_bucket = req["S3Loc"].split(":")[0]
s3_key = req["S3Loc"].split(":")[1]
cur_keys = self.aws_get_keys(debug)
file_path = cur_filename
if "Compress" in req:
if bool(req["Compress"]):
temp_output = "/tmp/tmpfile_" + datetime.datetime.now().strftime("%y-%m-%d-%H-%M-%S") + "_"
temp_file = self.log_msg_to_unique_file(req["JSON"], temp_output)
os.system("mv " + str(temp_file) + " " + str(cur_filename))
else:
file_path = self.write_json_to_file(cur_filename, req["JSON"])
import boto, math
import boto.s3
conn_s3 = boto.connect_s3(cur_keys["Key"], cur_keys["Secret"])
bucket = conn_s3.get_bucket(s3_bucket, validate=False)
cur_filename = os.path.basename(file_path)
k = bucket.new_key(s3_key)
mp = bucket.initiate_multipart_upload(s3_key.split("/")[-1])
source_size = os.stat(file_path).st_size
bytes_per_chunk = 5000*1024*1024
chunks_count = int(math.ceil(source_size / float(bytes_per_chunk)))
for i in range(chunks_count):
offset = i * bytes_per_chunk
remaining_bytes = source_size - offset
bytes = min([bytes_per_chunk, remaining_bytes])
part_num = i + 1
self.lg("S3 Uploading(" + str(cur_filename) + ") part " + str(part_num) + " of " + str(chunks_count), 6)
with open(file_path, 'r') as fp:
fp.seek(offset)
mp.upload_part_from_file(fp=fp, part_num=part_num, size=bytes)
# end of writing multipart files
record["Size"] = str(source_size)
if len(mp.get_all_parts()) == chunks_count:
mp.complete_upload()
results = self.build_def_hash("SUCCESS", "", record)
else:
mp.cancel_upload()
results = self.build_def_hash("Display Error", "Failed to Upload", record)
if os.path.exists(cur_filename):
os.system("rm -f " + str(cur_filename))
except Exception,k:
status = "FAILED"
err_msg = "Unable to Upload JSON Record to S3 with Ex(" + str(k) + ")"
self.lg("ERROR: " + str(err_msg), 0)
results = self.build_def_hash("Display Error", err_msg, {})
# end of try/ex
return results
# end of s3_upload_json_record
def s3_upload_csv_dataset(self, cur_csv_file, rds, dbs, bucketname="dataset-csv-new", filename="", delete_when_done=True, debug=False):
record = {
"File" : "",
"Size" : "0"
}
results = self.build_def_hash("Display Error", "Not Run", record)
try:
cur_keys = self.aws_get_keys(debug)
import boto, math
import boto.s3
conn_s3 = boto.connect_s3(cur_keys["Key"], cur_keys["Secret"])
bucket = conn_s3.get_bucket(bucketname, validate=False)
cur_filename = os.path.basename(cur_csv_file)
k = bucket.new_key(filename)
mp = bucket.initiate_multipart_upload(filename)
source_size = os.stat(cur_csv_file).st_size
bytes_per_chunk = 5000*1024*1024
chunks_count = int(math.ceil(source_size / float(bytes_per_chunk)))
for i in range(chunks_count):
offset = i * bytes_per_chunk
remaining_bytes = source_size - offset
bytes = min([bytes_per_chunk, remaining_bytes])
part_num = i + 1
self.lg("S3 Uploading(" + str(cur_filename) + ") part " + str(part_num) + " of " + str(chunks_count), 6)
with open(cur_csv_file, 'r') as fp:
fp.seek(offset)
mp.upload_part_from_file(fp=fp, part_num=part_num, size=bytes)
# end of writing multipart files
record["Size"] = str(source_size)
if len(mp.get_all_parts()) == chunks_count:
mp.complete_upload()
results = self.build_def_hash("SUCCESS", "", record)
else:
mp.cancel_upload()
results = self.build_def_hash("Display Error", "Failed to Upload", record)
if delete_when_done:
if os.path.exists(cur_filename):
os.system("rm -f " + str(cur_filename))
if os.path.exists(cur_csv_file):
os.system("rm -f " + str(cur_csv_file))
except Exception,k:
status = "FAILED"
err_msg = "Unable to Upload CSV Data to S3 with Ex(" + str(k) + ")"
self.lg("ERROR: " + str(err_msg), 0)
results = self.build_def_hash("Display Error", err_msg, {})
# end of try/ex
return results
# end of s3_upload_csv_dataset
def s3_upload_file(self, req, debug=False):
record = {
"File" : "",
"Size" : "0"
}
results = self.build_def_hash("Display Error", "Not Run", record)
try:
cur_keys = self.aws_get_keys(debug)
import boto, math
import boto.s3
"""
req = {
"SaveToFile" : <path to file>,
"S3Loc" : <bucket>:<key>,
"DeleteAfter" : boolean
}
"""
s3_split = str(req["S3Loc"]).split(":")
s3_bucket = str(s3_split[0])
s3_key = str(s3_split[1])
savepath = str(req["SaveToFile"])
delete_after = False
if "DeleteAfter" in req:
delete_after= bool(req["DeleteAfter"])
conn_s3 = boto.connect_s3(cur_keys["Key"], cur_keys["Secret"])
bucket = conn_s3.get_bucket(s3_bucket, validate=False)
filename = os.path.basename(savepath)
k = bucket.new_key(s3_key)
mp = bucket.initiate_multipart_upload(filename)
source_size = os.stat(savepath).st_size
bytes_per_chunk = 5000*1024*1024
chunks_count = int(math.ceil(source_size / float(bytes_per_chunk)))
for i in range(chunks_count):
offset = i * bytes_per_chunk
remaining_bytes = source_size - offset
bytes = min([bytes_per_chunk, remaining_bytes])
part_num = i + 1
self.lg("S3 Uploading(" + str(filename) + ") part " + str(part_num) + " of " + str(chunks_count), 6)
with open(savepath, 'r') as fp:
fp.seek(offset)
mp.upload_part_from_file(fp=fp, part_num=part_num, size=bytes)
# end of writing multipart files
record["Size"] = str(source_size)
if len(mp.get_all_parts()) == chunks_count:
mp.complete_upload()
results = self.build_def_hash("SUCCESS", "", record)
else:
mp.cancel_upload()
results = self.build_def_hash("Display Error", "Failed to Upload", record)
if delete_after:
if os.path.exists(savepath):
os.system("rm -f " + str(savepath))
if os.path.exists(savepath):
os.system("rm -f " + str(savepath))
except Exception,k:
status = "FAILED"
err_msg = "Unable to Upload File(" + str(json.dumps(req)) + ") to S3 with Ex(" + str(k) + ")"
self.lg("ERROR: " + str(err_msg), 0)
results = self.build_def_hash("Display Error", err_msg, {})
# end of try/ex
return results
# end of s3_upload_file
def s3_calculate_bucket_size(self, bucket_name, debug=False):
record = {
"Size" : 0,
"SizeMB" : 0.0,
"SizeGB" : 0.0,
"Files" : 0
}
results = self.build_def_hash("Display Error", "Not Run", record)
try:
import boto, math
import boto.s3
cur_keys = self.aws_get_keys(debug)
conn_s3 = boto.connect_s3(cur_keys["Key"], cur_keys["Secret"])
bucket = conn_s3.get_bucket(bucket_name, validate=False)
total_bytes = 0
for key in bucket:
record["Size"] += int(key.size)
record["Files"] += 1
# end for all keys
record["SizeMB"] = float(self.to_float_str(float(float(record["Size"]) / 1024.0 / 1024.0)))
record["SizeGB"] = float(self.to_float_str(float(float(record["SizeMB"]) / 1024.0)))
results = self.build_def_hash("SUCCESS", "", record)
except Exception,w:
self.lg("Failed to Process S3 Bucket(" + str(bucket_name) + ") Size Ex(" + str(w) + ")", 0)
results = self.build_def_hash("Display Error", "Not Run", record)
return results
# end of s3_calculate_bucket_size
def s3_download_file(self, s3_bucket, s3_key, rds, dbs, debug=False):
record = {
"Contents" : ""
}
results = self.build_def_hash("Display Error", "Not Run", record)
try:
if s3_bucket and s3_key:
record["Contents"] = s3_key.get_contents_as_string()
results = self.build_def_hash("SUCCESS", "", record)
else:
err_msg = "Missing valid S3 Bucket and Key"
self.lg("ERROR: " + str(err_msg), 0)
results = self.build_def_hash("Display Error", err_msg, record)
# end of if/else
except Exception,w:
err_msg = "Failed to Download File from S3 with Ex(" + str(w) + ")"
self.lg("ERROR: " + str(err_msg), 0)
results = self.build_def_hash("Display Error", err_msg, record)
return results
# end of s3_download_file
def s3_download_and_store_file(self, s3_loc, local_file, rds, dbs, debug):
record = {
"Contents" : "",
"File" : ""
}
results = self.build_def_hash("Display Error", "Not Run", record)
try:
import boto, math
import boto.s3
s3_split = s3_loc.split(":")
s3_bucket_name = s3_split[0]
s3_key_name = s3_split[1]
cur_keys = self.aws_get_keys(debug)
conn_s3 = boto.connect_s3(cur_keys["Key"], cur_keys["Secret"])
s3_bucket = conn_s3.get_bucket(s3_bucket_name, validate=False)
s3_key = s3_bucket.get_key(s3_key_name, validate=False)
self.lg("Downloading S3Loc(" + str(s3_bucket_name) + ":" + str(s3_key_name) + ")", 6)
key_results = self.s3_download_file(s3_bucket, s3_key, rds, dbs, debug)
self.lg("Done Downloading S3Loc(" + str(s3_bucket_name) + ":" + str(s3_key_name) + ") Writing to File(" + str(local_file) + ") Bytes(" + str(len(str(key_results["Record"]["Contents"]))) + ")", 6)
if len(key_results["Record"]["Contents"]) > 0:
with open(local_file, "w") as output_file:
output_file.write(str(key_results["Record"]["Contents"]))
# end of writing the contents
else:
self.lg(" - No data in S3 file", 6)
if os.path.exists(local_file) == False:
err_msg = "Failed to Download S3Loc(" + str(s3_bucket_name) + ":" + str(s3_key_name) + ")"
return self.handle_display_error(err_msg, record, True)
else:
self.lg("Created Local File(" + str(local_file) + ")", 6)
record["Contents"] = key_results["Record"]["Contents"]
record["File"] = local_file
results = self.build_def_hash("SUCCESS", "", record)
# end of if created local file from s3 location
except Exception,k:
err_msg = "Failed to download S3Loc(" + str(s3_loc) + ") with Ex(" + str(k) + ")"
return self.handle_display_error(err_msg, record, True)
# end of try/ex
return results
# end of s3_download_and_store_file
#####################################################################################################
#
# Machine Learning Helper Methods
#
#####################################################################################################
def ml_get_adjusted_current_time_for_deployment(self, debug=False):
cur_time = datetime.datetime.now() - datetime.timedelta(hours=0)
if self.m_env.lower().lstrip().strip() == "local":
cur_time = datetime.datetime.now()
return cur_time
# end of ml_get_adjusted_current_time_for_deployment
def ml_convert_models_to_objects(self, req, rds, dbs, debug=False):
record = {
"Errors" : [],
"Models" : []
}
results = self.build_def_hash("Display Error", "Failed to Get Convert Models to Objects", record )
try:
models = []
errors = []
self.lg("Converting Models(" + str(len(req["Models"])) + ") to Objects", 6)
for idx,node in enumerate(req["Models"]):
model_class_name = node["Type"]
model_version = node["Version"]
model_target = node["Target"]
model_feature_names = node["FeatureNames"]
model_obj = node["Model"]
model_id = node["ID"]
self.lg("Model(" + str(idx) + ") Type(" + str(model_class_name) + ") Target(" + str(model_target) + ") Version(" + str(model_version) + ") ID(" + str(model_id) + ")", 6)
new_model_node = {
"Type" : model_class_name,
"Version" : model_version,
"Target" : model_target,
"FeatureNames" : model_feature_names,
"Model" : model_obj,
"ID" : model_id
}
models.append(new_model_node)
# end of for all modesl
record["Errors"] = errors
record["Models"] = models
self.lg("Converted(" + str(len(record["Models"])) + ") Errors(" + str(len(record["Errors"])) + ")", 6)
results = self.build_def_hash("SUCCESS", "", record )
except Exception,k:
err_msg = "Failed to Convert Models into Objects with Ex(" + str(k) + ")"
return self.handle_display_error(err_msg, record, True)
# end of try/ex
return results
# end of ml_convert_models_to_objects
def ml_load_model_file_into_cache(self, req, rds, dbs, debug=False):
record = {
}
results = self.build_def_hash("Display Error", "Failed to Load Model file into Cache", record )
try:
import pickle, cPickle, zlib
ra_name = "CACHE"
ra_key = ""
dataset_name = ""
dataset_name = self.to_upper(str(req["DSName"]))
s3_bucket = str(req["S3Loc"].split(":")[0])
s3_key = str(req["S3Loc"].split(":")[1])
model_file = str(req["ModelFile"])
tracking_type = "UseTargetColAndUnits"
if "TrackingType" in req:
tracking_type = str(req["TrackingType"])
lg("Loading DSName(" + str(dataset_name) + ") ModelFile(" + str(model_file) + ")", 6)
if os.path.exists(model_file) == False:
err_msg = "Failed to Find ModelFile(" + str(model_file) + ")"
return self.handle_display_error(err_msg, record, True)
model_stream = cPickle.loads(zlib.decompress(open(model_file).read()))
if len(model_stream) == 0:
err_msg = "Decompressed ModelFile(" + str(model_file) + ") is Empty"
return self.handle_display_error(err_msg, record, True)
self.lg("Model File Decompressed Records(" + str(len(model_stream)) + ")", 6)
if debug:
for k in model_stream:
print k
# serialization debugging
if len(model_stream) > 0:
self.lg("Creating Analysis Dataset", 6)
analysis_dataset = {
"DSName" : str(dataset_name),
"CreationDate" : str(datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")),
"PredictionsDF" : None,
"AccuracyResults" : {},
"Models" : [],
"Manifest" : {},
"MLAlgo" : {},
"Tracking" : {},
"TrackingType" : str(tracking_type),
"Version" : 1
}
if "CreationDate" in model_stream:
analysis_dataset["CreationDate"] = model_stream["CreationDate"]
if "Models" in model_stream:
analysis_dataset["Models"] = model_stream["Models"]
if "CacheVersion" in model_stream:
analysis_dataset["Version"] = model_stream["CacheVersion"]
if "Manifest" in model_stream:
analysis_dataset["Manifest"] = model_stream["Manifest"]
if "PredictionsDF" in model_stream:
analysis_dataset["PredictionsDF"] = model_stream["PredictionsDF"]
if "Accuracy" in model_stream:
analysis_dataset["AccuracyResults"] = model_stream["Accuracy"]
if "MLAlgo" in model_stream:
analysis_dataset["MLAlgo"] = model_stream["MLAlgo"]
if "Tracking" in model_stream:
analysis_dataset["Tracking"] = model_stream["Tracking"]
tracking_data = self.ml_build_tracking_id(analysis_dataset, rds, dbs, debug)
if "Tracking" in model_stream:
tracking_data.update(model_stream["Tracking"])
tracking_id = str(tracking_data["TrackingID"])
ra_name = tracking_data["RLoc"].split(":")[0]
ra_key = tracking_data["RLoc"].split(":")[1]
analysis_dataset["Tracking"] = tracking_data
cache_req = {
"Name" : ra_name,
"Key" : ra_key,
"TrackingID" : tracking_id,
"Analysis" : analysis_dataset,
"ValidateDates" : False
}
cache_results = self.ml_cache_analysis_and_models(cache_req, rds, dbs, debug)
# end of valid stream
results = self.build_def_hash("SUCCESS", "", record )
except Exception,k:
err_msg = "Failed to Load Model File into Cache with Ex(" + str(k) + ")"
return self.handle_display_error(err_msg, record, True)
# end of try/ex
return results
# end of ml_load_model_file_into_cache
def ml_build_tracking_id(self, req, rds, dbs, debug=False):
tracking_node = {
"TrackingID" : "",
"Start" : datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"End" : "",
"TrackingVersion" : 1,
"TrackingType" : "",
"TrackingName" : "",
"RLoc" : "",
"Data" : {}
}
tracking_version = 1
tracking_type = "UseTargetColAndUnits"
tracking_id = ""
tracking_name = tracking_id
ra_name = "CACHE"
ra_key = ""
if "TrackingType" in req:
tracking_type = str(req["TrackingType"])
if str(tracking_type) == "UseTargetColAndUnits":
if "Tracking" in req:
tracking_id = "_MD_" + str(req["Tracking"]["TrackingName"])
tracking_name = "_MD_" + str(req["Tracking"]["TrackingName"])
ra_key = "_MODELS_" + str(req["Tracking"]["TrackingName"]) + "_LATEST"
else:
if "TrackingName" in req:
tracking_id = "_MD_" + str(req["TrackingName"])
tracking_name = "_MD_" + str(req["TrackingName"])
ra_key = "_MODELS_" + str(req["TrackingName"]) + "_LATEST"
if "MLAlgo" in req: # tracking nodes should support passing the parent ml json api through
data_node = {}
if len(req["MLAlgo"]) > 0:
for key in req:
data_node[key] = req[key]
# allow for pruning here
tracking_node["Data"] = data_node
# end of passing the org algo through
if tracking_name == "":
lg("", 6)
lg("Debugging help:", 6)
for k in req:
lg(" Key(" + str(k) + ")", 0)
lg("", 6)
raise Exception("Failed ml_build_tracking_id - Please confirm the TrackingName is set correctly")
if tracking_id == "":
lg("", 6)
lg("Debugging help:", 6)
for k in req:
lg(" Key(" + str(k) + ")", 0)
lg("", 6)
raise Exception("Failed ml_build_tracking_id - Please confirm the TrackingID is set correctly")
tracking_node["TrackingID"] = tracking_id
tracking_node["TrackingVersion"] = tracking_version
tracking_node["TrackingType"] = tracking_type
tracking_node["TrackingName"] = tracking_name
tracking_node["RLoc"] = str(ra_name) + ":" + str(ra_key)
return tracking_node
# end of ml_build_tracking_id
def ml_upload_cached_dataset_to_s3(self, req, rds, dbs, debug=False):
record = {
"Errors" : [],
"Models" : []
}
results = self.build_def_hash("Display Error", "Failed to Upload Cached Dataset to S3", record )
try:
import pickle, cPickle, zlib
compress_data = True
dataset_name = self.to_upper(str(req["DSName"]))
s3_bucket = str(req["S3Loc"].split(":")[0])
s3_key = str(req["S3Loc"].split(":")[1])
delete_after = False
if "DeleteAfter" in req:
delete_after = bool(req["DeleteAfter"])
image_base_dir = str(os.getenv("ENV_DATA_SRC_DIR", "/opt/work/data/dst"))
if "SaveDir" in req:
image_base_dir = str(req["SaveDir"])
if not os.path.exists(image_base_dir):
err_msg = "Unable to export because the SaveDir(" + str(image_base_dir) + ") does not exist. Please create it and retry."
return self.handle_display_error(err_msg, record, True)
tmp_file = image_base_dir + "/" + s3_key
ra_name = "CACHE"
ra_key = ""
if "RLoc" in req:
ra_name = str(req["RLoc"]).split(":")[0]
ra_key = str(req["RLoc"]).split(":")[1]
model_req = {
"DSName" : dataset_name,
"TrackingID" : "",
"RAName" : ra_name,
"ReturnPickle" : True
}
self.lg("Getting Latest Analysis and Models for DSName(" + str(dataset_name) + ")", 6)
analysis_rec = self.ml_get_models_for_request(model_req, rds, dbs, debug)
if len(analysis_rec["Record"]) > 0:
if "CreationDate" in analysis_rec["Record"]:
lg("Found DSName(" + str(dataset_name) + ") Analysis Created on Date(" + str(analysis_rec["Record"]["CreationDate"]) + ") Creating File(" + str(tmp_file) + ")", 6)
with open(tmp_file, "w") as output_file:
output_file.write(zlib.compress(cPickle.dumps(analysis_rec["Record"])))
lg("Validating Serialization", 6)
validate_bytes = open(tmp_file).read()
decompressed_obj = cPickle.loads(zlib.decompress(validate_bytes))
lg("Checking Model Counts", 6)
if len(decompressed_obj["Models"]) != len(analysis_rec["Record"]["Models"]):
err_msg = "Analysis Dataset found Incorrect Number of Models(" + str(len(decompressed_obj["Models"])) + ") != (" + str(len(analysis_rec["Record"]["Models"])) + ") Please confirm the cache is up to date"
return self.handle_display_error(err_msg, record, True)
else:
lg("Decompression Validation Passed - Models(" + str(len(decompressed_obj["Models"])) + ") == (" + str(len(analysis_rec["Record"]["Models"])) + ")", 5)
lg("Done Creating File(" + str(tmp_file) + ")", 6)
s3_loc = str(s3_bucket) + ":" + str(s3_key)
lg("Uploading to DSName(" + str(dataset_name) + ") Analysis to S3(" + str(s3_loc) + ")", 6)
req = {
"SaveToFile" : tmp_file,
"S3Loc" : s3_loc,
"DeleteAfter" : delete_after
}
upload_results = self.s3_upload_file(req, debug)
if upload_results["Status"] != "SUCCESS":
lg("ERROR: Failed to upload S3 DSName(" + str(dataset_name) + ") new S3 File(" + str(s3_key) + ") Bucket(" + str(s3_bucket) + ") Compressed(" + str(compress_data) + ") Error(" + str(upload_results["Error"]) + ")", 0)
else:
lg("Uploaded DSName(" + str(dataset_name) + ") S3(" + str(s3_bucket) + ":" + str(s3_key) + ")", 5)
# end of uploading Dataset Name snapshot
lg("Done Uploading to S3(" + str(s3_bucket) + ":" + str(s3_key) + ")", 6)
else:
err_msg = "Analysis Dataset is missing CreationDate"
return self.handle_display_error(err_msg, record, True)
else:
err_msg = "Failed to find Model Cache for DSName(" + str(dataset_name) + ")"
return self.handle_display_error(err_msg, record, True)
results = self.build_def_hash("SUCCESS", "", record )
except Exception,k:
err_msg = "Failed to Upload Cached Analysis Dataset to S3 with Ex(" + str(k) + ")"
return self.handle_display_error(err_msg, record, True)
# end of try/ex
return results
# end of ml_upload_cached_dataset_to_s3
def ml_get_single_model_from_cache(self, cached_model_node, rds, dbs, debug=False):
import zlib
import pickle
import cPickle
model_node = {
"Type" : "",
"Version" : "",
"Target" : "",
"FeatureNames" : "",
"Model" : "",
"ID" : -1
}
lg("Getting Single Model", 6)
model_ra_name = cached_model_node["RAName"]
model_cache_key = cached_model_node["RAKey"]
model_cache_suffix_key = ""
tracking_data = {}
algo_data = {}
tracking_id = -1
tracking_type = ""
tracking_version = 1
if "AlgoTracking" in cached_model_node:
tracking_data = cached_model_node["AlgoTracking"]
tracking_id = tracking_data["TrackingID"]
tracking_type = tracking_data["TrackingType"]
tracking_version = tracking_data["TrackingVersion"]
tracking_rloc = str(tracking_data["RLoc"]).split(":")
algo_data = tracking_data["Data"]
if "ModelRAKeySuffix" in cached_model_node:
model_cache_suffix_key = str(cached_model_node["ModelRAKeySuffix"])
if tracking_type == "UseTargetColAndUnits":
model_ra_name = str(tracking_rloc[0])
model_cache_key = str(tracking_id) + "_" + str(model_cache_suffix_key)
lg("Getting Model RLoc(" + str(model_ra_name) + ":" + str(model_cache_key) + ")", 6)
compressed_model_rec = self.get_cache_from_redis(rds[model_ra_name], model_cache_key, False, False)
if debug:
lg("Checking compressed records(" + str(len(compressed_model_rec["Record"])) + ")", 6)
if len(compressed_model_rec["Record"]) > 0:
model_class_name = compressed_model_rec["Record"]["Type"]
model_version = compressed_model_rec["Record"]["Version"]
model_id = compressed_model_rec["Record"]["ID"]
model_target = compressed_model_rec["Record"]["Target"]
model_feature_names = compressed_model_rec["Record"]["FeatureNames"]
model_obj = cPickle.loads(zlib.decompress(compressed_model_rec["Record"]["CompressedModel"]))
model_node = {
"Type" : model_class_name,
"Version" : model_version,
"Target" : model_target,
"FeatureNames" : model_feature_names,
"Model" : model_obj,
"MLAlgo" : algo_data,
"Tracking" : tracking_data,
"ModelRAKeySuffix" : model_cache_suffix_key,
"ID" : model_id
}
lg("Found Model(" + str(model_node["ID"]) + ") Type(" + str(model_node["Type"]) + ") Target(" + str(model_target) + ") FeatureNames(" + str(len(model_feature_names)) + ")", 6)
# end of found
return model_node
# end of ml_get_single_model_from_cache
def ml_get_models_for_request(self, req, rds, dbs, debug=False):
record = {
}
results = self.build_def_hash("Display Error", "Failed to Get Models for Request", record )
try:
import zlib
import pickle
import cPickle
dataset_name = req["DSName"]
track_id = req["TrackingID"]
ra_name = req["RAName"]
ret_pickle_obj = False
if "ReturnPickle" in req:
ret_pickle_obj = True
cache_key = "_MODELS_" + str(dataset_name) + "_LATEST"
manifest_cache_rec = self.get_cache_from_redis(rds[ra_name], cache_key, False, False)
if len(manifest_cache_rec["Record"]) > 0:
# Take out of cache with: cPickle.loads(zlib.decompress(cPickle.loads( cache_record )["Pickle"]))
cache_records = {}
cache_records["CompressedSize"] = manifest_cache_rec["Record"]["CompressedSize"]
cache_records["DecompressedSize"] = manifest_cache_rec["Record"]["DecompressedSize"]
cache_records["CreationDate"] = manifest_cache_rec["Record"]["CreationDate"]
cache_records["Compression"] = manifest_cache_rec["Record"]["Compression"]
cache_records["CacheVersion"] = manifest_cache_rec["Record"]["CacheVersion"]
cache_records["Manifest"] = manifest_cache_rec["Record"]["Manifest"]
cache_records["Tracking"] = manifest_cache_rec["Record"]["Tracking"]
cache_records["Analysis"] = {}
self.lg("Decompressing Cached Record Size(" + str(cache_records["CompressedSize"]) + ") DecompressedSize(" + str(cache_records["DecompressedSize"]) + ")", 6)
cache_records["Models"] = []
lg("Decompressing Analysis Dataset", 6)
for manifest_key in manifest_cache_rec["Record"]["Manifest"]:
if str(manifest_key).lower() != "models" \
and str(manifest_key).lower() != "date":
ra_name = manifest_cache_rec["Record"]["Manifest"][manifest_key][0]["RAName"]
ra_cache_key = manifest_cache_rec["Record"]["Manifest"][manifest_key][0]["RAKey"]
lg("Finding ManifestKey(" + str(manifest_key) + ") Records in RLoc(" + str(ra_name) + ":" + str(ra_cache_key) + ")", 6)
current_comp_cache = self.get_cache_from_redis(rds[ra_name], ra_cache_key, False, False)
if len(current_comp_cache["Record"]) > 0:
lg("Decompressing Key(" + str(manifest_key) + ")", 6)
cache_records[manifest_key] = cPickle.loads(zlib.decompress(current_comp_cache["Record"]["Pickle"]))
lg("Done Decompressing Key(" + str(manifest_key) + ")", 6)
else:
lg("ERROR: Failing Finding ManifestKey(" + str(manifest_key) + ") Records in RLoc(" + str(ra_name) + ":" + str(ra_cache_key) + ")", 0)
# end of for all manifest
lg("Done Decompressing Analysis Dataset", 6)
self.lg("Decompressing Models(" + str(len(manifest_cache_rec["Record"]["Manifest"]["Models"])) + ")", 6)
for cached_model_node in manifest_cache_rec["Record"]["Manifest"]["Models"]:
model_node = self.ml_get_single_model_from_cache(cached_model_node, rds, dbs, debug)
if model_node["ID"] != -1:
cache_records["Models"].append(model_node)
else:
lg("ERROR: Failed to find a Model in Cache RLoc(" + str(ra_name) + ":" + str(model_node) + ")", 0)
# end of pruning model objs
if len(cache_records["PredictionsDF"].index) > 0:
lg("Sorting Predictions", 6)
cache_records["PredictionsDF"].sort_values(by="Date", ascending=True, inplace=True)
lg("Done Decompressing Models(" + str(len(cache_records["Models"])) + ")", 6)
results = self.build_def_hash("SUCCESS", "", cache_records )
else:
err_msg = "Failed to Get Models in RLoc(" + str(ra_name) + ":" + str(cache_key) + ") Req(" + str(json.dumps(req)) + ")"
return self.handle_display_error(err_msg, record, True)
except Exception,k:
err_msg = "Failed to Get Models for Req(" + str(json.dumps(req)) + ") with Ex(" + str(k) + ")"
return self.handle_display_error(err_msg, record, True)
# end of try/ex
return results
# end of ml_get_models_for_request
def ml_cache_analysis_and_models(self, req, rds, dbs, debug):
record = {
"Locs" : []
}
results = self.build_def_hash("Display Error", "Not Run", record)
try:
import zlib
import pickle
import cPickle
self.lg("Caching Analysis and Models(" + str(len(req["Analysis"]["Models"])) + ")", 6)
tracking_node = req["Analysis"]["Tracking"]
tracking_type = tracking_node["TrackingType"]
tracking_id = tracking_node["TrackingID"]
tracking_version = tracking_node["TrackingVersion"]
tracking_data = tracking_node["Data"]
tracking_rloc = str(tracking_node["RLoc"]).split(":")
ra_name = str(tracking_rloc[0])
cache_key = str(tracking_rloc[1])
run_date_validation = True
if "ValidateDates" in req:
run_date_validation = bool(req["ValidateDates"])
cache_record = {
"Date" : str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")),
"Models" : [],
"Accuracy" : [],
"PredictionsDF" : []
}
accuracy_cache_key = tracking_id + "_" + "Accuracy"
predictionsdf_cache_key = tracking_id + "_" + "PredictionsDF"
accuracy_manifest = [
{
"RAName" : ra_name,
"RAKey" : accuracy_cache_key
}
]
predictions_df_manifest = [
{
"RAName" : ra_name,
"RAKey" : predictionsdf_cache_key
}
]
if "Manifest" in req["Analysis"]:
manifest_node = req["Analysis"]["Manifest"]
accuracy_manifest = manifest_node["Accuracy"]
predictions_df_manifest = manifest_node["PredictionsDF"]
# end of passing the manifests from a file or previous cache back in for a quick data refresh
lg("Compressing Models(" + str(len(req["Analysis"]["Models"])) + ")", 6)
compressed_models = []
num_done = 0
num_left = len(req["Analysis"]["Models"])
total_model_size = 0
compressed_model_size = 0
decompressed_model_size = 0
for m in req["Analysis"]["Models"]:
model_class_name = str(m["Model"].__class__.__name__)
lg("Compressing Model(" + str(num_done) + "/" + str(num_left) + ") Type(" + str(model_class_name) + ")", 6)
model_version = "1" # should use git commit hash for deserializing on correct code
pickled_model = cPickle.dumps(m["Model"])
compress_model = zlib.compress(pickled_model)
compressed_model_size += len(compress_model)
decompressed_model_size += len(pickled_model)
model_cache_rec = {
"Name" : ra_name,
"Key" : m["ID"],
"ID" : m["ID"],
"Target" : m["Target"],
"FeatureNames" : m["FeatureNames"],
"AlgoTracking" : tracking_node,
"ModelRAKeySuffix" : m["Target"],
"Type" : model_class_name,
"Version" : model_version,
"CompressedModel" : compress_model,
"CompressedSize" : len(compress_model),
"DecompressedSize" : len(pickled_model)
}
lg("Done Compressing(" + str(num_done) + "/" + str(num_left) + ") Type(" + str(model_class_name) + ") Size(" + str(model_cache_rec["CompressedSize"]) + ") Decompressed(" + str(model_cache_rec["DecompressedSize"]) + ")", 6)
total_model_size += int(model_cache_rec["CompressedSize"])
model_cache_key = str(m["ID"])
model_cache_key = str(req["TrackingID"]) + "_" + str(num_done) # Allow these to stomp to prevent overrunning the memory
if tracking_type == "UseTargetColAndUnits":
model_cache_key = tracking_id + "_" + str(model_cache_rec["ModelRAKeySuffix"])
lg("Caching Model(" + str(num_done) + ") ID(" + str(m["ID"]) + ") RLoc(" + str(ra_name) + ":" + str(model_cache_key) + ")", 6)
self.purge_and_cache_records_in_redis(rds[ra_name],
model_cache_key,
model_cache_rec,
False)
lg("Done Caching Model(" + str(num_done) + ") ID(" + str(m["ID"]) + ") RLoc(" + str(ra_name) + ":" + str(cache_key) + ")", 6)
new_manifest_node = {
"Type" : model_class_name,
"Version" : model_version,
"RAName" : ra_name,
"RAKey" : model_cache_key,
"Target" : m["Target"],
"AlgoTracking" : tracking_node,
"ModelRAKeySuffix" : str(model_cache_rec["ModelRAKeySuffix"]),
"FeatureNames" : m["FeatureNames"],
"ID" : str(m["ID"])
}
cache_record["Models"].append(new_manifest_node)
num_done += 1
# end for all models to compress
self.lg("Preprocessing Accuracy Results", 6)
no_models = {}
for key_name in req["Analysis"]["AccuracyResults"]:
cur_node = {}
for acc_res_key in req["Analysis"]["AccuracyResults"][key_name]:
# Ignore these as they just increase the cache size
if "model" != str(acc_res_key).lower() \
and "sourcedf" != str(acc_res_key).lower() \
and "predictions" != str(acc_res_key).lower() \
and "predictionsdf" != str(acc_res_key).lower() \
and "confusionmatrix" != str(acc_res_key).lower():
self.lg("Adding Acc(" + str(key_name) + "-" + str(acc_res_key) + ")", 6)
cur_node[acc_res_key] = req["Analysis"]["AccuracyResults"][key_name][acc_res_key]
# end of for each sub key name
no_models[key_name] = cur_node
# end of pruning model objs
cache_record["Accuracy"] = accuracy_manifest
cache_record["PredictionsDF"] = predictions_df_manifest
lg("Caching AccuracyResults RLoc(" + str(ra_name) + ":" + str(accuracy_cache_key) + ")", 6)
self.purge_and_cache_records_in_redis(rds[ra_name],
accuracy_cache_key,
{ "Pickle" : zlib.compress(cPickle.dumps(no_models)) },
False)
lg("Done Caching AccuracyResults", 6)
lg("Caching PredictionsDF RLoc(" + str(ra_name) + ":" + str(predictionsdf_cache_key) + ")", 6)
self.purge_and_cache_records_in_redis(rds[ra_name],
predictionsdf_cache_key,
{ "Pickle" : zlib.compress(cPickle.dumps(req["Analysis"]["PredictionsDF"])) },
False)
lg("Done Caching PredictionsDF", 6)
total_compressed_size = 0.0
total_compressed_size += float(compressed_model_size)
self.lg("Building Compressed Cache Record", 6)
creation_date = str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
compressed_cache_record = {
"CreationDate" : creation_date,
"Compression" : "zlib",
"CacheVersion" : 1,
"CompressedSize" : compressed_model_size,
"DecompressedSize" : decompressed_model_size,
"Tracking" : tracking_node,
"Manifest" : cache_record
}
self.purge_and_cache_records_in_redis(rds[ra_name],
cache_key,
compressed_cache_record,
False)
self.lg("Validating Cache Works", 6)
# Take out of cache with: cPickle.loads(zlib.decompress(cPickle.loads( cache_record )["Pickle"]))
validate_req = {
"DSName" : str(req["Analysis"]["DSName"]),
"TrackingID" : "",
"RAName" : "CACHE"
}
validate_records = self.ml_get_models_for_request(validate_req, rds, dbs, debug)
if len(validate_records["Record"]) > 0:
if validate_records["Record"]["CreationDate"] == creation_date:
if len(validate_records["Record"]["Models"]) == len(cache_record["Models"]):
self.lg("Validated Compressed Cache Records with Models(" + str(len(validate_records["Record"]["Models"])) + ")", 6)
if debug:
lg("Validated Compressed Cache Records with Models(" + str(len(validate_records["Record"]["Models"])) + ")", 5)
print validate_records["Record"]["Models"][0]["Model"]
else:
err_msg = "Failed to validate Compressed Cache Record Models(" + str(len(validate_records["Record"]["Models"])) + ") != Models(" + str(len(cache_record["Models"])) + ")"
return self.handle_display_error(err_msg, record, True)
else:
if run_date_validation:
err_msg = "Failed to validate Compressed Cache Record Date(" + str(validate_records["Record"]["CreationDate"]) + ") != CreationDate(" + str(creation_date) + ")"
return self.handle_display_error(err_msg, record, True)
# end of validation of compressed cache
self.lg("Done Caching Analysis and Models(" + str(len(cache_record["Models"])) + ") RLoc(" + str(ra_name) + ":" + str(cache_key) + ")", 6)
results = self.build_def_hash("SUCCESS", "", record)
except Exception,k:
err_msg = "Unable to Cache Analysis and Models with Ex(" + str(k) + ")"
return self.handle_display_error(err_msg, record, True)
# end of try/ex
return results
# end of ml_cache_analysis_and_models
def ml_cache_model(self, req, rds, dbs, debug):
record = {
"RALoc" : "",
"Pickle" : "",
"Type" : "",
"Version" : "1"
}
results = self.build_def_hash("Display Error", "Not Run", record)
try:
import pickle
ra_name = req["Name"]
ra_key = req["Key"]
model_type = req["Type"]
version = req["Version"]
self.lg("Pickling", 6)
pickle_str = pickle.dumps(req["Model"])
record["RLoc"] = str(ra_name) + ":" + str(ra_key)
self.lg("Storing Pickle Len(" + str(len(pickle_str)) + ") in RLoc(" + str(record["RLoc"]) + ")", 6)
cache_record = {
"Pickle" : str(pickle_str),
"Type" : str(model_type),
"Version" : version
}
self.purge_and_cache_records_in_redis(rds[ra_name],
ra_key,
cache_record,
False)
self.lg("Done", 6)
record.update(cache_record)
results = self.build_def_hash("SUCCESS", "", record)
except Exception,k:
err_msg = "Unable to Cache Model with Ex(" + str(k) + ")"
return self.handle_display_error(err_msg, record, True)
# end of try/ex
return results
# end of ml_cache_model
def ml_encode_target_column(self, df, target_column, result_column, debug=False):
import pandas as pd
success = False
if result_column in df:
lg("Already Have Column(" + str(result_column) + ")", 6)
return (success, df, [])
# Add column to df with integers for the target.
# make a copy of the df
df_mod = df.copy()
# find the uniques in the target column
targets = df_mod[target_column].unique()
# convert to a dict of names to int values
map_to_int = {name: n for n, name in enumerate(targets)}
# replace the result column with the correct integer values
df_mod[result_column] = df_mod[target_column].replace(map_to_int)
success = True
return (success, df_mod, targets)
# end of ml_encode_target_column
def ml_is_valid_classifier_request(self, req):
status = "Failed Validation"
err_msg = ""
record = {}
if "TargetColumnName" not in req:
err_msg = "Classifier Error - Missing required 'TargetColumnName' parameter. Please set it to the name of the column you want the machine to learn."
# end of TargetColumnName
if "TrainFeatureNames" not in req:
err_msg = "Classifier Error - Missing required 'TrainFeatureNames' parameter. Please set it to a list of feature names you want to be trained in the model."
# end of TargetColumnName
if "FeaturesTrain" not in req:
err_msg = "Classifier Error - Missing required 'FeaturesTrain' parameter. Please set it to the features_train result from: features_train, features_test, target_train, target_test = train_test_split(features, train_df[target_col], test_size=test_ratio)."
# end of FeaturesTrain
if "FeaturesTest" not in req:
err_msg = "Classifier Error - Missing required 'FeaturesTest' parameter. Please set it to the features_test result from: features_train, features_test, target_train, target_test = train_test_split(features, train_df[target_col], test_size=test_ratio)."
# end of FeaturesTest
if "TargetTrain" not in req:
err_msg = "Classifier Error - Missing required 'TargetTrain' parameter. Please set it to the target_train result from: features_train, features_test, target_train, target_test = train_test_split(features, train_df[target_col], test_size=test_ratio)."
# end of TargetTrain
if "TargetTest" not in req:
err_msg = "Classifier Error - Missing required 'TargetTest' parameter. Please set it to the target_test result from: features_train, features_test, target_train, target_test = train_test_split(features, train_df[target_col], test_size=test_ratio)."
# end of TargetTest
if "MLType" not in req:
err_msg = "Classifier Error - Missing required 'MLType' parameter. Please set it to a name."
# end of MLType
if "MLAlgo" not in req:
err_msg = "Classifier Error - Missing required 'MLAlgo' parameter. Please set it to a dictionary for building the Machine Learning Algorithm."
# end of MLAlgo
if "Name" not in req["MLAlgo"]:
err_msg = "Classifier Error - Missing required 'Name' parameter under 'MLAlgo'. Please set it to a name for the algorithm object."
# end of MLAlgo.Name
if "Steps" not in req["MLAlgo"]:
err_msg = "Classifier Error - Missing required 'Steps' parameter under 'MLAlgo'. Please set it to a dictionary for the algorithm to run."
# end of MLAlgo.Steps
if err_msg == "":
status = "SUCCESS"
return self.build_def_hash(status, err_msg, record)
# end of ml_is_valid_classifier_request
def ml_is_valid_regression_request(self, req):
status = "Failed Validation"
err_msg = ""
record = {}
if "TargetColumnName" not in req:
err_msg = "Regressor Error - Missing required 'TargetColumnName' parameter. Please set it to the name of the column you want the machine to learn."
# end of TargetColumnName
if "TrainFeatureNames" not in req:
err_msg = "Regressor Error - Missing required 'TrainFeatureNames' parameter. Please set it to a list of feature names you want to be trained in the model."
# end of TargetColumnName
if "FeaturesTrain" not in req:
err_msg = "Regressor Error - Missing required 'FeaturesTrain' parameter. Please set it to the features_train result from: features_train, features_test, target_train, target_test = train_test_split(features, train_df[target_col], test_size=test_ratio)."
# end of FeaturesTrain
if "FeaturesTest" not in req:
err_msg = "Regressor Error - Missing required 'FeaturesTest' parameter. Please set it to the features_test result from: features_train, features_test, target_train, target_test = train_test_split(features, train_df[target_col], test_size=test_ratio)."
# end of FeaturesTest
if "TargetTrain" not in req:
err_msg = "Regressor Error - Missing required 'TargetTrain' parameter. Please set it to the target_train result from: features_train, features_test, target_train, target_test = train_test_split(features, train_df[target_col], test_size=test_ratio)."
# end of TargetTrain
if "TargetTest" not in req:
err_msg = "Regressor Error - Missing required 'TargetTest' parameter. Please set it to the target_test result from: features_train, features_test, target_train, target_test = train_test_split(features, train_df[target_col], test_size=test_ratio)."
# end of TargetTest
if "MLType" not in req:
err_msg = "Regressor Error - Missing required 'MLType' parameter. Please set it to a name."
# end of MLType
if "MLAlgo" not in req:
err_msg = "Regressor Error - Missing required 'MLAlgo' parameter. Please set it to a dictionary for building the Machine Learning Algorithm."
# end of MLAlgo
if "Name" not in req["MLAlgo"]:
err_msg = "Regressor Error - Missing required 'Name' parameter under 'MLAlgo'. Please set it to a name for the algorithm object."
# end of MLAlgo.Name
if "Steps" not in req["MLAlgo"]:
err_msg = "Regressor Error - Missing required 'Steps' parameter under 'MLAlgo'. Please set it to a dictionary for the algorithm to run."
# end of MLAlgo.Steps
if err_msg == "":
status = "SUCCESS"
return self.build_def_hash(status, err_msg, record)
# end of ml_is_valid_regression_request
def ml_is_valid_train_and_test_set(self, ml_request):
import pandas as pd
import numpy as np
err_msg = ""
if len(ml_request["FeaturesTrain"].values) != len(ml_request["TargetTrain"].values):
err_msg = "Confirm the variables are correct: FeaturesTrain(" + str(len(ml_request["FeaturesTrain"].values)) + ") has a different row count than TargetTrain(" + str(len(ml_request["TargetTrain"])) + ")"
if len(ml_request["FeaturesTest"].values) != len(ml_request["TargetTest"].values):
err_msg = "Confirm the variables are correct: FeaturesTest(" + str(len(ml_request["FeaturesTest"].values)) + ") has a different row count than TargetTest(" + str(len(ml_request["TargetTest"])) + ")"
if len(ml_request["FeaturesTest"].values) == 0:
err_msg = "No Data to Process for: FeaturesTest(" + str(len(ml_request["FeaturesTest"].values)) + ")"
if len(ml_request["TargetTest"].values) == 0:
err_msg = "No Data to Process for: FeaturesTest(" + str(len(ml_request["FeaturesTest"].values)) + ")"
if len(ml_request["TrainFeatureNames"]) == 0:
err_msg = "No Data to Process for: TrainFeatureNames(" + str(len(ml_request["TrainFeatureNames"])) + ")"
return err_msg
# end of ml_is_valid_train_and_test_set
def ml_build_req_node(self, req):
ml_type = str(req["MLType"])
ml_algo_name = str(req["MLAlgo"]["Name"])
target_column_name = str(req["TargetColumnName"])
target_column_values = req["TargetColumnValues"]
ignore_features = req["IgnoreFeatures"]
ml_dataset_name = ""
algo_meta_data = {}
train_api_node = {}
cross_val_api_node = {}
</