diff --git a/instana/__init__.py b/instana/__init__.py index f484a23f..394f1f8e 100644 --- a/instana/__init__.py +++ b/instana/__init__.py @@ -137,6 +137,9 @@ def boot_agent(): else: from .instrumentation import mysqlclient + if sys.version_info[0] >= 3: + from .instrumentation.google.cloud import storage + from .instrumentation.celery import hooks from .instrumentation import cassandra_inst diff --git a/instana/instrumentation/google/cloud/collectors.py b/instana/instrumentation/google/cloud/collectors.py new file mode 100644 index 00000000..a6d4a9e3 --- /dev/null +++ b/instana/instrumentation/google/cloud/collectors.py @@ -0,0 +1,317 @@ +import re + +try: + # Python 3 + from urllib.parse import unquote +except ImportError: + # Python 2 + from urllib import unquote + +# _storage_api defines a conversion of Google Storage JSON API requests into span tags as follows: +# request_method -> path_matcher -> collector +# +# * request method - the HTTP method used to make an API request (GET, POST, etc.) +# * path_matcher - either a string or a regex applied to the API request path (string values match first). +# * collector - a lambda returning a dict of span from API request query string. +# parameters and request body data. If a regex is used as a path matcher, the match result +# will be provided as a third argument. +# +# The API documentation can be found at https://cloud.google.com/storage/docs/json_api +_storage_api = { + 'GET': { + ##################### + # Bucket operations # + ##################### + '/b': lambda params, data: { + 'gcs.op': 'buckets.list', + 'gcs.projectId': params.get('project', None) + }, + re.compile('^/b/(?P[^/]+)$'): lambda params, data, match: { + 'gcs.op': 'buckets.get', + 'gcs.bucket': unquote(match.group('bucket')), + }, + re.compile('^/b/(?P[^/]+)/iam$'): lambda params, data, match: { + 'gcs.op': 'buckets.getIamPolicy', + 'gcs.bucket': unquote(match.group('bucket')), + }, + re.compile('^/b/(?P[^/]+)/iam/testPermissions$'): lambda params, data, match: { + 'gcs.op': 'buckets.testIamPermissions', + 'gcs.bucket': unquote(match.group('bucket')), + }, + + ########################## + # Object/blob operations # + ########################## + re.compile('^/b/(?P[^/]+)/o/(?P[^/]+)$'): lambda params, data, match: { + 'gcs.op': params.get('alt', 'json') == 'media' and 'objects.get' or 'objects.attrs', + 'gcs.bucket': unquote(match.group('bucket')), + 'gcs.object': unquote(match.group('object')), + }, + re.compile('^/b/(?P[^/]+)/o$'): lambda params, data, match: { + 'gcs.op': 'objects.list', + 'gcs.bucket': unquote(match.group('bucket')) + }, + + ################################## + # Default object ACLs operations # + ################################## + re.compile('^/b/(?P[^/]+)/defaultObjectAcl/(?P[^/]+)$'): lambda params, data, match: { + 'gcs.op': 'defaultAcls.get', + 'gcs.bucket': unquote(match.group('bucket')), + 'gcs.entity': unquote(match.group('entity')) + }, + re.compile('^/b/(?P[^/]+)/defaultObjectAcl$'): lambda params, data, match: { + 'gcs.op': 'defaultAcls.list', + 'gcs.bucket': unquote(match.group('bucket')), + }, + + ######################### + # Object ACL operations # + ######################### + re.compile('^/b/(?P[^/]+)/o/(?P[^/]+)/acl/(?P[^/]+)$'): lambda params, data, match: { + 'gcs.op': 'objectAcls.get', + 'gcs.bucket': unquote(match.group('bucket')), + 'gcs.object': unquote(match.group('object')), + 'gcs.entity': unquote(match.group('entity')) + }, + re.compile('^/b/(?P[^/]+)/o/(?P[^/]+)/acl$'): lambda params, data, match: { + 'gcs.op': 'objectAcls.list', + 'gcs.bucket': unquote(match.group('bucket')), + 'gcs.object': unquote(match.group('object')) + }, + + ######################## + # HMAC keys operations # + ######################## + re.compile('^/projects/(?P[^/]+)/hmacKeys$'): lambda params, data, match: { + 'gcs.op': 'hmacKeys.list', + 'gcs.projectId': unquote(match.group('project')) + }, + re.compile('^/projects/(?P[^/]+)/hmacKeys/(?P[^/]+)$'): lambda params, data, match: { + 'gcs.op': 'hmacKeys.get', + 'gcs.projectId': unquote(match.group('project')), + 'gcs.accessId': unquote(match.group('accessId')) + }, + + ############################## + # Service account operations # + ############################## + re.compile('^/projects/(?P[^/]+)/serviceAccount$'): lambda params, data, match: { + 'gcs.op': 'serviceAccount.get', + 'gcs.projectId': unquote(match.group('project')) + } + }, + 'POST': { + ##################### + # Bucket operations # + ##################### + '/b': lambda params, data: { + 'gcs.op': 'buckets.insert', + 'gcs.projectId': params.get('project', None), + 'gcs.bucket': data.get('name', None), + }, + re.compile('^/b/(?P[^/]+)/lockRetentionPolicy$'): lambda params, data, match: { + 'gcs.op': 'buckets.lockRetentionPolicy', + 'gcs.bucket': unquote(match.group('bucket')), + }, + + ########################## + # Object/blob operations # + ########################## + re.compile('^/b/(?P[^/]+)/o/(?P[^/]+)/compose$'): lambda params, data, match: { + 'gcs.op': 'objects.compose', + 'gcs.destinationBucket': unquote(match.group('bucket')), + 'gcs.destinationObject': unquote(match.group('object')), + 'gcs.sourceObjects': ','.join( + ['%s/%s' % (unquote(match.group('bucket')), o['name']) for o in data.get('sourceObjects', []) if 'name' in o] + ) + }, + re.compile('^/b/(?P[^/]+)/o/(?P[^/]+)/copyTo/b/(?P[^/]+)/o/(?P[^/]+)$'): lambda params, data, match: { + 'gcs.op': 'objects.copy', + 'gcs.destinationBucket': unquote(match.group('destBucket')), + 'gcs.destinationObject': unquote(match.group('destObject')), + 'gcs.sourceBucket': unquote(match.group('srcBucket')), + 'gcs.sourceObject': unquote(match.group('srcObject')), + }, + re.compile('^/b/(?P[^/]+)/o$'): lambda params, data, match: { + 'gcs.op': 'objects.insert', + 'gcs.bucket': unquote(match.group('bucket')), + 'gcs.object': params.get('name', data.get('name', None)), + }, + re.compile('^/b/(?P[^/]+)/o/(?P[^/]+)/rewriteTo/b/(?P[^/]+)/o/(?P[^/]+)$'): lambda params, data, match: { + 'gcs.op': 'objects.rewrite', + 'gcs.destinationBucket': unquote(match.group('destBucket')), + 'gcs.destinationObject': unquote(match.group('destObject')), + 'gcs.sourceBucket': unquote(match.group('srcBucket')), + 'gcs.sourceObject': unquote(match.group('srcObject')), + }, + + ###################### + # Channel operations # + ###################### + '/channels/stop': lambda params, data: { + 'gcs.op': 'channels.stop', + 'gcs.entity': data.get('id', None) + }, + + ################################## + # Default object ACLs operations # + ################################## + re.compile('^/b/(?P[^/]+)/defaultObjectAcl$'): lambda params, data, match: { + 'gcs.op': 'defaultAcls.insert', + 'gcs.bucket': unquote(match.group('bucket')), + 'gcs.entity': data.get('entity', None) + }, + + ######################### + # Object ACL operations # + ######################### + re.compile('^/b/(?P[^/]+)/o/(?P[^/]+)/acl$'): lambda params, data, match: { + 'gcs.op': 'objectAcls.insert', + 'gcs.bucket': unquote(match.group('bucket')), + 'gcs.object': unquote(match.group('object')), + 'gcs.entity': data.get('entity', None) + }, + + ######################## + # HMAC keys operations # + ######################## + re.compile('^/projects/(?P[^/]+)/hmacKeys$'): lambda params, data, match: { + 'gcs.op': 'hmacKeys.create', + 'gcs.projectId': unquote(match.group('project')) + } + }, + 'PATCH': { + ##################### + # Bucket operations # + ##################### + re.compile('^/b/(?P[^/]+)$'): lambda params, data, match: { + 'gcs.op': 'buckets.patch', + 'gcs.bucket': unquote(match.group('bucket')), + }, + + ########################## + # Object/blob operations # + ########################## + re.compile('^/b/(?P[^/]+)/o/(?P[^/]+)$'): lambda params, data, match: { + 'gcs.op': 'objects.patch', + 'gcs.bucket': unquote(match.group('bucket')), + 'gcs.object': unquote(match.group('object')), + }, + + ################################## + # Default object ACLs operations # + ################################## + re.compile('^/b/(?P[^/]+)/defaultObjectAcl/(?P[^/]+)$'): lambda params, data, match: { + 'gcs.op': 'defaultAcls.patch', + 'gcs.bucket': unquote(match.group('bucket')), + 'gcs.entity': unquote(match.group('entity')) + }, + + ######################### + # Object ACL operations # + ######################### + re.compile('^/b/(?P[^/]+)/o/(?P[^/]+)/acl/(?P[^/]+)$'): lambda params, data, match: { + 'gcs.op': 'objectAcls.patch', + 'gcs.bucket': unquote(match.group('bucket')), + 'gcs.object': unquote(match.group('object')), + 'gcs.entity': unquote(match.group('entity')) + } + }, + 'PUT': { + ##################### + # Bucket operations # + ##################### + re.compile('^/b/(?P[^/]+)$'): lambda params, data, match: { + 'gcs.op': 'buckets.update', + 'gcs.bucket': unquote(match.group('bucket')), + }, + re.compile('^/b/(?P[^/]+)/iam$'): lambda params, data, match: { + 'gcs.op': 'buckets.setIamPolicy', + 'gcs.bucket': unquote(match.group('bucket')), + }, + + ########################## + # Object/blob operations # + ########################## + re.compile('^/b/(?P[^/]+)/o/(?P[^/]+)$'): lambda params, data, match: { + 'gcs.op': 'objects.update', + 'gcs.bucket': unquote(match.group('bucket')), + 'gcs.object': unquote(match.group('object')), + }, + + ################################## + # Default object ACLs operations # + ################################## + re.compile('^/b/(?P[^/]+)/defaultObjectAcl/(?P[^/]+)$'): lambda params, data, match: { + 'gcs.op': 'defaultAcls.update', + 'gcs.bucket': unquote(match.group('bucket')), + 'gcs.entity': unquote(match.group('entity')) + }, + + ######################### + # Object ACL operations # + ######################### + re.compile('^/b/(?P[^/]+)/o/(?P[^/]+)/acl/(?P[^/]+)$'): lambda params, data, match: { + 'gcs.op': 'objectAcls.update', + 'gcs.bucket': unquote(match.group('bucket')), + 'gcs.object': unquote(match.group('object')), + 'gcs.entity': unquote(match.group('entity')) + }, + + ######################## + # HMAC keys operations # + ######################## + re.compile('^/projects/(?P[^/]+)/hmacKeys/(?P[^/]+)$'): lambda params, data, match: { + 'gcs.op': 'hmacKeys.update', + 'gcs.projectId': unquote(match.group('project')), + 'gcs.accessId': unquote(match.group('accessId')) + } + }, + 'DELETE': { + ##################### + # Bucket operations # + ##################### + re.compile('^/b/(?P[^/]+)$'): lambda params, data, match: { + 'gcs.op': 'buckets.delete', + 'gcs.bucket': unquote(match.group('bucket')), + }, + + ########################## + # Object/blob operations # + ########################## + re.compile('^/b/(?P[^/]+)/o/(?P[^/]+)$'): lambda params, data, match: { + 'gcs.op': 'objects.delete', + 'gcs.bucket': unquote(match.group('bucket')), + 'gcs.object': unquote(match.group('object')), + }, + + ################################## + # Default object ACLs operations # + ################################## + re.compile('^/b/(?P[^/]+)/defaultObjectAcl/(?P[^/]+)$'): lambda params, data, match: { + 'gcs.op': 'defaultAcls.delete', + 'gcs.bucket': unquote(match.group('bucket')), + 'gcs.entity': unquote(match.group('entity')) + }, + + ######################### + # Object ACL operations # + ######################### + re.compile('^/b/(?P[^/]+)/o/(?P[^/]+)/acl/(?P[^/]+)$'): lambda params, data, match: { + 'gcs.op': 'objectAcls.delete', + 'gcs.bucket': unquote(match.group('bucket')), + 'gcs.object': unquote(match.group('object')), + 'gcs.entity': unquote(match.group('entity')) + }, + + ######################## + # HMAC keys operations # + ######################## + re.compile('^/projects/(?P[^/]+)/hmacKeys/(?P[^/]+)$'): lambda params, data, match: { + 'gcs.op': 'hmacKeys.delete', + 'gcs.projectId': unquote(match.group('project')), + 'gcs.accessId': unquote(match.group('accessId')) + } + } +} diff --git a/instana/instrumentation/google/cloud/storage.py b/instana/instrumentation/google/cloud/storage.py new file mode 100644 index 00000000..f3748893 --- /dev/null +++ b/instana/instrumentation/google/cloud/storage.py @@ -0,0 +1,155 @@ +from __future__ import absolute_import + +import wrapt +import re + +from ....log import logger +from ....singletons import tracer +from .collectors import _storage_api + +try: + from google.cloud import storage + + logger.debug('Instrumenting google-cloud-storage') + + def _collect_tags(api_request): + """ + Extract span tags from Google Cloud Storage API request. Returns None if the request is not + supported. + + :param: dict + :return: dict or None + """ + method, path = api_request.get('method', None), api_request.get('path', None) + + if method not in _storage_api: + return + + try: + params = api_request.get('query_params', {}) + data = api_request.get('data', {}) + + if path in _storage_api[method]: + # check is any of string keys matches the path exactly + return _storage_api[method][path](params, data) + else: + # look for a regex that matches the string + for (matcher, collect) in _storage_api[method].items(): + if not isinstance(matcher, re.Pattern): + continue + + m = matcher.match(path) + if m is None: + continue + + return collect(params, data, m) + except Exception: + logger.debug("instana.instrumentation.google.cloud.storage._collect_tags: ", exc_info=True) + + def execute_with_instana(wrapped, instance, args, kwargs): + # batch requests are traced with finish_batch_with_instana() + if isinstance(instance, storage.Batch): + return wrapped(*args, **kwargs) + + parent_span = tracer.active_span + + # return early if we're not tracing + if parent_span is None: + return wrapped(*args, **kwargs) + + tags = _collect_tags(kwargs) + + # don't trace if the call is not instrumented + if tags is None: + logger.debug('uninstrumented Google Cloud Storage API request: %s' % kwargs) + return wrapped(*args, **kwargs) + + with tracer.start_active_span('gcs', child_of=parent_span) as scope: + for (k, v) in tags.items(): + scope.span.set_tag(k, v) + + try: + kv = wrapped(*args, **kwargs) + except Exception as e: + scope.span.log_exception(e) + raise + else: + return kv + + def download_with_instana(wrapped, instance, args, kwargs): + parent_span = tracer.active_span + + # return early if we're not tracing + if parent_span is None: + return wrapped(*args, **kwargs) + + with tracer.start_active_span('gcs', child_of=parent_span) as scope: + scope.span.set_tag('gcs.op', 'objects.get') + scope.span.set_tag('gcs.bucket', instance.bucket.name) + scope.span.set_tag('gcs.object', instance.name) + + start = len(args) > 4 and args[4] or kwargs.get('start', None) + if start is None: + start = '' + + end = len(args) > 5 and args[5] or kwargs.get('end', None) + if end is None: + end = '' + + if start != '' or end != '': + scope.span.set_tag('gcs.range', '-'.join((start, end))) + + try: + kv = wrapped(*args, **kwargs) + except Exception as e: + scope.span.log_exception(e) + raise + else: + return kv + + def upload_with_instana(wrapped, instance, args, kwargs): + parent_span = tracer.active_span + + # return early if we're not tracing + if parent_span is None: + return wrapped(*args, **kwargs) + + with tracer.start_active_span('gcs', child_of=parent_span) as scope: + scope.span.set_tag('gcs.op', 'objects.insert') + scope.span.set_tag('gcs.bucket', instance.bucket.name) + scope.span.set_tag('gcs.object', instance.name) + + try: + kv = wrapped(*args, **kwargs) + except Exception as e: + scope.span.log_exception(e) + raise + else: + return kv + + def finish_batch_with_instana(wrapped, instance, args, kwargs): + parent_span = tracer.active_span + + # return early if we're not tracing + if parent_span is None: + return wrapped(*args, **kwargs) + + with tracer.start_active_span('gcs', child_of=parent_span) as scope: + scope.span.set_tag('gcs.op', 'batch') + scope.span.set_tag('gcs.projectId', instance._client.project) + scope.span.set_tag('gcs.numberOfOperations', len(instance._requests)) + + try: + kv = wrapped(*args, **kwargs) + except Exception as e: + scope.span.log_exception(e) + raise + else: + return kv + + wrapt.wrap_function_wrapper('google.cloud.storage._http', 'Connection.api_request', execute_with_instana) + wrapt.wrap_function_wrapper('google.cloud.storage.blob', 'Blob._do_download', download_with_instana) + wrapt.wrap_function_wrapper('google.cloud.storage.blob', 'Blob._do_upload', upload_with_instana) + wrapt.wrap_function_wrapper('google.cloud.storage.batch', 'Batch.finish', finish_batch_with_instana) +except ImportError: + pass diff --git a/instana/recorder.py b/instana/recorder.py index 437c1862..3034ae12 100644 --- a/instana/recorder.py +++ b/instana/recorder.py @@ -18,7 +18,7 @@ class StanRecorder(object): THREAD_NAME = "Instana Span Reporting" REGISTERED_SPANS = ("aiohttp-client", "aiohttp-server", "aws.lambda.entry", "cassandra", - "celery-client", "celery-worker", "couchbase", "django", "log", + "celery-client", "celery-worker", "couchbase", "django", "gcs", "log", "memcache", "mongo", "mysql", "postgres", "pymongo", "rabbitmq", "redis", "render", "rpc-client", "rpc-server", "sqlalchemy", "soap", "tornado-client", "tornado-server", "urllib3", "wsgi") diff --git a/instana/span.py b/instana/span.py index 2b5ac4cf..96d38881 100644 --- a/instana/span.py +++ b/instana/span.py @@ -232,7 +232,7 @@ class RegisteredSpan(BaseSpan): EXIT_SPANS = ("aiohttp-client", "cassandra", "celery-client", "couchbase", "log", "memcache", "mongo", "mysql", "postgres", "rabbitmq", "redis", "rpc-client", "sqlalchemy", - "soap", "tornado-client", "urllib3", "pymongo") + "soap", "tornado-client", "urllib3", "pymongo", "gcs") ENTRY_SPANS = ("aiohttp-server", "aws.lambda.entry", "celery-worker", "django", "wsgi", "rabbitmq", "rpc-server", "tornado-server") @@ -420,6 +420,21 @@ def _populate_exit_span_data(self, span): self.data["mongo"]["json"] = span.tags.pop('json', None) self.data["mongo"]["error"] = span.tags.pop('error', None) + elif span.operation_name == "gcs": + self.data["gcs"]["op"] = span.tags.pop('gcs.op') + self.data["gcs"]["bucket"] = span.tags.pop('gcs.bucket', None) + self.data["gcs"]["object"] = span.tags.pop('gcs.object', None) + self.data["gcs"]["entity"] = span.tags.pop('gcs.entity', None) + self.data["gcs"]["range"] = span.tags.pop('gcs.range', None) + self.data["gcs"]["sourceBucket"] = span.tags.pop('gcs.sourceBucket', None) + self.data["gcs"]["sourceObject"] = span.tags.pop('gcs.sourceObject', None) + self.data["gcs"]["sourceObjects"] = span.tags.pop('gcs.sourceObjects', None) + self.data["gcs"]["destinationBucket"] = span.tags.pop('gcs.destinationBucket', None) + self.data["gcs"]["destinationObject"] = span.tags.pop('gcs.destinationObject', None) + self.data["gcs"]["numberOfOperations"] = span.tags.pop('gcs.numberOfOperations', None) + self.data["gcs"]["projectId"] = span.tags.pop('gcs.projectId', None) + self.data["gcs"]["accessId"] = span.tags.pop('gcs.accessId', None) + elif span.operation_name == "log": # use last special key values for l in span.logs: diff --git a/setup.py b/setup.py index 3b80905d..78fece24 100644 --- a/setup.py +++ b/setup.py @@ -98,6 +98,7 @@ def check_setuptools(): 'nose>=1.0', 'flask>=0.12.2', 'grpcio>=1.18.0', + 'google-cloud-storage>=1.24.0;python_version>="3.5"', 'lxml>=3.4', 'mock>=2.0.0', 'mysqlclient>=1.3.14;python_version>="3.5"', diff --git a/tests/clients/test_google-cloud-storage.py b/tests/clients/test_google-cloud-storage.py new file mode 100644 index 00000000..c3ec3243 --- /dev/null +++ b/tests/clients/test_google-cloud-storage.py @@ -0,0 +1,940 @@ +from __future__ import absolute_import + +import sys +import unittest +import pytest +import json +import requests +import io + +from instana.singletons import tracer +from ..test_utils import _TraceContextMixin + +from mock import patch, Mock +from six.moves import http_client + +if sys.version_info[0] >= 3: + from google.cloud import storage + from google.api_core import iam + +@pytest.mark.skipif(sys.version_info[0] < 3, reason="google-cloud-storage has dropped support for Python 2") +class TestGoogleCloudStorage(unittest.TestCase, _TraceContextMixin): + def setUp(self): + self.recorder = tracer.recorder + self.recorder.clear_spans() + + @patch('requests.Session.request') + def test_buckets_list(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#buckets", "items": []}, + status_code=http_client.OK + ) + + client = self._client(project='test-project') + + with tracer.start_active_span('test'): + buckets = client.list_buckets() + self.assertEqual(0, self.recorder.queue_size(), msg='span has been created before the actual request') + + # trigger the iterator + for b in buckets: + pass + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('buckets.list', gcs_span.data["gcs"]["op"]) + self.assertEqual('test-project', gcs_span.data["gcs"]["projectId"]) + + @patch('requests.Session.request') + def test_buckets_insert(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#bucket"}, + status_code=http_client.OK + ) + + client = self._client(project='test-project') + + with tracer.start_active_span('test'): + client.create_bucket('test bucket') + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('buckets.insert', gcs_span.data["gcs"]["op"]) + self.assertEqual('test-project', gcs_span.data["gcs"]["projectId"]) + self.assertEqual('test bucket', gcs_span.data["gcs"]["bucket"]) + + @patch('requests.Session.request') + def test_buckets_get(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#bucket"}, + status_code=http_client.OK + ) + + client = self._client(project='test-project') + + with tracer.start_active_span('test'): + client.get_bucket('test bucket') + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertEqual(test_span.t, gcs_span.t) + self.assertEqual(test_span.s, gcs_span.p) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('buckets.get', gcs_span.data["gcs"]["op"]) + self.assertEqual('test bucket', gcs_span.data["gcs"]["bucket"]) + + @patch('requests.Session.request') + def test_buckets_patch(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#bucket"}, + status_code=http_client.OK + ) + + client = self._client(project='test-project') + + with tracer.start_active_span('test'): + client.bucket('test bucket').patch() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('buckets.patch', gcs_span.data["gcs"]["op"]) + self.assertEqual('test bucket', gcs_span.data["gcs"]["bucket"]) + + @patch('requests.Session.request') + def test_buckets_update(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#bucket"}, + status_code=http_client.OK + ) + + client = self._client(project='test-project') + + with tracer.start_active_span('test'): + client.bucket('test bucket').update() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('buckets.update', gcs_span.data["gcs"]["op"]) + self.assertEqual('test bucket', gcs_span.data["gcs"]["bucket"]) + + @patch('requests.Session.request') + def test_buckets_get_iam_policy(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#policy"}, + status_code=http_client.OK + ) + + client = self._client(project='test-project') + + with tracer.start_active_span('test'): + client.bucket('test bucket').get_iam_policy() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('buckets.getIamPolicy', gcs_span.data["gcs"]["op"]) + self.assertEqual('test bucket', gcs_span.data["gcs"]["bucket"]) + + @patch('requests.Session.request') + def test_buckets_set_iam_policy(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#policy"}, + status_code=http_client.OK + ) + + client = self._client(project='test-project') + + with tracer.start_active_span('test'): + client.bucket('test bucket').set_iam_policy(iam.Policy()) + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('buckets.setIamPolicy', gcs_span.data["gcs"]["op"]) + self.assertEqual('test bucket', gcs_span.data["gcs"]["bucket"]) + + @patch('requests.Session.request') + def test_buckets_test_iam_permissions(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#testIamPermissionsResponse"}, + status_code=http_client.OK + ) + + client = self._client(project='test-project') + + with tracer.start_active_span('test'): + client.bucket('test bucket').test_iam_permissions('test-permission') + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('buckets.testIamPermissions', gcs_span.data["gcs"]["op"]) + self.assertEqual('test bucket', gcs_span.data["gcs"]["bucket"]) + + @patch('requests.Session.request') + def test_buckets_lock_retention_policy(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#bucket", "metageneration": 1, "retentionPolicy": {"isLocked": False}}, + status_code=http_client.OK + ) + + client = self._client(project='test-project') + + bucket = client.bucket('test bucket') + bucket.reload() + + with tracer.start_active_span('test'): + bucket.lock_retention_policy() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('buckets.lockRetentionPolicy', gcs_span.data["gcs"]["op"]) + self.assertEqual('test bucket', gcs_span.data["gcs"]["bucket"]) + + @patch('requests.Session.request') + def test_buckets_delete(self, mock_requests): + mock_requests.return_value = self._mock_response() + + client = self._client(project='test-project') + + with tracer.start_active_span('test'): + client.bucket('test bucket').delete() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('buckets.delete', gcs_span.data["gcs"]["op"]) + self.assertEqual('test bucket', gcs_span.data["gcs"]["bucket"]) + + @patch('requests.Session.request') + def test_objects_compose(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#object"}, + status_code=http_client.OK + ) + + client = self._client(project='test-project') + + with tracer.start_active_span('test'): + client.bucket('test bucket').blob('dest object').compose([ + storage.blob.Blob('object 1', 'test bucket'), + storage.blob.Blob('object 2', 'test bucket') + ]) + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('objects.compose', gcs_span.data["gcs"]["op"]) + self.assertEqual('test bucket', gcs_span.data["gcs"]["destinationBucket"]) + self.assertEqual('dest object', gcs_span.data["gcs"]["destinationObject"]) + self.assertEqual('test bucket/object 1,test bucket/object 2', gcs_span.data["gcs"]["sourceObjects"]) + + @patch('requests.Session.request') + def test_objects_copy(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#object"}, + status_code=http_client.OK + ) + + client = self._client(project='test-project') + bucket = client.bucket('src bucket') + + with tracer.start_active_span('test'): + bucket.copy_blob( + bucket.blob('src object'), + client.bucket('dest bucket'), + new_name='dest object' + ) + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('objects.copy', gcs_span.data["gcs"]["op"]) + self.assertEqual('dest bucket', gcs_span.data["gcs"]["destinationBucket"]) + self.assertEqual('dest object', gcs_span.data["gcs"]["destinationObject"]) + self.assertEqual('src bucket', gcs_span.data["gcs"]["sourceBucket"]) + self.assertEqual('src object', gcs_span.data["gcs"]["sourceObject"]) + + @patch('requests.Session.request') + def test_objects_delete(self, mock_requests): + mock_requests.return_value = self._mock_response() + + client = self._client(project='test-project') + + with tracer.start_active_span('test'): + client.bucket('test bucket').blob('test object').delete() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('objects.delete', gcs_span.data["gcs"]["op"]) + self.assertEqual('test bucket', gcs_span.data["gcs"]["bucket"]) + self.assertEqual('test object', gcs_span.data["gcs"]["object"]) + + @patch('requests.Session.request') + def test_objects_attrs(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#object"}, + status_code=http_client.OK + ) + + client = self._client(project='test-project') + + with tracer.start_active_span('test'): + client.bucket('test bucket').blob('test object').exists() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('objects.attrs', gcs_span.data["gcs"]["op"]) + self.assertEqual('test bucket', gcs_span.data["gcs"]["bucket"]) + self.assertEqual('test object', gcs_span.data["gcs"]["object"]) + + @patch('requests.Session.request') + def test_objects_get(self, mock_requests): + mock_requests.return_value = self._mock_response( + content=b'CONTENT', + status_code=http_client.OK + ) + + client = self._client(project='test-project') + + with tracer.start_active_span('test'): + client.bucket('test bucket').blob('test object').download_to_file( + io.BytesIO(), + raw_download=True + ) + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('objects.get', gcs_span.data["gcs"]["op"]) + self.assertEqual('test bucket', gcs_span.data["gcs"]["bucket"]) + self.assertEqual('test object', gcs_span.data["gcs"]["object"]) + + @patch('requests.Session.request') + def test_objects_insert(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#object"}, + status_code=http_client.OK + ) + + client = self._client(project='test-project') + + with tracer.start_active_span('test'): + client.bucket('test bucket').blob('test object').upload_from_string('CONTENT') + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('objects.insert', gcs_span.data["gcs"]["op"]) + self.assertEqual('test bucket', gcs_span.data["gcs"]["bucket"]) + self.assertEqual('test object', gcs_span.data["gcs"]["object"]) + + @patch('requests.Session.request') + def test_objects_list(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#object"}, + status_code=http_client.OK + ) + + client = self._client(project='test-project') + + with tracer.start_active_span('test'): + blobs = client.bucket('test bucket').list_blobs() + self.assertEqual(0, self.recorder.queue_size(), msg='span has been created before the actual request') + + for b in blobs: pass + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('objects.list', gcs_span.data["gcs"]["op"]) + self.assertEqual('test bucket', gcs_span.data["gcs"]["bucket"]) + + @patch('requests.Session.request') + def test_objects_patch(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#object"}, + status_code=http_client.OK + ) + + client = self._client(project='test-project') + + with tracer.start_active_span('test'): + client.bucket('test bucket').blob('test object').patch() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('objects.patch', gcs_span.data["gcs"]["op"]) + self.assertEqual('test bucket', gcs_span.data["gcs"]["bucket"]) + self.assertEqual('test object', gcs_span.data["gcs"]["object"]) + + @patch('requests.Session.request') + def test_objects_rewrite(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#rewriteResponse", "totalBytesRewritten": 0, "objectSize": 0, "done": True, "resource": {}}, + status_code=http_client.OK + ) + + client = self._client(project='test-project') + + with tracer.start_active_span('test'): + client.bucket('dest bucket').blob('dest object').rewrite( + client.bucket('src bucket').blob('src object') + ) + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('objects.rewrite', gcs_span.data["gcs"]["op"]) + self.assertEqual('dest bucket', gcs_span.data["gcs"]["destinationBucket"]) + self.assertEqual('dest object', gcs_span.data["gcs"]["destinationObject"]) + self.assertEqual('src bucket', gcs_span.data["gcs"]["sourceBucket"]) + self.assertEqual('src object', gcs_span.data["gcs"]["sourceObject"]) + + @patch('requests.Session.request') + def test_objects_update(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#object"}, + status_code=http_client.OK + ) + + client = self._client(project='test-project') + + with tracer.start_active_span('test'): + client.bucket('test bucket').blob('test object').update() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('objects.update', gcs_span.data["gcs"]["op"]) + self.assertEqual('test bucket', gcs_span.data["gcs"]["bucket"]) + self.assertEqual('test object', gcs_span.data["gcs"]["object"]) + + @patch('requests.Session.request') + def test_default_acls_list(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#objectAccessControls", "items": []}, + status_code=http_client.OK + ) + + client = self._client(project='test-project') + + with tracer.start_active_span('test'): + client.bucket('test bucket').default_object_acl.get_entities() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('defaultAcls.list', gcs_span.data["gcs"]["op"]) + self.assertEqual('test bucket', gcs_span.data["gcs"]["bucket"]) + + @patch('requests.Session.request') + def test_object_acls_list(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#objectAccessControls", "items": []}, + status_code=http_client.OK + ) + + client = self._client(project='test-project') + + with tracer.start_active_span('test'): + client.bucket('test bucket').blob('test object').acl.get_entities() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('objectAcls.list', gcs_span.data["gcs"]["op"]) + self.assertEqual('test bucket', gcs_span.data["gcs"]["bucket"]) + self.assertEqual('test object', gcs_span.data["gcs"]["object"]) + + @patch('requests.Session.request') + def test_object_hmac_keys_create(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#hmacKey", "metadata": {}, "secret": ""}, + status_code=http_client.OK + ) + + client = self._client(project='test-project') + + with tracer.start_active_span('test'): + client.create_hmac_key('test@example.com') + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('hmacKeys.create', gcs_span.data["gcs"]["op"]) + self.assertEqual('test-project', gcs_span.data["gcs"]["projectId"]) + + @patch('requests.Session.request') + def test_object_hmac_keys_delete(self, mock_requests): + mock_requests.return_value = self._mock_response() + + client = self._client(project='test-project') + + with tracer.start_active_span('test'): + key = storage.hmac_key.HMACKeyMetadata(client, access_id='test key') + key.state = storage.hmac_key.HMACKeyMetadata.INACTIVE_STATE + key.delete() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('hmacKeys.delete', gcs_span.data["gcs"]["op"]) + self.assertEqual('test-project', gcs_span.data["gcs"]["projectId"]) + self.assertEqual('test key', gcs_span.data["gcs"]["accessId"]) + + @patch('requests.Session.request') + def test_object_hmac_keys_get(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#hmacKey", "metadata": {}, "secret": ""}, + status_code=http_client.OK + ) + + client = self._client(project='test-project') + + with tracer.start_active_span('test'): + storage.hmac_key.HMACKeyMetadata(client, access_id='test key').exists() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('hmacKeys.get', gcs_span.data["gcs"]["op"]) + self.assertEqual('test-project', gcs_span.data["gcs"]["projectId"]) + self.assertEqual('test key', gcs_span.data["gcs"]["accessId"]) + + @patch('requests.Session.request') + def test_object_hmac_keys_list(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#hmacKeysMetadata", "items": []}, + status_code=http_client.OK + ) + + client = self._client(project='test-project') + + with tracer.start_active_span('test'): + keys = client.list_hmac_keys() + self.assertEqual(0, self.recorder.queue_size(), msg='span has been created before the actual request') + + for k in keys: pass + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('hmacKeys.list', gcs_span.data["gcs"]["op"]) + self.assertEqual('test-project', gcs_span.data["gcs"]["projectId"]) + + @patch('requests.Session.request') + def test_object_hmac_keys_update(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"kind": "storage#hmacKey", "metadata": {}, "secret": ""}, + status_code=http_client.OK + ) + + client = self._client(project='test-project') + + with tracer.start_active_span('test'): + storage.hmac_key.HMACKeyMetadata(client, access_id='test key').update() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('hmacKeys.update', gcs_span.data["gcs"]["op"]) + self.assertEqual('test-project', gcs_span.data["gcs"]["projectId"]) + self.assertEqual('test key', gcs_span.data["gcs"]["accessId"]) + + @patch('requests.Session.request') + def test_object_hmac_keys_update(self, mock_requests): + mock_requests.return_value = self._mock_response( + json_content={"email_address": "test@example.com", "kind": "storage#serviceAccount"}, + status_code=http_client.OK + ) + + client = self._client(project='test-project') + + with tracer.start_active_span('test'): + client.get_service_account_email() + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + self.assertIsNone(tracer.active_span) + + gcs_span = spans[0] + test_span = spans[1] + + self.assertTraceContextPropagated(test_span, gcs_span) + + self.assertEqual('gcs',gcs_span.n) + self.assertEqual(2, gcs_span.k) + self.assertIsNone(gcs_span.ec) + + self.assertEqual('serviceAccount.get', gcs_span.data["gcs"]["op"]) + self.assertEqual('test-project', gcs_span.data["gcs"]["projectId"]) + + @patch('requests.Session.request') + def test_batch_operation(self, mock_requests): + mock_requests.return_value = self._mock_response( + _TWO_PART_BATCH_RESPONSE, + status_code=http_client.OK, + headers={"content-type": 'multipart/mixed; boundary="DEADBEEF="'} + ) + + client = self._client(project='test-project') + bucket = client.bucket('test-bucket') + + with tracer.start_active_span('test'): + with client.batch(): + for obj in ['obj1', 'obj2']: + bucket.delete_blob(obj) + + spans = self.recorder.queued_spans() + + self.assertEqual(2, len(spans)) + + def _client(self, *args, **kwargs): + # override the HTTP client to bypass the authorization + kwargs['_http'] = kwargs.get('_http', requests.Session()) + + return storage.Client(*args, **kwargs) + + def _mock_response(self, content=b'', status_code=http_client.NO_CONTENT, json_content=None, headers={}): + resp = Mock() + resp.status_code = status_code + resp.headers = headers + resp.content = content + resp.__enter__ = Mock(return_value=resp) + resp.__exit__ = Mock() + + if json_content is not None: + if resp.content == b'': + resp.content = json.dumps(json_content) + + resp.json = Mock(return_value=json_content) + + return resp + +_TWO_PART_BATCH_RESPONSE = b"""\ +--DEADBEEF= +Content-Type: application/json +Content-ID: + +HTTP/1.1 204 No Content + +Content-Type: application/json; charset=UTF-8 +Content-Length: 0 + +--DEADBEEF= +Content-Type: application/json +Content-ID: + +HTTP/1.1 204 No Content + +Content-Type: application/json; charset=UTF-8 +Content-Length: 0 + +--DEADBEEF=-- +""" diff --git a/tests/test_utils.py b/tests/test_utils.py index 6c5539cd..1f3b61ec 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -22,3 +22,8 @@ def test_validate_url(): assert(validate_url("http:boligrafo") is False) assert(validate_url(None) is False) +class _TraceContextMixin: + def assertTraceContextPropagated(self, parent_span, child_span): + self.assertEqual(parent_span.t, child_span.t) + self.assertEqual(parent_span.s, child_span.p) + self.assertNotEqual(parent_span.s, child_span.s)