Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ builddeploy_steps: &builddeploy_steps
# # consumer deployment
# rm -rf buildenvvar
# ./unsetenv.sh
# ./buildenv.sh -e $DEPLOY_ENV -b ${LOGICAL_ENV}-${APPNAME}-consumer-deployvar
# source buildenvvar
# ./master_deploy.sh -d ECS -e $DEPLOY_ENV -t latest -s ${LOGICAL_ENV}-global-appvar,${LOGICAL_ENV}-${APPNAME}-appvar -i ${APPNAME}
./buildenv.sh -e $DEPLOY_ENV -b ${LOGICAL_ENV}-${APPNAME}-consumer-deployvar
source buildenvvar
./master_deploy.sh -d ECS -e $DEPLOY_ENV -t latest -s ${LOGICAL_ENV}-global-appvar,${LOGICAL_ENV}-${APPNAME}-appvar -i ${APPNAME}
# # without kafka dynamodb
# rm -rf buildenvvar
# ./unsetenv.sh
Expand All @@ -62,9 +62,9 @@ builddeploy_steps: &builddeploy_steps
# # reconciler deployment
# rm -rf buildenvvar
# ./unsetenv.sh
./buildenv.sh -e $DEPLOY_ENV -b ${LOGICAL_ENV}-${APPNAME}-reconciler-deployvar
source buildenvvar
./master_deploy.sh -d ECS -e $DEPLOY_ENV -t latest -s ${LOGICAL_ENV}-global-appvar,${LOGICAL_ENV}-${APPNAME}-appvar -i ${APPNAME}
# ./buildenv.sh -e $DEPLOY_ENV -b ${LOGICAL_ENV}-${APPNAME}-reconciler-deployvar
# source buildenvvar
# ./master_deploy.sh -d ECS -e $DEPLOY_ENV -t latest -s ${LOGICAL_ENV}-global-appvar,${LOGICAL_ENV}-${APPNAME}-appvar -i ${APPNAME}


