Skip to content

label_selector causes watch timeouts #990

@TekTimmy

Description

@TekTimmy

What happened (please include outputs or screenshots):
I want to use the "new" Kubernetes Executor of the Apache Airflow project and ran into this problem. The Airflow Kubernetes-Executor tries to watch all pods with a specific label_selector but always runs into an timeout error. The error does not occur when I do not specify an label_selector.

timeout                                   Traceback (most recent call last)
/usr/local/lib/python3.6/dist-packages/urllib3/response.py in _error_catcher(self)
    424             try:
--> 425                 yield
    426 

/usr/local/lib/python3.6/dist-packages/urllib3/response.py in read_chunked(self, amt, decode_content)
    751             while True:
--> 752                 self._update_chunk_length()
    753                 if self.chunk_left == 0:

/usr/local/lib/python3.6/dist-packages/urllib3/response.py in _update_chunk_length(self)
    681             return
--> 682         line = self._fp.fp.readline()
    683         line = line.split(b";", 1)[0]

/usr/lib/python3.6/socket.py in readinto(self, b)
    585             try:
--> 586                 return self._sock.recv_into(b)
    587             except timeout:

/usr/lib/python3.6/ssl.py in recv_into(self, buffer, nbytes, flags)
   1011                   self.__class__)
-> 1012             return self.read(nbytes, buffer)
   1013         else:

/usr/lib/python3.6/ssl.py in read(self, len, buffer)
    873         try:
--> 874             return self._sslobj.read(len, buffer)
    875         except SSLError as x:

/usr/lib/python3.6/ssl.py in read(self, len, buffer)
    630         if buffer is not None:
--> 631             v = self._sslobj.read(len, buffer)
    632         else:

timeout: The read operation timed out

During handling of the above exception, another exception occurred:

ReadTimeoutError                          Traceback (most recent call last)
<ipython-input-1-7b49fb45407e> in <module>
      7 watch = kubernetes.watch.Watch()
      8 kwargs = {'label_selector': 'airflow-worker=b1dbda58-6a84-4ecd-9b31-73ce13fd3f4d', 'resource_version': '0', '_request_timeout': (6, 6)}
----> 9 for event in watch.stream(v1.list_namespaced_pod, 'default', **kwargs):
     10    print("got one")
     11 

/usr/local/lib/python3.6/dist-packages/kubernetes/watch/watch.py in stream(self, func, *args, **kwargs)
    142             resp = func(*args, **kwargs)
    143             try:
--> 144                 for line in iter_resp_lines(resp):
    145                     yield self.unmarshal_event(line, return_type)
    146                     if self._stop:

/usr/local/lib/python3.6/dist-packages/kubernetes/watch/watch.py in iter_resp_lines(resp)
     46 def iter_resp_lines(resp):
     47     prev = ""
---> 48     for seg in resp.read_chunked(decode_content=False):
     49         if isinstance(seg, bytes):
     50             seg = seg.decode('utf8')

/usr/local/lib/python3.6/dist-packages/urllib3/response.py in read_chunked(self, amt, decode_content)
    779             # We read everything; close the "file".
    780             if self._original_response:
--> 781                 self._original_response.close()
    782 
    783     def geturl(self):

/usr/lib/python3.6/contextlib.py in __exit__(self, type, value, traceback)
     97                 value = type()
     98             try:
---> 99                 self.gen.throw(type, value, traceback)
    100             except StopIteration as exc:
    101                 # Suppress StopIteration *unless* it's the same exception that

/usr/local/lib/python3.6/dist-packages/urllib3/response.py in _error_catcher(self)
    428                 # FIXME: Ideally we'd like to include the url in the ReadTimeoutError but
    429                 # there is yet no clean way to get at it from this context.
--> 430                 raise ReadTimeoutError(self._pool, None, "Read timed out.")
    431 
    432             except BaseSSLError as e:

ReadTimeoutError: HTTPSConnectionPool(host='10.96.0.1', port=443): Read timed out.

What you expected to happen:
The watcher does not timeout and trigger events correctly.

How to reproduce it (as minimally and precisely as possible):
Kubernetes YML with service role & pod to run the python code in:

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: airflow
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: admin
subjects:
  - kind: ServiceAccount
    name: airflow
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: airflow
---
apiVersion: v1
kind: Pod
metadata:
  name: python
spec:
  serviceAccountName: airflow
  containers:
    - name: python
      image: python:3.6
      command: ["tail", "-f", "/dev/null"]

Attach & Setup code:

kubectl exec -ti python bash
pip3 install kubernetes ipython
ipython

Python Code:

import kubernetes
import kubernetes.config
import kubernetes.client
import kubernetes.watch
kubernetes.config.load_incluster_config()
v1 = kubernetes.client.CoreV1Api()
watch = kubernetes.watch.Watch()
kwargs = {'label_selector': 'airflow-worker=b1dbda58-6a84-4ecd-9b31-73ce13fd3f4d', 'resource_version': '0', '_request_timeout': (6, 6)}
for event in watch.stream(v1.list_namespaced_pod, 'default', **kwargs):
   print("got one")

Anything else we need to know?:

Environment:

  • Kubernetes version (kubectl version): 1.16 (tried EKS & Minikube)
  • OS (e.g., MacOS 10.13.6): Docker Image python:3.6 based on Debian 10 & ubuntu:18.04
  • Python version (python --version) 3.5 & 3.6 & 3.7
  • Python client version (pip list | grep kubernetes): 5 to 10 (tried them all)

Metadata

Metadata

Assignees

Labels

kind/bugCategorizes issue or PR as related to a bug.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions