Skip to content

forward.portForward - Uncaught Error: not opened #187

@proAlexandr

Description

@proAlexandr

Hi, I faced an issue with forward.portForward method.

In my app, I use it in this way

import * as net from 'net'
import * as k8s from '@kubernetes/client-node'

const kc = new k8s.KubeConfig()
kc.loadFromString(cluster.config) // here cluster.config is a string with valid yaml config to real cluster

const forward = new k8s.PortForward(kc)
const server = net.createServer(function (socket) {
  forward.portForward(service.namespace, service.pod, [8081], socket, socket, socket)
})
server.listen(8081, '127.0.0.1')

where service.namespace and service.pod is not empty strings

Next, I enter http://localhost:8081 in a browser and see a website hosted on a kubernetes pod, everything is OK.
But sometimes, when I update page in my browser I see infinite page loading. If I updated the page again it could be infinite loading again or the page would successfully load. I figure out why It happens. Shortly, because WebSocket, which created by forward.portForward got closing or closed state and handler in WebSocketHandler.handleStandardInput ignore this case and allow an exception to be thrown

Details:

  1. Go to http://localhost:8081 first time, everything is ok.
  2. Update http://localhost:8081 and get an infinite loading
  3. See an error in a console.

screenshot 2019-01-19 at 12 35 19 copy

4) Next, I wrote `console.log` in `WebSocketHandler.handleStandardInput`

screenshot 2019-01-19 at 12 35 41

5) Got

screenshot 2019-01-19 at 12 35 19

Next, I tried to fix it In my app to make port forwarding more stable. I wrote a patch (I post it here in case it's useful)

Click to view the patch
import querystring from 'querystring'
import { WebSocketHandler } from '@kubernetes/client-node/dist/web-socket-handler'
import WebSocket from 'isomorphic-ws'

WebSocketHandler.restartableHandleStandardInput = async function (createWS, stdin, streamNum = 0) {
  const tryLimit = 3;
  let queue = Promise.resolve()
  let ws = await createWS()

  async function processData(data) {
    const buff = Buffer.alloc(data.length + 1);

    buff.writeInt8(streamNum, 0);
    if (data instanceof Buffer) {
      data.copy(buff, 1);
    } else {
      buff.write(data, 1);
    }

    let i = 0;
    for (; i < tryLimit; ++i) {
      if (ws.readyState === WebSocket.OPEN) {
        ws.send(buff);
        break;
      } else {
        ws = await createWS()
      }
    }

    if (i >= tryLimit) {
      throw new Error("can't send data to ws")
    }
  }

  stdin.on('data', (data) => {
    queue = queue.then(processData(data))
  })
  stdin.on('end', () => {
    ws.close();
  });
}

export function patchForward (forward) {
  forward.portForward = async function (namespace, podName, targetPorts, output, err, input) {
    if (targetPorts.length === 0) {
      throw new Error('You must provide at least one port to forward to.')
    }
    if (targetPorts.length > 1) {
      throw(new Error('Only one port is currently supported for port-forward'))
    }
    const query = {
      ports: targetPorts[0]
    }
    const queryStr = querystring.stringify(query)
    const needsToReadPortNumber = []
    targetPorts.forEach((value, index) => {
      needsToReadPortNumber[index * 2] = true
      needsToReadPortNumber[index * 2 + 1] = true
    })
    const path = `/api/v1/namespaces/${namespace}/pods/${podName}/portforward?${queryStr}`
    const createWebSocket = async () => {
      return await this.handler.connect(path, null, (streamNum, buff) => {
        if (streamNum >= targetPorts.length * 2) {
          return !this.disconnectOnErr
        }
        // First two bytes of each stream are the port number
        if (needsToReadPortNumber[streamNum]) {
          buff = buff.slice(2)
          needsToReadPortNumber[streamNum] = false
        }
        if (streamNum % 2 === 1) {
          if (err) {
            err.write(buff)
          }
        } else {
          output.write(buff)
        }
        return true
      })
    }

    await WebSocketHandler.restartableHandleStandardInput(createWebSocket, input, 0)
  }
}

Use it in this way

const forward = new k8s.PortForward(kc)
patchForward(forward)

An idea of the patch to recreate WebSocket if existing socket closed.

After this patch was applied the error disappeared.
That's all. Thanks for your reading and attention. Tell me please your opinion about this problem and patch. Should I create a PR with this patch or there is a better way to fix it?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions