Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bigquery retry #3238

Merged
merged 20 commits into from
May 3, 2023
6 changes: 3 additions & 3 deletions luigi/contrib/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def is_error_5xx(err):
wait=wait_exponential(multiplier=1, min=1, max=10),
stop=stop_after_attempt(3),
reraise=True,
after=lambda x: x.args[0].__initialise_client()
after=lambda x: x.args[0]._initialise_client()
)


Expand Down Expand Up @@ -152,9 +152,9 @@ def __init__(self, oauth_credentials=None, descriptor='', http_=None):
self.descriptor = descriptor
self.http_ = http_

self.__initialise_client()
self._initialise_client()

def __initialise_client(self):
def _initialise_client(self):
authenticate_kwargs = gcp.get_authenticate_kwargs(self.oauth_credentials, self.http_)

if self.descriptor:
Expand Down
32 changes: 31 additions & 1 deletion test/contrib/bigquery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@

import mock
import pytest
from mock.mock import MagicMock

from luigi.contrib import bigquery
from luigi.contrib.bigquery import BigQueryLoadTask, BigQueryTarget, BQDataset, \
BigQueryRunQueryTask, BigQueryExtractTask
BigQueryRunQueryTask, BigQueryExtractTask, BigQueryClient
from luigi.contrib.gcs import GCSTarget


Expand Down Expand Up @@ -147,3 +149,31 @@ def output(self):
}
}
run_job.assert_called_with('proj', expected_body, dataset=BQDataset('proj', 'ds', None))


class BigQueryClientTest(unittest.TestCase):

def test_retry_succeeds_on_second_attempt(self):
try:
from googleapiclient import errors
except ImportError:
raise unittest.SkipTest('Unable to load googleapiclient module')
client = MagicMock(spec=BigQueryClient)
attempts = 0

@bigquery.bq_retry
def fail_once(bq_client):
nonlocal attempts
attempts += 1
if attempts == 1:
raise errors.HttpError(
resp=MagicMock(status=500),
content=b'{"error": {"message": "stub"}',
)
else:
return MagicMock(status=200)

response = fail_once(client)
client._initialise_client.assert_called_once()
self.assertEqual(attempts, 2)
self.assertEqual(response.status, 200)