Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ client.update_dataset('mydataset', friendly_name="mon Dataset") # description is

# Patch dataset
client.patch_dataset('mydataset', friendly_name="mon Dataset") # friendly_name changed; description is preserved

# Check if dataset exists.
exists = client.check_dataset('mydataset')
```

# Creating a schema from a sample record
Expand Down
81 changes: 56 additions & 25 deletions bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ def __init__(self, bq_service, project_id, swallow_results=True):
self.cache = {}

def _submit_query_job(self, query_data):

""" Submit a query job to BigQuery.

This is similar to BigQueryClient.query, but gives the user
Expand Down Expand Up @@ -172,7 +171,6 @@ def _submit_query_job(self, query_data):
return job_id, [self._transform_row(row, schema) for row in rows]

def _insert_job(self, body_object):

""" Submit a job to BigQuery

Direct proxy to the insert() method of the offical BigQuery
Expand Down Expand Up @@ -243,9 +241,7 @@ def get_query_schema(self, job_id):
A list of dictionaries that represent the schema.
"""

job_collection = self.bigquery.jobs()
query_reply = self._get_query_results(
job_collection, self.project_id, job_id, offset=0, limit=0)
query_reply = self.get_query_results(job_id, offset=0, limit=0)

if not query_reply['jobComplete']:
logging.warning('BigQuery job %s not complete' % job_id)
Expand Down Expand Up @@ -289,38 +285,72 @@ def check_job(self, job_id):
included in the query table if it has completed.
"""

job_collection = self.bigquery.jobs()
query_reply = self._get_query_results(
job_collection, self.project_id, job_id, offset=0, limit=0)
query_reply = self.get_query_results(job_id, offset=0, limit=0)

return (query_reply.get('jobComplete', False),
int(query_reply.get('totalRows', 0)))

def get_query_rows(self, job_id, offset=None, limit=None):
def get_query_rows(self, job_id, offset=None, limit=None, timeout=0):
"""Retrieve a list of rows from a query table by job id.
This method will append results from multiple pages together. If you want
to manually page through results, you can use `get_query_results`
method directly.

Args:
job_id: The job id that references a BigQuery query.
offset: The offset of the rows to pull from BigQuery.
limit: The number of rows to retrieve from a query table.

timeout: Timeout in seconds.
Returns:
A list of dictionaries that represent table rows.
"""

job_collection = self.bigquery.jobs()
query_reply = self._get_query_results(
job_collection, self.project_id, job_id, offset=offset,
limit=limit)

# Get query results
query_reply = self.get_query_results(job_id, offset=offset, limit=limit, timeout=timeout)
if not query_reply['jobComplete']:
logging.warning('BigQuery job %s not complete' % job_id)
raise UnfinishedQueryException()

schema = query_reply['schema']['fields']
schema = query_reply["schema"]["fields"]
rows = query_reply.get('rows', [])
page_token = query_reply.get("pageToken")
records = [self._transform_row(row, schema) for row in rows]

# Append to records if there are multiple pages for query results
while page_token:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to add a note in the docstring of this method that it will append each page of results, which is potentially a very expensive call. If users want to manually page through results, they can use get_query_results. I think in general, we should probably make the docstrings of get_query_rows and get_query_results more detailed since it might not be obvious what the difference between the two is.

query_reply = self.get_query_results(job_id, offset=offset, limit=limit,
page_token=page_token, timeout=timeout)
page_token = query_reply.get("pageToken")
rows = query_reply.get('rows', [])
records += [self._transform_row(row, schema) for row in rows]
return records

def check_dataset(self, dataset_id):
"""Check to see if a dataset exists.
Args:
dataset: dataset unique id
Returns:
bool indicating if the table exists.
"""
dataset = self.get_dataset(dataset_id)
return bool(dataset)

return [self._transform_row(row, schema) for row in rows]
def get_dataset(self, dataset_id):
"""
Retrieve a dataset if it exists, otherwise return an empty dict.
Args:
dataset: dataset unique id
Returns:
dictionary containing the dataset object if it exists, otherwise
an empty dictionary
"""
try:
dataset = self.bigquery.datasets().get(
projectId=self.project_id, datasetId=dataset_id).execute()
except HttpError:
dataset = {}

return dataset

def check_table(self, dataset, table):
"""Check to see if a table exists.
Expand Down Expand Up @@ -1039,27 +1069,28 @@ def _in_range(self, start_time, end_time, time):
time <= start_time <= time + ONE_MONTH or \
time <= end_time <= time + ONE_MONTH

def _get_query_results(self, job_collection, project_id, job_id,
offset=None, limit=None):
"""Execute the query job indicated by the given job id.
def get_query_results(self, job_id, offset=None, limit=None, page_token=None, timeout=0):
"""Execute the query job indicated by the given job id. This is direct mapping to
bigquery api https://cloud.google.com/bigquery/docs/reference/v2/jobs/getQueryResults

Args:
job_collection: The collection the job belongs to.
project_id: The project id of the table.
job_id: The job id of the query to check.
offset: The index the result set should start at.
limit: The maximum number of results to retrieve.

page_token: Page token, returned by a previous call, to request the next page of results.
timeout: Timeout in seconds.
Returns:
The query reply.
"""

job_collection = self.bigquery.jobs()
return job_collection.getQueryResults(
projectId=project_id,
projectId=self.project_id,
jobId=job_id,
startIndex=offset,
maxResults=limit,
timeoutMs=0).execute()
pageToken=page_token,
timeoutMs=timeout * 1000).execute()

def _transform_row(self, row, schema):
"""Apply the given schema to the given BigQuery data row.
Expand Down
89 changes: 80 additions & 9 deletions bigquery/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,6 @@ def test_get_response(self):
"""Ensure that the query is executed and the query reply is returned.
"""

project_id = 'foo'
job_id = 'bar'

mock_query_job = mock.Mock()
Expand All @@ -368,14 +367,15 @@ def test_get_response(self):

offset = 5
limit = 10
page_token = "token"
timeout = 1

actual = self.client._get_query_results(self.mock_job_collection,
project_id, job_id,
offset, limit)
actual = self.client.get_query_results(job_id, offset, limit, page_token, timeout)

self.mock_job_collection.getQueryResults.assert_called_once_with(
timeoutMs=0, projectId=project_id, jobId=job_id,
startIndex=offset, maxResults=limit)
projectId=self.project_id, jobId=job_id, startIndex=offset,
maxResults=limit, pageToken=page_token, timeoutMs=1000)

mock_query_job.execute.assert_called_once()
self.assertEquals(actual, mock_query_reply)

Expand Down Expand Up @@ -458,7 +458,7 @@ def test_transform_row_with_nested_repeated(self):
self.assertEquals(actual, expected)


@mock.patch('bigquery.client.BigQueryClient._get_query_results')
@mock.patch('bigquery.client.BigQueryClient.get_query_results')
class TestCheckJob(unittest.TestCase):

def setUp(self):
Expand Down Expand Up @@ -1175,7 +1175,7 @@ def test_not_inside_range(self):
}


@mock.patch('bigquery.client.BigQueryClient._get_query_results')
@mock.patch('bigquery.client.BigQueryClient.get_query_results')
class TestGetQuerySchema(unittest.TestCase):

def test_query_complete(self, get_query_mock):
Expand Down Expand Up @@ -1251,7 +1251,7 @@ def test_table_does_not_exist(self):
self.mock_tables.get.return_value.execute.assert_called_once_with()


@mock.patch('bigquery.client.BigQueryClient._get_query_results')
@mock.patch('bigquery.client.BigQueryClient.get_query_results')
class TestGetQueryRows(unittest.TestCase):

def test_query_complete(self, get_query_mock):
Expand Down Expand Up @@ -1281,6 +1281,77 @@ def test_query_complete(self, get_query_mock):
{'foo': 'abc', 'spider': 'xyz'}]
self.assertEquals(result_rows, expected_rows)

def test_query_complete_with_page_token(self, get_query_mock):
"""Ensure that get_query_rows works with page token."""
from bigquery.client import BigQueryClient

page_one_resp = {
"jobComplete": True,
"kind": "bigquery#getQueryResultsResponse",
"pageToken": "TOKEN_TO_PAGE_2",
"schema": {
"fields": [{
"name": "first_name",
"type": "STRING",
}, {
"name": "last_name",
"type": "STRING",
}]
},
"rows": [{
"f": [{
"v": "foo",
}, {
"v": "bar"
}]
}, {
"f": [{
"v": "abc",
}, {
"v": "xyz"
}]
}],
"totalRows": "4"
}

page_two_resp = {
"jobComplete": True,
"kind": "bigquery#getQueryResultsResponse",
"schema": {
"fields": [{
"name": "first_name",
"type": "STRING",
}, {
"name": "last_name",
"type": "STRING",
}]
},
"rows": [{
"f": [{
"v": "the",
}, {
"v": "beatles"
}]
}, {
"f": [{
"v": "monty",
}, {
"v": "python"
}]
}],
"totalRows": "4"
}

bq = BigQueryClient(mock.Mock(), 'project')
get_query_mock.side_effect = [page_one_resp, page_two_resp]
result_rows = bq.get_query_rows(job_id=123, offset=0, limit=0)

expected_rows = [{'first_name': 'foo', 'last_name': 'bar'},
{'first_name': 'abc', 'last_name': 'xyz'},
{'first_name': 'the', 'last_name': 'beatles'},
{'first_name': 'monty', 'last_name': 'python'}]
self.assertEquals(result_rows, expected_rows)

def test_query_incomplete(self, get_query_mock):
"""Ensure that get_query_rows handles scenarios where the query is not
finished.
Expand Down