Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConnectionResetError: [Errno 104] Connection reset by peer #86

Closed
lqsantos opened this issue Feb 23, 2023 · 5 comments
Closed

ConnectionResetError: [Errno 104] Connection reset by peer #86

lqsantos opened this issue Feb 23, 2023 · 5 comments
Labels
configuration issue external bug The bug is from a supporting or related system,

Comments

@lqsantos
Copy link

lqsantos commented Feb 23, 2023

Describe the bug
I getting this error in our dags running KubenernetesJobOperator

To Reproduce
Steps to reproduce the behavior:

  1. Create an aiflow DAG with the following code:
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator
from airflow.utils.dates import days_ago

dag_name = 'logs-job-operator'

default_args = {"owner": "leandro", "start_date": days_ago(2), "retries": 0}

with DAG(
    dag_name,
    default_args=default_args,
    description='dag_anomaly',
    schedule_interval= None,
    start_date=days_ago(1),
    tags=['ml'],
) as dag:

    start = EmptyOperator(
        task_id="start",
        dag=dag
    )
    complete = EmptyOperator(
        task_id="complete",
        dag=dag
    )


    manifest_job = {
        "apiVersion": "batch/v1",
        "kind": "Job",
        "metadata": {
            "name": "job-operator-example",
            "namespace": "default"
        },
        "spec": {
            "completions": 10,
            "parallelism": 10,
            "backoffLimit": 10,
            "template": {
                "spec": {
                    "nodeSelector": {
                        "agentpool": "userpool"
                    },
                    "containers": [
                        {
                            "name": "job-operator-example",
                            "image": "sikwan/random-json-logger:latest"
                        }
                    ],
                    "restartPolicy": "OnFailure"
                }
            }
        }
    }


    k8sJobOperator = KubernetesJobOperator(task_id="test-job-success", body=manifest_job, dag=dag)

    start >> k8sJobOperator >> complete
  1. Run the DAG
  2. In our environment the error occurs after about 5 min of execution

Expected behavior
No error raise during job execution.

Screenshots
If applicable, add screenshots to help explain your problem.

Environment
Airflow is deployed on Azure Kubernetes Service with KubernetesExecutor that spawns the workers pod in the same AKS Clusters.
Kubernetes Version 1.24.9
Airflow Version 2.4.3
airflow_kubernetes_job_operator-2.0.12

Log

[2023-02-23, 14:40:56 UTC] {queries.py:79} INFO - [anomaly/pods/test-job-success-job-operator-example-of4vaqie-q8rwm]: {"@timestamp": "2023-02-23T14:40:56+0000", "level": "DEBUG", "message": "first loop completed."}
[2023-02-23, 14:40:56 UTC] {queries.py:79} INFO - [anomaly/pods/test-job-success-job-operator-example-of4vaqie-vvn6v]: {"@timestamp": "2023-02-23T14:40:56+0000", "level": "ERROR", "message": "something happened in this execution."}
[2023-02-23, 14:40:56 UTC] {client.py:485} ERROR - Traceback (most recent call last):

  File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/response.py", line 443, in _error_catcher
    yield

  File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/response.py", line 815, in read_chunked
    self._update_chunk_length()

  File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/response.py", line 745, in _update_chunk_length
    line = self._fp.fp.readline()

  File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
    return self._sock.recv_into(b)

  File "/usr/local/lib/python3.7/ssl.py", line 1071, in recv_into
    return self.read(nbytes, buffer)

  File "/usr/local/lib/python3.7/ssl.py", line 929, in read
    return self._sslobj.read(len, buffer)

ConnectionResetError: [Errno 104] Connection reset by peer


During handling of the above exception, another exception occurred:


