From 79df2b5eeaf2809015e5fa74bafbe563dc7587b2 Mon Sep 17 00:00:00 2001 From: pirsquare Date: Sat, 6 Jun 2015 23:37:00 +0800 Subject: [PATCH 1/4] Join query results returning multiple pages --- bigquery/client.py | 52 ++++++++++---------- bigquery/tests/test_client.py | 89 +++++++++++++++++++++++++++++++---- 2 files changed, 105 insertions(+), 36 deletions(-) diff --git a/bigquery/client.py b/bigquery/client.py index 2e27c6e..ec464af 100644 --- a/bigquery/client.py +++ b/bigquery/client.py @@ -121,7 +121,6 @@ def __init__(self, bq_service, project_id, swallow_results=True): self.cache = {} def _submit_query_job(self, query_data): - """ Submit a query job to BigQuery. This is similar to BigQueryClient.query, but gives the user @@ -172,7 +171,6 @@ def _submit_query_job(self, query_data): return job_id, [self._transform_row(row, schema) for row in rows] def _insert_job(self, body_object): - """ Submit a job to BigQuery Direct proxy to the insert() method of the offical BigQuery @@ -243,9 +241,7 @@ def get_query_schema(self, job_id): A list of dictionaries that represent the schema. """ - job_collection = self.bigquery.jobs() - query_reply = self._get_query_results( - job_collection, self.project_id, job_id, offset=0, limit=0) + query_reply = self.get_query_results(job_id, offset=0, limit=0) if not query_reply['jobComplete']: logging.warning('BigQuery job %s not complete' % job_id) @@ -289,38 +285,41 @@ def check_job(self, job_id): included in the query table if it has completed. """ - job_collection = self.bigquery.jobs() - query_reply = self._get_query_results( - job_collection, self.project_id, job_id, offset=0, limit=0) + query_reply = self.get_query_results(job_id, offset=0, limit=0) return (query_reply.get('jobComplete', False), int(query_reply.get('totalRows', 0))) - def get_query_rows(self, job_id, offset=None, limit=None): + def get_query_rows(self, job_id, offset=None, limit=None, timeout=0): """Retrieve a list of rows from a query table by job id. - Args: job_id: The job id that references a BigQuery query. offset: The offset of the rows to pull from BigQuery. limit: The number of rows to retrieve from a query table. - + timeout: Timeout in seconds. Returns: A list of dictionaries that represent table rows. """ - job_collection = self.bigquery.jobs() - query_reply = self._get_query_results( - job_collection, self.project_id, job_id, offset=offset, - limit=limit) - + # Get query results + query_reply = self.get_query_results(job_id, offset=offset, limit=limit, timeout=timeout) if not query_reply['jobComplete']: logging.warning('BigQuery job %s not complete' % job_id) raise UnfinishedQueryException() - schema = query_reply['schema']['fields'] + schema = query_reply["schema"]["fields"] rows = query_reply.get('rows', []) - - return [self._transform_row(row, schema) for row in rows] + page_token = query_reply.get("pageToken") + records = [self._transform_row(row, schema) for row in rows] + + # Append to records if there are multiple pages for query results + while page_token: + query_reply = self.get_query_results(job_id, offset=offset, limit=limit, + page_token=page_token, timeout=timeout) + page_token = query_reply.get("pageToken") + rows = query_reply.get('rows', []) + records += [self._transform_row(row, schema) for row in rows] + return records def check_table(self, dataset, table): """Check to see if a table exists. @@ -1039,27 +1038,26 @@ def _in_range(self, start_time, end_time, time): time <= start_time <= time + ONE_MONTH or \ time <= end_time <= time + ONE_MONTH - def _get_query_results(self, job_collection, project_id, job_id, - offset=None, limit=None): + def get_query_results(self, job_id, offset=None, limit=None, page_token=None, timeout=0): """Execute the query job indicated by the given job id. - Args: - job_collection: The collection the job belongs to. - project_id: The project id of the table. job_id: The job id of the query to check. offset: The index the result set should start at. limit: The maximum number of results to retrieve. - + page_token: Page token, returned by a previous call, to request the next page of results. + timeout: Timeout in seconds. Returns: The query reply. """ + job_collection = self.bigquery.jobs() return job_collection.getQueryResults( - projectId=project_id, + projectId=self.project_id, jobId=job_id, startIndex=offset, maxResults=limit, - timeoutMs=0).execute() + pageToken=page_token, + timeoutMs=timeout * 1000).execute() def _transform_row(self, row, schema): """Apply the given schema to the given BigQuery data row. diff --git a/bigquery/tests/test_client.py b/bigquery/tests/test_client.py index 30a8178..b09cba4 100644 --- a/bigquery/tests/test_client.py +++ b/bigquery/tests/test_client.py @@ -358,7 +358,6 @@ def test_get_response(self): """Ensure that the query is executed and the query reply is returned. """ - project_id = 'foo' job_id = 'bar' mock_query_job = mock.Mock() @@ -368,14 +367,15 @@ def test_get_response(self): offset = 5 limit = 10 + page_token = "token" + timeout = 1 - actual = self.client._get_query_results(self.mock_job_collection, - project_id, job_id, - offset, limit) + actual = self.client.get_query_results(job_id, offset, limit, page_token, timeout) self.mock_job_collection.getQueryResults.assert_called_once_with( - timeoutMs=0, projectId=project_id, jobId=job_id, - startIndex=offset, maxResults=limit) + projectId=self.project_id, jobId=job_id, startIndex=offset, + maxResults=limit, pageToken=page_token, timeoutMs=1000) + mock_query_job.execute.assert_called_once() self.assertEquals(actual, mock_query_reply) @@ -458,7 +458,7 @@ def test_transform_row_with_nested_repeated(self): self.assertEquals(actual, expected) -@mock.patch('bigquery.client.BigQueryClient._get_query_results') +@mock.patch('bigquery.client.BigQueryClient.get_query_results') class TestCheckJob(unittest.TestCase): def setUp(self): @@ -1175,7 +1175,7 @@ def test_not_inside_range(self): } -@mock.patch('bigquery.client.BigQueryClient._get_query_results') +@mock.patch('bigquery.client.BigQueryClient.get_query_results') class TestGetQuerySchema(unittest.TestCase): def test_query_complete(self, get_query_mock): @@ -1251,7 +1251,7 @@ def test_table_does_not_exist(self): self.mock_tables.get.return_value.execute.assert_called_once_with() -@mock.patch('bigquery.client.BigQueryClient._get_query_results') +@mock.patch('bigquery.client.BigQueryClient.get_query_results') class TestGetQueryRows(unittest.TestCase): def test_query_complete(self, get_query_mock): @@ -1281,6 +1281,77 @@ def test_query_complete(self, get_query_mock): {'foo': 'abc', 'spider': 'xyz'}] self.assertEquals(result_rows, expected_rows) + def test_query_complete_with_page_token(self, get_query_mock): + """Ensure that get_query_rows works with page token.""" + from bigquery.client import BigQueryClient + + page_one_resp = { + "jobComplete": True, + "kind": "bigquery#getQueryResultsResponse", + "pageToken": "TOKEN_TO_PAGE_2", + "schema": { + "fields": [{ + "name": "first_name", + "type": "STRING", + }, { + "name": "last_name", + "type": "STRING", + }] + }, + "rows": [{ + "f": [{ + "v": "foo", + }, { + "v": "bar" + }] + }, { + "f": [{ + "v": "abc", + }, { + "v": "xyz" + }] + }], + "totalRows": "4" + } + + page_two_resp = { + "jobComplete": True, + "kind": "bigquery#getQueryResultsResponse", + "schema": { + "fields": [{ + "name": "first_name", + "type": "STRING", + }, { + "name": "last_name", + "type": "STRING", + }] + }, + "rows": [{ + "f": [{ + "v": "the", + }, { + "v": "beatles" + }] + }, { + "f": [{ + "v": "monty", + }, { + "v": "python" + }] + }], + "totalRows": "4" + } + + bq = BigQueryClient(mock.Mock(), 'project') + get_query_mock.side_effect = [page_one_resp, page_two_resp] + result_rows = bq.get_query_rows(job_id=123, offset=0, limit=0) + + expected_rows = [{'first_name': 'foo', 'last_name': 'bar'}, + {'first_name': 'abc', 'last_name': 'xyz'}, + {'first_name': 'the', 'last_name': 'beatles'}, + {'first_name': 'monty', 'last_name': 'python'}] + self.assertEquals(result_rows, expected_rows) + def test_query_incomplete(self, get_query_mock): """Ensure that get_query_rows handles scenarios where the query is not finished. From 606330e5c3161aa4d1db522a7e65cbc95bce5e64 Mon Sep 17 00:00:00 2001 From: pirsquare Date: Sat, 6 Jun 2015 23:38:26 +0800 Subject: [PATCH 2/4] Add dataset helper methods --- bigquery/client.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/bigquery/client.py b/bigquery/client.py index ec464af..89f02a1 100644 --- a/bigquery/client.py +++ b/bigquery/client.py @@ -321,6 +321,33 @@ def get_query_rows(self, job_id, offset=None, limit=None, timeout=0): records += [self._transform_row(row, schema) for row in rows] return records + def check_dataset(self, dataset_id): + """Check to see if a dataset exists. + Args: + dataset: dataset unique id + Returns: + bool indicating if the table exists. + """ + dataset = self.get_dataset(dataset_id) + return bool(dataset) + + def get_dataset(self, dataset_id): + """ + Retrieve a dataset if it exists, otherwise return an empty dict. + Args: + dataset: dataset unique id + Returns: + dictionary containing the dataset object if it exists, otherwise + an empty dictionary + """ + try: + dataset = self.bigquery.datasets().get( + projectId=self.project_id, datasetId=dataset_id).execute() + except HttpError: + dataset = {} + + return dataset + def check_table(self, dataset, table): """Check to see if a table exists. From 1dfbf6a119fc5227e73d8f9686f4c7bb9ddca55c Mon Sep 17 00:00:00 2001 From: pirsquare Date: Sat, 6 Jun 2015 23:43:10 +0800 Subject: [PATCH 3/4] Add example on checking dataset --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index 0359d37..62f2556 100644 --- a/README.md +++ b/README.md @@ -225,6 +225,9 @@ client.update_dataset('mydataset', friendly_name="mon Dataset") # description is # Patch dataset client.patch_dataset('mydataset', friendly_name="mon Dataset") # friendly_name changed; description is preserved + +# Check if dataset exists. +exists = client.check_dataset('mydataset') ``` # Creating a schema from a sample record From ca408f84c5d327812032c605f950c1f89724912c Mon Sep 17 00:00:00 2001 From: pirsquare Date: Sun, 7 Jun 2015 05:19:39 +0800 Subject: [PATCH 4/4] Update docstrings for `get_query_rows` and `get_query_results` --- bigquery/client.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/bigquery/client.py b/bigquery/client.py index 89f02a1..331940f 100644 --- a/bigquery/client.py +++ b/bigquery/client.py @@ -292,6 +292,10 @@ def check_job(self, job_id): def get_query_rows(self, job_id, offset=None, limit=None, timeout=0): """Retrieve a list of rows from a query table by job id. + This method will append results from multiple pages together. If you want + to manually page through results, you can use `get_query_results` + method directly. + Args: job_id: The job id that references a BigQuery query. offset: The offset of the rows to pull from BigQuery. @@ -1066,7 +1070,9 @@ def _in_range(self, start_time, end_time, time): time <= end_time <= time + ONE_MONTH def get_query_results(self, job_id, offset=None, limit=None, page_token=None, timeout=0): - """Execute the query job indicated by the given job id. + """Execute the query job indicated by the given job id. This is direct mapping to + bigquery api https://cloud.google.com/bigquery/docs/reference/v2/jobs/getQueryResults + Args: job_id: The job id of the query to check. offset: The index the result set should start at.