jobs:
Expand Down
5 changes: 5 additions & 0 deletions config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ module.exports = {
},
RETRY_COUNTER: 3,
KAFKA_REPOST_COUNT: 5,
KAFKA_URL: process.env.KAFKA_URL,
KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'ifx-pg-consumer',
KAFKA_CLIENT_CERT: process.env.KAFKA_CLIENT_CERT ? process.env.KAFKA_CLIENT_CERT.replace('\\n', '\n') : null,
KAFKA_CLIENT_CERT_KEY: process.env.KAFKA_CLIENT_CERT_KEY ?
process.env.KAFKA_CLIENT_CERT_KEY.replace('\\n', '\n') : null,
topic_error: {
NAME: 'db.ifxpgmigrate.error',
PARTITION: 0,
Expand Down
32 changes: 23 additions & 9 deletions informix_auditing/audit_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ mi_string *do_castl(MI_CONNECTION *conn, MI_DATUM *datum,
DPRINTF("logger",95,("-- typeName=%s --",srcType));
printf("-- typeName=%s --",srcType);
if ((strcmp("blob", srcType) == 0) || (strcmp("clob", srcType) == 0) || (strcmp("text", srcType) == 0) || (strcmp("byte", srcType) == 0)) {
printf("skiping data read\n");
return("unsupportedtype");
}
else{
Expand Down Expand Up @@ -111,8 +112,8 @@ mi_string *do_castl(MI_CONNECTION *conn, MI_DATUM *datum,
tdesc = mi_type_typedesc(conn, typeid);
precision = mi_type_precision(tdesc);

printf("rputine read initiated \n");
printf("rputine read initiated %ld\n",collen);
//printf("rputine read initiated \n");
//printf("rputine read initiated %ld\n",collen);
new_datum = mi_routine_exec(conn, fn, &ret, datum, collen, precision, fp);
printf("routine read completed \n");
pbuf = mi_lvarchar_to_string(new_datum);
Expand Down Expand Up @@ -158,7 +159,7 @@ mi_string *doInsertCN()
//fixname(pdbname);
sprintf(&buffer[posi], "\"SCHEMANAME\": \"%s\", ", pdbname);
posi = strlen(buffer);
printf("\"TABLENAME\": \"%s\", ", tabname);
printf("\"DBNAME-TABLENAME-operation-TIME\": \"%s-%s-INSERT-%s\" \n",pdbname,tabname,cdatetime);
sprintf(&buffer[posi], "\"TABLENAME\": \"%s\", ", tabname);
posi = strlen(buffer);
sprintf(&buffer[posi], "\"OPERATION\": \"INSERT\", ");
Expand All @@ -182,7 +183,9 @@ DPRINTF("logger", 90, ("insert: colname: (0x%x) [%s]", pcolname, pcolname));
sprintf(&buffer[posi], ", ");
posi = strlen(buffer);
}
sprintf(&buffer[posi], "\"%s\" : \"%s\"", pcolname, escapecharjson(pcast));
char *bufdatval = escapecharjson(pcast);
sprintf(&buffer[posi], "\"%s\" : \"%s\"", pcolname, bufdatval);
free(bufdatval);
if (strcmp("unsupportedtype", pcast) == 0) {
strcpy(uniquedatatype, "true");
}
Expand All @@ -200,6 +203,7 @@ DPRINTF("logger", 90, ("insert: colname: (0x%x) [%s]", pcolname, pcolname));
} else {
sprintf(&buffer[posi], "}, \n \"uniquedatatype\" : \"false\" \n }");
}
printf("\"DBNAME-TABLENAME-operation-TIME\": \"%s-%s-INSERT-%s-Completed\" \n",pdbname,tabname,cdatetime);
free(cdatetime);
return(buffer);
}
Expand Down Expand Up @@ -310,6 +314,7 @@ mi_string *doDeleteCN()
sprintf(&buffer[posi], "\"SCHEMANAME\": \"%s\", ", pdbname);
posi = strlen(buffer);
sprintf(&buffer[posi], "\"TABLENAME\": \"%s\", ", ptabname);
printf("\"DBNAME-TABLENAME-operation-TIME\": \"%s-%s-DELETE-%s\" \n", pdbname,ptabname,cdatetime);
posi = strlen(buffer);
sprintf(&buffer[posi], "\"OPERATION\": \"DELETE\", ");
posi = strlen(buffer);
Expand All @@ -336,7 +341,9 @@ DPRINTF("logger", 90, ("delete: colname: (0x%x) [%s]", pcolname, pcolname));

//pcast = escapecharjson(pcast);
//printf("%s",pcast);
sprintf(&buffer[posi], "\"%s\" : \"%s\"", pcolname, escapecharjson(pcast));
char *bufdatdelval = escapecharjson(pcast);
sprintf(&buffer[posi], "\"%s\" : \"%s\"", pcolname, bufdatdelval);
free(bufdatdelval);
if (strcmp("unsupportedtype", pcast) == 0) {
strcpy(uniquedatatype, "true");
}
Expand All @@ -355,6 +362,7 @@ DPRINTF("logger", 90, ("delete: colname: (0x%x) [%s]", pcolname, pcolname));
} else {
sprintf(&buffer[posi], "}, \n \"uniquedatatype\" : \"false\" \n }");
}
printf("\"DBNAME-TABLENAME-operation-TIME\": \"%s-%s-DELETE-%s-Completed\" \n ", pdbname,ptabname,cdatetime);
free(cdatetime);
return(buffer);
}
Expand Down Expand Up @@ -406,7 +414,8 @@ mi_string *doUpdateCN()
//fixname(pdbname);
sprintf(&buffer[posi], "\"SCHEMANAME\": \"%s\", ", pdbname);
posi = strlen(buffer);
sprintf(&buffer[posi], "\"TABLENAME\": \"%s\", ", ptabname);
sprintf(&buffer[posi], "\"TABLENAME\": \"%s\", ", ptabname);
printf("\"DBNAME-TABLENAME-operation-TIME\": \"%s-%s-UPDATE-%s\" \n", pdbname,ptabname,cdatetime);
posi = strlen(buffer);
sprintf(&buffer[posi], "\"OPERATION\": \"UPDATE\", ");
posi = strlen(buffer);
Expand Down Expand Up @@ -448,7 +457,11 @@ mi_string *doUpdateCN()
sprintf(&buffer[pbufLen], ", ");
pbufLen = strlen(buffer);
}
sprintf(&buffer[pbufLen], "\"%s\" : { \"old\" : \"%s\", \"new\" : \"%s\" }", poldcolname, escapecharjson(pcast), escapecharjson(pcast2));
char *bufdatoldval = escapecharjson(pcast);
char *bufdatnewval = escapecharjson(pcast2);
sprintf(&buffer[pbufLen], "\"%s\" : { \"old\" : \"%s\", \"new\" : \"%s\" }", poldcolname, bufdatoldval, bufdatnewval);
free(bufdatoldval);
free(bufdatnewval);
if (strcmp("unsupportedtype", pcast2) == 0) {
strcpy(uniquedatatype, "true");
}
Expand All @@ -464,6 +477,7 @@ mi_string *doUpdateCN()
sprintf(&buffer[pbufLen], "}, \n \"uniquedatatype\" : \"false\" \n }");
}
DPRINTF("logger", 90, ("Exiting doUpdateCN()"));
printf("\"DBNAME-TABLENAME-operation-TIME\": \"%s-%s-UPDATE-%s-Completed\" \n ", pdbname,ptabname,cdatetime);
free(cdatetime);
return(buffer);
}
Expand Down Expand Up @@ -579,7 +593,7 @@ char * escapecharjson( char *jsonvalue_org)
escjsonvalue = (char *)calloc(10000, sizeof(char));
for (jsonvalue_copy = jsonvalue_org; *jsonvalue_copy != '\0'; jsonvalue_copy++) {

printf("%c:%d\n", *jsonvalue_copy,*jsonvalue_copy);
//printf("%c:%d\n", *jsonvalue_copy,*jsonvalue_copy);
if (*jsonvalue_copy == '"') {
posi = strlen(escjsonvalue);
sprintf(&escjsonvalue[posi], "%s","\\\"") ;
Expand Down Expand Up @@ -616,6 +630,6 @@ char * escapecharjson( char *jsonvalue_org)
}
//p=NULL;
jsonvalue_copy=NULL;
printf("%s", escjsonvalue);
//printf("%s", escjsonvalue);
return(escjsonvalue);
}
6 changes: 3 additions & 3 deletions informix_auditing/auditing2.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void do_auditing2( mi_lvarchar *sessionusername, MI_FPARAM *fp)
mi_string buffer[32], *pdata;

