In [9]:

#amazon aws
import boto3
import math
import numpy as np
import pandas as pd
import os, os.path
import shutil
import sys
import base64
import time
import setup_analysis as sa

##################################
###                            ###
###    START:INITIALIZATION    ###
###                            ###
##################################

#############################
#    INITIALIZATION FILE    #
#############################

#get ptn
ptn = sa.dict_init["ptn"]
#get name of model runs for tagging
tag_name = sa.dict_init["tag_name"]
#number of instnaces to launch
n_instances = sa.dict_init["n_instances"]
#get launch parameter information
ami = sa.dict_init["ami"]
#subnet
subnet = sa.dict_init["subnet"]
#vpc
vpc = sa.dict_init["vpc"]
#security group
security_group = sa.dict_init["security_group"]
#instance type
type_instance = sa.dict_init["instance_type"]
#iam role
iam_role = sa.dict_init["iam_role"]
#s3 information
s3_bucket = sa.dict_init["s3_bucket"]
s3_key_upload = sa.dict_init["s3_upload_key"]
s3_key_temporary = sa.dict_init["s3_temporary_key"]
s3_key_model = sa.dict_init["s3_storage_key"]
#date stamp
session_stamp = str(sa.dict_init["analysis_run_id"])


gen_tiu_q = False
###   LOAD SOME TABLES

df_attribute_primary_id = pd.read_csv(sa.fp_csv_attribute_primary_id)
df_experiment_primary_ids = pd.read_csv(sa.fp_csv_experiment_primary_ids)
df_model_input_database = pd.read_csv(sa.fp_csv_model_input_database)
primaries_to_run = list(set(df_experiment_primary_ids[sa.field_primary_key]))
# reduce for speed
df_model_input_database = df_model_input_database[df_model_input_database[sa.field_primary_key].isin(primaries_to_run)].sort_values(by = [sa.field_primary_key, "year", "month"]).reset_index(drop = True)

###   PATH INITIALIZATIONS

#set export directory
dir_tmp_export = sr.dir_tmp
#check existence
if (os.path.exists(dir_tmp_export)) == True:
    #clear out the directory
    shutil.rmtree(dir_tmp_export)
#make the directory anew
os.makedirs(dir_tmp_export, exist_ok = True)

#get experimental design file
path_exp_design = sa.fp_csv_model_input_database
path_exp_design = path_exp_design.replace("$$$dir_model$$$", dir_model)
#exists?
if os.path.exists(path_exp_design) != True:
    #notify an exit
    print("Experimental design file " + str(path_exp_design) + " not found. Exiting...")
    sys.exit()



#############################
#    INITIALIZE AWS INFO    #
#############################

#initialize session
b3s = boto3.Session(profile_name = "default")
#start ec2 object
ec2 = b3s.resource("ec2")
#start s3 object
s3 = b3s.resource("s3")
#get client
ec2client = boto3.client("ec2")
s3client = boto3.client("s3")
#start instance list
instanceList = []

###   CHECK S3

#get all bucket available
allBuckets = s3client.list_buckets()
ab = []
for a in allBuckets["Buckets"]:
    #get names
    ab.append(a["Name"])
#buckets ok? (initialize as false)
bucketsQ = False
#check membership
if s3_bucket in ab:
    bucketsQ = True
#exit if bucket is not found
if bucketsQ != True:
    print("Error: one or more buckets not found in s3.")
    sys.exit()


#check for temporary bucket ahead of time; clear if it exists
s3_key_exists = s3client.list_objects(Bucket = s3_bucket, Prefix = s3_key_temporary)
#create bucket object
bucket_check = s3.Bucket(s3_bucket)
#clean up
if s3_key_exists.get("Contents") != None:
    #notify of remove
    print("(TEMP DISABLE FOR TESTING) Removing " + s3_key_temporary + " from " + s3_bucket + ".")
else:
    #notify that not found
    print(s3_key_temporary + " not found in bucket s3://" + s3_bucket + ". It will be created.")


#set output bucket key
s3_key_model_stamped = s3_key_model + "/" + session_stamp
#check for model output bucket ahead of time; clear if it exists
s3_key_exists2 = s3client.list_objects(Bucket = s3_bucket, Prefix = s3_key_model_stamped)
#create bucket object
bucket_check2 = s3.Bucket(s3_bucket)
#clean up
if s3_key_exists2.get("Contents") != None:
    #notify of remove
    print("(TEMPORARILY DISABLED FOR TESTING) Removing " + s3_key_model_stamped + " from " + s3_bucket + ".")
else:
    #notify that not found
    print(s3_key_model_stamped + " not found in bucket s3://" + s3_bucket + ". It will be created.")



