From 2b83dfd9734de78d8d652140383efc0192875065 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Fri, 21 Jul 2017 14:41:13 -0700 Subject: [PATCH] Switched to google-resumable-media in BigQuery. (#3555) * Switched to google-resumable-media in BigQuery. * Upgrading google-resumable-media dependency to 0.2.1. --- bigquery/google/cloud/bigquery/table.py | 432 ++++++++---- bigquery/nox.py | 33 +- bigquery/setup.py | 3 + bigquery/tests/unit/test_table.py | 872 ++++++++++++++---------- storage/google/cloud/storage/blob.py | 1 + storage/setup.py | 2 +- storage/tests/unit/test_blob.py | 8 +- 7 files changed, 867 insertions(+), 484 deletions(-) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 7e21e35d1fb09..f7752bb8fc364 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -15,22 +15,21 @@ """Define API Datasets.""" import datetime -import json import os import httplib2 import six +import google.auth.transport.requests +from google import resumable_media +from google.resumable_media.requests import MultipartUpload +from google.resumable_media.requests import ResumableUpload + from google.cloud._helpers import _datetime_from_microseconds from google.cloud._helpers import _millis_from_datetime from google.cloud.exceptions import NotFound from google.cloud.exceptions import make_exception from google.cloud.iterator import HTTPIterator -from google.cloud.streaming.exceptions import HttpError -from google.cloud.streaming.http_wrapper import Request -from google.cloud.streaming.http_wrapper import make_api_request -from google.cloud.streaming.transfer import RESUMABLE_UPLOAD -from google.cloud.streaming.transfer import Upload from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery._helpers import _item_to_row from google.cloud.bigquery._helpers import _rows_page_start @@ -39,6 +38,17 @@ _TABLE_HAS_NO_SCHEMA = "Table has no schema: call 'table.reload()'" _MARKER = object() +_DEFAULT_CHUNKSIZE = 1048576 # 1024 * 1024 B = 1 MB +_BASE_UPLOAD_TEMPLATE = ( + u'https://www.googleapis.com/upload/bigquery/v2/projects/' + u'{project}/jobs?uploadType=') +_MULTIPART_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + u'multipart' +_RESUMABLE_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + u'resumable' +_GENERIC_CONTENT_TYPE = u'*/*' +_READ_LESS_THAN_SIZE = ( + 'Size {:d} was specified but the file-like object only had ' + '{:d} bytes remaining.') +_DEFAULT_NUM_RETRIES = 6 class Table(object): @@ -815,15 +825,177 @@ def insert_data(self, return errors - @staticmethod - def _check_response_error(request, http_response): - """Helper for :meth:`upload_from_file`.""" - info = http_response.info - status = int(info['status']) - if not 200 <= status < 300: - faux_response = httplib2.Response({'status': status}) - raise make_exception(faux_response, http_response.content, - error_info=request.url) + def _make_transport(self, client): + """Make an authenticated transport with a client's credentials. + + :type client: :class:`~google.cloud.bigquery.client.Client` + :param client: The client to use. + + :rtype transport: + :class:`~google.auth.transport.requests.AuthorizedSession` + :returns: The transport (with credentials) that will + make authenticated requests. + """ + # Create a ``requests`` transport with the client's credentials. + transport = google.auth.transport.requests.AuthorizedSession( + client._credentials) + return transport + + def _initiate_resumable_upload(self, client, stream, + metadata, num_retries): + """Initiate a resumable upload. + + :type client: :class:`~google.cloud.bigquery.client.Client` + :param client: The client to use. + + :type stream: IO[bytes] + :param stream: A bytes IO object open for reading. + + :type metadata: dict + :param metadata: The metadata associated with the upload. + + :type num_retries: int + :param num_retries: Number of upload retries. (Deprecated: This + argument will be removed in a future release.) + + :rtype: tuple + :returns: + Pair of + + * The :class:`~google.resumable_media.requests.ResumableUpload` + that was created + * The ``transport`` used to initiate the upload. + """ + chunk_size = _DEFAULT_CHUNKSIZE + transport = self._make_transport(client) + headers = _get_upload_headers(client._connection.USER_AGENT) + upload_url = _RESUMABLE_URL_TEMPLATE.format(project=self.project) + upload = ResumableUpload(upload_url, chunk_size, headers=headers) + + if num_retries is not None: + upload._retry_strategy = resumable_media.RetryStrategy( + max_retries=num_retries) + + upload.initiate( + transport, stream, metadata, _GENERIC_CONTENT_TYPE, + stream_final=False) + + return upload, transport + + def _do_resumable_upload(self, client, stream, metadata, num_retries): + """Perform a resumable upload. + + :type client: :class:`~google.cloud.bigquery.client.Client` + :param client: The client to use. + + :type stream: IO[bytes] + :param stream: A bytes IO object open for reading. + + :type metadata: dict + :param metadata: The metadata associated with the upload. + + :type num_retries: int + :param num_retries: Number of upload retries. (Deprecated: This + argument will be removed in a future release.) + + :rtype: :class:`~requests.Response` + :returns: The "200 OK" response object returned after the final chunk + is uploaded. + """ + upload, transport = self._initiate_resumable_upload( + client, stream, metadata, num_retries) + + while not upload.finished: + response = upload.transmit_next_chunk(transport) + + return response + + def _do_multipart_upload(self, client, stream, metadata, + size, num_retries): + """Perform a multipart upload. + + :type client: :class:`~google.cloud.bigquery.client.Client` + :param client: The client to use. + + :type stream: IO[bytes] + :param stream: A bytes IO object open for reading. + + :type metadata: dict + :param metadata: The metadata associated with the upload. + + :type size: int + :param size: The number of bytes to be uploaded (which will be read + from ``stream``). If not provided, the upload will be + concluded once ``stream`` is exhausted (or :data:`None`). + + :type num_retries: int + :param num_retries: Number of upload retries. (Deprecated: This + argument will be removed in a future release.) + + :rtype: :class:`~requests.Response` + :returns: The "200 OK" response object returned after the multipart + upload request. + :raises: :exc:`ValueError` if the ``stream`` has fewer than ``size`` + bytes remaining. + """ + data = stream.read(size) + if len(data) < size: + msg = _READ_LESS_THAN_SIZE.format(size, len(data)) + raise ValueError(msg) + + transport = self._make_transport(client) + headers = _get_upload_headers(client._connection.USER_AGENT) + + upload_url = _MULTIPART_URL_TEMPLATE.format(project=self.project) + upload = MultipartUpload(upload_url, headers=headers) + + if num_retries is not None: + upload._retry_strategy = resumable_media.RetryStrategy( + max_retries=num_retries) + + response = upload.transmit( + transport, data, metadata, _GENERIC_CONTENT_TYPE) + + return response + + def _do_upload(self, client, stream, metadata, size, num_retries): + """Determine an upload strategy and then perform the upload. + + If ``size`` is :data:`None`, then a resumable upload will be used, + otherwise the content and the metadata will be uploaded + in a single multipart upload request. + + :type client: :class:`~google.cloud.bigquery.client.Client` + :param client: The client to use. + + :type stream: IO[bytes] + :param stream: A bytes IO object open for reading. + + :type metadata: dict + :param metadata: The metadata associated with the upload. + + :type size: int + :param size: The number of bytes to be uploaded (which will be read + from ``stream``). If not provided, the upload will be + concluded once ``stream`` is exhausted (or :data:`None`). + + :type num_retries: int + :param num_retries: Number of upload retries. (Deprecated: This + argument will be removed in a future release.) + + :rtype: dict + :returns: The parsed JSON from the "200 OK" response. This will be the + **only** response in the multipart case and it will be the + **final** response in the resumable case. + """ + if size is None: + response = self._do_resumable_upload( + client, stream, metadata, num_retries) + else: + response = self._do_multipart_upload( + client, stream, metadata, size, num_retries) + + return response.json() # pylint: disable=too-many-arguments,too-many-locals def upload_from_file(self, @@ -831,7 +1003,7 @@ def upload_from_file(self, source_format, rewind=False, size=None, - num_retries=6, + num_retries=_DEFAULT_NUM_RETRIES, allow_jagged_rows=None, allow_quoted_newlines=None, create_disposition=None, @@ -846,10 +1018,6 @@ def upload_from_file(self, job_name=None): """Upload the contents of this table from a file-like object. - The content type of the upload will either be - - The value passed in to the function (if any) - - ``text/csv``. - :type file_obj: file :param file_obj: A file handle opened in binary mode for reading. @@ -860,7 +1028,7 @@ def upload_from_file(self, :type rewind: bool :param rewind: If True, seek to the beginning of the file handle before - writing the file to Cloud Storage. + writing the file. :type size: int :param size: The number of bytes to read from the file handle. @@ -911,16 +1079,16 @@ def upload_from_file(self, :param write_disposition: job configuration option; see :meth:`google.cloud.bigquery.job.LoadJob`. - :type client: :class:`~google.cloud.storage.client.Client` or - ``NoneType`` - :param client: Optional. The client to use. If not passed, falls back - to the ``client`` stored on the current dataset. + :type client: :class:`~google.cloud.bigquery.client.Client` + :param client: (Optional) The client to use. If not passed, falls back + to the ``client`` stored on the current table. :type job_name: str :param job_name: Optional. The id of the job. Generated if not explicitly passed in. - :rtype: :class:`google.cloud.bigquery.jobs.LoadTableFromStorageJob` + :rtype: :class:`~google.cloud.bigquery.jobs.LoadTableFromStorageJob` + :returns: the job instance used to load the data (e.g., for querying status). Note that the job is already started: do not call ``job.begin()``. @@ -929,54 +1097,10 @@ def upload_from_file(self, a file opened in text mode. """ client = self._require_client(client) - connection = client._connection - content_type = 'application/octet-stream' - - # Rewind the file if desired. - if rewind: - file_obj.seek(0, os.SEEK_SET) - - mode = getattr(file_obj, 'mode', None) - - if mode is not None and mode not in ('rb', 'r+b', 'rb+'): - raise ValueError( - "Cannot upload files opened in text mode: use " - "open(filename, mode='rb') or open(filename, mode='r+b')") - - # Get the basic stats about the file. - total_bytes = size - if total_bytes is None: - if hasattr(file_obj, 'fileno'): - total_bytes = os.fstat(file_obj.fileno()).st_size - else: - raise ValueError('total bytes could not be determined. Please ' - 'pass an explicit size.') - headers = { - 'Accept': 'application/json', - 'Accept-Encoding': 'gzip, deflate', - 'User-Agent': connection.USER_AGENT, - 'content-type': 'application/json', - } - - metadata = { - 'configuration': { - 'load': { - 'sourceFormat': source_format, - 'destinationTable': { - 'projectId': self._dataset.project, - 'datasetId': self._dataset.name, - 'tableId': self.name, - } - } - } - } - - if len(self._schema) > 0: - load_config = metadata['configuration']['load'] - load_config['schema'] = { - 'fields': _build_schema_resource(self._schema) - } - + _maybe_rewind(file_obj, rewind=rewind) + _check_mode(file_obj) + metadata = _get_upload_metadata( + source_format, self._schema, self._dataset, self.name) _configure_job_metadata(metadata, allow_jagged_rows, allow_quoted_newlines, create_disposition, encoding, field_delimiter, @@ -984,47 +1108,12 @@ def upload_from_file(self, quote_character, skip_leading_rows, write_disposition, job_name) - upload = Upload(file_obj, content_type, total_bytes, - auto_transfer=False) - - url_builder = _UrlBuilder() - upload_config = _UploadConfig() - - # Base URL may change once we know simple vs. resumable. - base_url = connection.API_BASE_URL + '/upload' - path = '/projects/%s/jobs' % (self._dataset.project,) - upload_url = connection.build_api_url(api_base_url=base_url, path=path) - - # Use apitools 'Upload' facility. - request = Request(upload_url, 'POST', headers, - body=json.dumps(metadata)) - - upload.configure_request(upload_config, request, url_builder) - query_params = url_builder.query_params - base_url = connection.API_BASE_URL + '/upload' - request.url = connection.build_api_url(api_base_url=base_url, - path=path, - query_params=query_params) try: - upload.initialize_upload(request, connection.http) - except HttpError as err_response: - faux_response = httplib2.Response(err_response.response) - raise make_exception(faux_response, err_response.content, - error_info=request.url) - - if upload.strategy == RESUMABLE_UPLOAD: - http_response = upload.stream_file(use_chunks=True) - else: - http_response = make_api_request(connection.http, request, - retries=num_retries) - - self._check_response_error(request, http_response) - - response_content = http_response.content - if not isinstance(response_content, - six.string_types): # pragma: NO COVER Python3 - response_content = response_content.decode('utf-8') - return client.job_from_resource(json.loads(response_content)) + created_json = self._do_upload( + client, file_obj, metadata, size, num_retries) + return client.job_from_resource(created_json) + except resumable_media.InvalidResponse as exc: + _raise_from_invalid_response(exc) # pylint: enable=too-many-arguments,too-many-locals @@ -1122,20 +1211,109 @@ def _build_schema_resource(fields): info['fields'] = _build_schema_resource(field.fields) infos.append(info) return infos +# pylint: enable=unused-argument + +def _maybe_rewind(stream, rewind=False): + """Rewind the stream if desired. -class _UploadConfig(object): - """Faux message FBO apitools' 'configure_request'.""" - accept = ['*/*'] - max_size = None - resumable_multipart = True - resumable_path = u'/upload/bigquery/v2/projects/{project}/jobs' - simple_multipart = True - simple_path = u'/upload/bigquery/v2/projects/{project}/jobs' + :type stream: IO[bytes] + :param stream: A bytes IO object open for reading. + :type rewind: bool + :param rewind: Indicates if we should seek to the beginning of the stream. + """ + if rewind: + stream.seek(0, os.SEEK_SET) -class _UrlBuilder(object): - """Faux builder FBO apitools' 'configure_request'""" - def __init__(self): - self.query_params = {} - self._relative_path = '' + +def _check_mode(stream): + """Check that a stream was opened in read-binary mode. + + :type stream: IO[bytes] + :param stream: A bytes IO object open for reading. + + :raises: :exc:`ValueError` if the ``stream.mode`` is a valid attribute + and is not among ``rb``, ``r+b`` or ``rb+``. + """ + mode = getattr(stream, 'mode', None) + + if mode is not None and mode not in ('rb', 'r+b', 'rb+'): + raise ValueError( + "Cannot upload files opened in text mode: use " + "open(filename, mode='rb') or open(filename, mode='r+b')") + + +def _get_upload_headers(user_agent): + """Get the headers for an upload request. + + :type user_agent: str + :param user_agent: The user-agent for requests. + + :rtype: dict + :returns: The headers to be used for the request. + """ + return { + 'Accept': 'application/json', + 'Accept-Encoding': 'gzip, deflate', + 'User-Agent': user_agent, + 'content-type': 'application/json', + } + + +def _get_upload_metadata(source_format, schema, dataset, name): + """Get base metadata for creating a table. + + :type source_format: str + :param source_format: one of 'CSV' or 'NEWLINE_DELIMITED_JSON'. + job configuration option. + + :type schema: list + :param schema: List of :class:`SchemaField` associated with a table. + + :type dataset: :class:`~google.cloud.bigquery.dataset.Dataset` + :param dataset: A dataset which contains a table. + + :type name: str + :param name: The name of the table. + + :rtype: dict + :returns: The metadata dictionary. + """ + load_config = { + 'sourceFormat': source_format, + 'destinationTable': { + 'projectId': dataset.project, + 'datasetId': dataset.name, + 'tableId': name, + }, + } + if schema: + load_config['schema'] = { + 'fields': _build_schema_resource(schema), + } + + return { + 'configuration': { + 'load': load_config, + }, + } + + +def _raise_from_invalid_response(error, error_info=None): + """Re-wrap and raise an ``InvalidResponse`` exception. + + :type error: :exc:`google.resumable_media.InvalidResponse` + :param error: A caught exception from the ``google-resumable-media`` + library. + + :type error_info: str + :param error_info: (Optional) Extra information about the failed request. + + :raises: :class:`~google.cloud.exceptions.GoogleCloudError` corresponding + to the failed status code + """ + response = error.response + faux_response = httplib2.Response({'status': response.status_code}) + raise make_exception(faux_response, response.content, + error_info=error_info, use_json=False) diff --git a/bigquery/nox.py b/bigquery/nox.py index 19a8f5761701f..9899654431595 100644 --- a/bigquery/nox.py +++ b/bigquery/nox.py @@ -19,7 +19,9 @@ import nox -LOCAL_DEPS = ('../core/',) +LOCAL_DEPS = ( + os.path.join('..', 'core'), +) @nox.session @@ -38,10 +40,17 @@ def unit_tests(session, python_version): session.install('-e', '.') # Run py.test against the unit tests. - session.run('py.test', '--quiet', - '--cov=google.cloud.bigquery', '--cov=tests.unit', '--cov-append', - '--cov-config=.coveragerc', '--cov-report=', '--cov-fail-under=97', - 'tests/unit', + session.run( + 'py.test', + '--quiet', + '--cov=google.cloud.bigquery', + '--cov=tests.unit', + '--cov-append', + '--cov-config=.coveragerc', + '--cov-report=', + '--cov-fail-under=97', + os.path.join('tests', 'unit'), + *session.posargs ) @@ -63,11 +72,19 @@ def system_tests(session, python_version): # Install all test dependencies, then install this package into the # virutalenv's dist-packages. session.install('mock', 'pytest', *LOCAL_DEPS) - session.install('../storage/', '../test_utils/') + session.install( + os.path.join('..', 'storage'), + os.path.join('..', 'test_utils'), + ) session.install('.') # Run py.test against the system tests. - session.run('py.test', '--quiet', 'tests/system.py') + session.run( + 'py.test', + '--quiet', + os.path.join('tests', 'system.py'), + *session.posargs + ) @nox.session @@ -81,7 +98,7 @@ def lint(session): session.install('flake8', 'pylint', 'gcp-devrel-py-tools', *LOCAL_DEPS) session.install('.') - session.run('flake8', 'google/cloud/bigquery') + session.run('flake8', os.path.join('google', 'cloud', 'bigquery')) session.run('flake8', 'tests') session.run( 'gcp-devrel-py-tools', 'run-pylint', diff --git a/bigquery/setup.py b/bigquery/setup.py index 6d61064c88bad..eeb2d90549d8b 100644 --- a/bigquery/setup.py +++ b/bigquery/setup.py @@ -52,6 +52,9 @@ REQUIREMENTS = [ 'google-cloud-core >= 0.25.0, < 0.26dev', + 'google-auth >= 1.0.0', + 'google-resumable-media >= 0.2.1', + 'requests >= 2.0.0', ] setup( diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index f535e87996288..502c0495f9c9d 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -12,8 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import email +import io +import json import unittest +import mock +from six.moves import http_client +import pytest + class _SchemaBase(object): @@ -31,7 +38,8 @@ def _verifySchema(self, schema, resource): class TestTable(unittest.TestCase, _SchemaBase): - PROJECT = 'project' + + PROJECT = 'prahj-ekt' DS_NAME = 'dataset-name' TABLE_NAME = 'table-name' @@ -1553,312 +1561,476 @@ def _row_data(row): self.assertEqual(req['path'], '/%s' % PATH) self.assertEqual(req['data'], SENT) - def test_upload_from_file_text_mode_file_failure(self): + @mock.patch('google.auth.transport.requests.AuthorizedSession') + def test__make_transport(self, session_factory): + client = mock.Mock(spec=[u'_credentials']) + table = self._make_one(self.TABLE_NAME, None) + transport = table._make_transport(client) - class TextModeFile(object): - mode = 'r' + self.assertIs(transport, session_factory.return_value) + session_factory.assert_called_once_with(client._credentials) - conn = _Connection() - client = _Client(project=self.PROJECT, connection=conn) + @staticmethod + def _mock_requests_response(status_code, headers, content=b''): + return mock.Mock( + content=content, headers=headers, status_code=status_code, + spec=['content', 'headers', 'status_code']) + + def _mock_transport(self, status_code, headers, content=b''): + fake_transport = mock.Mock(spec=['request']) + fake_response = self._mock_requests_response( + status_code, headers, content=content) + fake_transport.request.return_value = fake_response + return fake_transport + + def _initiate_resumable_upload_helper(self, num_retries=None): + from google.resumable_media.requests import ResumableUpload + from google.cloud.bigquery.table import _DEFAULT_CHUNKSIZE + from google.cloud.bigquery.table import _GENERIC_CONTENT_TYPE + from google.cloud.bigquery.table import _get_upload_headers + from google.cloud.bigquery.table import _get_upload_metadata + + connection = _Connection() + client = _Client(self.PROJECT, connection=connection) dataset = _Dataset(client) - file_obj = TextModeFile() - table = self._make_one(self.TABLE_NAME, dataset=dataset) - with self.assertRaises(ValueError): - table.upload_from_file(file_obj, 'CSV', size=1234) + table = self._make_one(self.TABLE_NAME, dataset) - def test_upload_from_file_binary_mode_no_failure(self): - self._upload_from_file_helper(input_file_mode='r+b') + # Create mocks to be checked for doing transport. + resumable_url = 'http://test.invalid?upload_id=hey-you' + response_headers = {'location': resumable_url} + fake_transport = self._mock_transport( + http_client.OK, response_headers) + table._make_transport = mock.Mock( + return_value=fake_transport, spec=[]) + + # Create some mock arguments and call the method under test. + data = b'goodbye gudbi gootbee' + stream = io.BytesIO(data) + metadata = _get_upload_metadata( + 'CSV', table._schema, table._dataset, table.name) + upload, transport = table._initiate_resumable_upload( + client, stream, metadata, num_retries) + + # Check the returned values. + self.assertIsInstance(upload, ResumableUpload) + upload_url = ( + 'https://www.googleapis.com/upload/bigquery/v2/projects/' + + self.PROJECT + + '/jobs?uploadType=resumable') + self.assertEqual(upload.upload_url, upload_url) + expected_headers = _get_upload_headers(connection.USER_AGENT) + self.assertEqual(upload._headers, expected_headers) + self.assertFalse(upload.finished) + self.assertEqual(upload._chunk_size, _DEFAULT_CHUNKSIZE) + self.assertIs(upload._stream, stream) + self.assertIsNone(upload._total_bytes) + self.assertEqual(upload._content_type, _GENERIC_CONTENT_TYPE) + self.assertEqual(upload.resumable_url, resumable_url) + + retry_strategy = upload._retry_strategy + self.assertEqual(retry_strategy.max_sleep, 64.0) + if num_retries is None: + self.assertEqual(retry_strategy.max_cumulative_retry, 600.0) + self.assertIsNone(retry_strategy.max_retries) + else: + self.assertIsNone(retry_strategy.max_cumulative_retry) + self.assertEqual(retry_strategy.max_retries, num_retries) + self.assertIs(transport, fake_transport) + # Make sure we never read from the stream. + self.assertEqual(stream.tell(), 0) + + # Check the mocks. + table._make_transport.assert_called_once_with(client) + request_headers = expected_headers.copy() + request_headers['x-upload-content-type'] = _GENERIC_CONTENT_TYPE + fake_transport.request.assert_called_once_with( + 'POST', + upload_url, + data=json.dumps(metadata).encode('utf-8'), + headers=request_headers, + ) - def test_upload_from_file_size_failure(self): - conn = _Connection() - client = _Client(project=self.PROJECT, connection=conn) - dataset = _Dataset(client) - file_obj = object() - table = self._make_one(self.TABLE_NAME, dataset=dataset) - with self.assertRaises(ValueError): - table.upload_from_file(file_obj, 'CSV', size=None) + def test__initiate_resumable_upload(self): + self._initiate_resumable_upload_helper() - def test_upload_from_file_multipart_w_400(self): - import csv - import datetime - from six.moves.http_client import BAD_REQUEST - from google.cloud._testing import _NamedTemporaryFile - from google.cloud._helpers import UTC - from google.cloud.exceptions import BadRequest + def test__initiate_resumable_upload_with_retry(self): + self._initiate_resumable_upload_helper(num_retries=11) - WHEN_TS = 1437767599.006 - WHEN = datetime.datetime.utcfromtimestamp(WHEN_TS).replace( - tzinfo=UTC) - response = {'status': BAD_REQUEST} - conn = _Connection( - (response, b'{}'), - ) - client = _Client(project=self.PROJECT, connection=conn) + def _do_multipart_upload_success_helper( + self, get_boundary, num_retries=None): + from google.cloud.bigquery.table import _get_upload_headers + from google.cloud.bigquery.table import _get_upload_metadata + + connection = _Connection() + client = _Client(self.PROJECT, connection=connection) dataset = _Dataset(client) - table = self._make_one(self.TABLE_NAME, dataset=dataset) + table = self._make_one(self.TABLE_NAME, dataset) - with _NamedTemporaryFile() as temp: - with open(temp.name, 'w') as file_obj: - writer = csv.writer(file_obj) - writer.writerow(('full_name', 'age', 'joined')) - writer.writerow(('Phred Phlyntstone', 32, WHEN)) + # Create mocks to be checked for doing transport. + fake_transport = self._mock_transport(http_client.OK, {}) + table._make_transport = mock.Mock(return_value=fake_transport, spec=[]) + + # Create some mock arguments. + data = b'Bzzzz-zap \x00\x01\xf4' + stream = io.BytesIO(data) + metadata = _get_upload_metadata( + 'CSV', table._schema, table._dataset, table.name) + size = len(data) + response = table._do_multipart_upload( + client, stream, metadata, size, num_retries) + + # Check the mocks and the returned value. + self.assertIs(response, fake_transport.request.return_value) + self.assertEqual(stream.tell(), size) + table._make_transport.assert_called_once_with(client) + get_boundary.assert_called_once_with() + + upload_url = ( + 'https://www.googleapis.com/upload/bigquery/v2/projects/' + + self.PROJECT + + '/jobs?uploadType=multipart') + payload = ( + b'--==0==\r\n' + + b'content-type: application/json; charset=UTF-8\r\n\r\n' + + json.dumps(metadata).encode('utf-8') + b'\r\n' + + b'--==0==\r\n' + + b'content-type: */*\r\n\r\n' + + data + b'\r\n' + + b'--==0==--') + headers = _get_upload_headers(connection.USER_AGENT) + headers['content-type'] = b'multipart/related; boundary="==0=="' + fake_transport.request.assert_called_once_with( + 'POST', + upload_url, + data=payload, + headers=headers, + ) - with open(temp.name, 'rb') as file_obj: - with self.assertRaises(BadRequest): - table.upload_from_file( - file_obj, 'CSV', rewind=True) + @mock.patch(u'google.resumable_media._upload.get_boundary', + return_value=b'==0==') + def test__do_multipart_upload(self, get_boundary): + self._do_multipart_upload_success_helper(get_boundary) - def _upload_from_file_helper(self, **kw): - import csv - import datetime - from six.moves.http_client import OK - from google.cloud._helpers import UTC - from google.cloud._testing import _NamedTemporaryFile - from google.cloud.bigquery.table import SchemaField + @mock.patch(u'google.resumable_media._upload.get_boundary', + return_value=b'==0==') + def test__do_multipart_upload_with_retry(self, get_boundary): + self._do_multipart_upload_success_helper(get_boundary, num_retries=8) - WHEN_TS = 1437767599.006 - WHEN = datetime.datetime.utcfromtimestamp(WHEN_TS).replace( - tzinfo=UTC) - PATH = 'projects/%s/jobs' % (self.PROJECT,) - response = {'status': OK} - conn = _Connection( - (response, b'{}'), - ) - client = _Client(project=self.PROJECT, connection=conn) - expected_job = object() - if 'client' in kw: - kw['client']._job = expected_job - else: - client._job = expected_job - input_file_mode = kw.pop('input_file_mode', 'rb') - dataset = _Dataset(client) - full_name = SchemaField('full_name', 'STRING', mode='REQUIRED') - age = SchemaField('age', 'INTEGER', mode='REQUIRED') - joined = SchemaField('joined', 'TIMESTAMP', mode='NULLABLE') - table = self._make_one(self.TABLE_NAME, dataset=dataset, - schema=[full_name, age, joined]) - ROWS = [ - ('Phred Phlyntstone', 32, WHEN), - ('Bharney Rhubble', 33, WHEN + datetime.timedelta(seconds=1)), - ('Wylma Phlyntstone', 29, WHEN + datetime.timedelta(seconds=2)), - ('Bhettye Rhubble', 27, None), - ] - with _NamedTemporaryFile() as temp: - with open(temp.name, 'w') as file_obj: - writer = csv.writer(file_obj) - writer.writerow(('full_name', 'age', 'joined')) - writer.writerows(ROWS) - - with open(temp.name, input_file_mode) as file_obj: - BODY = file_obj.read() - explicit_size = kw.pop('_explicit_size', False) - if explicit_size: - kw['size'] = len(BODY) - job = table.upload_from_file( - file_obj, 'CSV', rewind=True, **kw) - - self.assertIs(job, expected_job) - return conn.http._requested, PATH, BODY - - def test_upload_from_file_w_bound_client_multipart(self): - import json - from six.moves.urllib.parse import parse_qsl - from six.moves.urllib.parse import urlsplit - from google.cloud._helpers import _to_bytes - - requested, PATH, BODY = self._upload_from_file_helper() - parse_chunk = _email_chunk_parser() - - self.assertEqual(len(requested), 1) - req = requested[0] - self.assertEqual(req['method'], 'POST') - uri = req['uri'] - scheme, netloc, path, qs, _ = urlsplit(uri) - self.assertEqual(scheme, 'http') - self.assertEqual(netloc, 'example.com') - self.assertEqual(path, '/%s' % PATH) - self.assertEqual(dict(parse_qsl(qs)), - {'uploadType': 'multipart'}) - - ctype, boundary = [x.strip() - for x in req['headers']['content-type'].split(';')] - self.assertEqual(ctype, 'multipart/related') - self.assertTrue(boundary.startswith('boundary="==')) - self.assertTrue(boundary.endswith('=="')) - - divider = b'--' + _to_bytes(boundary[len('boundary="'):-1]) - chunks = req['body'].split(divider)[1:-1] # discard prolog / epilog - self.assertEqual(len(chunks), 2) - - text_msg = parse_chunk(chunks[0].strip()) - self.assertEqual(dict(text_msg._headers), - {'Content-Type': 'application/json', - 'MIME-Version': '1.0'}) - metadata = json.loads(text_msg._payload) - load_config = metadata['configuration']['load'] - DESTINATION_TABLE = { - 'projectId': self.PROJECT, - 'datasetId': self.DS_NAME, - 'tableId': self.TABLE_NAME, - } - self.assertEqual(load_config['destinationTable'], DESTINATION_TABLE) - self.assertEqual(load_config['sourceFormat'], 'CSV') - - app_msg = parse_chunk(chunks[1].strip()) - self.assertEqual(dict(app_msg._headers), - {'Content-Type': 'application/octet-stream', - 'Content-Transfer-Encoding': 'binary', - 'MIME-Version': '1.0'}) - body = BODY.decode('ascii').rstrip() - body_lines = [line.strip() for line in body.splitlines()] - payload_lines = app_msg._payload.rstrip().splitlines() - self.assertEqual(payload_lines, body_lines) - - def test_upload_from_file_resumable_with_400(self): - import csv - import datetime - import mock - from six.moves.http_client import BAD_REQUEST - from google.cloud.exceptions import BadRequest - from google.cloud._helpers import UTC - from google.cloud._testing import _NamedTemporaryFile +class TestTableUpload(object): + # NOTE: This is a "partner" to `TestTable` meant to test some of the + # "upload" portions of `Table`. It also uses `pytest`-style tests + # rather than `unittest`-style. - WHEN_TS = 1437767599.006 - WHEN = datetime.datetime.utcfromtimestamp(WHEN_TS).replace( - tzinfo=UTC) - initial_response = {'status': BAD_REQUEST} - conn = _Connection( - (initial_response, b'{}'), - ) - client = _Client(project=self.PROJECT, connection=conn) + @staticmethod + def _make_table(): + from google.cloud.bigquery import _http + from google.cloud.bigquery import client + from google.cloud.bigquery import dataset + from google.cloud.bigquery import table - class _UploadConfig(object): - accept = ['*/*'] - max_size = None - resumable_multipart = True - resumable_path = u'/upload/bigquery/v2/projects/{project}/jobs' - simple_multipart = True - simple_path = u'' # force resumable - dataset = _Dataset(client) - table = self._make_one(self.TABLE_NAME, dataset=dataset) + connection = mock.create_autospec(_http.Connection, instance=True) + client = mock.create_autospec(client.Client, instance=True) + client._connection = connection + client._credentials = mock.sentinel.credentials + client.project = 'project_id' - with mock.patch('google.cloud.bigquery.table._UploadConfig', - new=_UploadConfig): - with _NamedTemporaryFile() as temp: - with open(temp.name, 'w') as file_obj: - writer = csv.writer(file_obj) - writer.writerow(('full_name', 'age', 'joined')) - writer.writerow(('Phred Phlyntstone', 32, WHEN)) - - with open(temp.name, 'rb') as file_obj: - with self.assertRaises(BadRequest): - table.upload_from_file( - file_obj, 'CSV', rewind=True) - - # pylint: disable=too-many-statements - def test_upload_from_file_w_explicit_client_resumable(self): - import json - import mock - from six.moves.http_client import OK - from six.moves.urllib.parse import parse_qsl - from six.moves.urllib.parse import urlsplit - - UPLOAD_PATH = 'https://example.com/upload/test' - initial_response = {'status': OK, 'location': UPLOAD_PATH} - upload_response = {'status': OK} - conn = _Connection( - (initial_response, b'{}'), - (upload_response, b'{}'), - ) - client = _Client(project=self.PROJECT, connection=conn) + dataset = dataset.Dataset('test_dataset', client) + table = table.Table('test_table', dataset) - class _UploadConfig(object): - accept = ['*/*'] - max_size = None - resumable_multipart = True - resumable_path = u'/upload/bigquery/v2/projects/{project}/jobs' - simple_multipart = True - simple_path = u'' # force resumable - - with mock.patch('google.cloud.bigquery.table._UploadConfig', - new=_UploadConfig): - orig_requested, PATH, BODY = self._upload_from_file_helper( - allow_jagged_rows=False, - allow_quoted_newlines=False, - create_disposition='CREATE_IF_NEEDED', - encoding='utf8', - field_delimiter=',', - ignore_unknown_values=False, - max_bad_records=0, - quote_character='"', - skip_leading_rows=1, - write_disposition='WRITE_APPEND', - client=client, - _explicit_size=True) - - self.assertEqual(len(orig_requested), 0) - - requested = conn.http._requested - self.assertEqual(len(requested), 2) - req = requested[0] - self.assertEqual(req['method'], 'POST') - uri = req['uri'] - scheme, netloc, path, qs, _ = urlsplit(uri) - self.assertEqual(scheme, 'http') - self.assertEqual(netloc, 'example.com') - self.assertEqual(path, '/%s' % PATH) - self.assertEqual(dict(parse_qsl(qs)), - {'uploadType': 'resumable'}) - - self.assertEqual(req['headers']['content-type'], 'application/json') - metadata = json.loads(req['body']) - load_config = metadata['configuration']['load'] - DESTINATION_TABLE = { - 'projectId': self.PROJECT, - 'datasetId': self.DS_NAME, - 'tableId': self.TABLE_NAME, + return table + + @staticmethod + def _make_response(status_code, content='', headers={}): + """Make a mock HTTP response.""" + import requests + response = mock.create_autospec(requests.Response, instance=True) + response.content = content.encode('utf-8') + response.headers = headers + response.status_code = status_code + return response + + @classmethod + def _make_do_upload_patch(cls, table, method, side_effect=None): + """Patches the low-level upload helpers.""" + if side_effect is None: + side_effect = [cls._make_response( + http_client.OK, + json.dumps({}), + {'Content-Type': 'application/json'})] + return mock.patch.object( + table, method, side_effect=side_effect, autospec=True) + + EXPECTED_CONFIGURATION = { + 'configuration': { + 'load': { + 'sourceFormat': 'CSV', + 'destinationTable': { + 'projectId': 'project_id', + 'datasetId': 'test_dataset', + 'tableId': 'test_table' + } + } + } + } + + @staticmethod + def _make_file_obj(): + return io.BytesIO(b'hello, is it me you\'re looking for?') + + # High-level tests + + def test_upload_from_file_resumable(self): + import google.cloud.bigquery.table + + table = self._make_table() + file_obj = self._make_file_obj() + + do_upload_patch = self._make_do_upload_patch( + table, '_do_resumable_upload') + with do_upload_patch as do_upload: + table.upload_from_file(file_obj, source_format='CSV') + + do_upload.assert_called_once_with( + table._dataset._client, + file_obj, + self.EXPECTED_CONFIGURATION, + google.cloud.bigquery.table._DEFAULT_NUM_RETRIES) + + def test_upload_file_resumable_metadata(self): + table = self._make_table() + file_obj = self._make_file_obj() + + config_args = { + 'source_format': 'CSV', + 'allow_jagged_rows': False, + 'allow_quoted_newlines': False, + 'create_disposition': 'CREATE_IF_NEEDED', + 'encoding': 'utf8', + 'field_delimiter': ',', + 'ignore_unknown_values': False, + 'max_bad_records': 0, + 'quote_character': '"', + 'skip_leading_rows': 1, + 'write_disposition': 'WRITE_APPEND', + 'job_name': 'oddjob' } - self.assertEqual(load_config['destinationTable'], DESTINATION_TABLE) - self.assertEqual(load_config['sourceFormat'], 'CSV') - self.assertEqual(load_config['allowJaggedRows'], False) - self.assertEqual(load_config['allowQuotedNewlines'], False) - self.assertEqual(load_config['createDisposition'], 'CREATE_IF_NEEDED') - self.assertEqual(load_config['encoding'], 'utf8') - self.assertEqual(load_config['fieldDelimiter'], ',') - self.assertEqual(load_config['ignoreUnknownValues'], False) - self.assertEqual(load_config['maxBadRecords'], 0) - self.assertEqual(load_config['quote'], '"') - self.assertEqual(load_config['skipLeadingRows'], 1) - self.assertEqual(load_config['writeDisposition'], 'WRITE_APPEND') - - req = requested[1] - self.assertEqual(req['method'], 'PUT') - self.assertEqual(req['uri'], UPLOAD_PATH) - headers = req['headers'] - length = len(BODY) - self.assertEqual(headers['Content-Type'], 'application/octet-stream') - self.assertEqual(headers['Content-Range'], - 'bytes 0-%d/%d' % (length - 1, length)) - self.assertEqual(headers['content-length'], '%d' % (length,)) - self.assertEqual(req['body'], BODY) - # pylint: enable=too-many-statements - - def test_upload_from_file_w_jobid(self): - import json - from google.cloud._helpers import _to_bytes - - requested, PATH, BODY = self._upload_from_file_helper(job_name='foo') - parse_chunk = _email_chunk_parser() - req = requested[0] - ctype, boundary = [x.strip() - for x in req['headers']['content-type'].split(';')] - divider = b'--' + _to_bytes(boundary[len('boundary="'):-1]) - chunks = req['body'].split(divider)[1:-1] # discard prolog / epilog - text_msg = parse_chunk(chunks[0].strip()) - metadata = json.loads(text_msg._payload) - load_config = metadata['configuration']['load'] - self.assertEqual(load_config['jobReference'], {'jobId': 'foo'}) + + expected_config = { + 'configuration': { + 'load': { + 'sourceFormat': config_args['source_format'], + 'destinationTable': { + 'projectId': table._dataset._client.project, + 'datasetId': table.dataset_name, + 'tableId': table.name + }, + 'allowJaggedRows': config_args['allow_jagged_rows'], + 'allowQuotedNewlines': + config_args['allow_quoted_newlines'], + 'createDisposition': config_args['create_disposition'], + 'encoding': config_args['encoding'], + 'fieldDelimiter': config_args['field_delimiter'], + 'ignoreUnknownValues': + config_args['ignore_unknown_values'], + 'maxBadRecords': config_args['max_bad_records'], + 'quote': config_args['quote_character'], + 'skipLeadingRows': config_args['skip_leading_rows'], + 'writeDisposition': config_args['write_disposition'], + 'jobReference': {'jobId': config_args['job_name']} + } + } + } + + do_upload_patch = self._make_do_upload_patch( + table, '_do_resumable_upload') + with do_upload_patch as do_upload: + table.upload_from_file( + file_obj, **config_args) + + do_upload.assert_called_once_with( + table._dataset._client, + file_obj, + expected_config, + mock.ANY) + + def test_upload_from_file_multipart(self): + import google.cloud.bigquery.table + + table = self._make_table() + file_obj = self._make_file_obj() + file_obj_size = 10 + + do_upload_patch = self._make_do_upload_patch( + table, '_do_multipart_upload') + with do_upload_patch as do_upload: + table.upload_from_file( + file_obj, source_format='CSV', size=file_obj_size) + + do_upload.assert_called_once_with( + table._dataset._client, + file_obj, + self.EXPECTED_CONFIGURATION, + file_obj_size, + google.cloud.bigquery.table._DEFAULT_NUM_RETRIES) + + def test_upload_from_file_with_retries(self): + table = self._make_table() + file_obj = self._make_file_obj() + num_retries = 20 + + do_upload_patch = self._make_do_upload_patch( + table, '_do_resumable_upload') + with do_upload_patch as do_upload: + table.upload_from_file( + file_obj, source_format='CSV', num_retries=num_retries) + + do_upload.assert_called_once_with( + table._dataset._client, + file_obj, + self.EXPECTED_CONFIGURATION, + num_retries) + + def test_upload_from_file_with_rewind(self): + table = self._make_table() + file_obj = self._make_file_obj() + file_obj.seek(2) + + with self._make_do_upload_patch(table, '_do_resumable_upload'): + table.upload_from_file( + file_obj, source_format='CSV', rewind=True) + + assert file_obj.tell() == 0 + + def test_upload_from_file_failure(self): + from google.resumable_media import InvalidResponse + from google.cloud import exceptions + + table = self._make_table() + file_obj = self._make_file_obj() + + response = self._make_response( + content='Someone is already in this spot.', + status_code=http_client.CONFLICT) + + do_upload_patch = self._make_do_upload_patch( + table, '_do_resumable_upload', + side_effect=InvalidResponse(response)) + + with do_upload_patch, pytest.raises(exceptions.Conflict) as exc_info: + table.upload_from_file( + file_obj, source_format='CSV', rewind=True) + + assert exc_info.value.message == response.content.decode('utf-8') + assert exc_info.value.errors == [] + + def test_upload_from_file_bad_mode(self): + table = self._make_table() + file_obj = mock.Mock(spec=['mode']) + file_obj.mode = 'x' + + with pytest.raises(ValueError): + table.upload_from_file( + file_obj, source_format='CSV',) + + # Low-level tests + + @classmethod + def _make_resumable_upload_responses(cls, size): + """Make a series of responses for a successful resumable upload.""" + from google import resumable_media + + resumable_url = 'http://test.invalid?upload_id=and-then-there-was-1' + initial_response = cls._make_response( + http_client.OK, '', {'location': resumable_url}) + data_response = cls._make_response( + resumable_media.PERMANENT_REDIRECT, + '', {'range': 'bytes=0-{:d}'.format(size - 1)}) + final_response = cls._make_response( + http_client.OK, + json.dumps({'size': size}), + {'Content-Type': 'application/json'}) + return [initial_response, data_response, final_response] + + @staticmethod + def _make_transport_patch(table, responses=None): + """Patch a table's _make_transport method to return given responses.""" + import google.auth.transport.requests + + transport = mock.create_autospec( + google.auth.transport.requests.AuthorizedSession, instance=True) + transport.request.side_effect = responses + return mock.patch.object( + table, '_make_transport', return_value=transport, autospec=True) + + def test__do_resumable_upload(self): + table = self._make_table() + file_obj = self._make_file_obj() + file_obj_len = len(file_obj.getvalue()) + responses = self._make_resumable_upload_responses(file_obj_len) + + with self._make_transport_patch(table, responses) as transport: + result = table._do_resumable_upload( + table._dataset._client, + file_obj, + self.EXPECTED_CONFIGURATION, + None) + + content = result.content.decode('utf-8') + assert json.loads(content) == {'size': file_obj_len} + + # Verify that configuration data was passed in with the initial + # request. + transport.return_value.request.assert_any_call( + 'POST', + mock.ANY, + data=json.dumps(self.EXPECTED_CONFIGURATION).encode('utf-8'), + headers=mock.ANY) + + def test__do_multipart_upload(self): + table = self._make_table() + file_obj = self._make_file_obj() + file_obj_len = len(file_obj.getvalue()) + responses = [self._make_response(http_client.OK)] + + with self._make_transport_patch(table, responses) as transport: + table._do_multipart_upload( + table._dataset._client, + file_obj, + self.EXPECTED_CONFIGURATION, + file_obj_len, + None) + + # Verify that configuration data was passed in with the initial + # request. + request_args = transport.return_value.request.mock_calls[0][2] + request_data = request_args['data'].decode('utf-8') + request_headers = request_args['headers'] + + request_content = email.message_from_string( + 'Content-Type: {}\r\n{}'.format( + request_headers['content-type'].decode('utf-8'), + request_data)) + + # There should be two payloads: the configuration and the binary daya. + configuration_data = request_content.get_payload(0).get_payload() + binary_data = request_content.get_payload(1).get_payload() + + assert json.loads(configuration_data) == self.EXPECTED_CONFIGURATION + assert binary_data.encode('utf-8') == file_obj.getvalue() + + def test__do_multipart_upload_wrong_size(self): + table = self._make_table() + file_obj = self._make_file_obj() + file_obj_len = len(file_obj.getvalue()) + + with pytest.raises(ValueError): + table._do_multipart_upload( + table._dataset._client, + file_obj, + {}, + file_obj_len+1, + None) class Test_parse_schema_resource(unittest.TestCase, _SchemaBase): @@ -1974,6 +2146,70 @@ def test_w_subfields(self): 'mode': 'REQUIRED'}]}) +class Test__get_upload_metadata(unittest.TestCase): + + @staticmethod + def _call_fut(source_format, schema, dataset, name): + from google.cloud.bigquery.table import _get_upload_metadata + + return _get_upload_metadata(source_format, schema, dataset, name) + + def test_empty_schema(self): + source_format = 'AVRO' + dataset = mock.Mock(project='prediction', spec=['name', 'project']) + dataset.name = 'market' # mock.Mock() treats `name` specially. + table_name = 'chairs' + metadata = self._call_fut(source_format, [], dataset, table_name) + + expected = { + 'configuration': { + 'load': { + 'sourceFormat': source_format, + 'destinationTable': { + 'projectId': dataset.project, + 'datasetId': dataset.name, + 'tableId': table_name, + }, + }, + }, + } + self.assertEqual(metadata, expected) + + def test_with_schema(self): + from google.cloud.bigquery.table import SchemaField + + source_format = 'CSV' + full_name = SchemaField('full_name', 'STRING', mode='REQUIRED') + dataset = mock.Mock(project='blind', spec=['name', 'project']) + dataset.name = 'movie' # mock.Mock() treats `name` specially. + table_name = 'teebull-neem' + metadata = self._call_fut( + source_format, [full_name], dataset, table_name) + + expected = { + 'configuration': { + 'load': { + 'sourceFormat': source_format, + 'destinationTable': { + 'projectId': dataset.project, + 'datasetId': dataset.name, + 'tableId': table_name, + }, + 'schema': { + 'fields': [ + { + 'name': full_name.name, + 'type': full_name.field_type, + 'mode': full_name.mode, + }, + ], + }, + }, + }, + } + self.assertEqual(metadata, expected) + + class _Client(object): _query_results = () @@ -1982,9 +2218,6 @@ def __init__(self, project='project', connection=None): self.project = project self._connection = connection - def job_from_resource(self, resource): # pylint: disable=unused-argument - return self._job - def run_sync_query(self, query): return _Query(query, self) @@ -2016,37 +2249,14 @@ def project(self): return self._client.project -class _Responder(object): - - def __init__(self, *responses): - self._responses = responses[:] - self._requested = [] - - def _respond(self, **kw): - self._requested.append(kw) - response, self._responses = self._responses[0], self._responses[1:] - return response - - -class _HTTP(_Responder): - - connections = {} # For google-apitools debugging. - - def request(self, uri, method, headers, body, **kw): - if hasattr(body, 'read'): - body = body.read() - return self._respond(uri=uri, method=method, headers=headers, - body=body, **kw) - - -class _Connection(_Responder): +class _Connection(object): API_BASE_URL = 'http://example.com' USER_AGENT = 'testing 1.2.3' def __init__(self, *responses): - super(_Connection, self).__init__(*responses) - self.http = _HTTP(*responses) + self._responses = responses[:] + self._requested = [] def api_request(self, **kw): from google.cloud.exceptions import NotFound @@ -2059,29 +2269,3 @@ def api_request(self, **kw): raise NotFound('miss') else: return response - - def build_api_url(self, path, query_params=None, - api_base_url=API_BASE_URL): - from six.moves.urllib.parse import urlencode - from six.moves.urllib.parse import urlsplit - from six.moves.urllib.parse import urlunsplit - - # Mimic the build_api_url interface. - qs = urlencode(query_params or {}) - scheme, netloc, _, _, _ = urlsplit(api_base_url) - return urlunsplit((scheme, netloc, path, qs, '')) - - -def _email_chunk_parser(): - import six - - if six.PY3: # pragma: NO COVER Python3 - from email.parser import BytesParser - - parser = BytesParser() - return parser.parsebytes - else: - from email.parser import Parser - - parser = Parser() - return parser.parsestr diff --git a/storage/google/cloud/storage/blob.py b/storage/google/cloud/storage/blob.py index 7d967a3e4901d..d03d1364cf400 100644 --- a/storage/google/cloud/storage/blob.py +++ b/storage/google/cloud/storage/blob.py @@ -368,6 +368,7 @@ def _make_transport(self, client): :type client: :class:`~google.cloud.storage.client.Client` :param client: (Optional) The client to use. If not passed, falls back to the ``client`` stored on the blob's bucket. + :rtype transport: :class:`~google.auth.transport.requests.AuthorizedSession` :returns: The transport (with credentials) that will diff --git a/storage/setup.py b/storage/setup.py index d18624f3c13d2..8d11055fac77c 100644 --- a/storage/setup.py +++ b/storage/setup.py @@ -53,7 +53,7 @@ REQUIREMENTS = [ 'google-cloud-core >= 0.25.0, < 0.26dev', 'google-auth >= 1.0.0', - 'google-resumable-media >= 0.1.1', + 'google-resumable-media >= 0.2.1', 'requests >= 2.0.0', ] diff --git a/storage/tests/unit/test_blob.py b/storage/tests/unit/test_blob.py index 250a05bd28f41..e2227adbd94ae 100644 --- a/storage/tests/unit/test_blob.py +++ b/storage/tests/unit/test_blob.py @@ -775,7 +775,7 @@ def _do_multipart_success(self, mock_get_boundary, size=None, blob._make_transport = mock.Mock(return_value=fake_transport, spec=[]) # Create some mock arguments. - client = mock.sentinel.mock + client = mock.sentinel.client data = b'data here hear hier' stream = io.BytesIO(data) content_type = u'application/xml' @@ -865,7 +865,7 @@ def _initiate_resumable_helper(self, size=None, extra_headers=None, blob._make_transport = mock.Mock(return_value=fake_transport, spec=[]) # Create some mock arguments and call the method under test. - client = mock.sentinel.mock + client = mock.sentinel.client data = b'hello hallo halo hi-low' stream = io.BytesIO(data) content_type = u'text/plain' @@ -1033,7 +1033,7 @@ def _do_resumable_helper(self, use_size=False, num_retries=None): blob._make_transport = mock.Mock(return_value=fake_transport, spec=[]) # Create some mock arguments and call the method under test. - client = mock.sentinel.mock + client = mock.sentinel.client stream = io.BytesIO(data) content_type = u'text/html' response = blob._do_resumable_upload( @@ -1271,7 +1271,7 @@ def _create_resumable_upload_session_helper(self, origin=None, # Create some mock arguments and call the method under test. content_type = u'text/plain' size = 10000 - client = mock.sentinel.mock + client = mock.sentinel.client new_url = blob.create_resumable_upload_session( content_type=content_type, size=size, origin=origin, client=client)