diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 5822a23aa9..3e32b6b6f3 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -95,7 +95,6 @@ GraphSON3Serializer) from cassandra.datastax.graph.query import _request_timeout_key, _GraphSONContextRowFactory from cassandra.datastax import cloud as dscloud -from cassandra.scylla.cloud import CloudConfiguration from cassandra.application_info import ApplicationInfoBase try: @@ -1238,23 +1237,7 @@ def __init__(self, self.connection_class = connection_class if scylla_cloud is not None: - if contact_points is not _NOT_SET or ssl_context or ssl_options: - raise ValueError("contact_points, ssl_context, and ssl_options " - "cannot be specified with a scylla cloud configuration") - if shard_aware_options and not shard_aware_options.disable_shardaware_port: - raise ValueError("shard_aware_options.disable_shardaware_port=False " - "cannot be specified with a scylla cloud configuration") - uses_twisted = TwistedConnection and issubclass(self.connection_class, TwistedConnection) - uses_eventlet = EventletConnection and issubclass(self.connection_class, EventletConnection) - - scylla_cloud_config = CloudConfiguration.create(scylla_cloud, pyopenssl=uses_twisted or uses_eventlet, - endpoint_factory=endpoint_factory) - ssl_context = scylla_cloud_config.ssl_context - endpoint_factory = scylla_cloud_config.endpoint_factory - contact_points = scylla_cloud_config.contact_points - ssl_options = scylla_cloud_config.ssl_options - auth_provider = scylla_cloud_config.auth_provider - shard_aware_options = ShardAwareOptions(shard_aware_options, disable_shardaware_port=True) + raise NotImplementedError("scylla_cloud was removed and not supported anymore") if cloud is not None: self.cloud = cloud diff --git a/cassandra/scylla/cloud.py b/cassandra/scylla/cloud.py deleted file mode 100644 index c3298b199a..0000000000 --- a/cassandra/scylla/cloud.py +++ /dev/null @@ -1,139 +0,0 @@ -# Copyright ScyllaDB, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import sys -import ssl -import tempfile -import base64 -from ssl import SSLContext -from contextlib import contextmanager -from itertools import islice - -import yaml - -from cassandra.connection import SniEndPointFactory -from cassandra.auth import AuthProvider, PlainTextAuthProvider - - -@contextmanager -def file_or_memory(path=None, data=None): - # since we can't read keys/cert from memory yet - # see https://github.com/python/cpython/pull/2449 which isn't accepted and PEP-543 that was withdrawn - # so we use temporary file to load the key - if data: - with tempfile.NamedTemporaryFile(mode="wb") as f: - d = base64.b64decode(data) - f.write(d) - if not d.endswith(b"\n"): - f.write(b"\n") - - f.flush() - yield f.name - - if path: - yield path - - -def nth(iterable, n, default=None): - "Returns the nth item or a default value" - return next(islice(iterable, n, None), default) - - -class CloudConfiguration: - endpoint_factory: SniEndPointFactory - contact_points: list - auth_provider: AuthProvider = None - ssl_options: dict - ssl_context: SSLContext - skip_tls_verify: bool - - def __init__(self, configuration_file, pyopenssl=False, endpoint_factory=None): - cloud_config = yaml.safe_load(open(configuration_file)) - - self.current_context = cloud_config['contexts'][cloud_config['currentContext']] - self.data_centers = cloud_config['datacenters'] - self.current_data_center = self.data_centers[self.current_context['datacenterName']] - self.auth_info = cloud_config['authInfos'][self.current_context['authInfoName']] - self.ssl_options = {} - self.skip_tls_verify = self.current_data_center.get('insecureSkipTlsVerify', False) - self.ssl_context = self.create_pyopenssl_context() if pyopenssl else self.create_ssl_context() - - proxy_address, port, node_domain = self.get_server(self.current_data_center) - - if not endpoint_factory: - endpoint_factory = SniEndPointFactory(proxy_address, port=int(port), node_domain=node_domain) - else: - assert isinstance(endpoint_factory, SniEndPointFactory) - self.endpoint_factory = endpoint_factory - - username, password = self.auth_info.get('username'), self.auth_info.get('password') - if username and password: - self.auth_provider = PlainTextAuthProvider(username, password) - - @property - def contact_points(self): - _contact_points = [] - for data_center in self.data_centers.values(): - _, _, node_domain = self.get_server(data_center) - _contact_points.append(self.endpoint_factory.create_from_sni(node_domain)) - return _contact_points - - def get_server(self, data_center): - address = data_center.get('server') - address = address.split(":") - port = nth(address, 1, default=9142) - address = nth(address, 0) - node_domain = data_center.get('nodeDomain') - assert address and port and node_domain, "server or nodeDomain are missing" - return address, port, node_domain - - def create_ssl_context(self): - ssl_context = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_CLIENT) - ssl_context.verify_mode = ssl.CERT_NONE if self.skip_tls_verify else ssl.CERT_REQUIRED - for data_center in self.data_centers.values(): - with file_or_memory(path=data_center.get('certificateAuthorityPath'), - data=data_center.get('certificateAuthorityData')) as cafile: - ssl_context.load_verify_locations(cadata=open(cafile).read()) - with file_or_memory(path=self.auth_info.get('clientCertificatePath'), - data=self.auth_info.get('clientCertificateData')) as certfile, \ - file_or_memory(path=self.auth_info.get('clientKeyPath'), data=self.auth_info.get('clientKeyData')) as keyfile: - ssl_context.load_cert_chain(keyfile=keyfile, - certfile=certfile) - - return ssl_context - - def create_pyopenssl_context(self): - try: - from OpenSSL import SSL - except ImportError as e: - raise ImportError( - "PyOpenSSL must be installed to connect to scylla-cloud with the Eventlet or Twisted event loops") \ - .with_traceback(e.__traceback__) - ssl_context = SSL.Context(SSL.TLS_METHOD) - ssl_context.set_verify(SSL.VERIFY_PEER, callback=lambda _1, _2, _3, _4, ok: True if self.skip_tls_verify else ok) - for data_center in self.data_centers.values(): - with file_or_memory(path=data_center.get('certificateAuthorityPath'), - data=data_center.get('certificateAuthorityData')) as cafile: - ssl_context.load_verify_locations(cafile) - with file_or_memory(path=self.auth_info.get('clientCertificatePath'), - data=self.auth_info.get('clientCertificateData')) as certfile, \ - file_or_memory(path=self.auth_info.get('clientKeyPath'), data=self.auth_info.get('clientKeyData')) as keyfile: - ssl_context.use_privatekey_file(keyfile) - ssl_context.use_certificate_file(certfile) - - return ssl_context - - @classmethod - def create(cls, configuration_file, pyopenssl=False, endpoint_factory=None): - return cls(configuration_file, pyopenssl=pyopenssl, endpoint_factory=endpoint_factory) diff --git a/tests/integration/standard/test_scylla_cloud.py b/tests/integration/standard/test_scylla_cloud.py deleted file mode 100644 index 5679d959bb..0000000000 --- a/tests/integration/standard/test_scylla_cloud.py +++ /dev/null @@ -1,129 +0,0 @@ -import json -import logging -import os.path -from unittest import TestCase -from ccmlib.utils.ssl_utils import generate_ssl_stores -from ccmlib.utils.sni_proxy import refresh_certs, start_sni_proxy, create_cloud_config, NodeInfo - -from cassandra import DependencyException -from cassandra.policies import TokenAwarePolicy, RoundRobinPolicy, ConstantReconnectionPolicy -from tests.integration import use_cluster, PROTOCOL_VERSION -from cassandra.cluster import Cluster, TwistedConnection - - -supported_connection_classes = [TwistedConnection] - -try: - from cassandra.io.libevreactor import LibevConnection - supported_connection_classes += [LibevConnection] -except DependencyException: - pass - - -try: - from cassandra.io.asyncorereactor import AsyncoreConnection - supported_connection_classes += [AsyncoreConnection] -except DependencyException: - pass - -#from cassandra.io.geventreactor import GeventConnection -#from cassandra.io.eventletreactor import EventletConnection -#from cassandra.io.asyncioreactor import AsyncioConnection - -# need to run them with specific configuration like `gevent.monkey.patch_all()` or under async functions -# unsupported_connection_classes = [GeventConnection, AsyncioConnection, EventletConnection] -LOGGER = logging.getLogger(__name__) - - -def get_cluster_info(cluster, port=9142): - session = Cluster( - contact_points=list(map(lambda node: node.address(), cluster.nodelist())), protocol_version=PROTOCOL_VERSION, - load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()), - reconnection_policy=ConstantReconnectionPolicy(5) - ).connect() - - nodes_info = [] - - for row in session.execute('select host_id, broadcast_address, data_center from system.local'): - if row[0] and row[1]: - nodes_info.append(NodeInfo(address=row[1], - port=port, - host_id=row[0], - data_center=row[2])) - - for row in session.execute('select host_id, broadcast_address, data_center from system.local'): - nodes_info.append(NodeInfo(address=row[1], - port=port, - host_id=row[0], - data_center=row[2])) - - return nodes_info - - -class ScyllaCloudConfigTests(TestCase): - def start_cluster_with_proxy(self): - ccm_cluster = self.ccm_cluster - generate_ssl_stores(ccm_cluster.get_path()) - ssl_port = 9142 - sni_port = 443 - ccm_cluster.set_configuration_options(dict( - client_encryption_options= - dict(require_client_auth=True, - truststore=os.path.join(ccm_cluster.get_path(), 'ccm_node.cer'), - certificate=os.path.join(ccm_cluster.get_path(), 'ccm_node.pem'), - keyfile=os.path.join(ccm_cluster.get_path(), 'ccm_node.key'), - enabled=True), - native_transport_port_ssl=ssl_port)) - - ccm_cluster._update_config() - - ccm_cluster.start(wait_for_binary_proto=True, wait_other_notice=True) - - nodes_info = get_cluster_info(ccm_cluster, port=ssl_port) - refresh_certs(ccm_cluster, nodes_info) - - docker_id, listen_address, listen_port = \ - start_sni_proxy(ccm_cluster.get_path(), nodes_info=nodes_info, listen_port=sni_port) - ccm_cluster.sni_proxy_docker_ids = [docker_id] - ccm_cluster.sni_proxy_listen_port = listen_port - ccm_cluster._update_config() - - config_data_yaml, config_path_yaml = create_cloud_config(ccm_cluster.get_path(), - port=listen_port, address=listen_address, - nodes_info=nodes_info) - return config_data_yaml, config_path_yaml - - def test_1_node_cluster(self): - self.ccm_cluster = use_cluster("sni_proxy", [1], start=False) - config_data_yaml, config_path_yaml = self.start_cluster_with_proxy() - - for config in [config_path_yaml, config_data_yaml]: - for connection_class in supported_connection_classes: - logging.warning('testing with class: %s', connection_class.__name__) - cluster = Cluster(scylla_cloud=config, connection_class=connection_class) - try: - with cluster.connect() as session: - res = session.execute("SELECT * FROM system.local WHERE key='local'") - assert res.all() - - assert len(cluster.metadata._hosts) == 1 - assert len(cluster.metadata._host_id_by_endpoint) == 1 - finally: - cluster.shutdown() - - def test_3_node_cluster(self): - self.ccm_cluster = use_cluster("sni_proxy", [3], start=False) - config_data_yaml, config_path_yaml = self.start_cluster_with_proxy() - - for config in [config_path_yaml, config_data_yaml]: - for connection_class in supported_connection_classes: - logging.warning('testing with class: %s', connection_class.__name__) - cluster = Cluster(scylla_cloud=config, connection_class=connection_class) - try: - with cluster.connect() as session: - res = session.execute("SELECT * FROM system.local WHERE key='local'") - assert res.all() - assert len(cluster.metadata._hosts) == 3 - assert len(cluster.metadata._host_id_by_endpoint) == 3 - finally: - cluster.shutdown()