###################################
###                             ###
###    UPLOAD REQUIRED DATA     ###
###                             ###
###################################


if not gen_tiu_q:

    ###   FIRST, CREATE INTEGRATED MODEL TARBALL

    #set file path for system run and directory name of integrated model to upload
    dirname_upload = "model_upload"
    fn_mod_compressed = f"{dirname_upload}.tar.gz"
    fp_mod_compressed = os.path.join(os.path.dirname(sa.dir_proj), fn_mod_compressed)



    # COPY OVER INTEGRATED MODEL

    #temporary directory
    fp_tmp = os.path.join(os.path.dirname(sa.dir_proj), dirname_upload)
    #notify
    t0_copy = time.time()
    print("Copying " + sa.dir_proj + " to  " + fp_tmp + "...\n")
    #copy to tmp
    comm_cp = "rsync -a \"" + dir_model + "\" \"" + fp_tmp + "\""
    #paths to exclude
    vec_exclude_files_from_copy = vec_exclude_files_from_copy + [
        sa.dir_out,
        sa.fp_csv_experiment_primary_ids,
        sa.fp_csv_model_input_database,
        sa.fp_csv_model_output_database
    ]
    #ensure uniqueness
    vec_exclude_files_from_copy = list(set(vec_exclude_files_from_copy))
    #loop
    for fp in vec_exclude_files_from_copy:
        #string to replace
        str_repl = dir_model + os.path.sep
        #clean
        fp_clean = fp.replace(str_repl, "")
        #update command
        comm_cp = comm_cp + " --exclude \"" + fp_clean + "\""
    #execute it
    cp_res = os.system(comm_cp)
    #create an empty "out" directory
    os.makedirs(os.path.join(fp_tmp, "out"), exist_ok = True)
    #notify
    print("Copy complete. Time elapsed " + str(time.time() - t0_copy) + " seconds.")


    #check to see if compressed version of model exists
    if os.path.exists(fp_mod_compressed):
        #remove it if so
        os.remove(fp_mod_compressed)
    #set script execution to generate new tarball—IMPORTANT:COPYFILE_DISABLE=1 *MUST* BE SET TO 1 TO ELIMINATE ERRONEOUS FILES IN THE EXTRACTION
    print("Compressing " + dirname_upload + " to  " + fp_mod_compressed + "...\n\n")

    # switch to the correct directory
    dir_setback = os.getcwd()   
    os.chdir(os.path.dirname(sa.dir_proj))
    comm_tar = "COPYFILE_DISABLE=1 tar -cvzf \"" + fp_mod_compressed + "\" " + dirname_upload
    #run
    os.system(comm_tar)
    os.chdir(dir_setback)
    #remove temporary copy
    shutil.rmtree(fp_tmp)

    #

    ###   THEN, SET FILE PATHS FOR UPLOAD

    ##  ENERGY TARBALL

    #notify
    print("Uploading " + fp_mod_compressed + " to " + s3_key_temporary + "...\n")
    #add upload key
    s3_object_mod = s3_key_temporary + "/" + fn_mod_compressed
    #add as object
    s3client.upload_file(fp_mod_compressed, s3_bucket, s3_object_mod)

    #get initialization file (aws) and upload
    print("Uploading " + sa.fp_ini_aws + " to " + s3_key_temporary + "...\n")
    #add upload key
    s3_object_init_aws = s3_key_temporary + "/initialize_aws.ini"
    #add as object
    s3client.upload_file(sa.fp_ini_aws, s3_bucket, s3_object_init_aws)



##############################################
#    START:EXPERIMENTAL DESIGN SUBSETTING    #
##############################################

#get experimental design
exp_design = pd.read_csv(sa.fp_csv_experiment_primary_ids)
#reduce
exp_design = pd.DataFrame(exp_design[sa.field_primary_key])
#add run ids to loop over
primaries_all = list(set(exp_design[sa.field_primary_key]))
primaries_all.sort()
#number of futures
n_primaries = len(primaries_all)

#subset the master attribute and get unique time.series/design combinations
df_attribute_primary_id = df_attribute_primary_id[df_attribute_primary_id["Master.ID"].isin(set(exp_design["Master.ID"]))]
df_attribute_primary_id = df_attribute_primary_id.sort_values(by = ["Master.ID"])




##############################################
###                                        ###
###    BUILD AND UPLOAD INSTANCE FILES     ###
###                                        ###
##############################################

#initialize dictionary of session files to upload
dict_session_inis = {}
#initialize dictionary mapping instance id to session id
dict_inst_to_session = {}
#initialize data frame for indexing futures to instance
df_instance_indexing = []
#initialize list of output designs
designFilesOut = []
#set instance output index
inst_id = 0