DPRINTF("logger", 80, ("connected user %s", mi_lvarchar_to_string(sessionusername)));
printf("operating user %s welcome test \n",mi_lvarchar_to_string(sessionusername));
printf("USER Triggered: %s\n",mi_lvarchar_to_string(sessionusername));
if (strcmp(mi_lvarchar_to_string(sessionusername), "ifxsyncuser") == 0)
{
printf("automated user. skipping trigger\n");
Expand All @@ -72,14 +72,14 @@ void do_auditing2( mi_lvarchar *sessionusername, MI_FPARAM *fp)
if (trigger_operation & MI_TRIGGER_NOT_IN_EVENT) {
/* not in a trigger! generate an exception */
mi_db_error_raise(NULL, MI_EXCEPTION,
"do_auditing1() can only be called within a trigger!", NULL);
"do_auditing2() can only be called within a trigger!", NULL);
return;
}
/* Make sure this is in a FOR EACH type of trigger */
if (0 == (trigger_operation & MI_TRIGGER_FOREACH_EVENT) ) {
/* not in a for each trigger! generate an exception */
mi_db_error_raise(NULL, MI_EXCEPTION,
"do_auditing1() must be in a FOR EACH trigger operation", NULL);
"do_auditing2() must be in a FOR EACH trigger operation", NULL);
return;
}
/* keep only the SQL operation */
Expand Down
2 changes: 1 addition & 1 deletion src/common/app_log.js
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ async function consumerpg_failure_log(payload, postgreErr) {
CONSUMER_FAILURE_LOG: postgreErr
}).then(log => console.log('Added Error in Consumer Log Table'))
.catch(err => console.log(err))
console.log(`error-sync: consumer failed to update : "${postgreErr}"`)
console.log(`error-sync: consumer failed to update :` + JSON.stringify(postgreErr))
//audit table update
}
// CONSUMER_PAYLOAD: { type: DataTypes.JSON, allowNull: false },
Expand Down
11 changes: 10 additions & 1 deletion src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,16 @@ const Promise = require('bluebird');
const config = require('config');
const logger = require('./common/logger');
const healthcheck = require('topcoder-healthcheck-dropin');
const consumer = new Kafka.GroupConsumer();
const options = {
groupId: config.KAFKA_GROUP_ID,
connectionString: config.KAFKA_URL,
ssl: {
cert: config.KAFKA_CLIENT_CERT,
key: config.KAFKA_CLIENT_CERT_KEY
}
};

const consumer = new Kafka.GroupConsumer(options);
const {
create_consumer_app_log,
consumerpg_success_log,
Expand Down