From 2798840d035aec5623df6a8e88212cde27ccfd82 Mon Sep 17 00:00:00 2001 From: Yannis Mantzouratos Date: Wed, 28 Aug 2019 14:51:29 -0700 Subject: [PATCH 1/2] FIX support multiple sparkly sessions in tests Allow a developer to employ a variety of sparkly sessions during testing, each of which might have possibly different package requirements. To achieve this: 1. explicitly shut down the gateway and the underlying JVM before starting a new session. 2. make sure SparklySession cleans up the environment so that the next session has access to the same exact env and not to the modifed PYSPARK_SUBMIT_ARGS. Before, we were just stopping the spark session / context, which doesn't have any effect on the gateway / JVM, and as a result, new jars were not being installed. --- CHANGELOG.md | 4 ++ sparkly/__init__.py | 2 +- sparkly/session.py | 5 ++ sparkly/testing.py | 31 +++++++++ tests/integration/test_testing.py | 101 ++++++++++++++++++++++++++++++ tests/unit/test_session.py | 14 +++++ 6 files changed, 156 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 88a00b3..dcc1b33 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 2.8.1 +* Fix support for using multiple sparkly sessions during tests +* SparklySession does not persist modifications to os.environ + ## 2.8.0 * Extend `SparklyCatalog` to work with database properties: - `spark.catalog_ext.set_database_property` diff --git a/sparkly/__init__.py b/sparkly/__init__.py index e292edb..c2143bd 100644 --- a/sparkly/__init__.py +++ b/sparkly/__init__.py @@ -19,4 +19,4 @@ assert SparklySession -__version__ = '2.8.0' +__version__ = '2.8.1' diff --git a/sparkly/session.py b/sparkly/session.py index 3530b30..cdabfa5 100755 --- a/sparkly/session.py +++ b/sparkly/session.py @@ -14,6 +14,7 @@ # limitations under the License. # +from copy import deepcopy import os import signal import sys @@ -77,8 +78,10 @@ class MySession(sparkly.SparklySession): repositories = [] _instantiated_session = None + _original_environment = None def __init__(self, additional_options=None): + SparklySession._original_environment = deepcopy(os.environ) os.environ['PYSPARK_PYTHON'] = sys.executable submit_args = [ @@ -138,6 +141,8 @@ def stop(cls): if SparklySession._instantiated_session is not None: SparkSession.stop(SparklySession._instantiated_session) SparklySession._instantiated_session = None + os.environ = SparklySession._original_environment + SparklySession._original_environment = None @property def builder(self): diff --git a/sparkly/testing.py b/sparkly/testing.py index 02d8063..8a8db17 100644 --- a/sparkly/testing.py +++ b/sparkly/testing.py @@ -27,12 +27,14 @@ import os import pprint import shutil +import signal import sys import tempfile from unittest import TestCase from unittest.util import safe_repr import warnings +from pyspark.context import SparkContext from pyspark.sql import types as T import six @@ -74,6 +76,32 @@ _test_session_cache = None +def _ensure_gateway_is_down(): + # Apparently, the gateway and underlying JVM stay alive between different + # invocations of SparkContext, even when the context is explicitly stopped. + # This makes it impossible to have multiple SparklySessions for testing, + # with different JAR requirements etc; once the first one initializes the + # gateway / JVM, the other ones just re-use the existing gateway. So we have + # to kill it explicitly here. + if not SparkContext._gateway: + return + + jvm_pid = int( + # Get the still active JVM + SparkContext._gateway.jvm + # Extract its process name (pid@hostname) + .java.lang.management.ManagementFactory.getRuntimeMXBean().getName() + # And keep the pid (yeah, unfortunately there's no easier way to + # get it in Java 8...) + .split('@')[0] + ) + SparkContext._gateway.shutdown() + SparkContext._gateway = None + os.kill(jvm_pid, signal.SIGKILL) + os.environ.pop('PYSPARK_GATEWAY_PORT', None) + os.environ.pop('PYSPARK_GATEWAY_SECRET', None) + + class SparklyTest(TestCase): """Base test for spark scrip tests. @@ -122,6 +150,7 @@ def _init_session(cls): logger.info('Found a global session, stopping it %r', _test_session_cache) _test_session_cache.stop() _test_session_cache = None + _ensure_gateway_is_down() cls.spark = cls.setup_session() @@ -151,6 +180,7 @@ def setUpClass(cls): @classmethod def tearDownClass(cls): cls.spark.stop() + _ensure_gateway_is_down() super(SparklyTest, cls).tearDownClass() for fixture in cls.class_fixtures: @@ -505,6 +535,7 @@ def _init_session(cls): if _test_session_cache: logger.info('Stopping the previous global session %r', _test_session_cache) _test_session_cache.stop() + _ensure_gateway_is_down() logger.info('Starting the new global session for %r', cls.session) spark = _test_session_cache = cls.setup_session() diff --git a/tests/integration/test_testing.py b/tests/integration/test_testing.py index e363273..e86875d 100644 --- a/tests/integration/test_testing.py +++ b/tests/integration/test_testing.py @@ -18,11 +18,13 @@ import pickle import unittest +from sparkly.session import SparklySession from sparkly.testing import ( CassandraFixture, ElasticFixture, MysqlFixture, SparklyGlobalSessionTest, + SparklyTest, KafkaFixture, KafkaWatcher) from sparkly.utils import absolute_path @@ -226,3 +228,102 @@ def write_data(self, df, host, topic, port): producer.send(topic, key=row.key, value=row.value) producer.flush() return len(rows) + + +class TestSwitchingBetweenTestSessions(unittest.TestCase): + # Test whether a user can switch between different sessions + # during tests + + def test_switch_session_between_sparkly_tests(self): + # Define a test session with an ES6 dependency + class SessionA(SparklySession): + packages = [ + 'org.elasticsearch:elasticsearch-spark-20_2.11:6.5.4', + ] + + repositories = [ + 'http://packages.confluent.io/maven/', + ] + + class TestSessionA(SparklyTest): + session = SessionA + + # Define a test session with an ES7 dependency + class SessionB(SparklySession): + packages = [ + 'org.elasticsearch:elasticsearch-spark-20_2.11:7.3.1', + ] + + repositories = [ + 'http://packages.confluent.io/maven/', + ] + + class TestSessionB(SparklyTest): + session = SessionB + + # Make sure that when the ES6 session is set up, the underlying + # spark session contains the appropriate jars + TestSessionA.setUpClass() + expected_jars = [ + 'file:///root/.ivy2/jars/org.elasticsearch_elasticsearch-spark-20_2.11-6.5.4.jar', + ] + installed_jars = list(TestSessionA.spark._jsc.jars()) + self.assertEqual(installed_jars, expected_jars) + TestSessionA.tearDownClass() + + # And now make sure that when the ES7 session is set up, the underlying + # spark session contains the appropriate jars as well + TestSessionB.setUpClass() + expected_jars = [ + 'file:///root/.ivy2/jars/org.elasticsearch_elasticsearch-spark-20_2.11-7.3.1.jar', + ] + installed_jars = list(TestSessionB.spark._jsc.jars()) + self.assertEqual(installed_jars, expected_jars) + TestSessionB.tearDownClass() + + def test_switch_global_session_between_sparkly_tests(self): + # Define a test session with an ES6 dependency + class SessionA(SparklySession): + packages = [ + 'org.elasticsearch:elasticsearch-spark-20_2.11:6.5.4', + ] + + repositories = [ + 'http://packages.confluent.io/maven/', + ] + + class TestSessionA(SparklyGlobalSessionTest): + session = SessionA + + # Define a test session with an ES7 dependency + class SessionB(SparklySession): + packages = [ + 'org.elasticsearch:elasticsearch-spark-20_2.11:7.3.1', + ] + + repositories = [ + 'http://packages.confluent.io/maven/', + ] + + class TestSessionB(SparklyGlobalSessionTest): + session = SessionB + + # Make sure that when the ES6 session is set up, the underlying + # spark session contains the appropriate jars + TestSessionA.setUpClass() + expected_jars = [ + 'file:///root/.ivy2/jars/org.elasticsearch_elasticsearch-spark-20_2.11-6.5.4.jar', + ] + installed_jars = list(TestSessionA.spark._jsc.jars()) + self.assertEqual(installed_jars, expected_jars) + TestSessionA.tearDownClass() + + # And now make sure that when the ES7 session is set up, the underlying + # spark session contains the appropriate jars as well + TestSessionB.setUpClass() + expected_jars = [ + 'file:///root/.ivy2/jars/org.elasticsearch_elasticsearch-spark-20_2.11-7.3.1.jar', + ] + installed_jars = list(TestSessionB.spark._jsc.jars()) + self.assertEqual(installed_jars, expected_jars) + TestSessionB.tearDownClass() diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index 5c14639..9aafcf3 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -263,3 +263,17 @@ class _Session(SparklySession): original_session = _Session() retrieved_session = SparklySession.get_or_create() self.assertEqual(id(retrieved_session), id(original_session)) + + @mock.patch('sparkly.session.os') + @mock.patch('sparkly.session.SparkSession') + def test_stop_restores_the_environment(self, spark_session_mock, os_mock): + os_mock.environ = { + 'PYSPARK_SUBMIT_ARGS': '--conf "my.conf.here=5g" --and-other-properties', + } + + SparklySession() + SparklySession.stop() + + self.assertEqual(os_mock.environ, { + 'PYSPARK_SUBMIT_ARGS': '--conf "my.conf.here=5g" --and-other-properties', + }) From c45334d5f068dbab5fd8d2434028b72358e83c96 Mon Sep 17 00:00:00 2001 From: Xiaoyi Mao Date: Fri, 30 Aug 2019 14:52:14 -1000 Subject: [PATCH 2/2] IMP support elasticsearch 7 by making type optional. --- CHANGELOG.md | 1 + docker-compose.yml | 31 ++++++- sparkly/reader.py | 10 ++- sparkly/testing.py | 4 +- sparkly/writer.py | 13 +-- tests/integration/base.py | 12 ++- .../resources/test_fixtures/data_for_es7.json | 2 + .../resources/test_read/elastic7_setup.json | 6 ++ .../resources/test_write/elastic7_setup.json | 2 + tests/integration/test_reader.py | 84 +++++++++++++------ tests/integration/test_testing.py | 38 +++++++-- tests/integration/test_writer.py | 49 +++++++++-- tests/unit/test_reader.py | 21 ++++- tests/unit/test_writer.py | 17 +++- 14 files changed, 236 insertions(+), 54 deletions(-) create mode 100644 tests/integration/resources/test_fixtures/data_for_es7.json create mode 100644 tests/integration/resources/test_read/elastic7_setup.json create mode 100644 tests/integration/resources/test_write/elastic7_setup.json diff --git a/CHANGELOG.md b/CHANGELOG.md index dcc1b33..98c6aeb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## 2.8.1 * Fix support for using multiple sparkly sessions during tests * SparklySession does not persist modifications to os.environ +* Support ElasticSearch 7 by making type optional. ## 2.8.0 * Extend `SparklyCatalog` to work with database properties: diff --git a/docker-compose.yml b/docker-compose.yml index 1ffbcdc..038cbb2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -21,7 +21,9 @@ services: depends_on: cassandra.docker: condition: service_healthy - elastic.docker: + elastic6.docker: + condition: service_healthy + elastic7.docker: condition: service_healthy kafka.docker: condition: service_healthy @@ -37,7 +39,9 @@ services: depends_on: cassandra.docker: condition: service_healthy - elastic.docker: + elastic6.docker: + condition: service_healthy + elastic7.docker: condition: service_healthy kafka.docker: condition: service_healthy @@ -56,10 +60,29 @@ services: healthcheck: test: ps ax | grep cassandra - elastic.docker: + elastic6.docker: image: docker.elastic.co/elasticsearch/elasticsearch:6.5.4 + environment: + - xpack.security.enabled=false + - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + - discovery.type=single-node + healthcheck: + test: "curl -f http://localhost:9200/_cat/health | grep green" + interval: 5s + timeout: 5s + retries: 20 + + elastic7.docker: + image: docker.elastic.co/elasticsearch/elasticsearch:7.3.0 + environment: + - xpack.security.enabled=false + - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + - discovery.type=single-node healthcheck: - test: ps ax | grep elastic + test: "curl -f http://localhost:9200/_cat/health | grep green" + interval: 5s + timeout: 5s + retries: 20 mysql.docker: image: mysql:5.7 diff --git a/sparkly/reader.py b/sparkly/reader.py index b5db9a2..550a71a 100644 --- a/sparkly/reader.py +++ b/sparkly/reader.py @@ -149,7 +149,7 @@ def elastic(self, host, es_index, es_type, query='', fields=None, port=None, Args: host (str): Elastic server host. es_index (str): Elastic index. - es_type (str): Elastic type. + es_type (str|None): Elastic type. Deprecated in Elasticsearch 7 but required in below 7 query (str): Pre-filter es documents, e.g. '?q=views:>10'. fields (list[str]|None): Select only specified fields. port (int|None) Elastic server port. @@ -164,7 +164,7 @@ def elastic(self, host, es_index, es_type, query='', fields=None, port=None, assert self._spark.has_package('org.elasticsearch:elasticsearch-spark') reader_options = { - 'path': '{}/{}'.format(es_index, es_type), + 'path': '{}/{}'.format(es_index, es_type) if es_type else es_index, 'format': 'org.elasticsearch.spark.sql', 'es.nodes': host, 'es.query': query, @@ -330,10 +330,12 @@ def _resolve_elastic(self, parsed_url, parsed_qs): if 'fields' in parsed_qs: kwargs['fields'] = parsed_qs.pop('fields').split(',') + path_segments = parsed_url.path.split('/') + return self.elastic( host=parsed_url.netloc, - es_index=parsed_url.path.split('/')[1], - es_type=parsed_url.path.split('/')[2], + es_index=path_segments[1], + es_type=path_segments[2] if len(path_segments) > 2 else None, port=parsed_url.port, parallelism=parsed_qs.pop('parallelism', None), options=parsed_qs, diff --git a/sparkly/testing.py b/sparkly/testing.py index 8a8db17..ba923c4 100644 --- a/sparkly/testing.py +++ b/sparkly/testing.py @@ -656,7 +656,7 @@ class ElasticFixture(Fixture): ... """ - def __init__(self, host, es_index, es_type, mapping=None, data=None, port=None): + def __init__(self, host, es_index, es_type=None, mapping=None, data=None, port=None): self.host = host self.port = port or 9200 self.es_index = es_index @@ -680,7 +680,7 @@ def setup_data(self): ) self._request( 'PUT', - '/{}/_mapping/{}'.format(self.es_index, self.es_type), + '/{}/_mapping/{}'.format(self.es_index, self.es_type or ''), self.read_file(self.mapping), ) diff --git a/sparkly/writer.py b/sparkly/writer.py index 61205ed..e187a56 100644 --- a/sparkly/writer.py +++ b/sparkly/writer.py @@ -81,6 +81,7 @@ def by_url(self, url): elastic://localhost:9200/my_index/my_type?¶llelism=3&mode=overwrite &es.write.operation=upsert + Note: type is deprecated in Elasticsearch 7. Is an equivalent for:: @@ -154,14 +155,14 @@ def cassandra(self, host, keyspace, table, consistency=None, port=None, mode=Non return self._basic_write(writer_options, options, parallelism, mode) - def elastic(self, host, es_index, es_type, port=None, mode=None, + def elastic(self, host, es_index, es_type=None, port=None, mode=None, parallelism=None, options=None): """Write a dataframe into an ElasticSearch index. Args: host (str): Elastic server host. es_index (str): Elastic index. - es_type (str): Elastic type. + es_type (str|None): Elastic type. Deprecated in Elasticsearch 7 but required in below 7 port (int|None) Elastic server port. mode (str|None): Spark save mode, http://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes @@ -173,7 +174,7 @@ def elastic(self, host, es_index, es_type, port=None, mode=None, assert self._spark.has_package('org.elasticsearch:elasticsearch-spark') writer_options = { - 'path': '{}/{}'.format(es_index, es_type), + 'path': '{}/{}'.format(es_index, es_type) if es_type else es_index, 'format': 'org.elasticsearch.spark.sql', 'es.nodes': host, } @@ -512,10 +513,12 @@ def _resolve_csv(self, parsed_url, parsed_qs): ) def _resolve_elastic(self, parsed_url, parsed_qs): + path_segments = parsed_url.path.split('/') + return self.elastic( host=parsed_url.netloc, - es_index=parsed_url.path.split('/')[1], - es_type=parsed_url.path.split('/')[2], + es_index=path_segments[1], + es_type=path_segments[2] if len(path_segments) > 2 else None, port=parsed_url.port, mode=parsed_qs.pop('mode', None), parallelism=parsed_qs.pop('parallelism', None), diff --git a/tests/integration/base.py b/tests/integration/base.py index cb71d25..07e6a44 100644 --- a/tests/integration/base.py +++ b/tests/integration/base.py @@ -25,7 +25,7 @@ class SparklyTestSession(SparklySession): packages = [ 'datastax:spark-cassandra-connector:2.4.0-s_2.11', - 'org.elasticsearch:elasticsearch-spark-20_2.11:6.5.4', + 'org.elasticsearch:elasticsearch-spark-20_2.11:7.3.0', 'org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.0', 'mysql:mysql-connector-java:6.0.6', 'io.confluent:kafka-avro-serializer:3.0.1', @@ -43,3 +43,13 @@ class SparklyTestSession(SparklySession): 'collect_max': 'brickhouse.udf.collect.CollectMaxUDAF', 'length_of_text': (lambda text: len(text), StringType()) } + + +class SparklyTestSessionWithES6(SparklySession): + packages = [ + 'org.elasticsearch:elasticsearch-spark-20_2.11:6.5.4', + ] + + repositories = [ + 'http://packages.confluent.io/maven/', + ] diff --git a/tests/integration/resources/test_fixtures/data_for_es7.json b/tests/integration/resources/test_fixtures/data_for_es7.json new file mode 100644 index 0000000..cac385d --- /dev/null +++ b/tests/integration/resources/test_fixtures/data_for_es7.json @@ -0,0 +1,2 @@ +{ "index" : { "_index" : "sparkly_test_fixture", "_id": "1" } } +{ "name" : "John", "age": 56} diff --git a/tests/integration/resources/test_read/elastic7_setup.json b/tests/integration/resources/test_read/elastic7_setup.json new file mode 100644 index 0000000..07707aa --- /dev/null +++ b/tests/integration/resources/test_read/elastic7_setup.json @@ -0,0 +1,6 @@ +{ "index" : { "_index" : "sparkly_test", "_id": "1" } } +{ "name" : "John2", "topics": [1, 2, 3, 4, 5], "age": 56, "demo": { "age_30": 20, "age_10": 50 } } +{ "index" : { "_index" : "sparkly_test", "_id": "2" } } +{ "name" : "Smith3", "topics": [1, 4, 5], "age": 31, "demo": { "age_30": 110, "age_10": 50 } } +{ "index" : { "_index" : "sparkly_test", "_id": "3" } } +{ "name" : "Smith4", "topics": [4, 5], "age": 12, "demo": { "age_30": 20, "age_10": 1 } } diff --git a/tests/integration/resources/test_write/elastic7_setup.json b/tests/integration/resources/test_write/elastic7_setup.json new file mode 100644 index 0000000..e0bb51f --- /dev/null +++ b/tests/integration/resources/test_write/elastic7_setup.json @@ -0,0 +1,2 @@ +{ "index" : { "_index" : "sparkly_test", "_id": "1111" } } +{ "uid": "1111", "title": "xxxx", "views": 1111} diff --git a/tests/integration/test_reader.py b/tests/integration/test_reader.py index c76a028..3c5efa6 100644 --- a/tests/integration/test_reader.py +++ b/tests/integration/test_reader.py @@ -19,13 +19,17 @@ from sparkly.exceptions import InvalidArgumentError from sparkly.testing import ( SparklyGlobalSessionTest, + SparklyTest, CassandraFixture, MysqlFixture, ElasticFixture, KafkaFixture, ) from sparkly.utils import absolute_path, kafka_get_topics_offsets -from tests.integration.base import SparklyTestSession +from tests.integration.base import ( + SparklyTestSession, + SparklyTestSessionWithES6, +) class SparklyReaderCassandraTest(SparklyGlobalSessionTest): @@ -67,12 +71,34 @@ def test_read(self): ]) -class SparklyReaderElasticTest(SparklyGlobalSessionTest): - session = SparklyTestSession +ELASTIC_TEST_DATA = [ + { + 'name': 'Smith3', + 'topics': [1, 4, 5], + 'age': 31, + 'demo': { + 'age_30': 110, + 'age_10': 50, + } + }, + { + 'name': 'Smith4', + 'topics': [4, 5], + 'age': 12, + 'demo': { + 'age_30': 20, + 'age_10': 1, + } + } +] + + +class SparklyReaderElastic6Test(SparklyTest): + session = SparklyTestSessionWithES6 fixtures = [ ElasticFixture( - 'elastic.docker', + 'elastic6.docker', 'sparkly_test', 'test', None, @@ -82,7 +108,7 @@ class SparklyReaderElasticTest(SparklyGlobalSessionTest): def test_elastic(self): df = self.spark.read_ext.elastic( - host='elastic.docker', + host='elastic6.docker', port=9200, es_index='sparkly_test', es_type='test', @@ -93,26 +119,36 @@ def test_elastic(self): }, ) - self.assertDataFrameEqual(df, [ - { - 'name': 'Smith3', - 'topics': [1, 4, 5], - 'age': 31, - 'demo': { - 'age_30': 110, - 'age_10': 50, - } + self.assertDataFrameEqual(df, ELASTIC_TEST_DATA) + + +class SparklyReaderElastic7Test(SparklyGlobalSessionTest): + session = SparklyTestSession + + fixtures = [ + ElasticFixture( + 'elastic7.docker', + 'sparkly_test', + None, + None, + absolute_path(__file__, 'resources', 'test_read', 'elastic7_setup.json'), + ) + ] + + def test_elastic(self): + df = self.spark.read_ext.elastic( + host='elastic7.docker', + port=9200, + es_index='sparkly_test', + es_type=None, + query='?q=name:*Smith*', + options={ + 'es.read.field.as.array.include': 'topics', + 'es.read.metadata': 'false', }, - { - 'name': 'Smith4', - 'topics': [4, 5], - 'age': 12, - 'demo': { - 'age_30': 20, - 'age_10': 1, - } - } - ]) + ) + + self.assertDataFrameEqual(df, ELASTIC_TEST_DATA) class SparklyReaderMySQLTest(SparklyGlobalSessionTest): diff --git a/tests/integration/test_testing.py b/tests/integration/test_testing.py index e86875d..66b1d95 100644 --- a/tests/integration/test_testing.py +++ b/tests/integration/test_testing.py @@ -28,7 +28,11 @@ KafkaFixture, KafkaWatcher) from sparkly.utils import absolute_path -from tests.integration.base import SparklyTestSession +from tests.integration.base import ( + SparklyTestSession, + SparklyTestSessionWithES6, +) + try: from kafka import KafkaConsumer, KafkaProducer @@ -119,13 +123,13 @@ def test_mysql_fixture(self): ]) -class TestElasticFixture(SparklyGlobalSessionTest): +class TestElastic6Fixture(SparklyTest): - session = SparklyTestSession + session = SparklyTestSessionWithES6 class_fixtures = [ ElasticFixture( - 'elastic.docker', + 'elastic6.docker', 'sparkly_test_fixture', 'test', absolute_path(__file__, 'resources', 'test_fixtures', 'mapping.json'), @@ -134,13 +138,35 @@ class TestElasticFixture(SparklyGlobalSessionTest): ] def test_elastic_fixture(self): - df = self.spark.read_ext.by_url('elastic://elastic.docker/sparkly_test_fixture/test?' - 'es.read.metadata=false') + df = self.spark.read_ext.by_url( + 'elastic://elastic6.docker/sparkly_test_fixture/test?es.read.metadata=false' + ) self.assertDataFrameEqual(df, [ {'name': 'John', 'age': 56}, ]) +class TestElastic7Fixture(SparklyGlobalSessionTest): + + session = SparklyTestSession + + class_fixtures = [ + ElasticFixture( + 'elastic7.docker', + 'sparkly_test_fixture', + None, + absolute_path(__file__, 'resources', 'test_fixtures', 'mapping.json'), + absolute_path(__file__, 'resources', 'test_fixtures', 'data_for_es7.json'), + ) + ] + + def test_elastic_fixture(self): + df = self.spark.read_ext.by_url( + 'elastic://elastic7.docker/sparkly_test_fixture?es.read.metadata=false' + ) + self.assertDataFrameEqual(df, [{'name': 'John', 'age': 56}]) + + class TestKafkaFixture(SparklyGlobalSessionTest): session = SparklyTestSession diff --git a/tests/integration/test_writer.py b/tests/integration/test_writer.py index b50bfed..10dd6d4 100644 --- a/tests/integration/test_writer.py +++ b/tests/integration/test_writer.py @@ -31,12 +31,16 @@ from sparkly.utils import absolute_path from sparkly.testing import ( + SparklyTest, SparklyGlobalSessionTest, CassandraFixture, ElasticFixture, MysqlFixture, ) -from tests.integration.base import SparklyTestSession +from tests.integration.base import ( + SparklyTestSession, + SparklyTestSessionWithES6, +) try: from kafka import KafkaConsumer @@ -115,12 +119,12 @@ def test_write_cassandra(self): self.assertDataFrameEqual(written_df, TEST_DATA) -class TestWriteElastic(SparklyGlobalSessionTest): - session = SparklyTestSession +class TestWriteElastic6(SparklyTest): + session = SparklyTestSessionWithES6 fixtures = [ ElasticFixture( - 'elastic.docker', + 'elastic6.docker', 'sparkly_test', 'test', None, @@ -132,7 +136,7 @@ def test_write_elastic(self): df = self.spark.createDataFrame(TEST_DATA) df.write_ext.elastic( - host='elastic.docker', + host='elastic6.docker', port=9200, es_index='sparkly_test', es_type='test_writer', @@ -143,7 +147,40 @@ def test_write_elastic(self): ) df = self.spark.read_ext.by_url( - 'elastic://elastic.docker/sparkly_test/test_writer?es.read.metadata=false' + 'elastic://elastic6.docker/sparkly_test/test_writer?es.read.metadata=false' + ) + self.assertDataFrameEqual(df, TEST_DATA) + + +class TestWriteElastic7(SparklyGlobalSessionTest): + session = SparklyTestSession + + fixtures = [ + ElasticFixture( + 'elastic7.docker', + 'sparkly_test', + None, + None, + absolute_path(__file__, 'resources', 'test_write', 'elastic7_setup.json'), + ), + ] + + def test_write_elastic(self): + df = self.spark.createDataFrame(TEST_DATA) + + df.write_ext.elastic( + host='elastic7.docker', + port=9200, + es_index='sparkly_test', + es_type=None, + mode='overwrite', + options={ + 'es.mapping.id': 'uid', + }, + ) + + df = self.spark.read_ext.by_url( + 'elastic://elastic7.docker/sparkly_test?es.read.metadata=false', ) self.assertDataFrameEqual(df, TEST_DATA) diff --git a/tests/unit/test_reader.py b/tests/unit/test_reader.py index f54e833..60bd371 100644 --- a/tests/unit/test_reader.py +++ b/tests/unit/test_reader.py @@ -77,7 +77,7 @@ def test_csv_on_local_file_system(self): header='false', ) - def test_elastic(self): + def test_elastic_on_or_before_6(self): self.read_ext.elastic = mock.Mock(return_value=self.fake_df) df = self.read_ext.by_url('elastic://es_host/test_index/test_type?' @@ -96,6 +96,25 @@ def test_elastic(self): options={'es.input.json': 'true'}, ) + def test_elastic_on_and_after_7(self): + self.read_ext.elastic = mock.Mock(return_value=self.fake_df) + + df = self.read_ext.by_url('elastic://es_host/test_index?' + 'q=name:*Johnny*&fields=name,surname&' + 'es.input.json=true¶llelism=4') + + self.assertEqual(df, self.fake_df) + self.read_ext.elastic.assert_called_with( + host='es_host', + es_index='test_index', + es_type=None, + query='?q=name:*Johnny*', + fields=['name', 'surname'], + port=None, + parallelism=4, + options={'es.input.json': 'true'}, + ) + def test_cassandra(self): self.read_ext.cassandra = mock.Mock(return_value=self.fake_df) diff --git a/tests/unit/test_writer.py b/tests/unit/test_writer.py index 6634397..f871643 100644 --- a/tests/unit/test_writer.py +++ b/tests/unit/test_writer.py @@ -95,7 +95,7 @@ def test_cassandra_custom_port(self): options={}, ) - def test_elastic(self): + def test_elastic_on_or_before_6(self): self.write_ext.elastic = mock.Mock() self.write_ext.by_url('elastic://host/index/type?parallelism=15') @@ -109,7 +109,22 @@ def test_elastic(self): parallelism=15, options={}, ) + + def test_elastic_on_and_after_7(self): + self.write_ext.elastic = mock.Mock() + + self.write_ext.by_url('elastic://host/index?parallelism=15') + self.write_ext.elastic.assert_called_once_with( + host='host', + es_index='index', + es_type=None, + port=None, + mode=None, + parallelism=15, + options={}, + ) + def test_mysql(self): self.write_ext.mysql = mock.Mock()