Skip to content

Commit

Permalink
Merge pull request #22545 from taosdata/feat/3.0/TS-3701
Browse files Browse the repository at this point in the history
feat(driver): add committed assignment API for jdbc
  • Loading branch information
gccgdb1234 committed Aug 24, 2023
2 parents 0860670 + a3d4d0c commit d6019c1
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 1 deletion.
15 changes: 15 additions & 0 deletions source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSubscriptionIm
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNIEnv *, jobject, jlong, jlong);

JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAllSync(JNIEnv *, jobject, jlong);

JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitOffsetSyncImp(JNIEnv *, jobject, jlong, jstring,
jint, jlong);
/*
* Class: com_taosdata_jdbc_tmq_TMQConnector
* Method: tmqCommitAsync
Expand All @@ -102,6 +106,12 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *, jobject, jlong, jlong,
jobject);

JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAllAsync(JNIEnv *, jobject, jlong,
jobject);

JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitOffsetAsync(JNIEnv *, jobject, jlong,
jstring, jint, jlong, jobject);

/*
* Class: com_taosdata_jdbc_tmq_TMQConnector
* Method: tmqUnsubscribeImp
Expand Down Expand Up @@ -179,6 +189,11 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSeekImp(JNIEnv
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssignmentImp(JNIEnv *, jobject, jlong,
jstring, jobject);

JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommittedImp(JNIEnv *, jobject, jlong, jstring,
jint);

JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqPositionImp(JNIEnv *, jobject, jlong, jstring, jint);

#ifdef __cplusplus
}
#endif
Expand Down
108 changes: 107 additions & 1 deletion source/client/src/clientTmqConnector.c
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,39 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNI
TAOS_RES *res = (TAOS_RES *)jres;
return tmq_commit_sync(tmq, res);
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAllSync(JNIEnv *env, jobject jobj, jlong jtmq) {
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
jniError("jobj:%p, tmq is closed", jobj);
return TMQ_CONSUMER_NULL;
}

return tmq_commit_sync(tmq, NULL);
}

JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitOffsetSyncImp(JNIEnv *env, jobject jobj,
jlong jtmq, jstring jtopic,
jint vgId, jlong offset) {
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
jniDebug("jobj:%p, tmq is closed", jobj);
return TMQ_CONSUMER_NULL;
}

if (jtopic == NULL) {
jniDebug("jobj:%p, topic is null", jobj);
return TMQ_TOPIC_NULL;
}
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);

int code = tmq_commit_offset_sync(tmq, topicName, vgId, offset);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, tmq commit offset error, code:%d, msg:%s", jobj, code, tmq_err2str(code));
}

(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
return code;
}

// deprecated
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JNIEnv *env, jobject jobj, jlong jtmq,
Expand Down Expand Up @@ -319,6 +352,27 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsy
tmq_commit_async(tmq, res, consumer_callback, offset);
}

JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAllAsync(JNIEnv *env, jobject jobj,
jlong jtmq, jobject offset) {
tmqGlobalMethod(env);
tmq_t *tmq = (tmq_t *)jtmq;

offset = (*env)->NewGlobalRef(env, offset);
tmq_commit_async(tmq, NULL, consumer_callback, offset);
}

JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitOffsetAsync(JNIEnv *env, jobject jobj,
jlong jtmq, jstring jtopic,
jint vgId, jlong offset,
jobject callback) {
tmqGlobalMethod(env);
tmq_t *tmq = (tmq_t *)jtmq;
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);

callback = (*env)->NewGlobalRef(env, callback);
tmq_commit_offset_async(tmq, topicName, vgId, offset, consumer_callback, callback);
}

JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp(JNIEnv *env, jobject jobj,
jlong jtmq) {
tmq_t *tmq = (tmq_t *)jtmq;
Expand Down Expand Up @@ -497,9 +551,9 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssign
int32_t res = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssignment);

if (res != TSDB_CODE_SUCCESS) {
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
jniError("jobj:%p, tmq get topic assignment error, topic:%s, code:%d, msg:%s", jobj, topicName, res,
tmq_err2str(res));
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
tmq_free_assignment(pAssign);
return (jint)res;
}
Expand All @@ -518,3 +572,55 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssign
tmq_free_assignment(pAssign);
return JNI_SUCCESS;
}

JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommittedImp(JNIEnv *env, jobject jobj, jlong jtmq,
jstring jtopic, jint vgId) {
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
jniDebug("jobj:%p, tmq is closed", jobj);
return TMQ_CONSUMER_NULL;
}

if (jtopic == NULL) {
jniDebug("jobj:%p, topic is null", jobj);
return TMQ_TOPIC_NULL;
}

const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);

int64_t offset = tmq_committed(tmq, topicName, vgId);

if (offset < JNI_SUCCESS && offset != -2147467247) {
jniError("jobj:%p, tmq get committed offset error, topic:%s, vgId:%d, code:0x%" PRIx64 ", msg:%s", jobj, topicName,
vgId, offset, tmq_err2str(offset));
}

(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
return (jlong)offset;
}

JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqPositionImp(JNIEnv *env, jobject jobj, jlong jtmq,
jstring jtopic, jint vgId) {
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
jniDebug("jobj:%p, tmq is closed", jobj);
return TMQ_CONSUMER_NULL;
}

if (jtopic == NULL) {
jniDebug("jobj:%p, topic is null", jobj);
return TMQ_TOPIC_NULL;
}

const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);

int64_t offset = tmq_position(tmq, topicName, vgId);

if (offset < JNI_SUCCESS) {
jniError("jobj:%p, tmq get position error, topic:%s, vgId:%d, code:0x%" PRIx64 ", msg:%s", jobj, topicName, vgId,
offset, tmq_err2str(offset));
}

(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
return (jlong)offset;
}

0 comments on commit d6019c1

Please sign in to comment.