Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(driver): add committed assignment API for jdbc #22544

Merged
merged 2 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 15 additions & 0 deletions source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSubscriptionIm
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNIEnv *, jobject, jlong, jlong);

JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAllSync(JNIEnv *, jobject, jlong);

JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitOffsetSyncImp(JNIEnv *, jobject, jlong, jstring,
jint, jlong);
/*
* Class: com_taosdata_jdbc_tmq_TMQConnector
* Method: tmqCommitAsync
Expand All @@ -102,6 +106,12 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *, jobject, jlong, jlong,
jobject);

JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAllAsync(JNIEnv *, jobject, jlong,
jobject);

JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitOffsetAsync(JNIEnv *, jobject, jlong,
jstring, jint, jlong, jobject);

/*
* Class: com_taosdata_jdbc_tmq_TMQConnector
* Method: tmqUnsubscribeImp
Expand Down Expand Up @@ -179,6 +189,11 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSeekImp(JNIEnv
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssignmentImp(JNIEnv *, jobject, jlong,
jstring, jobject);

JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommittedImp(JNIEnv *, jobject, jlong, jstring,
jint);

JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqPositionImp(JNIEnv *, jobject, jlong, jstring, jint);

#ifdef __cplusplus
}
#endif
Expand Down
108 changes: 107 additions & 1 deletion source/client/src/clientTmqConnector.c
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,39 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNI
TAOS_RES *res = (TAOS_RES *)jres;
return tmq_commit_sync(tmq, res);
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAllSync(JNIEnv *env, jobject jobj, jlong jtmq) {
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
jniError("jobj:%p, tmq is closed", jobj);
return TMQ_CONSUMER_NULL;
}

return tmq_commit_sync(tmq, NULL);
}

JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitOffsetSyncImp(JNIEnv *env, jobject jobj,
jlong jtmq, jstring jtopic,
jint vgId, jlong offset) {
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
jniDebug("jobj:%p, tmq is closed", jobj);
return TMQ_CONSUMER_NULL;
}

if (jtopic == NULL) {
jniDebug("jobj:%p, topic is null", jobj);
return TMQ_TOPIC_NULL;
}
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);

int code = tmq_commit_offset_sync(tmq, topicName, vgId, offset);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, tmq commit offset error, code:%d, msg:%s", jobj, code, tmq_err2str(code));
}

(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
return code;
}

// deprecated
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JNIEnv *env, jobject jobj, jlong jtmq,
Expand Down Expand Up @@ -319,6 +352,27 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsy
tmq_commit_async(tmq, res, consumer_callback, offset);
}

JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAllAsync(JNIEnv *env, jobject jobj,
jlong jtmq, jobject offset) {
tmqGlobalMethod(env);
tmq_t *tmq = (tmq_t *)jtmq;

offset = (*env)->NewGlobalRef(env, offset);
tmq_commit_async(tmq, NULL, consumer_callback, offset);
}

JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitOffsetAsync(JNIEnv *env, jobject jobj,
jlong jtmq, jstring jtopic,
jint vgId, jlong offset,
jobject callback) {
tmqGlobalMethod(env);
tmq_t *tmq = (tmq_t *)jtmq;
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);

callback = (*env)->NewGlobalRef(env, callback);
tmq_commit_offset_async(tmq, topicName, vgId, offset, consumer_callback, callback);
}

JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp(JNIEnv *env, jobject jobj,
jlong jtmq) {
tmq_t *tmq = (tmq_t *)jtmq;
Expand Down Expand Up @@ -497,9 +551,9 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssign
int32_t res = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssignment);

if (res != TSDB_CODE_SUCCESS) {
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
jniError("jobj:%p, tmq get topic assignment error, topic:%s, code:%d, msg:%s", jobj, topicName, res,
tmq_err2str(res));
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
tmq_free_assignment(pAssign);
return (jint)res;
}
Expand All @@ -518,3 +572,55 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssign
tmq_free_assignment(pAssign);
return JNI_SUCCESS;
}

JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommittedImp(JNIEnv *env, jobject jobj, jlong jtmq,
jstring jtopic, jint vgId) {
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
jniDebug("jobj:%p, tmq is closed", jobj);
return TMQ_CONSUMER_NULL;
}

if (jtopic == NULL) {
jniDebug("jobj:%p, topic is null", jobj);
return TMQ_TOPIC_NULL;
}

const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);

int64_t offset = tmq_committed(tmq, topicName, vgId);

if (offset < JNI_SUCCESS && offset != -2147467247) {
jniError("jobj:%p, tmq get committed offset error, topic:%s, vgId:%d, code:0x%" PRIx64 ", msg:%s", jobj, topicName,
vgId, offset, tmq_err2str(offset));
}

(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
return (jlong)offset;
}

JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqPositionImp(JNIEnv *env, jobject jobj, jlong jtmq,
jstring jtopic, jint vgId) {
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
jniDebug("jobj:%p, tmq is closed", jobj);
return TMQ_CONSUMER_NULL;
}

if (jtopic == NULL) {
jniDebug("jobj:%p, topic is null", jobj);
return TMQ_TOPIC_NULL;
}

const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);

int64_t offset = tmq_position(tmq, topicName, vgId);

if (offset < JNI_SUCCESS) {
jniError("jobj:%p, tmq get position error, topic:%s, vgId:%d, code:0x%" PRIx64 ", msg:%s", jobj, topicName, vgId,
offset, tmq_err2str(offset));
}

(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
return (jlong)offset;
}