Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: taosdump support rename database during importing #703

Merged
merged 18 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions VERSION
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
taosbenchmark-3.2.3
taosdump-2.5.3
2.5.3
taosdump-2.5.4
2.5.4
240 changes: 220 additions & 20 deletions src/taosdump.c
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ static struct argp_option options[] = {
"Server host from which to dump data. Default is localhost.", 0},
{"user", 'u', "USER", 0,
"User name used to connect to server. Default is root.", 0},
{"password", 'p', 0, 0,
{"password", 'p', 0, 0,
"User password to connect to server. Default is taosdata.", 0},
{"port", 'P', "PORT", 0, "Port to connect", 0},
// input/output file
Expand Down Expand Up @@ -494,12 +494,21 @@ static struct argp_option options[] = {
#endif
{"debug", 'g', 0, 0, "Print debug info.", 15},
{"dot-replace", 'Q', 0, 0, "Repalce dot character with underline character in the table name.", 10},
{"rename", 'W', "RENAME-LIST", 0, "Rename database name with new name during importing data. RENAME-LIST: \"db1=newDB1|db2=newDB2\" means rename db1 to newDB1 and rename db2 to newDB2", 10},
{0}
};

#define HUMAN_TIME_LEN 28
#define DUMP_DIR_LEN (MAX_DIR_LEN - (TSDB_DB_NAME_LEN + 10))

// rename db
struct SRenameDB;
typedef struct SRenameDB {
char* old;
char* new;
void* next;
}SRenameDB;

/* Used by main to communicate with parse_opt. */
typedef struct arguments {
// connection option
Expand Down Expand Up @@ -545,9 +554,7 @@ typedef struct arguments {
bool verbose_print;
bool performance_print;
bool dotReplace;

int dumpDbCount;

#ifdef WEBSOCKET
bool restful;
bool cloud;
Expand All @@ -557,6 +564,10 @@ typedef struct arguments {
int cloudPort;
char cloudHost[MAX_HOSTNAME_LEN];
#endif

// put rename db string
char * renameBuf;
SRenameDB * renameHead;
} SArguments;

static resultStatistics g_resultStatistics = {0};
Expand Down Expand Up @@ -611,6 +622,7 @@ struct arguments g_args = {
false, // performance_print
false, // dotRepalce
0, // dumpDbCount

#ifdef WEBSOCKET
false, // restful
false, // cloud
Expand All @@ -620,6 +632,9 @@ struct arguments g_args = {
0, // cloudPort
{0}, // cloudHost
#endif // WEBSOCKET

NULL, // renameBuf
NULL // renameHead
};


Expand Down Expand Up @@ -781,6 +796,158 @@ int64_t getEndTime(int precision) {
return end_time;
}

SRenameDB* newNode(char* first, SRenameDB* prev) {
SRenameDB* node = (SRenameDB*) malloc(sizeof(SRenameDB));
memset(node, 0, sizeof(SRenameDB));
node->old = first;
// link to list
if(prev) {
prev->next = node;
}

return node;
}

void setRenameDbs(char* arg) {
if (arg == NULL) return ;
// malloc new
int len = strlen(arg);
if(len <= 2) {
return ;
}
len += 1; // include \0

// malloc
char* p = malloc(len);
int j = 0; // j is p pos
for (int i = 0; i < len; i++) {
if (arg[i] == ' ') {
// do nothing
} else if (arg[i] == '=' || arg[i] == '|') {
// set zero
p[j++] = 0;
} else {
// copy
p[j++] = arg[i];
}
}

// splite
SRenameDB* node = newNode(p, NULL);
g_args.renameHead = node;
for (int k = 0; k < j; k++) {
if(p[k] == 0 && k + 1 != j && k > 0) {
// string end and not last end
char* name = &p[k] + 1;
if (node->new == NULL) {
node->new = name;
} else {
node = newNode(name, node);
}
}
}

// end
g_args.renameBuf = p;
}

// find newName
char* findNewName(char* oldName) {
SRenameDB* node = g_args.renameHead;
while(node) {
if (strcmp(node->old, oldName) == 0) {
return node->new;
}
node = (SRenameDB* )node->next;
}
return NULL;
}

bool replaceCopy(char *des, char *src) {
size_t len = strlen(src);
bool replace = false;
for (size_t i = 0; i <= len; i++) {
if (src[i] == '.') {
des[i] = '_';
replace = true;
} else {
des[i] = src[i];
}
}

return replace;
}

// repalce old name with new
char * replaceNewName(char* cmd, int len) {
// database name left char and right char
int nLeftSql = len;
char left = cmd[len];
char right = '.';
if(left == '`') {
right = left;
nLeftSql += 1;
}

// get old database name
char oldName[TSDB_DB_NAME_LEN];
char* s = &cmd[nLeftSql];
char* e = strchr(s, right);
char* e1 = strchr(s, ' ');
if(e == NULL && e1 == NULL) {
return NULL;
} else if(e == NULL && e1) {
e = e1;
} else if(e && e1 ) {
if (e > e1) {
e = e1;
}
}

int oldLen = e - s;
if(oldLen + 1 > TSDB_DB_NAME_LEN) {
return NULL;
}
memcpy(oldName, s, oldLen);
oldName[oldLen] = 0;

// macth new database
char* newName = findNewName(oldName);
if(newName == NULL){
return NULL;
}

// malloc new buff put new sql with new name
int newLen = strlen(cmd) + (strlen(newName) - oldLen) + 1;
char* newCmd = (char *)malloc(newLen);
memset(newCmd, 0, newLen);

// copy left + newName + right from cmd
memcpy(newCmd, cmd, nLeftSql); // left sql
strcat(newCmd, newName); // newName
strcat(newCmd, e); // right sql

return newCmd;
}

// if have database name rename, return new sql with new database name
// retrn value need call free() to free memory
char * afterRenameSql(char *cmd) {
// match pattern
const char* CREATE_DB = "CREATE DATABASE IF NOT EXISTS ";
const char* CREATE_TB = "CREATE TABLE IF NOT EXISTS ";

const char* pres[] = {CREATE_DB, CREATE_TB};
for (int i = 0; i < sizeof(pres); i++ ) {
int len = strlen(pres[i]);
if (strncmp(cmd, pres[i], len) == 0) {
// found
return replaceNewName(cmd, len);
}
}
return NULL;
}

/* Parse a single option. */
static error_t parse_opt(int key, char *arg, struct argp_state *state) {
/* Get the input argument from argp_parse, which we
Expand Down Expand Up @@ -967,6 +1134,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
}
state->next = state->argc;
break;
case 'W':
setRenameDbs(arg);
break;

default:
return ARGP_ERR_UNKNOWN;
Expand Down Expand Up @@ -1805,21 +1975,6 @@ static int getDumpDbCount() {
return count;
}

bool replaceCopy(char *des, char *src) {
size_t len = strlen(src);
bool replace = false;
for (size_t i = 0; i <= len; i++) {
if (src[i] == '.') {
des[i] = '_';
replace = true;
} else {
des[i] = src[i];
}
}

return replace;
}

static int dumpCreateMTableClause(
const char* dbName,
const char *stable,
Expand Down Expand Up @@ -6007,6 +6162,13 @@ static int64_t dumpInAvroNtbImpl(
__func__, __LINE__);
continue;
}

char* newBuf = afterRenameSql(buf);
if(newBuf) {
infoPrint(" rename database name for create normal table sql: \n old=%s\n new=%s\n", buf, newBuf);
buf = newBuf;
}

#ifdef WEBSOCKET
if (g_args.cloud || g_args.restful) {
WS_RES *ws_res = ws_query_timeout(taos, buf, g_args.ws_timeout);
Expand All @@ -6026,6 +6188,9 @@ static int64_t dumpInAvroNtbImpl(
} else {
#endif
TAOS_RES *res = taos_query(taos, buf);
if(newBuf) {
free(newBuf);
}
int code = taos_errno(res);
if (0 != code) {
errorPrint("%s() LN%d,"
Expand Down Expand Up @@ -7320,6 +7485,13 @@ static int64_t dumpInOneAvroFile(
}

const char *namespace = avro_schema_namespace((const avro_schema_t)schema);
if(g_args.renameHead) {
char* newDbName = findNewName((char *)namespace);
if(newDbName) {
infoPrint(" ------- rename DB Name %s to %s ------\n", namespace, newDbName);
namespace = newDbName;
}
}
debugPrint("%s() LN%d, Namespace: %s\n",
__func__, __LINE__, namespace);

Expand Down Expand Up @@ -10083,14 +10255,24 @@ static int64_t dumpInOneDebugFile(
}

int ret;
char *newSql = NULL;

if(g_args.renameHead) {
// have rename database options
newSql = afterRenameSql(cmd);
}

debugPrint("%s() LN%d, cmd: %s\n", __func__, __LINE__, cmd);
#ifdef WEBSOCKET
if (g_args.cloud || g_args.restful) {
ret = queryDbImplWS(taos, cmd);
ret = queryDbImplWS(taos, newSql?newSql:cmd);
} else {
#endif
ret = queryDbImplNative(taos, cmd);
ret = queryDbImplNative(taos, newSql?newSql:cmd);
if(newSql) {
free(newSql);
}

#ifdef WEBSOCKET
}
#endif
Expand Down Expand Up @@ -13127,6 +13309,24 @@ int main(int argc, char *argv[]) {
} else {
ret = dumpEntry();
}

// free buf
if (g_args.renameBuf) {
free(g_args.renameBuf);
g_args.renameBuf = NULL;
}

// free node
SRenameDB* node = g_args.renameHead;
g_args.renameHead = NULL;
while(node) {
SRenameDB* next = (SRenameDB*)node->next;
free(node);
node = next;
}



return ret;
}

6 changes: 3 additions & 3 deletions tests/taosdump/native/taosdumpDbNtb.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,21 +107,21 @@ def run(self):
tdSql.execute("drop database db")
# sys.exit(1)

os.system("%s -i %s -T 1" % (binPath, self.tmpdir))
os.system("%s -i %s -T 1 -W db=newdb" % (binPath, self.tmpdir))

tdSql.query("show databases")
dbresult = tdSql.queryResult

found = False
for i in range(len(dbresult)):
print("Found db: %s" % dbresult[i][0])
if dbresult[i][0] == "db":
if dbresult[i][0] == "newdb":
found = True
break

assert found == True

tdSql.execute("use db")
tdSql.execute("use newdb")
tdSql.query("show stables")
tdSql.checkRows(1)
tdSql.checkData(0, 0, "st")
Expand Down
Loading
Loading