-
Notifications
You must be signed in to change notification settings - Fork 161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
undefined err
in node_transport.ts
#441
Comments
@niteshkumarniranjan how is that even possible? return Promise.resolve();
} catch (err) {
const { code } = err;
const perr = code === "ECONNREFUSED"
? NatsError.errorForCode(ErrorCode.ConnectionRefused, err)
: err;
if (this.socket) {
this.socket.destroy();
}
throw perr;
} The code is clearly catching an exception so the only thing that could happen is for |
Are you building your own version of the library? The build steps are very peculiar, and the error you have is from typescript. |
Maybe, the error is from https://github.com/nats-io/nats.deno It's mentioned in the doc that the nats.js library shares client functionality with NATS.deno. I didn't know where to open the issue. And the error happens in k8s environment when the NATS pod isn't ready and my client is trying to connect. |
Just to add a bit of context, I am running my code using import * as nats from "nats"
import rTracer from "cls-rtracer"
const sc = nats.JSONCodec()
function encode<T>(obj: { data: T; traceId: unknown }) {
return sc.encode(obj)
}
function decode<T>(str: Uint8Array) {
return sc.decode(str) as { data: T; traceId: unknown }
}
export interface IBus {
subscribe<T>(
name: string,
callback: (arg: {
obj: { data: T; traceId: unknown }
m: nats.JsMsg
}) => void
): Promise<void>
publish<T>(name: string, data: T): Promise<void>
}
export default function createNATSTransport({
servers,
durableName,
logger,
}: {
servers: string | string[]
durableName: string
logger: any
}) {
let isConnected = false
let nc: nats.NatsConnection,
jsm: nats.JetStreamManager,
js: nats.JetStreamClient
async function connect() {
nc = await nats.connect({ servers, name: durableName, timeout: 30 * 1000 })
jsm = await nc.jetstreamManager()
js = await nc.jetstream()
logger.info({
msg: `connected to nats`,
url: servers,
})
isConnected = true
}
async function drain() {
await nc.drain()
}
const bus: IBus = {
async subscribe<T>(
name: string,
callback: (arg: {
obj: { data: T; traceId: unknown }
m: nats.JsMsg
}) => void
) {
if (!isConnected) {
throw new Error("NATS is not connected")
}
const streams = await jsm.streams.list().next()
const [stream, ...extra] = name.split(".")
const streamExists =
streams.findIndex((x) => x.config.name === stream) > -1
if (!streamExists) {
await jsm.streams.add({ name: stream, subjects: [`${stream}.*`] })
}
const inbox = nats.createInbox()
const opts = nats.consumerOpts({
ack_policy: nats.AckPolicy.Explicit,
deliver_policy: nats.DeliverPolicy.Last,
deliver_subject: inbox,
flow_control: true,
durable_name: durableName,
})
const sub = await js.subscribe(name, { ...opts, queue: durableName })
;(async () => {
for await (const m of sub) {
const obj = decode<T>(m.data)
logger.info({
msg: "bus event recieved",
event: {
name,
...obj,
},
})
callback({
obj,
m: m,
})
}
})()
},
async publish<T>(name: string, data: T) {
if (!isConnected) {
throw new Error("NATS is not connected")
}
const streams = await jsm.streams.list().next()
const [stream, ...extra] = name.split(".")
const streamExists =
streams.findIndex((x) => x.config.name === stream) > -1
if (!streamExists) {
await jsm.streams.add({ name: stream, subjects: [`${stream}.*`] })
}
const wrappedData = {
data,
traceId: rTracer.id(),
}
js.publish(name, encode<T>(wrappedData))
logger.info({
msg: `bus event published`,
event: {
name,
},
})
},
}
const transport = {
reply<RequestData, ResponseData>(
subject: string,
callback: (obj: {
data: RequestData
traceId: unknown
}) => ResponseData | Promise<ResponseData>
) {
if (!isConnected) {
throw new Error("NATS is not connected")
}
const sub = nc.subscribe("x" + subject, { queue: durableName })
;(async () => {
for await (const m of sub) {
const obj = decode<RequestData>(m.data)
logger.info({
msg: "nats transport request recieved",
event: {
name: subject,
...obj,
},
})
const result = await callback(obj)
const encodedData = encode<ResponseData>({
data: result,
traceId: obj.traceId,
})
if (m.respond(encodedData)) {
logger.info({
msg: "nats transport reply",
traceId: obj.traceId,
event: {
name: subject,
},
})
}
}
})()
},
async request<RequestData, ResponseData>(
subject: string,
data: RequestData
) {
if (!isConnected) {
throw new Error("NATS is not connected")
}
const traceId = rTracer.id()
const response = await nc.request(
"x" + subject,
encode<RequestData>({ data, traceId })
)
return decode<ResponseData>(response.data)
},
}
return {
connect,
bus,
transport,
drain,
}
}
|
@niteshkumarniranjan please don't run with ts-node, the code has to be processed by the build in order to generate javascript - there's some source manipulation etc that happens during the build process which ts-node is not able to do. Please use the released npm bundle. |
@aricart I am using the published npm version only. |
I am at a loss on how a catch(err) block can be involved, but the next line says that err is undefined.... On the destructure, the only thing that can happen is that |
@aricart I'll try to do some debugging and will get back if I find something. |
on all socket close events, it is assumed that the error variable is defined Line 144 in 9d53eda
Line 240 in 9d53eda
Line 103 in 9d53eda
Might be a dumb question but is it possible that the socket connection is being closed without any errors? |
even if that was true, the issue is that the catch block is invoked, how is it possible for a |
Can you modify the code and simply put a console.log on the err |
I mean you can reject a promise without any error const a = (x) => {
return new Promise((resolve, reject) => {
if (x) resolve(a)
else reject()
})
}
async function main() {
try {
await a()
} catch (err) {
const {
code
} = err
console.log(code)
}
}
main() |
I am not sure if I can modify the code for a module inside node_modules inside a docker image? |
Yes the reject simply means that the promise failed, but no error. But the code you are pointing out, is a catch... |
you would have to modify the npm package before you copy it to the docker image |
yes! Line 80 in 9d53eda
here the only awaits that are being done are And you are using |
Let me try. |
I am going to post a version for you to use |
Yup! it is |
something changed in node |
14.17.4 |
are you in slack? |
Nope! |
you are saying when this happens the nats-server is not ready or kubernetes node is not yet running right? |
I am also using istio sidecar with all my pods. |
are you using any library that is looking or inspecting any network traffic? |
inside node.js? no |
one more, can you try with the current version of node - I will do a dev release for an npm bundle that treats this as a connection refused. But would like to know if current node version is affected. |
A/C to the node.js doc for when the close event is emitted it also provides if was because of an error. |
weird how we have not seen this earlier. |
I checked with the latest node.js, still has the same error. |
I don't know how NATS Server behaves, |
ok I just published a bundle - can you |
okay! testing |
|
ok, so now, in your code provide the |
…close, triggering a catch(err) where the error is undefined - this fix simply aliases that as a connection refused error.
Since I am running on k8s, my pod just restarts in a couple of seconds and it connects successfully since NATS is up and running by then. |
there will be a release of nats clients for javascript next week - waiting on a server release, this will be included it in it. |
@niteshkumarniranjan thank you for helping out on this! |
happy to help! |
The text was updated successfully, but these errors were encountered: