Skip to content

Commit

Permalink
[#27] Added configuration to support time violation.
Browse files Browse the repository at this point in the history
Configuration parameters are metadata_key_for_storage_tiering_time_violation and set_metadata_storage_tiering_time_violation
Update readme with configuration settings to set the update time on data objects for storage tiering compatability when using direct mode.
  • Loading branch information
JustinKyleJames committed May 9, 2019
1 parent a79a6ce commit f22b567
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 37 deletions.
4 changes: 4 additions & 0 deletions README.md
Expand Up @@ -112,6 +112,10 @@ This will create an executable called lustre_irods_connector and a configuration
- The lustre_root_path needs to be in the register_map and must be the last entry in this map.
- The entries must be ordered from more specific to less specific. For example, "/mnt/dir1" should appear in the map before "/mnt"
- thread_{n}_connection_paramters - irods_host and irods_port that thread n connects to. If this is not defined the local iRODS environment (iinit) is used.
- set_metadata_for_storage_tiering_time_violation (optional) - If set to "true" sets the metadata for update time on data objects to be compatible with the storage tiering plugin when using time violation policy.
- metadata_key_for_storage_tiering_time_violation (optional) - The metdata key used for the update time metdata on data objects. The default is "irods::access_time". This should be set to the same value that is configured in storage tiering.

Note that the last two settings are only valid when irods_api_update_type is "direct". When policy is used the metadata is set by the rules as defined in the storage tiering policy.

9. Add the irods user on the MDS server with the same user ID and group ID as exists on the iRODS server. Here is an example entry in /etc/passwd.

Expand Down
71 changes: 69 additions & 2 deletions irods_lustre_plugin/src/irods_lustre_operations.cpp
Expand Up @@ -388,7 +388,8 @@ void handle_create(const std::vector<std::pair<std::string, std::string> >& regi
void handle_batch_create(const std::vector<std::pair<std::string, std::string> >& register_map, const int64_t& resource_id,
const std::string& resource_name, const std::vector<std::string>& fidstr_list, const std::vector<std::string>& lustre_path_list,
const std::vector<std::string>& object_name_list, const std::vector<std::string>& parent_fidstr_list,
const std::vector<int64_t>& file_size_list, const int64_t& maximum_records_per_sql_command, rsComm_t* _comm, icatSessionStruct *icss, const rodsLong_t& user_id) {
const std::vector<int64_t>& file_size_list, const int64_t& maximum_records_per_sql_command, rsComm_t* _comm, icatSessionStruct *icss,
const rodsLong_t& user_id, bool set_metadata_for_storage_tiering_time_violation, const std::string& metadata_key_for_storage_tiering_time_violation) {

size_t insert_count = fidstr_list.size();
int status;
Expand All @@ -407,7 +408,12 @@ void handle_batch_create(const std::vector<std::pair<std::string, std::string> >
std::vector<rodsLong_t> data_obj_sequences;
std::vector<rodsLong_t> metadata_sequences;
cmlGetNSeqVals(icss, insert_count, data_obj_sequences);
cmlGetNSeqVals(icss, insert_count, metadata_sequences);

if (set_metadata_for_storage_tiering_time_violation) {
cmlGetNSeqVals(icss, insert_count+1, metadata_sequences);
} else {
cmlGetNSeqVals(icss, insert_count, metadata_sequences);
}

// insert into R_DATA_MAIN

Expand Down Expand Up @@ -492,6 +498,35 @@ void handle_batch_create(const std::vector<std::pair<std::string, std::string> >
}
#endif

// if we are setting the access time metadata for storage tiering
if (set_metadata_for_storage_tiering_time_violation) {
// Insert access time into R_META_MAIN

time_t now = time(NULL);

insert_sql = "insert into R_META_MAIN (meta_id, meta_attr_name, meta_attr_value) values (" +
std::to_string(metadata_sequences[insert_count]) + ", '" +
metadata_key_for_storage_tiering_time_violation + "', '" +
std::to_string(now) + "')";

cllBindVarCount = 0;
status = cmlExecuteNoAnswerSql(insert_sql.c_str(), icss);
if (status != 0) {
rodsLog(LOG_ERROR, "Error inserting metadata into R_META_MAIN for %s. Error is %i. SQL is %s.",
metadata_key_for_storage_tiering_time_violation.c_str(), status, insert_sql.c_str());
return;
}

#if !defined(COCKROACHDB_ICAT)
status = cmlExecuteNoAnswerSql("commit", icss);
if (status != 0) {
rodsLog(LOG_ERROR, "Error committing insert into R_META_MAIN. Error is %i", status);
return;
}
#endif

} // set_metadata_for_storage_tiering_time_violation


// Insert into R_OBJT_METMAP

Expand Down Expand Up @@ -520,6 +555,38 @@ void handle_batch_create(const std::vector<std::pair<std::string, std::string> >
}
#endif


// if we are setting the access time metadata for storage tiering
if (set_metadata_for_storage_tiering_time_violation) {

insert_sql = "insert into R_OBJT_METAMAP (object_id, meta_id) values ";

for (size_t i = 0; i < insert_count; ++i) {
insert_sql += "(" + std::to_string(data_obj_sequences[i]) + ", " + std::to_string(metadata_sequences[insert_count]) + ")";

if (i < insert_count - 1) {
insert_sql += ", ";
}
}

cllBindVarCount = 0;
status = cmlExecuteNoAnswerSql(insert_sql.c_str(), icss);
if (status != 0) {
rodsLog(LOG_ERROR, "Error performing batch insert into R_OBJT_METAMAP. Error is %i. SQL is %s.", status, insert_sql.c_str());
return;
}

#if !defined(COCKROACHDB_ICAT)
status = cmlExecuteNoAnswerSql("commit", icss);
if (status != 0) {
rodsLog(LOG_ERROR, "Error committing insert into R_OBJT_METAMAP for storage tiering time metadata. Error is %i", status);
return;
}
#endif

} // set_metadata_for_storage_tiering_time_violation


// insert user ownership
//insert into R_OBJT_ACCESS (object_id, user_id, access_type_id) values (?, ?, 1200)
insert_sql = "insert into R_OBJT_ACCESS (object_id, user_id, access_type_id) values ";
Expand Down
3 changes: 2 additions & 1 deletion irods_lustre_plugin/src/irods_lustre_operations.hpp
Expand Up @@ -51,7 +51,8 @@ void handle_create(const std::vector<std::pair<std::string, std::string> >& regi
void handle_batch_create(const std::vector<std::pair<std::string, std::string> >& register_map, const int64_t& resource_id,
const std::string& resource_name, const std::vector<std::string>& fidstr_list, const std::vector<std::string>& lustre_path_list,
const std::vector<std::string>& object_name_list, const std::vector<std::string>& parent_fidstr_list,
const std::vector<int64_t>& file_size_list, const int64_t& maximum_records_per_sql_command, rsComm_t* _comm, icatSessionStruct *icss, const rodsLong_t& user_id);
const std::vector<int64_t>& file_size_list, const int64_t& maximum_records_per_sql_command, rsComm_t* _comm, icatSessionStruct *icss, const rodsLong_t& user_id,
bool set_metadata_for_storage_tiering_time_violation, const std::string& metadata_key_for_storage_tiering_time_violation);

void handle_mkdir(const std::vector<std::pair<std::string, std::string> >& register_map, const int64_t& resource_id,
const std::string& resource_name, const std::string& fidstr, const std::string& lustre_path, const std::string& object_name,
Expand Down
7 changes: 6 additions & 1 deletion irods_lustre_plugin/src/libirods-lustre-api.cpp
Expand Up @@ -207,6 +207,10 @@ int rs_handle_lustre_records( rsComm_t* _comm, irodsLustreApiInp_t* _inp, irodsL
int64_t resource_id = changeMap.getResourceId();
std::string resource_name(changeMap.getResourceName().cStr());
int64_t maximum_records_per_sql_command = changeMap.getMaximumRecordsPerSqlCommand();
bool set_metadata_for_storage_tiering_time_violation = changeMap.getSetMetadataForStorageTieringTimeViolation();
std::string metadata_key_for_storage_tiering_time_violation = changeMap.getMetadataKeyForStorageTieringTimeViolation();

rodsLog(LOG_NOTICE, "metadata_key_for_storage_tiering_time_violation=%s", metadata_key_for_storage_tiering_time_violation.c_str());

// for batched file inserts
std::vector<std::string> fidstr_list_for_create;
Expand Down Expand Up @@ -286,7 +290,8 @@ int rs_handle_lustre_records( rsComm_t* _comm, irodsLustreApiInp_t* _inp, irodsL
if (fidstr_list_for_create.size() > 0) {
handle_batch_create(register_map, resource_id, resource_name,
fidstr_list_for_create, lustre_path_list, object_name_list, parent_fidstr_list, file_size_list,
maximum_records_per_sql_command, _comm, icss, user_id);
maximum_records_per_sql_command, _comm, icss, user_id, set_metadata_for_storage_tiering_time_violation,
metadata_key_for_storage_tiering_time_violation);
}
}

Expand Down
2 changes: 2 additions & 0 deletions lustre_irods_connector/src/change_table.capnp
Expand Up @@ -42,6 +42,8 @@ struct ChangeMap {
irodsApiUpdateType @4 :Text;
resourceName @5 :Text;
maximumRecordsPerSqlCommand @6 :Int64;
setMetadataForStorageTieringTimeViolation @7 :Bool;
metadataKeyForStorageTieringTimeViolation @8 :Text;
}


30 changes: 27 additions & 3 deletions lustre_irods_connector/src/config.cpp
Expand Up @@ -14,11 +14,17 @@
FILE *dbgstream = stdout;
int log_level = LOG_INFO;

int read_key_from_map(const json_map& config_map, const std::string &key, std::string& value) {
int read_key_from_map(const json_map& config_map, const std::string &key, std::string& value, bool required = true) {
auto entry = config_map.find(key);
if (entry == config_map.end()) {
LOG(LOG_ERR, "Could not find key %s in configuration\n", key.c_str());
return lustre_irods::CONFIGURATION_ERROR;
if (required) {
LOG(LOG_ERR, "Could not find key %s in configuration\n", key.c_str());
return lustre_irods::CONFIGURATION_ERROR;
} else {
// return error here just indicates the caller should
// set a default value
return lustre_irods::CONFIGURATION_ERROR;
}
}
std::stringstream tmp;
tmp << entry->second;
Expand Down Expand Up @@ -78,6 +84,7 @@ int read_config_file(const std::string& filename, lustre_irods_connector_cfg_t *
std::string maximum_records_per_sql_command_str;
std::string maximum_records_to_receive_from_lustre_changelog_str;
std::string message_receive_timeout_msec_str;
std::string time_violation_setting_str;

try {
json_map config_map{ json_file{ filename.c_str() } };
Expand Down Expand Up @@ -160,6 +167,23 @@ int read_config_file(const std::string& filename, lustre_irods_connector_cfg_t *
return lustre_irods::CONFIGURATION_ERROR;
}

if (0 != read_key_from_map(config_map, "set_metadata_for_storage_tiering_time_violation", time_violation_setting_str, false)) {
config_struct->set_metadata_for_storage_tiering_time_violation = false;
} else {
std::transform(time_violation_setting_str.begin(), time_violation_setting_str.end(), time_violation_setting_str.begin(), ::tolower);
if (time_violation_setting_str == "true") {
config_struct->set_metadata_for_storage_tiering_time_violation = true;
} else {
config_struct->set_metadata_for_storage_tiering_time_violation = false;
}
}


if (0 != read_key_from_map(config_map, "metadata_key_for_storage_tiering_time_violation", config_struct->metadata_key_for_storage_tiering_time_violation, false)) {
config_struct->metadata_key_for_storage_tiering_time_violation = "irods::access_time";
}
LOG(LOG_INFO, "set metadata_key_for_storage_tiering_time_violation=%s\n", config_struct->metadata_key_for_storage_tiering_time_violation.c_str());

// read register_map
try {
auto &register_map_array(config_map.get<json_array>("register_map"));
Expand Down
6 changes: 6 additions & 0 deletions lustre_irods_connector/src/config.hpp
Expand Up @@ -30,6 +30,12 @@ typedef struct lustre_irods_connector_cfg {
unsigned int maximum_records_per_update_to_irods;
unsigned int maximum_records_to_receive_from_lustre_changelog;
unsigned int message_receive_timeout_msec;

// optional parameters for using storage tiering time violation
bool set_metadata_for_storage_tiering_time_violation;
std::string metadata_key_for_storage_tiering_time_violation;


std::map<int, irods_connection_cfg_t> irods_connection_list;

// map the lustre path to irods path
Expand Down
2 changes: 2 additions & 0 deletions lustre_irods_connector/src/lustre_change_table.cpp
Expand Up @@ -547,6 +547,8 @@ int write_change_table_to_capnproto_buf(const lustre_irods_connector_cfg_t *conf
changeMap.setUpdateStatus("PENDING");
changeMap.setIrodsApiUpdateType(config_struct_ptr->irods_api_update_type);
changeMap.setMaximumRecordsPerSqlCommand(config_struct_ptr->maximum_records_per_sql_command);
changeMap.setSetMetadataForStorageTieringTimeViolation(config_struct_ptr->set_metadata_for_storage_tiering_time_violation);
changeMap.setMetadataKeyForStorageTieringTimeViolation(config_struct_ptr->metadata_key_for_storage_tiering_time_violation);

// build the register map
capnp::List<RegisterMapEntry>::Builder reg_map = changeMap.initRegisterMap(config_struct_ptr->register_map.size());
Expand Down
54 changes: 24 additions & 30 deletions lustre_irods_connector/src/lustre_irods_connector_config.json
@@ -1,54 +1,48 @@
{
"mdtname": "test-MDT0000",
"mdtname": "lustre01-MDT0000",
"changelog_reader": "cl1",
"lustre_root_path": "/cfs/test",
"irods_resource_name": "lustre-test",
"lustre_root_path": "/lustreResc/lustre01",
"irods_resource_name": "lustreResc",
"irods_api_update_type": "direct",
"log_level": "LOG_INFO",
"log_level": "LOG_DBG",
"changelog_poll_interval_seconds": 1,
"irods_client_connect_failure_retry_seconds": 30,
"irods_client_broadcast_address": "ipc:///irods_client_broadcast_events",
"changelog_reader_broadcast_address": "ipc:///changelog_reader_broadcast_events",
"changelog_reader_push_work_address": "ipc:///changelog_reader_push_work_events",
"result_accumulator_push_address": "ipc:///result_accumulator_push_address",
"irods_updater_thread_count": 6,
"maximum_records_per_update_to_irods": 1000,
"maximum_records_per_sql_command": 500,
"maximum_records_to_receive_from_lustre_changelog": 10000,
"irods_updater_thread_count": 5,
"maximum_records_per_update_to_irods": 200,
"maximum_records_per_sql_command": 1,
"maximum_records_to_receive_from_lustre_changelog": 500,
"message_receive_timeout_msec": 2000,

"set_metadata_for_storage_tiering_time_violation": "true",
"metadata_key_for_storage_tiering_time_violation": "irods::access_time",

"register_map": [
{
"lustre_path": "/cfs/test/scratch",
"irods_register_path": "/pdc.kth.se/scratch"
"lustre_path": "/lustreResc/lustre01/home",
"irods_register_path": "/tempZone/home"
},
{
"lustre_path": "/lustreResc/lustre01/a",
"irods_register_path": "/tempZone/a"
},
{
"lustre_path": "/cfs/test",
"irods_register_path": "/pdc.kth.se/lustre_root"
"lustre_path": "/lustreResc/lustre01",
"irods_register_path": "/tempZone/lustre01"
}
],


"thread_1_connection_parameters": {
"irods_host": "ridgehead.pdc.kth.se",
"irods_host": "localhost",
"irods_port": "1247"
},
"thread_2_connection_parameters": {
"irods_host": "ridgehead.pdc.kth.se",
"irods_port": "1247"
}
"thread_3_connection_parameters": {
"irods_host": "ridgehead.pdc.kth.se",
"irods_port": "1247"
}
"thread_4_connection_parameters": {
"irods_host": "ridgehead.pdc.kth.se",
"irods_port": "1247"
}
"thread_5_connection_parameters": {
"irods_host": "ridgehead.pdc.kth.se",
"irods_port": "1247"
}
"thread_6_connection_parameters": {
"irods_host": "ridgehead.pdc.kth.se",
"irods_host": "localhost",
"irods_port": "1247"
}
}

0 comments on commit f22b567

Please sign in to comment.