From dddcefd1d5dd8e1c2dea8c1f305e64509073db4a Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Tue, 22 Nov 2016 15:37:33 -0500 Subject: [PATCH] Add the capability for analysts to define their own tasks, or SQL scripts, which can be run, sequentially, against a target HP Vertica database, using a constrained set of credentials and schema. We've created two new tasks: RunVerticaSqlScriptTask and RunVerticaSqlScriptsTask. The former allows a user to run a single script against an HP Vertica database, which will run in a configured schema. The latter will read a configuration file that specifies many scripts to be run, scheduling them such that they run sequentially, one after the other, and only continue running so long as the previous script did not encounter an error. This allows use to have a separate repository where analysts can commit SQL scripts to run against the data warehouse to generate their own summary tables, etc, without needing to write Luigi scripts and worry about scheduling. --- AUTHORS | 2 +- Makefile | 2 +- config/devstack.cfg | 4 + config/test.cfg | 4 + .../load_internal_reporting_user_activity.py | 3 +- edx/analytics/tasks/load_warehouse.py | 3 +- edx/analytics/tasks/run_vertica_sql_script.py | 149 ++++++++++++++++++ .../tasks/run_vertica_sql_scripts.py | 97 ++++++++++++ .../fixtures/sql_scripts/empty_config.yaml | 0 .../fixtures/sql_scripts/four_scripts.yaml | 9 ++ .../fixtures/sql_scripts/nonsense_config.yaml | 1 + .../fixtures/sql_scripts/single_script.yaml | 4 + .../fixtures/sql_scripts/two_scripts.yaml | 5 + .../tests/test_run_vertica_sql_script.py | 92 +++++++++++ .../tests/test_run_vertica_sql_scripts.py | 102 ++++++++++++ edx/analytics/tasks/util/vertica_target.py | 39 ++++- edx/analytics/tasks/vertica_load.py | 3 +- setup.cfg | 3 +- 18 files changed, 514 insertions(+), 8 deletions(-) create mode 100644 edx/analytics/tasks/run_vertica_sql_script.py create mode 100644 edx/analytics/tasks/run_vertica_sql_scripts.py create mode 100644 edx/analytics/tasks/tests/fixtures/sql_scripts/empty_config.yaml create mode 100644 edx/analytics/tasks/tests/fixtures/sql_scripts/four_scripts.yaml create mode 100644 edx/analytics/tasks/tests/fixtures/sql_scripts/nonsense_config.yaml create mode 100644 edx/analytics/tasks/tests/fixtures/sql_scripts/single_script.yaml create mode 100644 edx/analytics/tasks/tests/fixtures/sql_scripts/two_scripts.yaml create mode 100644 edx/analytics/tasks/tests/test_run_vertica_sql_script.py create mode 100644 edx/analytics/tasks/tests/test_run_vertica_sql_scripts.py diff --git a/AUTHORS b/AUTHORS index 7bfc3473e6..18a5c44b1c 100644 --- a/AUTHORS +++ b/AUTHORS @@ -12,4 +12,4 @@ Dmitry Viskov Sanford Student Jillian Vogel Dennis Jen - +Toby Lawrence diff --git a/Makefile b/Makefile index c4be04b705..39a23bf20b 100644 --- a/Makefile +++ b/Makefile @@ -47,7 +47,7 @@ test-requirements: requirements test-local: # TODO: when we have better coverage, modify this to actually fail when coverage is too low. rm -rf .coverage - LUIGI_CONFIG_PATH='config/test.cfg' python -m coverage run --rcfile=./.coveragerc -m nose --with-xunit --xunit-file=unittests.xml -A 'not acceptance' + LUIGI_CONFIG_PATH='config/test.cfg' python -m coverage run --rcfile=./.coveragerc -m nose --with-xunit --xunit-file=unittests.xml -A 'not acceptance' -s test: test-requirements develop test-local diff --git a/config/devstack.cfg b/config/devstack.cfg index 1330ff3e9c..16e64c05d2 100644 --- a/config/devstack.cfg +++ b/config/devstack.cfg @@ -87,3 +87,7 @@ number_of_shards = 5 [ccx] enabled = false + +[run-vertica-sql-script] +schema = testing +read_timeout = 5 diff --git a/config/test.cfg b/config/test.cfg index 5798bafb1b..b953949686 100644 --- a/config/test.cfg +++ b/config/test.cfg @@ -134,3 +134,7 @@ api_root_url = http://example.com/api/v1/ [ccx] enabled = false + +[run-vertica-sql-script] +schema = testing +read_timeout = 5 diff --git a/edx/analytics/tasks/load_internal_reporting_user_activity.py b/edx/analytics/tasks/load_internal_reporting_user_activity.py index 8d80b01810..d209d7cc7e 100644 --- a/edx/analytics/tasks/load_internal_reporting_user_activity.py +++ b/edx/analytics/tasks/load_internal_reporting_user_activity.py @@ -10,10 +10,11 @@ from edx.analytics.tasks.pathutil import PathSetTask from edx.analytics.tasks.url import ExternalURL, url_path_join from edx.analytics.tasks.user_activity import UserActivityTableTask -from edx.analytics.tasks.vertica_load import VerticaCopyTask, VerticaCopyTaskMixin, CredentialFileVerticaTarget +from edx.analytics.tasks.vertica_load import VerticaCopyTask, VerticaCopyTaskMixin from edx.analytics.tasks.database_imports import ImportAuthUserTask from edx.analytics.tasks.util.hive import HiveTableFromQueryTask, WarehouseMixin, HivePartition from edx.analytics.tasks.util.weekly_interval import WeeklyIntervalMixin +from edx.analytics.tasks.util.vertica_target import CredentialFileVerticaTarget from edx.analytics.tasks.user_activity import CourseActivityWeeklyTask log = logging.getLogger(__name__) diff --git a/edx/analytics/tasks/load_warehouse.py b/edx/analytics/tasks/load_warehouse.py index d134be63c9..14b5fc8890 100644 --- a/edx/analytics/tasks/load_warehouse.py +++ b/edx/analytics/tasks/load_warehouse.py @@ -12,8 +12,9 @@ from edx.analytics.tasks.load_internal_reporting_user_course import LoadUserCourseSummary from edx.analytics.tasks.load_internal_reporting_user import LoadInternalReportingUserToWarehouse from edx.analytics.tasks.course_catalog import DailyLoadSubjectsToVerticaTask -from edx.analytics.tasks.vertica_load import VerticaCopyTaskMixin, CredentialFileVerticaTarget +from edx.analytics.tasks.vertica_load import VerticaCopyTaskMixin +from edx.analytics.tasks.util.vertica_target import CredentialFileVerticaTarget from edx.analytics.tasks.util.hive import WarehouseMixin from edx.analytics.tasks.url import ExternalURL diff --git a/edx/analytics/tasks/run_vertica_sql_script.py b/edx/analytics/tasks/run_vertica_sql_script.py new file mode 100644 index 0000000000..341fb6556f --- /dev/null +++ b/edx/analytics/tasks/run_vertica_sql_script.py @@ -0,0 +1,149 @@ +""" +Support for running a SQL script against an HP Vertica database. +""" +import datetime +import logging + +import luigi +import luigi.configuration +from edx.analytics.tasks.url import ExternalURL +from edx.analytics.tasks.util.vertica_target import VerticaTarget, CredentialFileVerticaTarget + +log = logging.getLogger(__name__) + +try: + import vertica_python + vertica_client_available = True # pylint: disable-msg=C0103 +except ImportError: + log.warn('Unable to import Vertica client libraries') + # On hadoop slave nodes we don't have Vertica client libraries installed so it is pointless to ship this package to + # them, instead just fail noisily if we attempt to use these libraries. + vertica_client_available = False # pylint: disable-msg=C0103 + + +class BaseVerticaSqlScriptTaskMixin(object): + """ + Parameters for running a SQL script against an HP Vertica database. + """ + date = luigi.DateParameter( + default=datetime.datetime.utcnow().date(), + description='Default is today, UTC.', + ) + schema = luigi.Parameter( + config_path={'section': 'run-vertica-sql-script', 'name': 'schema'}, + description='Name of the schema to which to write.', + ) + credentials = luigi.Parameter( + config_path={'section': 'run-vertica-sql-script', 'name': 'credentials'}, + description='Path to the external access credentials file.', + ) + read_timeout = luigi.IntParameter( + config_path={'section': 'run-vertica-sql-script', 'name': 'read_timeout'}, + description='Timeout in seconds for reading from a Vertica database connection.' + ) + marker_schema = luigi.Parameter( + default=None, + description='Name of the schema to which to write the marker table. marker_schema would ' + 'default to the schema value if the value here is None.' + ) + + def update_id(self): + """ + Unique string identifying this task run, based on the input parameters. + """ + return str(self) + + +class RunVerticaSqlScriptTaskMixin(BaseVerticaSqlScriptTaskMixin): + """ + Parameters required to run a single SQL script against an HP Vertica database. + """ + source_script = luigi.Parameter( + description='Path to the source script to execute.' + ) + script_name = luigi.Parameter( + description='Unique identifier for the purposes of tracking whether or not this ' + 'script ran successfully i.e. the table created by this script, or the ticket related to it.' + ) + + +class RunVerticaSqlScriptTask(RunVerticaSqlScriptTaskMixin, luigi.Task): + """ + A task for running a SQL script against an HP Vertica database. + """ + required_tasks = None + output_target = None + depends_on = None + + def add_dependency(self, dependency): + """ + Adds a custom dependency/requirement for this task. + + Note: this currently *sets* a single, custom dependency. You cannot add multiple dependencies to this task. + The last dependency to be added is the only one that will stick. It will, however, not be the only dependency, + as this task has a "base" set of dependencies. + """ + self.depends_on = dependency + + def requires(self): + if self.required_tasks is None: + self.required_tasks = { + 'credentials': ExternalURL(url=self.credentials), + 'source_script': ExternalURL(url=self.source_script), + } + + if self.depends_on is not None: + self.required_tasks['depends_on'] = self.depends_on + + return self.required_tasks + + def output(self): + """ + Returns a VerticaTarget representing the inserted dataset. + """ + if self.output_target is None: + self.output_target = CredentialFileVerticaTarget( + credentials_target=self.input()['credentials'], + table=self.script_name, + schema=self.schema, + update_id=self.update_id(), + read_timeout=self.read_timeout, + marker_schema=self.marker_schema, + ) + + return self.output_target + + def run(self): + """ + Runs the given SQL script against the Vertica target. + """ + # Make sure we can connect to Vertica. + self.check_vertica_availability() + connection = self.output().connect() + + try: + # Set up our connection to point to the specified schema so that scripts can have unqualified + # table references and not necessarily need to know or care about where they're running. + connection.cursor().execute('SET SEARCH_PATH = {schema};'.format(schema=self.schema)) + + with self.input()['source_script'].open('r') as script_file: + # Read in our script and execute it. + script_body = script_file.read() + connection.cursor().execute(script_body) + + # If we're here, nothing blew up, so mark as complete. + self.output().touch(connection) + + connection.commit() + log.debug("Committed transaction.") + except Exception as exc: + log.exception("Rolled back the transaction; exception raised: %s", str(exc)) + connection.rollback() + raise + finally: + connection.close() + + def check_vertica_availability(self): + """Call to ensure fast failure if this machine doesn't have the Vertica client library available.""" + if not vertica_client_available: + raise ImportError('Vertica client library not available') diff --git a/edx/analytics/tasks/run_vertica_sql_scripts.py b/edx/analytics/tasks/run_vertica_sql_scripts.py new file mode 100644 index 0000000000..c44504b72a --- /dev/null +++ b/edx/analytics/tasks/run_vertica_sql_scripts.py @@ -0,0 +1,97 @@ +""" +Support for running multiple SQL scripts against an HP Vertica database in a deterministic fashion. +""" +import yaml +import datetime +import logging +from os import path + +import luigi +import luigi.configuration +from edx.analytics.tasks.url import ExternalURL +from edx.analytics.tasks.run_vertica_sql_script import BaseVerticaSqlScriptTaskMixin, RunVerticaSqlScriptTask + + +log = logging.getLogger(__name__) + + +class RunVerticaSqlScriptsTaskMixin(BaseVerticaSqlScriptTaskMixin): + """ + Parameters for running multiple SQL scripts against an HP Vertica database in a deterministic fashion. + """ + script_configuration = luigi.Parameter( + description='Path to the configuration file that specifies which scripts to run.' + ) + script_root = luigi.Parameter( + default='', + description='Root directory from which the script locations in the configuration ' + 'are referenced from.' + ) + + +class RunVerticaSqlScriptsTask(RunVerticaSqlScriptsTaskMixin, luigi.WrapperTask): + """ + A wrapper task for running multiple SQL scripts against an HP Vertica database in a deterministic fashion. + + We use a YAML file that defines a list of scripts to run. We run the scripts in the order they are defined. + By using RunVerticaSqlScriptTask, each script stores its own marker, thus allowing us to idempotently run this + task until all required tasks (aka our scripts) have successfully run for a given date. + """ + downstream_task = None + + def requires(self): + if self.downstream_task is None: + self.downstream_task = self.get_downstream_task() + + if self.downstream_task is not None: + yield self.downstream_task + + def validate_script_entry(self, script): + # It has to be a dictionary. + if not isinstance(script, dict): + return False + + # It needs to have a name and a script location. + for attr in ['name', 'location']: + if attr not in script: + return False + + return True + + def get_downstream_task(self): + # If no downstream task has been set, load our configuration and generate our tasks and dependency chain. + if self.downstream_task is None: + script_conf_target = ExternalURL(url=self.script_configuration).output() + with script_conf_target.open('r') as script_conf_file: + config = yaml.safe_load(script_conf_file) + if config is not None and isinstance(config, dict): + previous_task = None + + scripts = config.get('scripts', []) + + # Iterate over the list of scripts in the configuration file in reverse order. We also zip a list of integers, + # representing the zero-based index position of the given script in the overall list. We iterate in reverse + # in order to link each task together, using requires(), to ensure that tasks run sequentially, and in the intended + # order: from the top of the file, downwards. + for script in scripts: + if not self.validate_script_entry(script): + log.warn("encountered invalid script entry!") + continue + + new_task = RunVerticaSqlScriptTask( + credentials=self.credentials, schema=self.schema, marker_schema=self.marker_schema, + date=self.date, read_timeout=self.read_timeout, source_script=path.join(self.script_root, script['location']), + script_name=script.get('name')) + + # If we previously configured a task, set it as a dependency of this one, so it runs prior to. + if previous_task is not None: + new_task.add_dependency(previous_task) + + # Mark this as the previously-created task. + previous_task = new_task + + self.downstream_task = previous_task + + # If a downstream task has been set, yield it, triggering Luigi to schedule our scripts. + if self.downstream_task is not None: + yield self.downstream_task diff --git a/edx/analytics/tasks/tests/fixtures/sql_scripts/empty_config.yaml b/edx/analytics/tasks/tests/fixtures/sql_scripts/empty_config.yaml new file mode 100644 index 0000000000..e69de29bb2 diff --git a/edx/analytics/tasks/tests/fixtures/sql_scripts/four_scripts.yaml b/edx/analytics/tasks/tests/fixtures/sql_scripts/four_scripts.yaml new file mode 100644 index 0000000000..49289c0189 --- /dev/null +++ b/edx/analytics/tasks/tests/fixtures/sql_scripts/four_scripts.yaml @@ -0,0 +1,9 @@ +scripts: + - name: script one + location: script_one.sql + - name: script two + location: script_two.sql + - name: script three + location: script_three.sql + - name: script four + location: script_four.sql diff --git a/edx/analytics/tasks/tests/fixtures/sql_scripts/nonsense_config.yaml b/edx/analytics/tasks/tests/fixtures/sql_scripts/nonsense_config.yaml new file mode 100644 index 0000000000..fdac7c26b9 --- /dev/null +++ b/edx/analytics/tasks/tests/fixtures/sql_scripts/nonsense_config.yaml @@ -0,0 +1 @@ +asdasdaikubdnaiksudb diff --git a/edx/analytics/tasks/tests/fixtures/sql_scripts/single_script.yaml b/edx/analytics/tasks/tests/fixtures/sql_scripts/single_script.yaml new file mode 100644 index 0000000000..56952a2e36 --- /dev/null +++ b/edx/analytics/tasks/tests/fixtures/sql_scripts/single_script.yaml @@ -0,0 +1,4 @@ +scripts: + - name: Simple Testing Script + script_id: my_lil_test_script + location: foobar.sql diff --git a/edx/analytics/tasks/tests/fixtures/sql_scripts/two_scripts.yaml b/edx/analytics/tasks/tests/fixtures/sql_scripts/two_scripts.yaml new file mode 100644 index 0000000000..2df2d6b204 --- /dev/null +++ b/edx/analytics/tasks/tests/fixtures/sql_scripts/two_scripts.yaml @@ -0,0 +1,5 @@ +scripts: + - name: script one + location: script_one.sql + - name: script two + location: script_two.sql diff --git a/edx/analytics/tasks/tests/test_run_vertica_sql_script.py b/edx/analytics/tasks/tests/test_run_vertica_sql_script.py new file mode 100644 index 0000000000..64236a1460 --- /dev/null +++ b/edx/analytics/tasks/tests/test_run_vertica_sql_script.py @@ -0,0 +1,92 @@ +""" +Ensure we can write to Vertica data sources. +""" +from __future__ import absolute_import + +import textwrap + +import luigi +import luigi.task + +from mock import call +from mock import MagicMock +from mock import patch +from mock import sentinel + +from edx.analytics.tasks.run_vertica_sql_script import RunVerticaSqlScriptTask +from edx.analytics.tasks.tests import unittest +from edx.analytics.tasks.tests.target import FakeTarget +from edx.analytics.tasks.tests.config import with_luigi_config + + +class RunVerticaSqlScriptTaskTest(unittest.TestCase): + """ + Ensure we can connect to and write data to Vertica data sources. + """ + + def setUp(self): + patcher = patch('edx.analytics.tasks.run_vertica_sql_script.vertica_python.vertica') + self.mock_vertica_connector = patcher.start() + self.addCleanup(patcher.stop) + + def create_task(self, credentials=None, source_script=None): + """ + Emulate execution of a generic RunVerticaSqlScriptTask. + """ + # Make sure to flush the instance cache so we create a new task object. + luigi.task.Register.clear_instance_cache() + task = RunVerticaSqlScriptTask( + credentials=sentinel.ignored, + script_name='my simple script', + source_script=sentinel.ignored, + ) + + if not credentials: + credentials = '''\ + { + "host": "db.example.com", + "port": 5433, + "user": "exampleuser", + "password": "example password" + }''' + + # This SQL doesn't actually run, but I've used real SQL to provide context. :) + source = ''' + DELETE TABLE my_schema.my_table; + CREATE TABLE my_schema.my_table AS SELECT foo, bar, baz FROM my_schema.another_table; + ''' + + fake_input = { + 'credentials': FakeTarget(value=textwrap.dedent(credentials)), + 'source_script': FakeTarget(value=textwrap.dedent(source)) + } + + fake_output = MagicMock(return_value=self.mock_vertica_connector) + self.mock_vertica_connector.marker_schema = "name_of_marker_schema" + self.mock_vertica_connector.marker_table = "name_of_marker_table" + + task.input = MagicMock(return_value=fake_input) + task.output = fake_output + return task + + def test_run_with_default_credentials(self): + self.create_task(credentials='{}').run() + + def test_run(self): + self.create_task().run() + mock_conn = self.mock_vertica_connector.connect() + self.assertTrue(mock_conn.cursor().execute.called) + self.assertFalse(mock_conn.rollback.called) + self.assertTrue(mock_conn.commit.called) + self.assertTrue(mock_conn.close.called) + + def test_run_with_failure(self): + task = self.create_task() + task.output().touch = MagicMock(side_effect=Exception("Failed to update marker")) + with self.assertRaises(Exception): + task.run() + mock_conn = self.mock_vertica_connector.connect() + self.assertTrue(mock_conn.cursor().execute.called) + self.assertTrue(mock_conn.rollback.called) + self.assertFalse(mock_conn.commit.called) + self.assertTrue(mock_conn.close.called) diff --git a/edx/analytics/tasks/tests/test_run_vertica_sql_scripts.py b/edx/analytics/tasks/tests/test_run_vertica_sql_scripts.py new file mode 100644 index 0000000000..f28d7c9a60 --- /dev/null +++ b/edx/analytics/tasks/tests/test_run_vertica_sql_scripts.py @@ -0,0 +1,102 @@ +""" +Ensure we can write to Vertica data sources. +""" +from __future__ import absolute_import + +import textwrap +from os import path + +import luigi +import luigi.task + +from mock import sentinel + +from edx.analytics.tasks.run_vertica_sql_scripts import RunVerticaSqlScriptsTask +from edx.analytics.tasks.tests import unittest + + +class RunVerticaSqlScriptsTaskTest(unittest.TestCase): + """ + Ensure we can run SQL scripts that read and write data to Vertica data sources. + """ + + def create_task(self, credentials='', script_config=''): + """ + Emulate execution of a generic RunVerticaSqlScriptsTask. + """ + # Make sure to flush the instance cache so we create a new task object. + luigi.task.Register.clear_instance_cache() + task = RunVerticaSqlScriptsTask( + credentials=sentinel.ignored, + script_configuration=script_config, + script_root='', + ) + + return task + + def get_configured_task_chain_for_config(self, config_path): + """ + Creates the task based on a given YAML configuration file. + """ + return self.create_task(script_config=path.join('edx/analytics/tasks/tests/fixtures/sql_scripts', config_path)).get_downstream_task() + + def get_tasks_in_chain(self, chain): + # Enumerate the chain, revealing whether or not anything was actually generated. + chain_items = list(chain) + + items = [] + if len(chain_items) == 0: + return items + + # If we're returning more than one item from the requirements, that means we're risking concurrent + # task execution, which breaks our desire to serially execute tasks one after the other. + self.assertEqual(len(chain_items), 1) + + # Pull out the chain of dependent tasks. + current_item = chain_items[0] + while current_item is not None: + items.append(current_item) + current_item = current_item.depends_on + + return list(reversed(items)) + + def test_run_with_empty_configuration(self): + # It's ... valid YAML, but there just won't be anything there. No generated tasks. + chain = self.get_configured_task_chain_for_config('empty_config.yaml') + tasks = self.get_tasks_in_chain(chain) + + self.assertEqual(len(tasks), 0) + + def test_run_with_jacked_up_configuration(self): + # Again, this is more like "weird, unexpected content that is still valid." No generated tasks. + chain = self.get_configured_task_chain_for_config('nonsense_config.yaml') + tasks = self.get_tasks_in_chain(chain) + + self.assertEqual(len(tasks), 0) + + def test_run_with_single_script(self): + # A real configuration. Should be a single generated task. + chain = self.get_configured_task_chain_for_config('single_script.yaml') + tasks = self.get_tasks_in_chain(chain) + + self.assertEqual(len(tasks), 1) + + def test_run_with_two_scripts(self): + # A real configuration. Should be two generated tasks: script_two, then script_one. + chain = self.get_configured_task_chain_for_config('two_scripts.yaml') + tasks = self.get_tasks_in_chain(chain) + + self.assertEqual(len(tasks), 2) + self.assertEqual(tasks[0].script_name, "script one") + self.assertEqual(tasks[1].script_name, "script two") + + def test_run_with_four_scripts(self): + # A real configuration. Should be four generated tasks: script_four, then script_three, then script_two, then script_one. + chain = self.get_configured_task_chain_for_config('four_scripts.yaml') + tasks = self.get_tasks_in_chain(chain) + + self.assertEqual(len(tasks), 4) + self.assertEqual(tasks[0].script_name, "script one") + self.assertEqual(tasks[1].script_name, "script two") + self.assertEqual(tasks[2].script_name, "script three") + self.assertEqual(tasks[3].script_name, "script four") diff --git a/edx/analytics/tasks/util/vertica_target.py b/edx/analytics/tasks/util/vertica_target.py index 0576e22de7..89a56d29f6 100644 --- a/edx/analytics/tasks/util/vertica_target.py +++ b/edx/analytics/tasks/util/vertica_target.py @@ -1,6 +1,6 @@ """luigi target for writing data into an HP Vertica database""" import logging - +import json import luigi logger = logging.getLogger('luigi-interface') # pylint: disable-msg=C0103 @@ -148,3 +148,40 @@ def create_marker_schema(self, connection): """ query = "CREATE SCHEMA IF NOT EXISTS {marker_schema}".format(marker_schema=self.marker_schema) connection.cursor().execute(query) + +class CredentialFileVerticaTarget(VerticaTarget): + """ + Represents a table in Vertica, is complete when the update_id is the same as a previous successful execution. + + Arguments: + + credentials_target (luigi.Target): A target that can be read to retrieve the hostname, port and user credentials + that will be used to connect to the database. + database_name (str): The name of the database that the table exists in. Note this database need not exist. + schema (str): The name of the schema in which the table being modified lies. + table (str): The name of the table in the schema that is being modified. + update_id (str): A unique identifier for this update to the table. Subsequent updates with identical update_id + values will not be executed. + """ + + def __init__(self, credentials_target, schema, table, update_id, read_timeout=None, marker_schema=None): + with credentials_target.open('r') as credentials_file: + cred = json.load(credentials_file) + super(CredentialFileVerticaTarget, self).__init__( + # Annoying, but the port must be passed in with the host string... + host="{host}:{port}".format(host=cred.get('host'), port=cred.get('port', 5433)), + user=cred.get('username'), + password=cred.get('password'), + schema=schema, + table=table, + update_id=update_id, + read_timeout=read_timeout, + marker_schema=marker_schema, + ) + + def exists(self, connection=None): + # The parent class fails if the database does not exist. This override tolerates that error. + try: + return super(CredentialFileVerticaTarget, self).exists(connection=connection) + except vertica_python.errors.ProgrammingError: + return False diff --git a/edx/analytics/tasks/vertica_load.py b/edx/analytics/tasks/vertica_load.py index 8d9957bcdb..416fd0a0ca 100644 --- a/edx/analytics/tasks/vertica_load.py +++ b/edx/analytics/tasks/vertica_load.py @@ -2,7 +2,6 @@ Support for loading data into an HP Vertica database. """ from collections import namedtuple -import json import logging import luigi @@ -10,7 +9,7 @@ from edx.analytics.tasks.url import ExternalURL from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin -from edx.analytics.tasks.util.vertica_target import VerticaTarget +from edx.analytics.tasks.util.vertica_target import VerticaTarget, CredentialFileVerticaTarget log = logging.getLogger(__name__) diff --git a/setup.cfg b/setup.cfg index c26cc44901..a7a8082ec0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -70,7 +70,8 @@ edx.analytics.tasks = push_to_vertica_lms_courseware_link_clicked = edx.analytics.tasks.lms_courseware_link_clicked:PushToVerticaLMSCoursewareLinkClickedTask load-course-catalog = edx.analytics.tasks.load_internal_reporting_course_catalog:PullCourseCatalogAPIData load-internal-database = edx.analytics.tasks.load_internal_reporting_database:ImportMysqlToVerticaTask - + run-vertica-sql-script = edx.analytics.tasks.run_vertica_sql_script:RunVerticaSqlScriptTask + run-vertica-sql-scripts = edx.analytics.tasks.run_vertica_sql_scripts:RunVerticaSqlScriptTask mapreduce.engine = hadoop = edx.analytics.tasks.mapreduce:MapReduceJobRunner