Traceback (most recent call last):

  File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 230, in _execute_query
    self.query_loop(client)

  File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/queries.py", line 346, in query_loop
    raise ex

  File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/queries.py", line 339, in query_loop
    return super().query_loop(client)

  File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 390, in query_loop
    raise ex

  File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 348, in query_loop
    for line in self._read_response_stream_lines(response):

  File "/home/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 183, in _read_response_stream_lines
    for chunk in response.stream(decode_content=False):

  File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/response.py", line 623, in stream
    for line in self.read_chunked(amt, decode_content=decode_content):

  File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/response.py", line 844, in read_chunked
    self._original_response.close()

  File "/usr/local/lib/python3.7/contextlib.py", line 130, in __exit__
    self.gen.throw(type, value, traceback)

  File "/home/airflow/.local/lib/python3.7/site-packages/urllib3/response.py", line 460, in _error_catcher
    raise ProtocolError("Connection broken: %r" % e, e)

urllib3.exceptions.ProtocolError: ("Connection broken: ConnectionResetError(104, 'Connection reset by peer')", ConnectionResetError(104, 'Connection reset by peer'))

Complete log:
dag_id=logs-job-operator_run_id=manual__2023-02-23T14_31_39.933191+00_00_task_id=test-job-success_attempt=1.log

@LamaAni
Copy link
Owner

LamaAni commented Feb 23, 2023

Hi, this looks like a kind of timeout.

Note that the restart policy should be never.

What is your system? Also, this seems like a timeout which is known to happen on Amazon kubernetes. See the other open conversation.

Can you run this locally with docker desktop kubernetes or some other local cluster?
Can you try this with 1 completion? And 1 fails?
Can you try running the example in the repo? Or the test in the repo?

I can only get to this mid next week. Apologies about that.

@lqsantos
Copy link
Author

Hi, @LamaAni,
Tks for reply...

I've changed the RestartPolicy to Never and the problem keeps happening.
I've tried to run with 1 competitions and doesn't work either.

Sorry but I don't know what your meant about running the example in the repo... Sorry I'm new on this. Could you explain me how to do it?
In meanwhile I'll setup and test localy as you suggested.

Tks

@LamaAni
Copy link
Owner

LamaAni commented Feb 27, 2023

Can you please test the example here:

from airflow import DAG
from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator
from airflow.utils.dates import days_ago

default_args = {
    "owner": "tester",
    "start_date": days_ago(2),
    "retries": 0,
}

dag = DAG(
    "job-operator-simple-test",
    default_args=default_args,
    description="Test base job operator",
    schedule_interval=None,
)

KubernetesJobOperator(
    task_id="very-simple-job",
    dag=dag,
    image="ubuntu",
    command=[
        "bash",
        "-c",
        "echo start; sleep 5; echo end",
    ],
)

The above should make a very fast image run - check if the timeout is the issue. If that passes, just increase the sleep time in there and you would get a test of the timeout limit on the cluster.

I forced in the code the restart policy to Never - but the yaml should be correct anyways - this is since airflow should control the restarts. Completions should works, but in this case I would set that in airflow rather then the job... it would give you more proper logs.

The error sometimes comes from a timeout on the open connection with the Kubernets cluster (connection timeout forced by server) - hence testing locally may shed light on the issue.

@lqsantos
Copy link
Author

lqsantos commented Mar 1, 2023

I've just test your proposed dag and the task end successfully when the sleep value was 5 seconds. After increasing to 300 seconds, the task ends with Connection reset by peer.
I think the issue is related to a know issue on AKS:
https://learn.microsoft.com/en-us/answers/questions/467642/no-response-from-api-for-long-running-jobs-in-aci

Since this problem is critical to the delivery of a project. We had to create our own custom operator to submit Kubernetes jobs. During the creation and testing process we came across a similar problem when we tried to stream read the logs generated by Kubernetes using the official python library for k8s.
We got around this problem by abandoning the log flow strategy and starting to make periodic requests.

@LamaAni
Copy link
Owner

LamaAni commented Mar 1, 2023

Yea this issue was already mentioned here: #54

I need to fix that but had not the time. If you can add a reconnect methodology then I will def accept a PR. If I get some time I'll fix that up but currently it is an open issue.

Feel free to close this issue when you are done.

@LamaAni LamaAni added the external bug The bug is from a supporting or related system, label Mar 1, 2023
@LamaAni LamaAni closed this as completed Mar 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
configuration issue external bug The bug is from a supporting or related system,
Projects
None yet
Development

No branches or pull requests

2 participants