##  BUILD SESSION INITIALIZATION TEMPLATE BY SPLIT

#notify
print("Starting build of session initialization files...\n")

with open(sa.fp_ini_session, 'r') as file_template_session_ini:
    template_session_ini = file_template_session_ini.readlines()
# template file name
fn_ini_session_upload = "initialize_session_upload.ini"
#dictionary of replacements
dict_repls = {
    "cloud_run_q": "True",
    "build_experimental_design_q": "False"
}
#adjusted file
file_session_ini = []
#loop over ini
for j in range(len(template_session_ini)):
    line = template_session_ini[j]
    #check over keys
    for key in dict_repls.keys():
        key_str = str(key)
        #check line
        overwrite_q = (key_str == line[0:min(len(key_str), len(line))])
        #
        if overwrite_q:
            line = str(key) + ":\t" + str(dict_repls[key]) + "\n"
    file_session_ini.append(line)
#write to output
fp_si = os.path.join(sa.dir_tmp, fn_ini_session_upload)
#write lines
with open(fp_si, "w") as file_si_out:
    file_si_out.writelines(file_session_ini)

    ##  ADD TO S3

    if not gen_tiu_q:
        #build s3 upload key for file
        temp_key = s3_key_temporary + "/" + fn_si
        #notify
        print("Uploading file " + fn_si + " to s3://" + s3_bucket + "/" + temp_key + "\n")
        #upload
        s3.meta.client.upload_file(fp_si, s3_bucket, temp_key)

#notify
print("Session initialization file generation complete.\n\n")



################################################################
#    START DIVISION OF EXPERIMENTAL DESIGN AND UPLOAD TO S3    #
################################################################

print("Chopping experimental design...\n")

#get information
n_base_scenarios_per_inst = math.floor(n_primaries/n_instances)
n_extra_scenarios = n_primaries%n_base_scenarios_per_inst

#initialize number of extra lines added to this dt's set
nelAdded = 0
#initialize "task list"
l_task = primaries_all

#build files
while len(l_task) > 0:

    ##  GET SCENARIOS

    #get index
    if nelAdded != n_extra_scenarios:
        #extra future to add
        q = 1
        #update
        nelAdded = nelAdded + 1
    else:
        q = 0
    #number of lines to add
    n_scenarios_extract = n_base_scenarios_per_inst + q
    #remove from list
    scenarios_cur = l_task[0:n_scenarios_extract]


    ## SUBSET AND INDEX

    #build temporary file for upload
    temp = df_model_input_database[df_model_input_database[sa.field_primary_key].isin(scenarios_cur)][[sa.field_primary_key]]
    #generate index of
    dfo = list(set(scenarios_cur))
    dfo.sort()
    dfo = [[x, inst_id] for x in dfo]
    dfo = pd.DataFrame(dfo, columns = [sa.field_primary_key, "instance_id"])
    #add to master index
    df_instance_indexing.append(dfo)

    ## EXPORT TO CSV

    #build file name
    fnOut = "instance_" + str(inst_id) + ".csv"
    #build file path out name
    tempPathOut = os.path.join(dir_tmp_export, fnOut)
    #write output
    temp.to_csv(tempPathOut, header = True, index = False)

    ##  S3 UPLOADS

    if not gen_tiu_q:
        #build s3 upload key for file
        temp_key = s3_key_temporary + "/" + fnOut
        #notify
        print("Uploading file " + fnOut + " to s3://" + s3_bucket + "/" + temp_key)
        #upload
        s3.meta.client.upload_file(tempPathOut, s3_bucket, temp_key)
    #extract
    designFilesOut.append(temp)
    #update instance index
    inst_id += 1
    #reduce the task list
    if len(l_task) > n_scenarios_extract:
        #reduce list
        l_task = l_task[n_scenarios_extract:len(l_task)]
    else:
        l_task = []

df_instance_indexing = pd.concat(df_instance_indexing, axis = 0)
#check to ensure integer
for field in list(df_instance_indexing.columns):
    df_instance_indexing[field] = np.array(df_instance_indexing[field]).astype(int)
#export indexed csv
df_instance_indexing.to_csv(os.path.join(dir_tmp_export, "scenario_instance_index.csv"), index = False)

print("Instance data upload complete.")




AttributeError: module 'setup_analysis' has no attribute 'dict_init'

In [10]:
import setup_analysis as sa

In [11]:
import importlib

In [12]:
importlib.reload(sa)

<module 'setup_analysis' from '/Users/jsyme/Documents/PRGS/Classes/2021/RDM Tutorial/tutorial2021/code/crdm_project/python/setup_analysis.py'>

In [13]:
sa.dict_init

AttributeError: module 'setup_analysis' has no attribute 'dict_init'