From 366d96065993381db1289aaa795b81bf22bf4630 Mon Sep 17 00:00:00 2001 From: Gunasekar-K Date: Wed, 22 Apr 2020 13:33:35 +0530 Subject: [PATCH 1/6] Memory leak suspected --- .circleci/config.yml | 6 +++--- informix_auditing/audit_util.c | 27 ++++++++++++++++++++------- informix_auditing/auditing2.c | 6 +++--- 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 20cdc6d..02aaf2d 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -62,9 +62,9 @@ builddeploy_steps: &builddeploy_steps # # reconciler deployment # rm -rf buildenvvar # ./unsetenv.sh - ./buildenv.sh -e $DEPLOY_ENV -b ${LOGICAL_ENV}-${APPNAME}-reconciler-deployvar - source buildenvvar - ./master_deploy.sh -d ECS -e $DEPLOY_ENV -t latest -s ${LOGICAL_ENV}-global-appvar,${LOGICAL_ENV}-${APPNAME}-appvar -i ${APPNAME} + # ./buildenv.sh -e $DEPLOY_ENV -b ${LOGICAL_ENV}-${APPNAME}-reconciler-deployvar + # source buildenvvar + # ./master_deploy.sh -d ECS -e $DEPLOY_ENV -t latest -s ${LOGICAL_ENV}-global-appvar,${LOGICAL_ENV}-${APPNAME}-appvar -i ${APPNAME} jobs: diff --git a/informix_auditing/audit_util.c b/informix_auditing/audit_util.c index e4af24b..c7de2aa 100644 --- a/informix_auditing/audit_util.c +++ b/informix_auditing/audit_util.c @@ -158,7 +158,7 @@ mi_string *doInsertCN() //fixname(pdbname); sprintf(&buffer[posi], "\"SCHEMANAME\": \"%s\", ", pdbname); posi = strlen(buffer); - printf("\"TABLENAME\": \"%s\", ", tabname); + printf("\"DBNAME-TABLENAME-operation-TIME\": \"%s-%s-INSERT-%s\" \n",pdbname,tabname,cdatetime); sprintf(&buffer[posi], "\"TABLENAME\": \"%s\", ", tabname); posi = strlen(buffer); sprintf(&buffer[posi], "\"OPERATION\": \"INSERT\", "); @@ -182,7 +182,9 @@ DPRINTF("logger", 90, ("insert: colname: (0x%x) [%s]", pcolname, pcolname)); sprintf(&buffer[posi], ", "); posi = strlen(buffer); } - sprintf(&buffer[posi], "\"%s\" : \"%s\"", pcolname, escapecharjson(pcast)); + char *bufdatval = escapecharjson(pcast); + sprintf(&buffer[posi], "\"%s\" : \"%s\"", pcolname, bufdatval); + free(bufdatval); if (strcmp("unsupportedtype", pcast) == 0) { strcpy(uniquedatatype, "true"); } @@ -200,6 +202,7 @@ DPRINTF("logger", 90, ("insert: colname: (0x%x) [%s]", pcolname, pcolname)); } else { sprintf(&buffer[posi], "}, \n \"uniquedatatype\" : \"false\" \n }"); } + printf("\"DBNAME-TABLENAME-operation-TIME\": \"%s-%s-INSERT-%s-Completed\" \n",pdbname,tabname,cdatetime); free(cdatetime); return(buffer); } @@ -310,6 +313,7 @@ mi_string *doDeleteCN() sprintf(&buffer[posi], "\"SCHEMANAME\": \"%s\", ", pdbname); posi = strlen(buffer); sprintf(&buffer[posi], "\"TABLENAME\": \"%s\", ", ptabname); + printf("\"DBNAME-TABLENAME-operation-TIME\": \"%s-%s-DELETE-%s\" \n", pdbname,ptabname,cdatetime); posi = strlen(buffer); sprintf(&buffer[posi], "\"OPERATION\": \"DELETE\", "); posi = strlen(buffer); @@ -336,7 +340,9 @@ DPRINTF("logger", 90, ("delete: colname: (0x%x) [%s]", pcolname, pcolname)); //pcast = escapecharjson(pcast); //printf("%s",pcast); - sprintf(&buffer[posi], "\"%s\" : \"%s\"", pcolname, escapecharjson(pcast)); + char *bufdatdelval = escapecharjson(pcast); + sprintf(&buffer[posi], "\"%s\" : \"%s\"", pcolname, bufdatdelval); + free(bufdatdelval) if (strcmp("unsupportedtype", pcast) == 0) { strcpy(uniquedatatype, "true"); } @@ -355,6 +361,7 @@ DPRINTF("logger", 90, ("delete: colname: (0x%x) [%s]", pcolname, pcolname)); } else { sprintf(&buffer[posi], "}, \n \"uniquedatatype\" : \"false\" \n }"); } + printf("\"DBNAME-TABLENAME-operation-TIME\": \"%s-%s-DELETE-%s-Completed\" \n ", pdbname,ptabname,cdatetime); free(cdatetime); return(buffer); } @@ -406,7 +413,8 @@ mi_string *doUpdateCN() //fixname(pdbname); sprintf(&buffer[posi], "\"SCHEMANAME\": \"%s\", ", pdbname); posi = strlen(buffer); - sprintf(&buffer[posi], "\"TABLENAME\": \"%s\", ", ptabname); + sprintf(&buffer[posi], "\"TABLENAME\": \"%s\", ", ptabname); + printf("\"DBNAME-TABLENAME-operation-TIME\": \"%s-%s-UPDATE-%s\" \n", pdbname,ptabname,cdatetime); posi = strlen(buffer); sprintf(&buffer[posi], "\"OPERATION\": \"UPDATE\", "); posi = strlen(buffer); @@ -448,7 +456,11 @@ mi_string *doUpdateCN() sprintf(&buffer[pbufLen], ", "); pbufLen = strlen(buffer); } - sprintf(&buffer[pbufLen], "\"%s\" : { \"old\" : \"%s\", \"new\" : \"%s\" }", poldcolname, escapecharjson(pcast), escapecharjson(pcast2)); + char *bufdatoldval = escapecharjson(pcast); + char *bufdatnewval = escapecharjson(pcast2); + sprintf(&buffer[pbufLen], "\"%s\" : { \"old\" : \"%s\", \"new\" : \"%s\" }", poldcolname, bufdatoldval, bufdatnewval); + free(bufdatoldval); + free(bufdatnewval); if (strcmp("unsupportedtype", pcast2) == 0) { strcpy(uniquedatatype, "true"); } @@ -464,6 +476,7 @@ mi_string *doUpdateCN() sprintf(&buffer[pbufLen], "}, \n \"uniquedatatype\" : \"false\" \n }"); } DPRINTF("logger", 90, ("Exiting doUpdateCN()")); + printf("\"DBNAME-TABLENAME-operation-TIME\": \"%s-%s-UPDATE-%s-Completed\" \n ", pdbname,ptabname,cdatetime); free(cdatetime); return(buffer); } @@ -579,7 +592,7 @@ char * escapecharjson( char *jsonvalue_org) escjsonvalue = (char *)calloc(10000, sizeof(char)); for (jsonvalue_copy = jsonvalue_org; *jsonvalue_copy != '\0'; jsonvalue_copy++) { - printf("%c:%d\n", *jsonvalue_copy,*jsonvalue_copy); + //printf("%c:%d\n", *jsonvalue_copy,*jsonvalue_copy); if (*jsonvalue_copy == '"') { posi = strlen(escjsonvalue); sprintf(&escjsonvalue[posi], "%s","\\\"") ; @@ -616,6 +629,6 @@ char * escapecharjson( char *jsonvalue_org) } //p=NULL; jsonvalue_copy=NULL; - printf("%s", escjsonvalue); + //printf("%s", escjsonvalue); return(escjsonvalue); } diff --git a/informix_auditing/auditing2.c b/informix_auditing/auditing2.c index 79c5f62..e86fa63 100644 --- a/informix_auditing/auditing2.c +++ b/informix_auditing/auditing2.c @@ -60,7 +60,7 @@ void do_auditing2( mi_lvarchar *sessionusername, MI_FPARAM *fp) mi_string buffer[32], *pdata; DPRINTF("logger", 80, ("connected user %s", mi_lvarchar_to_string(sessionusername))); - printf("operating user %s welcome test \n",mi_lvarchar_to_string(sessionusername)); + printf("USER Triggered: %s\n",mi_lvarchar_to_string(sessionusername)); if (strcmp(mi_lvarchar_to_string(sessionusername), "ifxsyncuser") == 0) { printf("automated user. skipping trigger\n"); @@ -72,14 +72,14 @@ void do_auditing2( mi_lvarchar *sessionusername, MI_FPARAM *fp) if (trigger_operation & MI_TRIGGER_NOT_IN_EVENT) { /* not in a trigger! generate an exception */ mi_db_error_raise(NULL, MI_EXCEPTION, - "do_auditing1() can only be called within a trigger!", NULL); + "do_auditing2() can only be called within a trigger!", NULL); return; } /* Make sure this is in a FOR EACH type of trigger */ if (0 == (trigger_operation & MI_TRIGGER_FOREACH_EVENT) ) { /* not in a for each trigger! generate an exception */ mi_db_error_raise(NULL, MI_EXCEPTION, - "do_auditing1() must be in a FOR EACH trigger operation", NULL); + "do_auditing2() must be in a FOR EACH trigger operation", NULL); return; } /* keep only the SQL operation */ From 8f5dcfbe349f0d0ef0111ddf6aaa6284a26ded2b Mon Sep 17 00:00:00 2001 From: Gunasekar-K Date: Wed, 22 Apr 2020 13:48:38 +0530 Subject: [PATCH 2/6] Memory leak suspected compile error --- informix_auditing/audit_util.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/informix_auditing/audit_util.c b/informix_auditing/audit_util.c index c7de2aa..50c52cd 100644 --- a/informix_auditing/audit_util.c +++ b/informix_auditing/audit_util.c @@ -342,7 +342,7 @@ DPRINTF("logger", 90, ("delete: colname: (0x%x) [%s]", pcolname, pcolname)); //printf("%s",pcast); char *bufdatdelval = escapecharjson(pcast); sprintf(&buffer[posi], "\"%s\" : \"%s\"", pcolname, bufdatdelval); - free(bufdatdelval) + free(bufdatdelval); if (strcmp("unsupportedtype", pcast) == 0) { strcpy(uniquedatatype, "true"); } From d06dbaf576213aaf1247cf9c8fe0eb8f29bd7220 Mon Sep 17 00:00:00 2001 From: Gunasekar-K Date: Wed, 22 Apr 2020 14:26:25 +0530 Subject: [PATCH 3/6] Memory leak suspected log compile error --- informix_auditing/audit_util.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/informix_auditing/audit_util.c b/informix_auditing/audit_util.c index 50c52cd..8a24bd4 100644 --- a/informix_auditing/audit_util.c +++ b/informix_auditing/audit_util.c @@ -67,6 +67,7 @@ mi_string *do_castl(MI_CONNECTION *conn, MI_DATUM *datum, DPRINTF("logger",95,("-- typeName=%s --",srcType)); printf("-- typeName=%s --",srcType); if ((strcmp("blob", srcType) == 0) || (strcmp("clob", srcType) == 0) || (strcmp("text", srcType) == 0) || (strcmp("byte", srcType) == 0)) { + printf("skiping data read\n"); return("unsupportedtype"); } else{ @@ -111,8 +112,8 @@ mi_string *do_castl(MI_CONNECTION *conn, MI_DATUM *datum, tdesc = mi_type_typedesc(conn, typeid); precision = mi_type_precision(tdesc); - printf("rputine read initiated \n"); - printf("rputine read initiated %ld\n",collen); + //printf("rputine read initiated \n"); + //printf("rputine read initiated %ld\n",collen); new_datum = mi_routine_exec(conn, fn, &ret, datum, collen, precision, fp); printf("routine read completed \n"); pbuf = mi_lvarchar_to_string(new_datum); From 67bc40062a64dedcf1eb6e9c5d2fa89eb673346a Mon Sep 17 00:00:00 2001 From: Sachin Maheshwari Date: Thu, 23 Apr 2020 13:03:30 +0530 Subject: [PATCH 4/6] giving group id to group consumer --- config/default.js | 5 +++++ src/consumer.js | 11 ++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/config/default.js b/config/default.js index fdc74ba..0c6d957 100644 --- a/config/default.js +++ b/config/default.js @@ -9,6 +9,11 @@ module.exports = { }, RETRY_COUNTER: 3, KAFKA_REPOST_COUNT: 5, + KAFKA_URL: process.env.KAFKA_URL, + KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'ifx-pg-consumer', + KAFKA_CLIENT_CERT: process.env.KAFKA_CLIENT_CERT ? process.env.KAFKA_CLIENT_CERT.replace('\\n', '\n') : null, + KAFKA_CLIENT_CERT_KEY: process.env.KAFKA_CLIENT_CERT_KEY ? + process.env.KAFKA_CLIENT_CERT_KEY.replace('\\n', '\n') : null, topic_error: { NAME: 'db.ifxpgmigrate.error', PARTITION: 0, diff --git a/src/consumer.js b/src/consumer.js index d39332c..6a2daf5 100644 --- a/src/consumer.js +++ b/src/consumer.js @@ -3,7 +3,16 @@ const Promise = require('bluebird'); const config = require('config'); const logger = require('./common/logger'); const healthcheck = require('topcoder-healthcheck-dropin'); -const consumer = new Kafka.GroupConsumer(); +const options = { + groupId: config.KAFKA_GROUP_ID, + connectionString: config.KAFKA_URL, + ssl: { + cert: config.KAFKA_CLIENT_CERT, + key: config.KAFKA_CLIENT_CERT_KEY + } +}; + +const consumer = new Kafka.GroupConsumer(options); const { create_consumer_app_log, consumerpg_success_log, From 3fd4ff81274bca18fff3522555c1100ba4643724 Mon Sep 17 00:00:00 2001 From: Gunasekar-K Date: Thu, 23 Apr 2020 13:12:09 +0530 Subject: [PATCH 5/6] Update config.yml --- .circleci/config.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 02aaf2d..30ba72c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -50,9 +50,9 @@ builddeploy_steps: &builddeploy_steps # # consumer deployment # rm -rf buildenvvar # ./unsetenv.sh - # ./buildenv.sh -e $DEPLOY_ENV -b ${LOGICAL_ENV}-${APPNAME}-consumer-deployvar - # source buildenvvar - # ./master_deploy.sh -d ECS -e $DEPLOY_ENV -t latest -s ${LOGICAL_ENV}-global-appvar,${LOGICAL_ENV}-${APPNAME}-appvar -i ${APPNAME} + ./buildenv.sh -e $DEPLOY_ENV -b ${LOGICAL_ENV}-${APPNAME}-consumer-deployvar + source buildenvvar + ./master_deploy.sh -d ECS -e $DEPLOY_ENV -t latest -s ${LOGICAL_ENV}-global-appvar,${LOGICAL_ENV}-${APPNAME}-appvar -i ${APPNAME} # # without kafka dynamodb # rm -rf buildenvvar # ./unsetenv.sh From 909065bce119e9a8cfd094b03c43839287c86359 Mon Sep 17 00:00:00 2001 From: Gunasekar-K Date: Thu, 23 Apr 2020 17:21:14 +0530 Subject: [PATCH 6/6] reduce the log --- src/common/app_log.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/app_log.js b/src/common/app_log.js index 7f94e48..308ecfb 100644 --- a/src/common/app_log.js +++ b/src/common/app_log.js @@ -197,7 +197,7 @@ async function consumerpg_failure_log(payload, postgreErr) { CONSUMER_FAILURE_LOG: postgreErr }).then(log => console.log('Added Error in Consumer Log Table')) .catch(err => console.log(err)) - console.log(`error-sync: consumer failed to update : "${postgreErr}"`) + console.log(`error-sync: consumer failed to update :` + JSON.stringify(postgreErr)) //audit table update } // CONSUMER_PAYLOAD: { type: DataTypes.JSON, allowNull: false },