From bcf17c6b2f3939f52cacd73ebfec159cf3633029 Mon Sep 17 00:00:00 2001 From: Aneil Mallavarapu Date: Sat, 9 Aug 2014 00:01:17 -0700 Subject: [PATCH 01/12] WIP - working on cloud storage import --- bigquery/client.py | 85 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 82 insertions(+), 3 deletions(-) diff --git a/bigquery/client.py b/bigquery/client.py index d94b6b8..90db74d 100644 --- a/bigquery/client.py +++ b/bigquery/client.py @@ -1,6 +1,7 @@ import calendar from collections import defaultdict from datetime import datetime +from time import sleep import json from apiclient.discovery import build @@ -11,9 +12,20 @@ from bigquery.errors import UnfinishedQueryException from bigquery.schema_builder import schema_from_record + BIGQUERY_SCOPE = 'https://www.googleapis.com/auth/bigquery' BIGQUERY_SCOPE_READ_ONLY = 'https://www.googleapis.com/auth/bigquery.readonly' +JOB_DISPOSITION_CREATE_IF_NEEDED = 'CREATE_IF_NEEDED' +JOB_DISPOSITION_CREATE_NEVER = 'CREATE_NEVER' +JOB_SOURCE_FORMAT_JSON = 'json' +JOB_SOURCE_FORMAT_CSV = 'csv' +JOB_DISPOSITION_WRITE_TRUNCATE = 'WRITE_TRUNCATE' +JOB_DISPOSITION_WRITE_APPEND = 'WRITE_APPEND' +JOB_DISPOSITION_WRITE_EMPTY = 'WRITE_EMPTY' +JOB_ENCODING_UTF_8 = 'UTF-8' +JOB_ENCODING_ISO_8859_1 = 'ISO-8859-1' + def get_client(project_id, credentials=None, service_account=None, private_key=None, readonly=True): @@ -67,11 +79,11 @@ def _get_bq_service(credentials=None, service_account=None, private_key=None, def _credentials(): """Import and return SignedJwtAssertionCredentials class""" from oauth2client.client import SignedJwtAssertionCredentials + return SignedJwtAssertionCredentials class BigQueryClient(object): - def __init__(self, bq_service, project_id): self.bigquery = bq_service self.project_id = project_id @@ -309,6 +321,72 @@ def get_tables(self, dataset_id, app_id, start_time, end_time): return self._filter_tables_by_time(app_tables, start_time, end_time) + def import_data_from_uris(self, + job, + source_uris, + dataset, + table, + schema, + wait=False, + allow_jagged_rows=True, + create_disposition=JOB_DISPOSITION_CREATE_IF_NEEDED, + allow_quoted_newlines=True, + ignore_unknown_values=True, + field_delimiter='\t', + max_bad_records=None, + quote='"', + skip_leading_rows=0, + source_format=JOB_SOURCE_FORMAT_JSON, + write_disposition=JOB_DISPOSITION_WRITE_EMPTY, + encoding=JOB_ENCODING_UTF_8): + + source_uris = source_uris if isinstance(source_uris, list) else [source_uris] + body = { + "kind": "bigquery#job", + "jobReference": { + "projectId": self.project_id, + "jobId": job + }, + "configuration": { + "load": { + "sourceUris": source_uris, + "schema": schema, + "destinationTable": { + "projectId": self.project_id, + "datasetId": dataset, + "tableId": table + }, + "createDisposition": create_disposition, + "writeDisposition": write_disposition, + "fieldDelimiter": field_delimiter, + "skipLeadingRows": skip_leading_rows, + "encoding": encoding, + "quote": quote, + "maxBadRecords": max_bad_records, + "allowQuotedNewlines": allow_quoted_newlines, + "sourceFormat": source_format, + "allowJaggedRows": allow_jagged_rows, + "ignoreUnknownValues": ignore_unknown_values + } + } + } + job_resource = self.bigquery.jobs() \ + .insert(projectId=self.project_id, body=body) \ + .execute() + + return job_resource + + def wait_for_job(self, job_resource): + status = job_resource['state']['status'].lower() + job_id = job_resource['jobReference']['jobId'] + while status not in ('done', 'failed'): + sleep(5) + try: + job_resource = self.bigquery.jobs.get(self.project_id, job_id) + status = job_resource['state']['status'].lower() + except Exception, e: + logger("Failed to get job status for job: {0}: {1}".format(job_id, e)) + def push_rows(self, dataset, table, rows, insert_id_key=None): """Upload rows to BigQuery table. @@ -471,8 +549,8 @@ def _in_range(self, start_time, end_time, time): ONE_MONTH = 2764800 # 32 days return start_time <= time <= end_time or \ - time <= start_time <= time + ONE_MONTH or \ - time <= end_time <= time + ONE_MONTH + time <= start_time <= time + ONE_MONTH or \ + time <= end_time <= time + ONE_MONTH def _get_query_results(self, job_collection, project_id, job_id, offset=None, limit=None): @@ -740,3 +818,4 @@ def schema_from_record(cls, record): { rfield: [ { x: 1}, {x: "a string"} ] } # undefined! """ return schema_from_record(record) + From 9000b1ab37e4847d8e69be85ee84f99d9b198ab6 Mon Sep 17 00:00:00 2001 From: Aneil Mallavarapu Date: Sat, 9 Aug 2014 11:14:17 -0700 Subject: [PATCH 02/12] Add import_data_from_uris and wait_for_job The import_data_from_uris method imports data from Google Cloud Storage. The wait_for_job method takes a job resource or jobId and waits until the job completes, polling with a user-specified interval and timing out after a different user-specified interval. --- README.md | 14 +++ bigquery/client.py | 113 +++++++++++++------ bigquery/tests/test_client.py | 206 +++++++++++++++++++++++++++------- 3 files changed, 258 insertions(+), 75 deletions(-) diff --git a/README.md b/README.md index a533ea9..da274b5 100644 --- a/README.md +++ b/README.md @@ -139,6 +139,20 @@ rows = [ inserted = client.push_rows('dataset', 'table', rows, 'id') ``` +# Import data from Google cloud storage +```python +schema = [ {"name": "username", "type": "string", "mode": "nullable"} ] +job = client.import_data_from_uris('myjob', + ['gs://mybucket/mydata.json'], + 'dataset', + 'table', + schema, + source_format=JOB_SOURCE_FORMAT_JSON) + +job = client.wait_for_job(job, timeout=60) # or client.wait_for_job('myjob') +print(job) +``` + # Managing Datasets The client provides an API for listing, creating, deleting, updating and patching datasets. diff --git a/bigquery/client.py b/bigquery/client.py index 90db74d..31eedf9 100644 --- a/bigquery/client.py +++ b/bigquery/client.py @@ -1,7 +1,7 @@ import calendar from collections import defaultdict from datetime import datetime -from time import sleep +from time import sleep, time import json from apiclient.discovery import build @@ -321,26 +321,50 @@ def get_tables(self, dataset_id, app_id, start_time, end_time): return self._filter_tables_by_time(app_tables, start_time, end_time) - def import_data_from_uris(self, - job, - source_uris, - dataset, - table, - schema, - wait=False, - allow_jagged_rows=True, - create_disposition=JOB_DISPOSITION_CREATE_IF_NEEDED, - allow_quoted_newlines=True, - ignore_unknown_values=True, - field_delimiter='\t', - max_bad_records=None, - quote='"', - skip_leading_rows=0, - source_format=JOB_SOURCE_FORMAT_JSON, - write_disposition=JOB_DISPOSITION_WRITE_EMPTY, - encoding=JOB_ENCODING_UTF_8): - - source_uris = source_uris if isinstance(source_uris, list) else [source_uris] + def import_data_from_uris( + self, + job, + source_uris, + dataset, + table, + schema, + allow_jagged_rows=True, + create_disposition=JOB_DISPOSITION_CREATE_IF_NEEDED, + allow_quoted_newlines=True, + ignore_unknown_values=True, + field_delimiter='\t', + max_bad_records=None, + quote='"', + skip_leading_rows=0, + source_format=JOB_SOURCE_FORMAT_JSON, + write_disposition=JOB_DISPOSITION_WRITE_EMPTY, + encoding=JOB_ENCODING_UTF_8): + """ + Imports data into a BigQuery table from cloud storage. + Args: + job: required string identifying the job + source_uris: required string or list of strings representing + the uris on cloud storage of the form: + gs://bucket/filename + dataset: required string id of the dataset + table: required string id of the table + schema: required list representing the bigquery schema + allow_jagged_rows: optional boolean default True + create_disposition: optional string default 'CREATE_IF_NEEDED' + allow_quoted_newlines: optional boolean default True + ignore_unknown_values: optional boolean default True + field_delimiter: optional string default ',' for csv files + max_bad_records: optional boolean default None + quote: optional string '"' + skip_leading_rows: optional int default 0 + source_format: optional string default 'JSON' + write_disposition: optional string default 'WRITE_EMPTY' + encoding: optional string default 'utf-8' + Returns: + dict, a BigQuery job resource or None on failure + """ + source_uris = source_uris if isinstance(source_uris, list) \ + else [source_uris] body = { "kind": "bigquery#job", "jobReference": { @@ -370,22 +394,40 @@ def import_data_from_uris(self, } } } - job_resource = self.bigquery.jobs() \ - .insert(projectId=self.project_id, body=body) \ - .execute() - - return job_resource + try: + job_resource = self.bigquery.jobs() \ + .insert(projectId=self.project_id, body=body) \ + .execute() + return job_resource + except Exception, e: + logger.error("Failed while starting uri import job: {0}" + .format(e)) + return None - def wait_for_job(self, job_resource): - status = job_resource['state']['status'].lower() + def wait_for_job(self, job_resource, interval=5, timeout=None): + """ + Waits until the job indicated by job_resource is done or has failed + Args: + job_resource: dict, a BigQuery job resource + interval: optional float polling interval in seconds, default = 5 + timeout: optional float timeout in seconds, default = None + Returns: + dict, final state of the job_resource, as described here: + https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/python/latest/bigquery_v2.jobs.html#get + Raises: + standard exceptions on http / auth failures (you must retry) + """ + complete = job_resource.get('jobComplete') job_id = job_resource['jobReference']['jobId'] - while status not in ('done', 'failed'): - sleep(5) - try: - job_resource = self.bigquery.jobs.get(self.project_id, job_id) - status = job_resource['state']['status'].lower() - except Exception, e: - logger("Failed to get job status for job: {0}: {1}".format(job_id, e)) + start_time = time() + elapsed_time = 0 + while not (complete or (timeout is not None and elapsed_time > timeout)): + sleep(interval) + job_resource = self.bigquery.jobs().get(self.project_id, job_id) + complete = job_resource.get('jobComplete') + elapsed_time = time() - start_time + + return job_resource def push_rows(self, dataset, table, rows, insert_id_key=None): """Upload rows to BigQuery table. @@ -818,4 +860,3 @@ def schema_from_record(cls, record): { rfield: [ { x: 1}, {x: "a string"} ] } # undefined! """ return schema_from_record(record) - diff --git a/bigquery/tests/test_client.py b/bigquery/tests/test_client.py index 6808eb1..8c240c5 100644 --- a/bigquery/tests/test_client.py +++ b/bigquery/tests/test_client.py @@ -6,7 +6,6 @@ class TestGetClient(unittest.TestCase): - def setUp(self): client._bq_client = None @@ -84,7 +83,6 @@ def test_initialize_read_write(self, mock_build, mock_return_cred): class TestQuery(unittest.TestCase): - def setUp(self): client._bq_client = None @@ -169,7 +167,7 @@ def test_query_timeout(self): self.mock_job_collection.query.assert_called_once_with( projectId=self.project_id, body={'query': self.query, 'timeoutMs': timeout * 1000, 'dryRun': - False, 'maxResults': None} + False, 'maxResults': None} ) self.assertEquals(job_id, 'spiderman') self.assertEquals(results, []) @@ -249,7 +247,6 @@ def test_query_with_results(self): class TestGetQueryResults(unittest.TestCase): - def setUp(self): client._bq_client = None @@ -289,7 +286,6 @@ def test_get_response(self): class TestTransformRow(unittest.TestCase): - def setUp(self): client._bq_client = None @@ -368,7 +364,6 @@ def test_transform_row_with_nested_repeated(self): @mock.patch('bigquery.client.BigQueryClient._get_query_results') class TestCheckJob(unittest.TestCase): - def setUp(self): client._bq_client = None self.project_id = 'project' @@ -408,8 +403,156 @@ def test_query_complete(self, mock_exec): self.assertEquals(total_rows, 2) -class TestFilterTablesByTime(unittest.TestCase): +class TestWaitForJob(unittest.TestCase): + def setUp(self): + client._bq_client = None + self.project_id = 'project' + self.api_mock = mock.Mock() + self.client = client.BigQueryClient(self.api_mock, self.project_id) + + def test_detects_completion(self): + """ Ensure we can detect completed jobs""" + + return_values = [{'jobComplete': False, + 'jobReference': {'jobId': "testJob"}}, + {'jobComplete': True, + 'jobReference': {'jobId': "testJob"}}] + + def side_effect(*args, **kwargs): + return return_values.pop(0) + + self.api_mock.jobs().get.side_effect = side_effect + self.client.wait_for_job({'jobComplete': False, + 'jobReference': {'jobId': "testJob"}}, + interval=.01, + timeout=None) + + self.assertEqual(self.api_mock.jobs().get.call_count, 2) + + def test_respects_timeout(self): + """Ensure that the wait can be timed out""" + incomplete_job = {'jobComplete': False, + 'jobReference': {'jobId': "testJob"}} + + self.api_mock.jobs().get.return_value = incomplete_job + + job_resource = self.client.wait_for_job(incomplete_job, + interval=.1, + timeout=.25) + + self.assertEqual(self.api_mock.jobs().get.call_count, 3) + self.assertFalse(job_resource['jobComplete']) + + def test_respects_zero_timeout(self): + """Ensure that the wait times out even if timeout is zero""" + incomplete_job = {'jobComplete': False, + 'jobReference': {'jobId': "testJob"}} + + self.api_mock.jobs().get.return_value = incomplete_job + + job_resource = self.client.wait_for_job(incomplete_job, + interval=.01, + timeout=0) + + self.assertEqual(self.api_mock.jobs().get.call_count, 1) + self.assertFalse(job_resource['jobComplete']) + + +class TestImportDataFromURIs(unittest.TestCase): + def setUp(self): + self.body = { + "kind": "bigquery#job", + "jobReference": { + "projectId": "project", + "jobId": "job" + }, + "configuration": { + "load": { + "sourceUris": ["sourceuri"], + "schema": ["schema"], + "destinationTable": { + "projectId": "project", + "datasetId": "dataset", + "tableId": "table" + }, + "createDisposition": "a", + "writeDisposition": "b", + "fieldDelimiter": "c", + "skipLeadingRows": "d", + "encoding": "e", + "quote": "f", + "maxBadRecords": "g", + "allowQuotedNewlines": "h", + "sourceFormat": "i", + "allowJaggedRows": "j", + "ignoreUnknownValues": "k" + } + } + } + + def test_job_body_constructed_correctly(self): + mock_api = mock.Mock() + bq = client.BigQueryClient(mock_api, "project") + bq.import_data_from_uris("job", ["sourceuri"], "dataset", "table", ["schema"], + create_disposition="a", + write_disposition="b", + field_delimiter="c", + skip_leading_rows="d", + encoding="e", + quote="f", + max_bad_records="g", + allow_quoted_newlines="h", + source_format="i", + allow_jagged_rows="j", + ignore_unknown_values="k") + + mock_api.jobs().insert.assert_called_once_with(projectId="project", + body=self.body) + mock_api.jobs().insert(projectId="project", body=self.body) \ + .execute.called_once_with() + + def test_accepts_single_source_uri(self): + """Ensure that a source_uri accepts a non-list""" + mock_api = mock.Mock() + bq = client.BigQueryClient(mock_api, "project") + bq.import_data_from_uris("job", + "sourceuri", # not a list! + "dataset", + "table", + ["schema"], + create_disposition="a", + write_disposition="b", + field_delimiter="c", + skip_leading_rows="d", + encoding="e", + quote="f", + max_bad_records="g", + allow_quoted_newlines="h", + source_format="i", + allow_jagged_rows="j", + ignore_unknown_values="k") + + mock_api.jobs().insert.assert_called_once_with(projectId="project", + body=self.body) + mock_api.jobs().insert(projectId="project", body=self.body) \ + .execute.called_once_with() + + def test_swallows_exception(self): + """Ensure that exceptions are handled""" + mock_api = mock.Mock() + mock_api.jobs().insert.side_effect = Exception + + bq = client.BigQueryClient(mock_api, "project") + result = bq.import_data_from_uris("job", + "sourceuri", # not a list! + "dataset", + "table", + ["schema"]) + self.assertEqual(result, None) + + +class TestFilterTablesByTime(unittest.TestCase): def test_empty_tables(self): """Ensure we can handle filtering an empty dictionary""" @@ -425,15 +568,15 @@ def test_multi_inside_range(self): bq = client.BigQueryClient(None, 'project') tables = bq._filter_tables_by_time({ - 'Spider-Man': 1370002001, - 'Daenerys Targaryen': 1370001999, - 'Gordon Freeman': 1369999999, - 'William Shatner': 1370001000, - 'Heavy Weapons Guy': 0 - }, 1370002000, 1370000000) + 'Spider-Man': 1370002001, + 'Daenerys Targaryen': 1370001999, + 'Gordon Freeman': 1369999999, + 'William Shatner': 1370001000, + 'Heavy Weapons Guy': 0 + }, 1370002000, 1370000000) self.assertEqual([ - 'Daenerys Targaryen', 'William Shatner', 'Gordon Freeman'], tables) + 'Daenerys Targaryen', 'William Shatner', 'Gordon Freeman'], tables) def test_not_inside_range(self): """Ensure we can correctly filter several application ids outside the @@ -443,11 +586,11 @@ def test_not_inside_range(self): bq = client.BigQueryClient(None, 'project') tables = bq._filter_tables_by_time({ - 'John Snow': 9001, - 'Adam West': 100000000000000, - 'Glados': -1, - 'Potato': 0, - }, 1370002000, 1370000000) + 'John Snow': 9001, + 'Adam West': 100000000000000, + 'Glados': -1, + 'Potato': 0, + }, 1370002000, 1370000000) self.assertEqual([], tables) @@ -530,7 +673,6 @@ def test_not_inside_range(self): @mock.patch('bigquery.client.BigQueryClient._get_query_results') class TestGetQuerySchema(unittest.TestCase): - def test_query_complete(self, get_query_mock): """Ensure that get_query_schema works when a query is complete.""" from bigquery.client import BigQueryClient @@ -564,7 +706,6 @@ def test_query_incomplete(self, get_query_mock): class TestGetTableSchema(unittest.TestCase): - def setUp(self): self.mock_bq_service = mock.Mock() self.mock_tables = mock.Mock() @@ -608,7 +749,6 @@ def test_table_does_not_exist(self): @mock.patch('bigquery.client.BigQueryClient._get_query_results') class TestGetQueryRows(unittest.TestCase): - def test_query_complete(self, get_query_mock): """Ensure that get_query_rows works when a query is complete.""" from bigquery.client import BigQueryClient @@ -664,7 +804,6 @@ def test_query_incomplete(self, get_query_mock): class TestCheckTable(unittest.TestCase): - def setUp(self): self.mock_bq_service = mock.Mock() self.mock_tables = mock.Mock() @@ -705,7 +844,6 @@ def test_table_does_exist(self): class TestCreateTable(unittest.TestCase): - def setUp(self): self.mock_bq_service = mock.Mock() self.mock_tables = mock.Mock() @@ -758,7 +896,6 @@ def test_table_create_success(self): class TestDeleteTable(unittest.TestCase): - def setUp(self): self.mock_bq_service = mock.Mock() self.mock_tables = mock.Mock() @@ -799,7 +936,6 @@ def test_delete_table_success(self): class TestParseTableListReponse(unittest.TestCase): - def test_full_parse(self): """Ensures we can parse a full list response.""" @@ -892,7 +1028,6 @@ def test_incorrect_table_formats(self): class TestPushRows(unittest.TestCase): - def setUp(self): self.mock_bq_service = mock.Mock() self.mock_table_data = mock.Mock() @@ -981,7 +1116,6 @@ def test_push_success(self): class TestGetAllTables(unittest.TestCase): - def test_get_tables(self): """Ensure get_all_tables fetches table names from BigQuery.""" @@ -1011,7 +1145,6 @@ def test_get_tables(self): class TestGetTables(unittest.TestCase): - def test_get_tables(self): """Ensure tables falling in the time window are returned.""" @@ -1057,7 +1190,6 @@ def test_get_tables_from_datetimes(self): # Dataset tests # class TestCreateDataset(unittest.TestCase): - def setUp(self): self.mock_bq_service = mock.Mock() self.mock_datasets = mock.Mock() @@ -1092,7 +1224,7 @@ def test_dataset_create_failed(self): self.mock_datasets.insert.assert_called_once_with( projectId=self.project, body=self.body) - self.mock_datasets.insert.return_value.execute.\ + self.mock_datasets.insert.return_value.execute. \ assert_called_once_with() def test_dataset_create_success(self): @@ -1110,12 +1242,11 @@ def test_dataset_create_success(self): self.mock_datasets.insert.assert_called_once_with( projectId=self.project, body=self.body) - self.mock_datasets.insert.return_value.execute.\ + self.mock_datasets.insert.return_value.execute. \ assert_called_once_with() class TestDeleteDataset(unittest.TestCase): - def setUp(self): self.mock_bq_service = mock.Mock() self.mock_datasets = mock.Mock() @@ -1153,7 +1284,7 @@ def test_delete_datasets_success(self): self.mock_datasets.delete.assert_called_once_with( projectId=self.project, datasetId=self.dataset) - self.mock_datasets.delete.return_value.execute.\ + self.mock_datasets.delete.return_value.execute. \ assert_called_once_with() @@ -1227,7 +1358,6 @@ def test_delete_datasets_success(self): class TestGetDatasets(unittest.TestCase): - def test_get_datasets(self): """Ensure datasets are returned.""" @@ -1247,7 +1377,6 @@ def test_get_datasets(self): class TestUpdateDataset(unittest.TestCase): - def setUp(self): self.mock_bq_service = mock.Mock() self.mock_datasets = mock.Mock() @@ -1282,7 +1411,7 @@ def test_dataset_update_failed(self): self.mock_datasets.update.assert_called_once_with( projectId=self.project, datasetId=self.dataset, body=self.body) - self.mock_datasets.update.return_value.execute.\ + self.mock_datasets.update.return_value.execute. \ assert_called_once_with() def test_dataset_update_success(self): @@ -1300,6 +1429,5 @@ def test_dataset_update_success(self): self.mock_datasets.update.assert_called_once_with( projectId=self.project, datasetId=self.dataset, body=self.body) - self.mock_datasets.update.return_value.execute.\ + self.mock_datasets.update.return_value.execute. \ assert_called_once_with() - From dd1597c6897c909daf1727d0fd89b97098ad0d9a Mon Sep 17 00:00:00 2001 From: Aneil Mallavarapu Date: Sat, 9 Aug 2014 11:22:46 -0700 Subject: [PATCH 03/12] Add wait_for_job test Ensure that wait_for_job accepts a jobId. --- bigquery/client.py | 14 ++++++++++---- bigquery/tests/test_client.py | 20 +++++++++++++++++++- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/bigquery/client.py b/bigquery/client.py index 31eedf9..1d6b7e8 100644 --- a/bigquery/client.py +++ b/bigquery/client.py @@ -404,11 +404,11 @@ def import_data_from_uris( .format(e)) return None - def wait_for_job(self, job_resource, interval=5, timeout=None): + def wait_for_job(self, job, interval=5, timeout=None): """ Waits until the job indicated by job_resource is done or has failed Args: - job_resource: dict, a BigQuery job resource + job: dict, representing a BigQuery job resource or jobId interval: optional float polling interval in seconds, default = 5 timeout: optional float timeout in seconds, default = None Returns: @@ -417,8 +417,14 @@ def wait_for_job(self, job_resource, interval=5, timeout=None): Raises: standard exceptions on http / auth failures (you must retry) """ - complete = job_resource.get('jobComplete') - job_id = job_resource['jobReference']['jobId'] + if isinstance(job,dict): # job is a job resource + complete = job.get('jobComplete') + job_id = job['jobReference']['jobId'] + else: # job is the jobId + complete = False + job_id = job + job_resource = None + start_time = time() elapsed_time = 0 while not (complete or (timeout is not None and elapsed_time > timeout)): diff --git a/bigquery/tests/test_client.py b/bigquery/tests/test_client.py index 8c240c5..ab7a92c 100644 --- a/bigquery/tests/test_client.py +++ b/bigquery/tests/test_client.py @@ -411,7 +411,7 @@ def setUp(self): self.client = client.BigQueryClient(self.api_mock, self.project_id) def test_detects_completion(self): - """ Ensure we can detect completed jobs""" + """Ensure we can detect completed jobs""" return_values = [{'jobComplete': False, 'jobReference': {'jobId': "testJob"}}, @@ -430,6 +430,24 @@ def side_effect(*args, **kwargs): self.assertEqual(self.api_mock.jobs().get.call_count, 2) + def test_accepts_job_id(self): + """Ensure that a jobId argument is accepted""" + return_values = [{'jobComplete': False, + 'jobReference': {'jobId': "testJob"}}, + {'jobComplete': True, + 'jobReference': {'jobId': "testJob"}}] + + def side_effect(*args, **kwargs): + return return_values.pop(0) + + self.api_mock.jobs().get.side_effect = side_effect + + self.client.wait_for_job('jobId', + interval=.01, + timeout=None) + + self.assertEqual(self.api_mock.jobs().get.call_count, 2) + def test_respects_timeout(self): """Ensure that the wait can be timed out""" incomplete_job = {'jobComplete': False, From b0e9c387f2c7cb3390f6034327997517517a6c62 Mon Sep 17 00:00:00 2001 From: Aneil Mallavarapu Date: Sat, 9 Aug 2014 11:25:05 -0700 Subject: [PATCH 04/12] Fix flake8 issues --- bigquery/__init__.py | 3 +-- bigquery/query_builder.py | 1 - bigquery/schema_builder.py | 1 - bigquery/tests/test_client.py | 1 - bigquery/tests/test_query_builder.py | 20 +++++++------------- 5 files changed, 8 insertions(+), 18 deletions(-) diff --git a/bigquery/__init__.py b/bigquery/__init__.py index b8ad97e..82d7ad1 100644 --- a/bigquery/__init__.py +++ b/bigquery/__init__.py @@ -2,5 +2,4 @@ logging.basicConfig() logger = logging.getLogger('bigquery') -logger.setLevel(logging.DEBUG) - +logger.setLevel(logging.DEBUG) \ No newline at end of file diff --git a/bigquery/query_builder.py b/bigquery/query_builder.py index 60ec81d..baa8e18 100644 --- a/bigquery/query_builder.py +++ b/bigquery/query_builder.py @@ -252,4 +252,3 @@ def _render_groupings(fields): return "" return "GROUP BY " + ", ".join(fields) - diff --git a/bigquery/schema_builder.py b/bigquery/schema_builder.py index 9446e45..4316ca1 100644 --- a/bigquery/schema_builder.py +++ b/bigquery/schema_builder.py @@ -102,4 +102,3 @@ def bigquery_type(o, timestamp_parser=default_timestamp_parser): else: raise Exception( "Invalid object for BigQuery schema: {0} ({1}".format(o, t)) - diff --git a/bigquery/tests/test_client.py b/bigquery/tests/test_client.py index 6808eb1..29111c6 100644 --- a/bigquery/tests/test_client.py +++ b/bigquery/tests/test_client.py @@ -1302,4 +1302,3 @@ def test_dataset_update_success(self): self.mock_datasets.update.return_value.execute.\ assert_called_once_with() - diff --git a/bigquery/tests/test_query_builder.py b/bigquery/tests/test_query_builder.py index 4f209f7..01212f0 100644 --- a/bigquery/tests/test_query_builder.py +++ b/bigquery/tests/test_query_builder.py @@ -2,7 +2,6 @@ class TestRenderSelect(unittest.TestCase): - def test_multiple_selects(self): """Ensure that render select can handle multiple selects.""" from bigquery.query_builder import _render_select @@ -30,7 +29,7 @@ def test_casting(self): result = _render_select({ 'start_time': {'alias': 'TimeStamp', 'format': - 'SEC_TO_MICRO-INTEGER-FORMAT_UTC_USEC'} + 'SEC_TO_MICRO-INTEGER-FORMAT_UTC_USEC'} }) self.assertEqual( @@ -47,7 +46,6 @@ def test_no_selects(self): class TestRenderSources(unittest.TestCase): - def test_multi_tables(self): """Ensure that render sources can handle multiple sources.""" from bigquery.query_builder import _render_sources @@ -75,7 +73,6 @@ def test_no_dataset(self): class TestRenderConditions(unittest.TestCase): - def test_single_condition(self): """Ensure that render conditions can handle a single condition.""" from bigquery.query_builder \ @@ -154,7 +151,6 @@ def test_multiple_comparators(self): class TestRenderOrder(unittest.TestCase): - def test_order(self): """Ensure that render order can work under expected conditions.""" from bigquery.query_builder import _render_order @@ -173,7 +169,6 @@ def test_no_order(self): class TestGroupings(unittest.TestCase): - def test_mutliple_fields(self): """Ensure that render grouping works with multiple fields.""" from bigquery.query_builder \ @@ -194,7 +189,6 @@ def test_no_fields(self): class TestRenderQuery(unittest.TestCase): - def test_full_query(self): """Ensure that all the render query arguments work together.""" from bigquery.query_builder import render_query @@ -278,7 +272,7 @@ def test_incorrect_conditions(self): }, conditions=[ {'asdfasdfasdf': 'start_time', 'ffd': 1371566954, 'comparator': - '<=', 'type': 'INTEGER'}, + '<=', 'type': 'INTEGER'}, {'field': 'start_time', 'value': {'value': 1371556954, 'negate': False}, 'compoorattor': '>=', 'type': 'INTEGER'} @@ -526,7 +520,7 @@ def test_formatting(self): tables=['2013_06_appspot_1'], select={ 'start_time': {'alias': 'timestamp', 'format': - 'INTEGER-FORMAT_UTC_USEC'}, + 'INTEGER-FORMAT_UTC_USEC'}, 'status': {'alias': 'status'}, 'resource': {'alias': 'url'} }, @@ -561,10 +555,10 @@ def test_formatting_duplicate_columns(self): tables=['2013_06_appspot_1'], select={ 'start_time': [{'alias': 'timestamp', 'format': - 'INTEGER-FORMAT_UTC_USEC'}, + 'INTEGER-FORMAT_UTC_USEC'}, {'alias': 'day', 'format': - 'SEC_TO_MICRO-INTEGER-FORMAT_UTC_USEC-LEFT:10'} - ], + 'SEC_TO_MICRO-INTEGER-FORMAT_UTC_USEC-LEFT:10'} + ], 'status': {'alias': 'status'}, 'resource': {'alias': 'url'} }, @@ -601,7 +595,7 @@ def test_sec_to_micro_formatting(self): tables=['2013_06_appspot_1'], select={ 'start_time': {'alias': 'timestamp', 'format': - 'SEC_TO_MICRO-INTEGER-SEC_TO_TIMESTAMP'}, + 'SEC_TO_MICRO-INTEGER-SEC_TO_TIMESTAMP'}, 'status': {'alias': 'status'}, 'resource': {'alias': 'url'} }, From 1be1e0aa16c7a7d3c05994aefd3bf63409d32210 Mon Sep 17 00:00:00 2001 From: Aneil Mallavarapu Date: Sat, 9 Aug 2014 22:54:57 -0700 Subject: [PATCH 05/12] Test return type from BigQueryClient.wait_for_job is a dict --- bigquery/tests/test_client.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/bigquery/tests/test_client.py b/bigquery/tests/test_client.py index ab7a92c..407cdf9 100644 --- a/bigquery/tests/test_client.py +++ b/bigquery/tests/test_client.py @@ -423,12 +423,13 @@ def side_effect(*args, **kwargs): self.api_mock.jobs().get.side_effect = side_effect - self.client.wait_for_job({'jobComplete': False, - 'jobReference': {'jobId': "testJob"}}, - interval=.01, - timeout=None) + job_resource = self.client.wait_for_job({'jobComplete': False, + 'jobReference': {'jobId': "testJob"}}, + interval=.01, + timeout=None) self.assertEqual(self.api_mock.jobs().get.call_count, 2) + self.assertIsInstance(job_resource, dict) def test_accepts_job_id(self): """Ensure that a jobId argument is accepted""" @@ -455,9 +456,9 @@ def test_respects_timeout(self): self.api_mock.jobs().get.return_value = incomplete_job - job_resource = self.client.wait_for_job(incomplete_job, - interval=.1, - timeout=.25) + final_state, job_resource = self.client.wait_for_job(incomplete_job, + interval=.1, + timeout=.25) self.assertEqual(self.api_mock.jobs().get.call_count, 3) self.assertFalse(job_resource['jobComplete']) From c4873c05792482ad516f1f509841cd769590b8f0 Mon Sep 17 00:00:00 2001 From: Aneil Mallavarapu Date: Sun, 10 Aug 2014 17:30:26 -0700 Subject: [PATCH 06/12] Log errors when .create_dataset fails --- bigquery/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bigquery/client.py b/bigquery/client.py index d94b6b8..0ba1d46 100644 --- a/bigquery/client.py +++ b/bigquery/client.py @@ -594,8 +594,8 @@ def create_dataset(self, dataset_id, friendly_name=None, description=None, datasets.insert(projectId=self.project_id, body=dataset_data).execute() return True - except: - logger.error('Cannot create dataset %s' % dataset_id) + except Exception,e: + logger.error('Cannot create dataset %s, %s' % (dataset_id, e)) return False def get_datasets(self): From 6d8170b666d647fbb3d824b0d691c9c9180f6565 Mon Sep 17 00:00:00 2001 From: Aneil Mallavarapu Date: Tue, 12 Aug 2014 13:27:12 -0700 Subject: [PATCH 07/12] Fix source_format constants used by import_data_from_uris --- bigquery/client.py | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/bigquery/client.py b/bigquery/client.py index 4726ee0..899d2ad 100644 --- a/bigquery/client.py +++ b/bigquery/client.py @@ -18,8 +18,9 @@ JOB_DISPOSITION_CREATE_IF_NEEDED = 'CREATE_IF_NEEDED' JOB_DISPOSITION_CREATE_NEVER = 'CREATE_NEVER' -JOB_SOURCE_FORMAT_JSON = 'json' -JOB_SOURCE_FORMAT_CSV = 'csv' +JOB_SOURCE_FORMAT_NEWLINE_DELIMITED_JSON = 'NEWLINE_DELIMITED_JSON' +JOB_SOURCE_FORMAT_DATASTORE_BACKUP = 'DATASTORE_BACKUP' +JOB_SOURCE_FORMAT_CSV = 'CSV' JOB_DISPOSITION_WRITE_TRUNCATE = 'WRITE_TRUNCATE' JOB_DISPOSITION_WRITE_APPEND = 'WRITE_APPEND' JOB_DISPOSITION_WRITE_EMPTY = 'WRITE_EMPTY' @@ -336,7 +337,7 @@ def import_data_from_uris( max_bad_records=None, quote='"', skip_leading_rows=0, - source_format=JOB_SOURCE_FORMAT_JSON, + source_format=JOB_SOURCE_FORMAT_NEWLINE_DELIMITED_JSON, write_disposition=JOB_DISPOSITION_WRITE_EMPTY, encoding=JOB_ENCODING_UTF_8): """ @@ -357,7 +358,7 @@ def import_data_from_uris( max_bad_records: optional boolean default None quote: optional string '"' skip_leading_rows: optional int default 0 - source_format: optional string default 'JSON' + source_format: optional string default JOB_SOURCE_FORMAT_NEWLINE_DELIMITED_JSON write_disposition: optional string default 'WRITE_EMPTY' encoding: optional string default 'utf-8' Returns: @@ -367,9 +368,10 @@ def import_data_from_uris( else [source_uris] body = { "kind": "bigquery#job", + "projectId": self.project_id, "jobReference": { "projectId": self.project_id, - "jobId": job + "jobId": "{0}-{1}".format(job, int(time())) }, "configuration": { "load": { @@ -382,18 +384,20 @@ def import_data_from_uris( }, "createDisposition": create_disposition, "writeDisposition": write_disposition, - "fieldDelimiter": field_delimiter, - "skipLeadingRows": skip_leading_rows, "encoding": encoding, - "quote": quote, "maxBadRecords": max_bad_records, - "allowQuotedNewlines": allow_quoted_newlines, "sourceFormat": source_format, - "allowJaggedRows": allow_jagged_rows, "ignoreUnknownValues": ignore_unknown_values } } } + if source_format == JOB_SOURCE_FORMAT_CSV: + body['fieldDelimiter'] = field_delimiter + body['allowJaggedRows'] = allow_jagged_rows + body['allowQuotedNewlines'] = allow_quoted_newlines + body['quote'] = quote + body['skipLeadingRows'] = skip_leading_rows + try: job_resource = self.bigquery.jobs() \ .insert(projectId=self.project_id, body=body) \ @@ -417,10 +421,10 @@ def wait_for_job(self, job, interval=5, timeout=None): Raises: standard exceptions on http / auth failures (you must retry) """ - if isinstance(job,dict): # job is a job resource + if isinstance(job, dict): # job is a job resource complete = job.get('jobComplete') job_id = job['jobReference']['jobId'] - else: # job is the jobId + else: # job is the jobId complete = False job_id = job job_resource = None @@ -429,8 +433,8 @@ def wait_for_job(self, job, interval=5, timeout=None): elapsed_time = 0 while not (complete or (timeout is not None and elapsed_time > timeout)): sleep(interval) - job_resource = self.bigquery.jobs().get(self.project_id, job_id) - complete = job_resource.get('jobComplete') + job_resource = self.bigquery.jobs().get(projectId=self.project_id, jobId=job_id).execute() + complete = job_resource.get('status').get('state') == u'DONE' elapsed_time = time() - start_time return job_resource @@ -720,7 +724,7 @@ def create_dataset(self, dataset_id, friendly_name=None, description=None, datasets.insert(projectId=self.project_id, body=dataset_data).execute() return True - except Exception,e: + except Exception, e: logger.error('Cannot create dataset %s, %s' % (dataset_id, e)) return False From 6dbdfae69dd0b0d373cf79070ac8507884d663c3 Mon Sep 17 00:00:00 2001 From: Aneil Mallavarapu Date: Tue, 19 Aug 2014 08:34:27 -0700 Subject: [PATCH 08/12] Fix bug: BigQuery load jobs now work Got rid of the default values in import_data_from_uri. If values are not provided, they are not inserted into the load configuration. Also fixed the structure of the load configuration JSON. Still need to write tests. --- bigquery/client.py | 126 +++++++++++++++++++++++++++++++-------------- 1 file changed, 86 insertions(+), 40 deletions(-) diff --git a/bigquery/client.py b/bigquery/client.py index 899d2ad..174d485 100644 --- a/bigquery/client.py +++ b/bigquery/client.py @@ -2,6 +2,7 @@ from collections import defaultdict from datetime import datetime from time import sleep, time +from hashlib import sha256 import json from apiclient.discovery import build @@ -324,32 +325,33 @@ def get_tables(self, dataset_id, app_id, start_time, end_time): def import_data_from_uris( self, - job, source_uris, dataset, table, - schema, - allow_jagged_rows=True, - create_disposition=JOB_DISPOSITION_CREATE_IF_NEEDED, - allow_quoted_newlines=True, - ignore_unknown_values=True, - field_delimiter='\t', + job=None, + schema=None, + allow_jagged_rows=None, + create_disposition=None, + allow_quoted_newlines=None, + ignore_unknown_values=None, + field_delimiter=None, max_bad_records=None, - quote='"', - skip_leading_rows=0, - source_format=JOB_SOURCE_FORMAT_NEWLINE_DELIMITED_JSON, - write_disposition=JOB_DISPOSITION_WRITE_EMPTY, - encoding=JOB_ENCODING_UTF_8): + quote=None, + skip_leading_rows=None, + source_format=None, + write_disposition=None, + encoding=None): """ Imports data into a BigQuery table from cloud storage. Args: - job: required string identifying the job source_uris: required string or list of strings representing the uris on cloud storage of the form: gs://bucket/filename dataset: required string id of the dataset table: required string id of the table - schema: required list representing the bigquery schema + job: optional string identifying the job (a unique jobid is automatically + generated if not provided) + schema: optional list representing the bigquery schema allow_jagged_rows: optional boolean default True create_disposition: optional string default 'CREATE_IF_NEEDED' allow_quoted_newlines: optional boolean default True @@ -366,39 +368,80 @@ def import_data_from_uris( """ source_uris = source_uris if isinstance(source_uris, list) \ else [source_uris] - body = { - "kind": "bigquery#job", - "projectId": self.project_id, - "jobReference": { + + configuration = { + "destinationTable": { "projectId": self.project_id, - "jobId": "{0}-{1}".format(job, int(time())) + "tableId": table, + "datasetId": dataset }, + "sourceUris": source_uris, + } + + if max_bad_records: + configuration['maxBadRecords'] = max_bad_records + + if ignore_unknown_values: + configuration['ignoreUnknownValues'] = ignore_unknown_values + + if create_disposition: + configuration['createDisposition'] = create_disposition + + if write_disposition: + configuration['writeDisposition'] = write_disposition + + if encoding: + configuration['encoding'] = encoding + + if schema: + configuration['schema'] = schema + + if source_format: + configuration['sourceFormat'] = source_format + + if not job: + hex = sha256(":".join(source_uris) + str(time())).hexdigest() + job = "{dataset}-{table}-{digest}".format( + dataset=dataset, + table=table, + digest=hex + ) + + if source_format == JOB_SOURCE_FORMAT_CSV: + if field_delimiter: + configuration['fieldDelimiter'] = field_delimiter + + if allow_jagged_rows: + configuration['allowJaggedRows'] = allow_jagged_rows + + if allow_quoted_newlines: + configuration['allowQuotedNewlines'] = allow_quoted_newlines + + if quote: + configuration['quote'] = quote + + if skip_leading_rows: + configuration['skipLeadingRows'] = skip_leading_rows + + elif field_delimiter or allow_jagged_rows or allow_quoted_newlines or quote or skip_leading_rows: + non_null_values = dict((k,v) for k,v in dict(field_delimiter=field_delimiter,allow_jagged_rows=allow_jagged_rows, + allow_quoted_newlines=allow_quoted_newlines,skip_leading_rows=skip_leading_rows, + quote=quote) if v) + raise Exception("Parameters field_delimiter, allow_jagged_rows, allow_quoted_newlines, quote \ + and skip_leading_rows are only allowed when source_format=JOB_SOURCE_FORMAT_CSV: %s" % non_null_values) + + body = { "configuration": { - "load": { - "sourceUris": source_uris, - "schema": schema, - "destinationTable": { - "projectId": self.project_id, - "datasetId": dataset, - "tableId": table - }, - "createDisposition": create_disposition, - "writeDisposition": write_disposition, - "encoding": encoding, - "maxBadRecords": max_bad_records, - "sourceFormat": source_format, - "ignoreUnknownValues": ignore_unknown_values - } + 'load': configuration + }, + "jobReference": { + "projectId": self.project_id, + "jobId": job } } - if source_format == JOB_SOURCE_FORMAT_CSV: - body['fieldDelimiter'] = field_delimiter - body['allowJaggedRows'] = allow_jagged_rows - body['allowQuotedNewlines'] = allow_quoted_newlines - body['quote'] = quote - body['skipLeadingRows'] = skip_leading_rows try: + logger.info("Creating load job %s" % body) job_resource = self.bigquery.jobs() \ .insert(projectId=self.project_id, body=body) \ .execute() @@ -434,6 +477,9 @@ def wait_for_job(self, job, interval=5, timeout=None): while not (complete or (timeout is not None and elapsed_time > timeout)): sleep(interval) job_resource = self.bigquery.jobs().get(projectId=self.project_id, jobId=job_id).execute() + error = job_resource.get('error') + if error: + raise Exception("{message} ({code}). Errors: {errors}", **error) complete = job_resource.get('status').get('state') == u'DONE' elapsed_time = time() - start_time From a4c45c22b4ddf93e1a5efcaf11ae321b1c6e0298 Mon Sep 17 00:00:00 2001 From: Aneil Mallavarapu Date: Tue, 26 Aug 2014 00:21:05 -0700 Subject: [PATCH 09/12] Update tests to be consistent with BigQuery results Previous commits fixed the client after testing directly with BigQuery. This commit fixes the tests to reflect the actual return values from BigQuery, and adds tests for parameters to import_data_from_uris, some of which are not permitted when source_format is not "CSV". --- bigquery/client.py | 14 ++- bigquery/tests/test_client.py | 201 ++++++++++++++++++++++++---------- 2 files changed, 155 insertions(+), 60 deletions(-) diff --git a/bigquery/client.py b/bigquery/client.py index 174d485..c2bb3ec 100644 --- a/bigquery/client.py +++ b/bigquery/client.py @@ -328,8 +328,8 @@ def import_data_from_uris( source_uris, dataset, table, - job=None, schema=None, + job=None, allow_jagged_rows=None, create_disposition=None, allow_quoted_newlines=None, @@ -424,9 +424,12 @@ def import_data_from_uris( configuration['skipLeadingRows'] = skip_leading_rows elif field_delimiter or allow_jagged_rows or allow_quoted_newlines or quote or skip_leading_rows: - non_null_values = dict((k,v) for k,v in dict(field_delimiter=field_delimiter,allow_jagged_rows=allow_jagged_rows, - allow_quoted_newlines=allow_quoted_newlines,skip_leading_rows=skip_leading_rows, - quote=quote) if v) + non_null_values = dict((k, v) for k, v in dict(field_delimiter=field_delimiter, + allow_jagged_rows=allow_jagged_rows, + allow_quoted_newlines=allow_quoted_newlines, + skip_leading_rows=skip_leading_rows, + quote=quote).items() + if v) raise Exception("Parameters field_delimiter, allow_jagged_rows, allow_quoted_newlines, quote \ and skip_leading_rows are only allowed when source_format=JOB_SOURCE_FORMAT_CSV: %s" % non_null_values) @@ -476,7 +479,8 @@ def wait_for_job(self, job, interval=5, timeout=None): elapsed_time = 0 while not (complete or (timeout is not None and elapsed_time > timeout)): sleep(interval) - job_resource = self.bigquery.jobs().get(projectId=self.project_id, jobId=job_id).execute() + request = self.bigquery.jobs().get(projectId=self.project_id, jobId=job_id) + job_resource = request.execute() error = job_resource.get('error') if error: raise Exception("{message} ({code}). Errors: {errors}", **error) diff --git a/bigquery/tests/test_client.py b/bigquery/tests/test_client.py index 407cdf9..fca8488 100644 --- a/bigquery/tests/test_client.py +++ b/bigquery/tests/test_client.py @@ -1,6 +1,7 @@ import unittest import mock +from nose.tools import raises from bigquery import client @@ -413,75 +414,93 @@ def setUp(self): def test_detects_completion(self): """Ensure we can detect completed jobs""" - return_values = [{'jobComplete': False, + return_values = [{'status': {'state': u'RUNNING'}, 'jobReference': {'jobId': "testJob"}}, - {'jobComplete': True, + {'status': {'state': u'DONE'}, 'jobReference': {'jobId': "testJob"}}] def side_effect(*args, **kwargs): return return_values.pop(0) - self.api_mock.jobs().get.side_effect = side_effect + self.api_mock.jobs().get().execute.side_effect = side_effect job_resource = self.client.wait_for_job({'jobComplete': False, 'jobReference': {'jobId': "testJob"}}, interval=.01, timeout=None) - self.assertEqual(self.api_mock.jobs().get.call_count, 2) + self.assertEqual(self.api_mock.jobs().get().execute.call_count, 2) self.assertIsInstance(job_resource, dict) def test_accepts_job_id(self): """Ensure that a jobId argument is accepted""" - return_values = [{'jobComplete': False, + return_values = [{'status': {'state': u'RUNNING'}, 'jobReference': {'jobId': "testJob"}}, - {'jobComplete': True, + {'status': {'state': u'DONE'}, 'jobReference': {'jobId': "testJob"}}] def side_effect(*args, **kwargs): return return_values.pop(0) - self.api_mock.jobs().get.side_effect = side_effect + self.api_mock.jobs().get('status').execute.side_effect = side_effect self.client.wait_for_job('jobId', interval=.01, timeout=None) - self.assertEqual(self.api_mock.jobs().get.call_count, 2) + self.assertEqual(self.api_mock.jobs().get().execute.call_count, 2) def test_respects_timeout(self): """Ensure that the wait can be timed out""" - incomplete_job = {'jobComplete': False, + incomplete_job = {'status': {'state': u'RUNNING'}, 'jobReference': {'jobId': "testJob"}} - self.api_mock.jobs().get.return_value = incomplete_job + self.api_mock.jobs().get().execute.return_value = incomplete_job - final_state, job_resource = self.client.wait_for_job(incomplete_job, - interval=.1, - timeout=.25) + job_resource = self.client.wait_for_job(incomplete_job, + interval=.1, + timeout=.25) - self.assertEqual(self.api_mock.jobs().get.call_count, 3) - self.assertFalse(job_resource['jobComplete']) + self.assertEqual(self.api_mock.jobs().get().execute.call_count, 3) + self.assertEqual(job_resource['status']['state'], u'RUNNING') def test_respects_zero_timeout(self): """Ensure that the wait times out even if timeout is zero""" - incomplete_job = {'jobComplete': False, + incomplete_job = {'status': {'state': u'RUNNING'}, 'jobReference': {'jobId': "testJob"}} - self.api_mock.jobs().get.return_value = incomplete_job + self.api_mock.jobs().get().execute.return_value = incomplete_job job_resource = self.client.wait_for_job(incomplete_job, interval=.01, timeout=0) - self.assertEqual(self.api_mock.jobs().get.call_count, 1) - self.assertFalse(job_resource['jobComplete']) + self.assertEqual(self.api_mock.jobs().get().execute.call_count, 1) + self.assertEqual(job_resource['status']['state'], u'RUNNING') class TestImportDataFromURIs(unittest.TestCase): def setUp(self): - self.body = { - "kind": "bigquery#job", + pass + + def test_csv_job_body_constructed_correctly(self): + mock_api = mock.Mock() + bq = client.BigQueryClient(mock_api, "project") + bq.import_data_from_uris(["sourceuri"], "dataset", "table", ["schema"], + job="job", + create_disposition="a", + write_disposition="b", + field_delimiter="c", + skip_leading_rows="d", + encoding="e", + quote="f", + max_bad_records="g", + allow_quoted_newlines="h", + source_format="CSV", + allow_jagged_rows="j", + ignore_unknown_values="k") + + body = { "jobReference": { "projectId": "project", "jobId": "job" @@ -503,58 +522,130 @@ def setUp(self): "quote": "f", "maxBadRecords": "g", "allowQuotedNewlines": "h", - "sourceFormat": "i", + "sourceFormat": "CSV", "allowJaggedRows": "j", "ignoreUnknownValues": "k" } } } - def test_job_body_constructed_correctly(self): + mock_api.jobs().insert.assert_called_once_with(projectId="project", + body=body) + mock_api.jobs().insert(projectId="project", body=body) \ + .execute.called_once_with() + + def test_json_job_body_constructed_correctly(self): mock_api = mock.Mock() bq = client.BigQueryClient(mock_api, "project") - bq.import_data_from_uris("job", ["sourceuri"], "dataset", "table", ["schema"], - create_disposition="a", - write_disposition="b", - field_delimiter="c", - skip_leading_rows="d", - encoding="e", - quote="f", - max_bad_records="g", - allow_quoted_newlines="h", - source_format="i", - allow_jagged_rows="j", - ignore_unknown_values="k") + bq.import_data_from_uris(["sourceuri"], "dataset", "table", ["schema"], + job="job", + source_format="JSON") + + body = { + "jobReference": { + "projectId": "project", + "jobId": "job" + }, + "configuration": { + "load": { + "sourceUris": ["sourceuri"], + "schema": ["schema"], + "destinationTable": { + "projectId": "project", + "datasetId": "dataset", + "tableId": "table" + }, + "sourceFormat": "JSON" + } + } + } mock_api.jobs().insert.assert_called_once_with(projectId="project", - body=self.body) - mock_api.jobs().insert(projectId="project", body=self.body) \ + body=body) + mock_api.jobs().insert(projectId="project", body=body) \ .execute.called_once_with() + @raises(Exception) + def test_field_delimiter_exception_if_not_csv(self): + """Raise exception if source_format is not csv, but csv-only parameter is set""" + mock_api = mock.Mock() + bq = client.BigQueryClient(mock_api, "project") + bq.import_data_from_uris(["sourceuri"], "dataset", "table", ["schema"], + job="job", + source_format="JSON", + field_delimiter=",") + + @raises(Exception) + def test_allow_jagged_rows_exception_if_not_csv(self): + """Raise exception if source_format is not csv, but csv-only parameter is set""" + mock_api = mock.Mock() + bq = client.BigQueryClient(mock_api, "project") + bq.import_data_from_uris(["sourceuri"], "dataset", "table", ["schema"], + job="job", + source_format="JSON", + allow_jagged_rows=True) + + @raises(Exception) + def test_allow_quoted_newlines_exception_if_not_csv(self): + """Raise exception if source_format is not csv, but csv-only parameter is set""" + mock_api = mock.Mock() + bq = client.BigQueryClient(mock_api, "project") + bq.import_data_from_uris(["sourceuri"], "dataset", "table", ["schema"], + job="job", + source_format="JSON", + allow_quoted_newlines=True) + + @raises(Exception) + def test_quote_exception_if_not_csv(self): + """Raise exception if source_format is not csv, but csv-only parameter is set""" + mock_api = mock.Mock() + bq = client.BigQueryClient(mock_api, "project") + bq.import_data_from_uris(["sourceuri"], "dataset", "table", ["schema"], + job="job", + source_format="JSON", + quote="'") + + @raises(Exception) + def test_skip_leading_rows_exception_if_not_csv(self): + """Raise exception if source_format is not csv, but csv-only parameter is set""" + mock_api = mock.Mock() + bq = client.BigQueryClient(mock_api, "project") + bq.import_data_from_uris(["sourceuri"], "dataset", "table", ["schema"], + "job", + source_format="JSON", + skip_leading_rows=10) + def test_accepts_single_source_uri(self): """Ensure that a source_uri accepts a non-list""" mock_api = mock.Mock() bq = client.BigQueryClient(mock_api, "project") - bq.import_data_from_uris("job", - "sourceuri", # not a list! + bq.import_data_from_uris("sourceuri", # not a list! "dataset", "table", - ["schema"], - create_disposition="a", - write_disposition="b", - field_delimiter="c", - skip_leading_rows="d", - encoding="e", - quote="f", - max_bad_records="g", - allow_quoted_newlines="h", - source_format="i", - allow_jagged_rows="j", - ignore_unknown_values="k") + schema=["schema"], + job="job") + + body = { + "jobReference": { + "projectId": "project", + "jobId": "job" + }, + "configuration": { + "load": { + "sourceUris": ["sourceuri"], + "schema": ["schema"], + "destinationTable": { + "projectId": "project", + "datasetId": "dataset", + "tableId": "table" + } + } + } + } mock_api.jobs().insert.assert_called_once_with(projectId="project", - body=self.body) - mock_api.jobs().insert(projectId="project", body=self.body) \ + body=body) + mock_api.jobs().insert(projectId="project", body=body) \ .execute.called_once_with() def test_swallows_exception(self): @@ -563,8 +654,8 @@ def test_swallows_exception(self): mock_api.jobs().insert.side_effect = Exception bq = client.BigQueryClient(mock_api, "project") - result = bq.import_data_from_uris("job", - "sourceuri", # not a list! + result = bq.import_data_from_uris("sourceuri", # not a list! + "job", "dataset", "table", ["schema"]) From 0b56bc8b68152f911e2b61a51efc21324789e314 Mon Sep 17 00:00:00 2001 From: Aneil Mallavarapu Date: Tue, 26 Aug 2014 00:21:59 -0700 Subject: [PATCH 10/12] Fix import_data_from_uris documentation --- README.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index da274b5..762917d 100644 --- a/README.md +++ b/README.md @@ -142,14 +142,13 @@ inserted = client.push_rows('dataset', 'table', rows, 'id') # Import data from Google cloud storage ```python schema = [ {"name": "username", "type": "string", "mode": "nullable"} ] -job = client.import_data_from_uris('myjob', - ['gs://mybucket/mydata.json'], +job = client.import_data_from_uris( ['gs://mybucket/mydata.json'], 'dataset', 'table', schema, source_format=JOB_SOURCE_FORMAT_JSON) -job = client.wait_for_job(job, timeout=60) # or client.wait_for_job('myjob') +job = client.wait_for_job(job, timeout=60) print(job) ``` From 4c8243625752f796c8b528cb461b4e530548f66b Mon Sep 17 00:00:00 2001 From: Aneil Mallavarapu Date: Tue, 26 Aug 2014 00:36:06 -0700 Subject: [PATCH 11/12] Fix flake8 violations --- bigquery/__init__.py | 2 +- bigquery/client.py | 38 ++++++++++++++++++++++------------- bigquery/tests/test_client.py | 23 +++++++++++---------- 3 files changed, 37 insertions(+), 26 deletions(-) diff --git a/bigquery/__init__.py b/bigquery/__init__.py index 82d7ad1..8102877 100644 --- a/bigquery/__init__.py +++ b/bigquery/__init__.py @@ -2,4 +2,4 @@ logging.basicConfig() logger = logging.getLogger('bigquery') -logger.setLevel(logging.DEBUG) \ No newline at end of file +logger.setLevel(logging.DEBUG) diff --git a/bigquery/client.py b/bigquery/client.py index c2bb3ec..6aa25d2 100644 --- a/bigquery/client.py +++ b/bigquery/client.py @@ -349,8 +349,8 @@ def import_data_from_uris( gs://bucket/filename dataset: required string id of the dataset table: required string id of the table - job: optional string identifying the job (a unique jobid is automatically - generated if not provided) + job: optional string identifying the job (a unique jobid + is automatically generated if not provided) schema: optional list representing the bigquery schema allow_jagged_rows: optional boolean default True create_disposition: optional string default 'CREATE_IF_NEEDED' @@ -360,7 +360,8 @@ def import_data_from_uris( max_bad_records: optional boolean default None quote: optional string '"' skip_leading_rows: optional int default 0 - source_format: optional string default JOB_SOURCE_FORMAT_NEWLINE_DELIMITED_JSON + source_format: optional string + default JOB_SOURCE_FORMAT_NEWLINE_DELIMITED_JSON write_disposition: optional string default 'WRITE_EMPTY' encoding: optional string default 'utf-8' Returns: @@ -423,15 +424,21 @@ def import_data_from_uris( if skip_leading_rows: configuration['skipLeadingRows'] = skip_leading_rows - elif field_delimiter or allow_jagged_rows or allow_quoted_newlines or quote or skip_leading_rows: - non_null_values = dict((k, v) for k, v in dict(field_delimiter=field_delimiter, - allow_jagged_rows=allow_jagged_rows, - allow_quoted_newlines=allow_quoted_newlines, - skip_leading_rows=skip_leading_rows, - quote=quote).items() + elif field_delimiter or allow_jagged_rows \ + or allow_quoted_newlines or quote or skip_leading_rows: + all_values = dict(field_delimiter=field_delimiter, + allow_jagged_rows=allow_jagged_rows, + allow_quoted_newlines=allow_quoted_newlines, + skip_leading_rows=skip_leading_rows, + quote=quote) + non_null_values = dict((k, v) for k, v + in all_values.items() if v) - raise Exception("Parameters field_delimiter, allow_jagged_rows, allow_quoted_newlines, quote \ - and skip_leading_rows are only allowed when source_format=JOB_SOURCE_FORMAT_CSV: %s" % non_null_values) + raise Exception("Parameters field_delimiter, allow_jagged_rows, " + "allow_quoted_newlines, quote and " + "skip_leading_rows are only allowed when " + "source_format=JOB_SOURCE_FORMAT_CSV: %s" + % non_null_values) body = { "configuration": { @@ -477,13 +484,16 @@ def wait_for_job(self, job, interval=5, timeout=None): start_time = time() elapsed_time = 0 - while not (complete or (timeout is not None and elapsed_time > timeout)): + while not (complete + or (timeout is not None and elapsed_time > timeout)): sleep(interval) - request = self.bigquery.jobs().get(projectId=self.project_id, jobId=job_id) + request = self.bigquery.jobs().get(projectId=self.project_id, + jobId=job_id) job_resource = request.execute() error = job_resource.get('error') if error: - raise Exception("{message} ({code}). Errors: {errors}", **error) + raise Exception("{message} ({code}). Errors: {errors}", + **error) complete = job_resource.get('status').get('state') == u'DONE' elapsed_time = time() - start_time diff --git a/bigquery/tests/test_client.py b/bigquery/tests/test_client.py index fca8488..86546e9 100644 --- a/bigquery/tests/test_client.py +++ b/bigquery/tests/test_client.py @@ -167,8 +167,8 @@ def test_query_timeout(self): self.mock_job_collection.query.assert_called_once_with( projectId=self.project_id, - body={'query': self.query, 'timeoutMs': timeout * 1000, 'dryRun': - False, 'maxResults': None} + body={'query': self.query, 'timeoutMs': timeout * 1000, + 'dryRun': False, 'maxResults': None} ) self.assertEquals(job_id, 'spiderman') self.assertEquals(results, []) @@ -424,10 +424,11 @@ def side_effect(*args, **kwargs): self.api_mock.jobs().get().execute.side_effect = side_effect - job_resource = self.client.wait_for_job({'jobComplete': False, - 'jobReference': {'jobId': "testJob"}}, - interval=.01, - timeout=None) + job_resource = self.client.wait_for_job( + {'jobComplete': False, + 'jobReference': {'jobId': "testJob"}}, + interval=.01, + timeout=None) self.assertEqual(self.api_mock.jobs().get().execute.call_count, 2) self.assertIsInstance(job_resource, dict) @@ -567,7 +568,7 @@ def test_json_job_body_constructed_correctly(self): @raises(Exception) def test_field_delimiter_exception_if_not_csv(self): - """Raise exception if source_format is not csv, but csv-only parameter is set""" + """Raise exception if csv-only parameter is set inappropriately""" mock_api = mock.Mock() bq = client.BigQueryClient(mock_api, "project") bq.import_data_from_uris(["sourceuri"], "dataset", "table", ["schema"], @@ -577,7 +578,7 @@ def test_field_delimiter_exception_if_not_csv(self): @raises(Exception) def test_allow_jagged_rows_exception_if_not_csv(self): - """Raise exception if source_format is not csv, but csv-only parameter is set""" + """Raise exception if csv-only parameter is set inappropriately""" mock_api = mock.Mock() bq = client.BigQueryClient(mock_api, "project") bq.import_data_from_uris(["sourceuri"], "dataset", "table", ["schema"], @@ -587,7 +588,7 @@ def test_allow_jagged_rows_exception_if_not_csv(self): @raises(Exception) def test_allow_quoted_newlines_exception_if_not_csv(self): - """Raise exception if source_format is not csv, but csv-only parameter is set""" + """Raise exception if csv-only parameter is set inappropriately""" mock_api = mock.Mock() bq = client.BigQueryClient(mock_api, "project") bq.import_data_from_uris(["sourceuri"], "dataset", "table", ["schema"], @@ -597,7 +598,7 @@ def test_allow_quoted_newlines_exception_if_not_csv(self): @raises(Exception) def test_quote_exception_if_not_csv(self): - """Raise exception if source_format is not csv, but csv-only parameter is set""" + """Raise exception if csv-only parameter is set inappropriately""" mock_api = mock.Mock() bq = client.BigQueryClient(mock_api, "project") bq.import_data_from_uris(["sourceuri"], "dataset", "table", ["schema"], @@ -607,7 +608,7 @@ def test_quote_exception_if_not_csv(self): @raises(Exception) def test_skip_leading_rows_exception_if_not_csv(self): - """Raise exception if source_format is not csv, but csv-only parameter is set""" + """Raise exception if csv-only parameter is set inappropriately""" mock_api = mock.Mock() bq = client.BigQueryClient(mock_api, "project") bq.import_data_from_uris(["sourceuri"], "dataset", "table", ["schema"], From 2fc643aebb2a494720ee3421bb1b0f7d7fe1258d Mon Sep 17 00:00:00 2001 From: Aneil Mallavarapu Date: Tue, 26 Aug 2014 10:07:43 -0700 Subject: [PATCH 12/12] Fix import_data_from_uris docstring and parameter order Optional parameters are not injected into the job configuration, default values are given by BigQuery. The docstring now correctly explains this. Also shortened some constants. --- bigquery/client.py | 53 +++++++++++++++++++++++++++------------------- 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/bigquery/client.py b/bigquery/client.py index 6aa25d2..7a1d8f3 100644 --- a/bigquery/client.py +++ b/bigquery/client.py @@ -17,14 +17,14 @@ BIGQUERY_SCOPE = 'https://www.googleapis.com/auth/bigquery' BIGQUERY_SCOPE_READ_ONLY = 'https://www.googleapis.com/auth/bigquery.readonly' -JOB_DISPOSITION_CREATE_IF_NEEDED = 'CREATE_IF_NEEDED' -JOB_DISPOSITION_CREATE_NEVER = 'CREATE_NEVER' +JOB_CREATE_IF_NEEDED = 'CREATE_IF_NEEDED' +JOB_CREATE_NEVER = 'CREATE_NEVER' JOB_SOURCE_FORMAT_NEWLINE_DELIMITED_JSON = 'NEWLINE_DELIMITED_JSON' JOB_SOURCE_FORMAT_DATASTORE_BACKUP = 'DATASTORE_BACKUP' JOB_SOURCE_FORMAT_CSV = 'CSV' -JOB_DISPOSITION_WRITE_TRUNCATE = 'WRITE_TRUNCATE' -JOB_DISPOSITION_WRITE_APPEND = 'WRITE_APPEND' -JOB_DISPOSITION_WRITE_EMPTY = 'WRITE_EMPTY' +JOB_WRITE_TRUNCATE = 'WRITE_TRUNCATE' +JOB_WRITE_APPEND = 'WRITE_APPEND' +JOB_WRITE_EMPTY = 'WRITE_EMPTY' JOB_ENCODING_UTF_8 = 'UTF-8' JOB_ENCODING_ISO_8859_1 = 'ISO-8859-1' @@ -330,17 +330,18 @@ def import_data_from_uris( table, schema=None, job=None, - allow_jagged_rows=None, + source_format=None, create_disposition=None, - allow_quoted_newlines=None, + write_disposition=None, + encoding=None, ignore_unknown_values=None, - field_delimiter=None, max_bad_records=None, + allow_jagged_rows=None, + allow_quoted_newlines=None, + field_delimiter=None, quote=None, skip_leading_rows=None, - source_format=None, - write_disposition=None, - encoding=None): + ): """ Imports data into a BigQuery table from cloud storage. Args: @@ -352,18 +353,26 @@ def import_data_from_uris( job: optional string identifying the job (a unique jobid is automatically generated if not provided) schema: optional list representing the bigquery schema - allow_jagged_rows: optional boolean default True - create_disposition: optional string default 'CREATE_IF_NEEDED' - allow_quoted_newlines: optional boolean default True - ignore_unknown_values: optional boolean default True - field_delimiter: optional string default ',' for csv files - max_bad_records: optional boolean default None - quote: optional string '"' - skip_leading_rows: optional int default 0 source_format: optional string - default JOB_SOURCE_FORMAT_NEWLINE_DELIMITED_JSON - write_disposition: optional string default 'WRITE_EMPTY' - encoding: optional string default 'utf-8' + (one of the JOB_SOURCE_FORMAT_* constants) + create_disposition: optional string + (one of the JOB_CREATE_* constants) + write_disposition: optional string + (one of the JOB_WRITE_* constants) + encoding: optional string default + (one of the JOB_ENCODING_* constants) + ignore_unknown_values: optional boolean + max_bad_records: optional boolean + allow_jagged_rows: optional boolean for csv only + allow_quoted_newlines: optional boolean for csv only + field_delimiter: optional string for csv only + quote: optional string the quote character for csv only + skip_leading_rows: optional int for csv only + + Optional arguments with value None are determined by + BigQuery as described: + https://developers.google.com/bigquery/docs/reference/v2/jobs + Returns: dict, a BigQuery job resource or None on failure """