// Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include #include #include #include #include #include std::ofstream outputFile("C:/Users/l0185/Desktop/nats-client-prattice/bin/Debug/subscribe.txt"); void onMsg(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure) { printf("Received msg: %s - %.*s\n", natsMsg_GetSubject(msg), natsMsg_GetDataLength(msg), natsMsg_GetData(msg)); natsMsg_Destroy(msg); } int main() { natsConnection *conn = NULL; natsStatistics *stats = NULL; natsOptions *opts = NULL; natsSubscription *sub = NULL; natsMsg *msg = NULL; jsCtx *js = NULL; jsOptions jsOpts; jsSubOptions so; natsStatus s; bool delStream = false; bool pull = false; bool async = false; char * stream = "foostream"; char * durable = "foodurable"; const char ** subj=new const char *[3]; subj[0]="foo1"; subj[1]="foo2"; subj[2]="foo3"; s=natsOptions_Create(&opts); s=natsOptions_SetURL(opts,"nats://localhost:42342"); s = natsConnection_Connect(&conn, opts); if (s == NATS_OK) s = jsOptions_Init(&jsOpts); if (s == NATS_OK) s = jsSubOptions_Init(&so); if (s == NATS_OK) { so.Stream = stream; so.Consumer = durable; } if (s == NATS_OK) s = natsConnection_JetStream(&js, conn, &jsOpts); if (s == NATS_OK) { jsStreamInfo *si = NULL; // First check if the stream already exists. s = js_GetStreamInfo(&si, js, stream, NULL, nullptr); if (s == NATS_NOT_FOUND) { jsStreamConfig cfg; // Since we are the one creating this stream, we can delete at the end. delStream = true; // Initialize the configuration structure. jsStreamConfig_Init(&cfg); cfg.Name = stream; // Set the subject cfg.Subjects = subj; cfg.SubjectsLen = 1; // Make it a memory stream. cfg.Storage = js_MemoryStorage; // Add the stream, s = js_AddStream(&si, js, &cfg, NULL, nullptr); } if (s == NATS_OK) { printf("Stream %s has %" PRIu64 " messages (%" PRIu64 " bytes)\n", si->Config->Name, si->State.Msgs, si->State.Bytes); // Need to destroy the returned stream object. jsStreamInfo_Destroy(si); } } std::string subject; std::cout<<"input subject do you want to subscribe:"<>subject; std::cout<<"input subscribe method:"<<"\n" <<"1.pull"<<"\n" <<"2.async"<<"\n" <<"3.sync"<<"\n"; int opt; std::cin>>opt; switch (opt) { case 1: { pull=true; } break; case 2: { async = true; } break; default: break; } if (s == NATS_OK) { if (pull) s = js_PullSubscribe(&sub, js, subject.c_str(), nullptr, &jsOpts,nullptr, nullptr); else if (async) s = js_Subscribe(&sub, js, subject.c_str(), onMsg, NULL, &jsOpts, nullptr, nullptr); else s = js_SubscribeSync(&sub, js, subject.c_str(), &jsOpts, nullptr, nullptr); } if ((s == NATS_OK) && pull) { natsMsgList list; int i; int total = 20; while(1){ for (int count = 0; (s == NATS_OK) && (count < total); ) { s = natsSubscription_Fetch(&list, sub, 5, 5000, nullptr); if (s != NATS_OK) break; count += (int64_t) list.Count; for (i=0; (s == NATS_OK) && (iConfig->Name, si->State.Msgs, si->State.Bytes); jsStreamInfo_Destroy(si); } if (delStream) { printf("\nDeleting stream %s: ", stream); s = js_DeleteStream(js, stream, NULL, nullptr); if (s == NATS_OK) printf("OK!"); printf("\n"); } } else { printf("Error: %u - %s ", s, natsStatus_GetText(s)); nats_PrintLastErrorStack(stderr); } // Destroy all our objects to avoid report of memory leak jsCtx_Destroy(js); natsStatistics_Destroy(stats); natsSubscription_Destroy(sub); natsConnection_Destroy(conn); natsOptions_Destroy(opts); // To silence reports of memory still in used with valgrind nats_Close(); return 0; }