From 63695efcb36a69d95b88c94a0ed213abbbc23859 Mon Sep 17 00:00:00 2001 From: Anthony Miyaguchi Date: Fri, 29 Jan 2021 12:21:50 -0800 Subject: [PATCH 1/8] Remove mozetl and mozdatabricks --- README.md | 28 --- bin/mozetl_runner.py | 32 ---- dags/mozaggregator_release.py | 25 +-- dags/parquet_export.py | 1 - plugins/moz_databricks.py | 260 -------------------------- plugins/mozetl.py | 33 ---- tests/__init__.py | 0 tests/test_moz_databricks_operator.py | 209 --------------------- tests/test_mozetl.py | 23 --- 9 files changed, 4 insertions(+), 607 deletions(-) delete mode 100644 bin/mozetl_runner.py delete mode 100644 plugins/moz_databricks.py delete mode 100644 plugins/mozetl.py delete mode 100644 tests/__init__.py delete mode 100644 tests/test_moz_databricks_operator.py delete mode 100644 tests/test_mozetl.py diff --git a/README.md b/README.md index f6f8c010e..90ca3bcb0 100644 --- a/README.md +++ b/README.md @@ -39,16 +39,6 @@ An Airflow container can be built with make build ``` -### Export Credentials - -For now, DAGs that use the Databricks operator won't parse until the following environment variables are set (see issue #501): - -``` -AWS_SECRET_ACCESS_KEY -AWS_ACCESS_KEY_ID -DB_TOKEN -``` - ### Migrate Database Airflow database migration is no longer a separate step for dev but is run by the web container if necessary on first run. That means, however, that you should run the web container (and the database container, of course) and wait for the database migrations to complete before running individual test commands per below. The easiest way to do this is to run `make up` and let it run until the migrations complete. @@ -109,24 +99,6 @@ sed -i "s/10001/$(id -u)/g" Dockerfile.dev ``` -### Testing Databricks Jobs - -To run a job running on Databricks, run `make up` in the background. Follow -[this guide on generating a -token](https://docs.databricks.com/api/latest/authentication.html#generate-a-token) -and save this to a secure location. Export the token to a an environment -variable: - -```bash -export DB_TOKEN= -``` - -Finally, run the testing command using docker-compose directly: - -```bash -docker-compose exec web airflow test example spark 20180101 -``` - ### Testing GKE Jobs (including BigQuery-etl changes) For now, follow the steps outlined here to create a service account: https://bugzilla.mozilla.org/show_bug.cgi?id=1553559#c1. diff --git a/bin/mozetl_runner.py b/bin/mozetl_runner.py deleted file mode 100644 index 4df98ff77..000000000 --- a/bin/mozetl_runner.py +++ /dev/null @@ -1,32 +0,0 @@ -"""A mozetl runner script for the MozDatabricksRunSubmit operator. - -A copy of this file may be found in `telemetry-airflow/bin` - -This file is used as an argument in the SparkPythonTask in the Databricks -api.[0] The library is assumed to be installed on all of the machines in the -cluster. Arguments are passed to the script through `MOZETL_`-prefixed -environment variables. - -This script is deployed to `s3://telemetry-airflow/steps/mozetl_runner.py`.[1] - -[0]: https://docs.databricks.com/api/latest/jobs.html#sparkpythontask -[1]: https://bugzilla.mozilla.org/show_bug.cgi?id=1484331 -""" - -from os import environ -from pprint import pformat -from mozetl import cli - -print( - pformat({ - k: v for k, v in environ.items() - if k.startswith("MOZETL") - }) -) - -try: - cli.entry_point(auto_envvar_prefix="MOZETL") -except SystemExit: - # avoid calling sys.exit() in databricks - # http://click.palletsprojects.com/en/7.x/api/?highlight=auto_envvar_prefix#click.BaseCommand.main - pass diff --git a/dags/mozaggregator_release.py b/dags/mozaggregator_release.py index 55d11f49a..8d1808f7d 100644 --- a/dags/mozaggregator_release.py +++ b/dags/mozaggregator_release.py @@ -1,7 +1,5 @@ from airflow import DAG from datetime import datetime, timedelta -from airflow.operators.moz_databricks import MozDatabricksSubmitRunOperator -from utils.mozetl import mozetl_envvar default_args = { "owner": "frank@mozilla.com", @@ -20,26 +18,11 @@ schedule_interval="@daily", ) -release_telemetry_aggregate_view = MozDatabricksSubmitRunOperator( +# See mozaggregator_prerelease and mozaggregator_mobile for functional +# implementations using dataproc operator. This is not implemented due to the +# migration to GCP and https://bugzilla.mozilla.org/show_bug.cgi?id=1517018 +release_telemetry_aggregate_view = DummyOperator( task_id="release_telemetry_aggregate_view", job_name="Release Telemetry Aggregate View", - release_label="6.1.x-scala2.11", - instance_count=40, - execution_timeout=timedelta(hours=12), - env=mozetl_envvar( - "aggregator", - { - "date": "{{ ds_nodash }}", - "channels": "release", - "credentials-bucket": "telemetry-spark-emr-2", - "credentials-prefix": "aggregator_database_envvars.json", - "num-partitions": 40 * 32, - }, - dev_options={"credentials-prefix": "aggregator_dev_database_envvars.json"}, - other={ - "MOZETL_GIT_PATH": "https://github.com/mozilla/python_mozaggregator.git", - "MOZETL_EXTERNAL_MODULE": "mozaggregator", - }, - ), dag=dag, ) diff --git a/dags/parquet_export.py b/dags/parquet_export.py index 78e6c38c1..3939c4544 100644 --- a/dags/parquet_export.py +++ b/dags/parquet_export.py @@ -11,7 +11,6 @@ moz_dataproc_jar_runner, get_dataproc_parameters, ) -from utils.mozetl import mozetl_envvar from utils.tbv import tbv_envvar from utils.gcp import ( bigquery_etl_query, diff --git a/plugins/moz_databricks.py b/plugins/moz_databricks.py deleted file mode 100644 index 43e1d6572..000000000 --- a/plugins/moz_databricks.py +++ /dev/null @@ -1,260 +0,0 @@ -import boto3 -import botocore - -from os import environ -from pprint import pformat - -from airflow.plugins_manager import AirflowPlugin -from airflow.contrib.operators.databricks_operator import DatabricksSubmitRunOperator -from mozetl import generate_runner - - -class MozDatabricksSubmitRunOperator(DatabricksSubmitRunOperator): - """Execute a Spark job on Databricks.""" - - template_fields = ('json',) - region = environ['AWS_REGION'] - spark_bucket = environ['SPARK_BUCKET'] - private_output_bucket = environ['PRIVATE_OUTPUT_BUCKET'] - public_output_bucket = environ['PUBLIC_OUTPUT_BUCKET'] - deploy_environment = environ['DEPLOY_ENVIRONMENT'] - deploy_tag = environ['DEPLOY_TAG'] - artifacts_bucket = environ['ARTIFACTS_BUCKET'] - - # constants - mozilla_slug = 'mozilla' - telemetry_batch_view_slug = 'telemetry-batch-view' - - def __init__(self, job_name, env, instance_count, - dev_instance_count=1, - max_instance_count=None, - dev_max_instance_count=3, - enable_autoscale=False, - on_demand_instances=1, - spot_bid_price_percent=75, - disable_on_dev=False, - release_label='4.3.x-scala2.11', - iam_role=environ["DATABRICKS_DEFAULT_IAM"], - instance_type=environ['EMR_INSTANCE_TYPE'], - driver_instance_type=None, - owner="", - uri=None, - output_visibility=None, - ebs_volume_count=None, - ebs_volume_size=None, - python_version=3, - pypi_libs=None, - *args, **kwargs): - """ - Generate parameters for running a job through the Databricks run-submit - api. This is designed to be backwards compatible with EMRSparkOperator. - - See: https://docs.databricks.com/api/latest/jobs.html#runs-submit - - :param job_name: Name of the job - :param env: Parameters via mozetl and tbv envvar wrappers - :param instance_count: The number of instances to use in production - :param dev_instance_count: The number of instances to use in development - :param max_instance_count: Max number of instances during autoscaling - :param dev_max_instance_count: Max number of instances during - autoscaling in dev - :param on_demand_instances: Minimum number of on-demand instances. All - other instances will first be requested from the spot market, - with on-demand being a backup. - :param spot_bid_price_percent: Percent of max price to bid in spot market - :param enable_autoscale: Enable autoscaling for the job - :param disable_on_dev: Turn the job into a no-op if run in development - :param release_label: Databricks Runtime versions, - run `databricks clusters spark-versions` for possible values. - :param iam_role: An Amazon Resource Name (ARN) specifying an iam role - :param instance_type: An EC2 instance type (worker nodes and driver, if not specified) - :param driver_instance_type: Driver node instance type. - If not set, the same type is used across driver and worker nodes. - :param owner: The e-mail address of the user owning the job. - :param uri: argument from EMRSparkOperator for compatibility - :param output_visibility: argument from EMRSparkOperator for compatibility - :param ebs_volume_count: number of ebs volumes to attach to each node - :param ebs_volume_size: size of ebs volumes attached to each node - :param python_version: the default python runtime on the cluster (python 3.5.2) - See https://docs.databricks.com/release-notes/runtime/4.3.html#system-environment - for more details. - :param pypi_libs: PyPI libraries to install. ex: "['pylib1==0.1', 'pylib2==3.1']" - :param kwargs: Keyword arguments to pass to DatabricksSubmitRunOperator - """ - if python_version not in (2, 3): - raise ValueError("Only Python versions 2 or 3 allowed") - elif python_version == 3: - env["PYSPARK_PYTHON"] = "/databricks/python3/bin/python3" - - if enable_autoscale: - if not max_instance_count: - raise ValueError("`max_instance_count` should be set when " - "`enable_autoscale` is enabled.") - if (max_instance_count < instance_count or - dev_max_instance_count < dev_instance_count): - raise ValueError("The max instance count should be greater " - "than the instance count.") - - is_dev = self.deploy_environment == 'dev' - self.is_dev = is_dev - self.disable_on_dev = disable_on_dev - self.job_name = job_name - self.env = env - - jar_task = None - python_task = None - libraries = [] - - aws_attributes = { - "availability": "SPOT_WITH_FALLBACK", - "spot_bid_price_percent": spot_bid_price_percent, - "first_on_demand": on_demand_instances, - "instance_profile_arn": iam_role - } - - if bool(ebs_volume_size) ^ bool(ebs_volume_count): - raise ValueError("`ebs_volume_count` and `ebs_volume_size` " - "must be set together.") - - if ebs_volume_count is not None: - aws_attributes["ebs_volume_count"] = ebs_volume_count - - if ebs_volume_size is not None: - aws_attributes["ebs_volume_size"] = ebs_volume_size - - # Create the cluster configuration - new_cluster = { - "spark_version": release_label, - "node_type_id": instance_type, - "aws_attributes": aws_attributes, - "spark_env_vars": env, - "custom_tags": { - "Owner": owner, - "Application": "databricks", - "Source": "Airflow", - "Job": job_name, - } - } - - if driver_instance_type is not None: - new_cluster["driver_node_type_id"] = driver_instance_type - - min_workers = dev_instance_count if is_dev else instance_count - max_workers = dev_max_instance_count if is_dev else max_instance_count - - if enable_autoscale: - new_cluster["autoscale"] = { - "min_workers": min_workers, - "max_workers": max_workers, - } - else: - new_cluster["num_workers"] = min_workers - - # Parse the environment variables to bootstrap the tbv/mozetl workflow - if env.get("TBV_CLASS"): - opts = [ - ["--{}".format(key[4:].replace("_", "-")), value] - for key, value in env.items() - if key.startswith("TBV_") and key != "TBV_CLASS" - ] - - formatted_opts = [v for opt in opts for v in opt if v] - - jar_task = { - "main_class_name": env["TBV_CLASS"], - # Reconstruct TBV parameters from the environment, scallop does - # not support reading arguments in this form - "parameters": formatted_opts - } - - # Currently the artifacts are fetched via HTTP. Databricks - # expects either dbfs:// or s3:// for resources. - artifact_path = env.get("ARTIFACT_URL").split("amazonaws.com/")[-1] - artifact_path_s3 = "s3://{}".format(artifact_path) - libraries.append({'jar': artifact_path_s3}) - - elif env.get("MOZETL_COMMAND"): - # create a runner if it doesn't exist - bucket = "telemetry-test-bucket" if is_dev else "telemetry-airflow" - prefix = "steps" - - module_name = env.get("MOZETL_EXTERNAL_MODULE", "mozetl") - runner_name = "{}_runner.py".format(module_name) - - # options are read directly from the environment via Click - python_task = { - "python_file": "s3://{}/{}/{}".format(bucket, prefix, runner_name), - "parameters": [env["MOZETL_COMMAND"]] - } - - # Proper pip dependencies in Databricks is only supported via pypi. - # Dependencies for source/binary distributions need to be added - # manually. - path = env.get( - "MOZETL_GIT_PATH", "https://github.com/mozilla/python_mozetl.git" - ) - branch = env.get("MOZETL_GIT_BRANCH", "master") - - libraries.append( - { - "pypi": { - "package": "git+{path}@{branch}".format( - path=path, branch=branch - ) - } - } - ) - - if pypi_libs is not None and len(pypi_libs) > 0: - libraries.extend([{"pypi": {"package": lib}} for lib in pypi_libs]) - - else: - raise ValueError("Missing options for running tbv or mozetl tasks") - - json = { - "run_name": job_name, - "new_cluster": new_cluster, - "spark_jar_task": jar_task, - "spark_python_task": python_task, - "libraries": libraries - } - json = {k: v for k, v in json.items() if v} - super(MozDatabricksSubmitRunOperator, self).__init__( - json=json, - databricks_retry_limit=20, - databricks_retry_delay=30, - **kwargs - ) - - def execute(self, context): - self.log.info("Running {} with parameters:\n{}" - .format(self.job_name, pformat(self.json))) - - if self.disable_on_dev: - self.log.info("Skipping {} in the development environment" - .format(self.job_name)) - return - - # Create a runner if it doesn't exist only at execution time of the job. - if self.env.get("MOZETL_COMMAND"): - s3 = boto3.resource("s3") - bucket = "telemetry-test-bucket" if self.is_dev else "telemetry-airflow" - prefix = "steps" - - module_name = self.env.get("MOZETL_EXTERNAL_MODULE", "mozetl") - runner_name = "{}_runner.py".format(module_name) - - try: - s3.Object(bucket, "{}/{}".format(prefix, runner_name)).load() - except botocore.exceptions.ClientError as e: - if e.response["Error"]["Code"] == "404": - generate_runner(module_name, bucket, prefix) - else: - raise e - - super(MozDatabricksSubmitRunOperator, self).execute(context) - - -class MozDatabricksPlugin(AirflowPlugin): - name = 'moz_databricks' - operators = [MozDatabricksSubmitRunOperator] diff --git a/plugins/mozetl.py b/plugins/mozetl.py deleted file mode 100644 index 1074d8cea..000000000 --- a/plugins/mozetl.py +++ /dev/null @@ -1,33 +0,0 @@ -import boto3 -import logging -from textwrap import dedent - - -def generate_runner(module_name, bucket, prefix): - """Generate a runner for the current module to be run in Databricks. - - See https://github.com/mozilla/python_mozetl/blob/master/bin/mozetl-databricks.py for a - standalone implementation. - """ - logging.info( - "Writing new runner to {}/{} for {}".format(bucket, prefix, module_name) - ) - - runner_data = """ - # This runner has been auto-generated from mozilla/telemetry-airflow/plugins/moz_databricks.py. - # Any changes made to the runner file may be over-written on subsequent runs. - from {module} import cli - - try: - cli.entry_point(auto_envvar_prefix="MOZETL") - except SystemExit: - # avoid calling sys.exit() in databricks - # http://click.palletsprojects.com/en/7.x/api/?highlight=auto_envvar_prefix#click.BaseCommand.main - pass - """.format( - module=module_name - ) - - s3 = boto3.resource("s3") - runner_object = s3.Object(bucket, "{}/{}_runner.py".format(prefix, module_name)) - runner_object.put(Body=dedent(runner_data)) diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/test_moz_databricks_operator.py b/tests/test_moz_databricks_operator.py deleted file mode 100644 index f9989b2fe..000000000 --- a/tests/test_moz_databricks_operator.py +++ /dev/null @@ -1,209 +0,0 @@ -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, You can obtain one at http://mozilla.org/MPL/2.0/. - -import boto3 -import pytest -from moto import mock_s3 -from plugins.moz_databricks import MozDatabricksSubmitRunOperator - -# The environment variables required by the MozDatabricks operator must be available -# at module import because the `os.environ` is accessed in the class scope. These -# variables are used by Airflow to template variables. Monkeypatch occurs after import, -# so the variables are defined in `tox.ini` instead. - - -@pytest.fixture() -def client(): - """Create a moto generated fixture for s3. Using this fixture will put the function - under test in the same scope as the @mock_s3 decorator. See - https://github.com/spulec/moto/issues/620. - """ - mock_s3().start() - client = boto3.resource("s3") - client.create_bucket(Bucket="telemetry-test-bucket") - client.create_bucket(Bucket="telemetry-airflow") - yield client - mock_s3().stop() - - -@pytest.fixture() -def mock_hook(mocker): - mock_hook = mocker.patch("airflow.contrib.operators.databricks_operator.DatabricksHook") - mock_hook_instance = mock_hook.return_value - mock_hook_instance.submit_run.return_value = 1 - return mock_hook_instance - - -def test_missing_tbv_or_mozetl_env(mock_hook): - with pytest.raises(ValueError): - MozDatabricksSubmitRunOperator( - job_name="test_databricks", env={}, instance_count=1 - ) - - -def test_mozetl_success(mock_hook, client): - client.Object("telemetry-test-bucket", "steps/mozetl_runner.py").put( - Body="raise NotImplementedError" - ) - operator = MozDatabricksSubmitRunOperator( - task_id="test_databricks", - job_name="test_databricks", - env={"MOZETL_COMMAND": "test"}, - instance_count=1, - ) - operator.execute(None) - mock_hook.submit_run.assert_called_once() - - # https://docs.python.org/3.3/library/unittest.mock.html#unittest.mock.Mock.call_args - # call_args is a tuple where the first element is a list of elements. The first element - # in `submit_run` is the constructed json blob. - json = mock_hook.submit_run.call_args[0][0] - assert json.get("spark_python_task") is not None - - -def test_tbv_success(mock_hook): - operator = MozDatabricksSubmitRunOperator( - task_id="test_databricks", - job_name="test_databricks", - env={"TBV_CLASS": "test", "ARTIFACT_URL": "https://test.amazonaws.com/test"}, - instance_count=1, - ) - operator.execute(None) - mock_hook.submit_run.assert_called_once() - - json = mock_hook.submit_run.call_args[0][0] - assert json.get("spark_jar_task") is not None - - -def test_default_python_version(mock_hook, client): - # run with default - operator = MozDatabricksSubmitRunOperator( - task_id="test_databricks", - job_name="test_databricks", - env={"MOZETL_COMMAND": "test"}, - instance_count=1, - ) - operator.execute(None) - mock_hook.submit_run.assert_called_once() - - json = mock_hook.submit_run.call_args[0][0] - assert ( - json["new_cluster"]["spark_env_vars"]["PYSPARK_PYTHON"] - == "/databricks/python3/bin/python3" - ) - - # run with python 2 specifically - operator = MozDatabricksSubmitRunOperator( - task_id="test_databricks", - job_name="test_databricks", - env={"MOZETL_COMMAND": "test"}, - python_version=2, - instance_count=1, - ) - operator.execute(None) - - json = mock_hook.submit_run.call_args[0][0] - assert json["new_cluster"]["spark_env_vars"].get("PYSPARK_PYTHON") is None - - with pytest.raises(ValueError): - MozDatabricksSubmitRunOperator( - task_id="test_databricks", - job_name="test_databricks", - env={"MOZETL_COMMAND": "test"}, - python_version=4, - instance_count=1, - ).execute(None) - - -def test_set_mozetl_path_and_branch(mock_hook, client): - def mocked_run_submit_args(env): - MozDatabricksSubmitRunOperator( - task_id="test_databricks", - job_name="test_databricks", - env=env, - instance_count=1, - ).execute(None) - return mock_hook.submit_run.call_args[0][0] - - json = mocked_run_submit_args( - { - "MOZETL_COMMAND": "test", - "MOZETL_GIT_PATH": "https://custom.com/repo.git", - "MOZETL_GIT_BRANCH": "dev", - } - ) - assert ( - json["libraries"][0]["pypi"]["package"] == "git+https://custom.com/repo.git@dev" - ) - - json = mocked_run_submit_args( - {"MOZETL_COMMAND": "test", "MOZETL_GIT_BRANCH": "dev"} - ) - assert ( - json["libraries"][0]["pypi"]["package"] - == "git+https://github.com/mozilla/python_mozetl.git@dev" - ) - - -def test_mozetl_skips_generates_runner_if_exists(mocker, client): - client.Object("telemetry-test-bucket", "steps/mozetl_runner.py").put( - Body="raise NotImplementedError" - ) - mock_hook = mocker.patch("airflow.contrib.operators.databricks_operator.DatabricksHook") - mock_runner = mocker.patch("plugins.moz_databricks.generate_runner") - - operator = MozDatabricksSubmitRunOperator( - task_id="test_databricks", - job_name="test_databricks", - env={"MOZETL_COMMAND": "test"}, - instance_count=1, - ) - operator.execute(None) - assert mock_hook.called - assert mock_runner.assert_not_called - assert ( - operator.json["spark_python_task"]["python_file"] - == "s3://telemetry-test-bucket/steps/mozetl_runner.py" - ) - - -def test_mozetl_generates_runner_if_not_exists(mocker, client): - mock_hook = mocker.patch("airflow.contrib.operators.databricks_operator.DatabricksHook") - mock_runner = mocker.patch("plugins.moz_databricks.generate_runner") - - operator = MozDatabricksSubmitRunOperator( - task_id="test_databricks", - job_name="test_databricks", - env={"MOZETL_COMMAND": "test"}, - instance_count=1, - ) - operator.execute(None) - assert mock_hook.called - assert mock_runner.called - assert ( - operator.json["spark_python_task"]["python_file"] - == "s3://telemetry-test-bucket/steps/mozetl_runner.py" - ) - - -def test_mozetl_generates_runner_for_external_module(mocker, client): - client.Object("telemetry-test-bucket", "steps/mozetl_runner.py").put( - Body="raise NotImplementedError" - ) - mock_hook = mocker.patch("airflow.contrib.operators.databricks_operator.DatabricksHook") - mock_runner = mocker.patch("plugins.moz_databricks.generate_runner") - - operator = MozDatabricksSubmitRunOperator( - task_id="test_databricks", - job_name="test_databricks", - env={"MOZETL_COMMAND": "test", "MOZETL_EXTERNAL_MODULE": "custom_module"}, - instance_count=1, - ) - operator.execute(None) - assert mock_hook.called - assert mock_runner.called - assert ( - operator.json["spark_python_task"]["python_file"] - == "s3://telemetry-test-bucket/steps/custom_module_runner.py" - ) diff --git a/tests/test_mozetl.py b/tests/test_mozetl.py deleted file mode 100644 index 5454cd438..000000000 --- a/tests/test_mozetl.py +++ /dev/null @@ -1,23 +0,0 @@ -import boto3 -import pytest -from moto import mock_s3 -from plugins.mozetl import generate_runner - - -@mock_s3 -@pytest.mark.parametrize("module_name", ["mozetl", "custom"]) -def test_generate_runner(module_name): - bucket = "test-bucket" - prefix = "test-prefix" - conn = boto3.resource("s3") - conn.create_bucket(Bucket=bucket) - - generate_runner(module_name, bucket, prefix) - - body = ( - conn.Object(bucket, "{}/{}_runner.py".format(prefix, module_name)) - .get()["Body"] - .read() - .decode("utf-8") - ) - assert body.split("\n")[3] == "from {} import cli".format(module_name) From 5b6262e8ae8983ef64747d8c3aaab893ec9c79e3 Mon Sep 17 00:00:00 2001 From: Anthony Miyaguchi Date: Fri, 29 Jan 2021 12:24:05 -0800 Subject: [PATCH 2/8] Remove moz_emr --- plugins/moz_emr/__init__.py | 3 - plugins/moz_emr/emr_add_steps_operator.py | 17 --- ...te_job_flow_selective_template_operator.py | 25 ---- plugins/moz_emr/moz_emr_cluster_sensors.py | 40 ------- plugins/moz_emr/moz_emr_mixin.py | 113 ------------------ plugins/moz_emr_plugin.py | 12 -- 6 files changed, 210 deletions(-) delete mode 100644 plugins/moz_emr/__init__.py delete mode 100644 plugins/moz_emr/emr_add_steps_operator.py delete mode 100644 plugins/moz_emr/emr_create_job_flow_selective_template_operator.py delete mode 100644 plugins/moz_emr/moz_emr_cluster_sensors.py delete mode 100644 plugins/moz_emr/moz_emr_mixin.py delete mode 100644 plugins/moz_emr_plugin.py diff --git a/plugins/moz_emr/__init__.py b/plugins/moz_emr/__init__.py deleted file mode 100644 index 92ecbc740..000000000 --- a/plugins/moz_emr/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from moz_emr.emr_add_steps_operator import EmrAddStepsOperator -from moz_emr.emr_create_job_flow_selective_template_operator import EmrCreateJobFlowSelectiveTemplateOperator -from moz_emr.moz_emr_cluster_sensors import MozEmrClusterStartSensor, MozEmrClusterEndSensor diff --git a/plugins/moz_emr/emr_add_steps_operator.py b/plugins/moz_emr/emr_add_steps_operator.py deleted file mode 100644 index fe8b2a004..000000000 --- a/plugins/moz_emr/emr_add_steps_operator.py +++ /dev/null @@ -1,17 +0,0 @@ -from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator - -from moz_emr.moz_emr_mixin import MozEmrMixin - - -class EmrAddStepsOperator(MozEmrMixin, EmrAddStepsOperator): - """ - We need templated steps so we can pass in date macros, etc - """ - template_fields = ['job_flow_id', 'steps'] - - """ - Override so we only return one step_id - """ - def execute(self, context): - step_ids = super(EmrAddStepsOperator, self).execute(context) - return step_ids[0] diff --git a/plugins/moz_emr/emr_create_job_flow_selective_template_operator.py b/plugins/moz_emr/emr_create_job_flow_selective_template_operator.py deleted file mode 100644 index c96d31d47..000000000 --- a/plugins/moz_emr/emr_create_job_flow_selective_template_operator.py +++ /dev/null @@ -1,25 +0,0 @@ -from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator -from airflow.utils import apply_defaults - -from moz_emr.moz_emr_mixin import MozEmrMixin - - -class EmrCreateJobFlowSelectiveTemplateOperator(MozEmrMixin, EmrCreateJobFlowOperator): - """ - Unfortunately, the task templater currently throws an exception if a field contains non-strings, - so we have to separate the fields we want to template from the rest of them - WARNING: currently, this implementation only supports separating top-level keys since the - templated dictionary will *overwrite* any duplicate top-level keys. - """ - template_fields = ['templated_job_flow_overrides'] - - @apply_defaults - def __init__(self, - templated_job_flow_overrides=None, - *args, **kwargs): - super(EmrCreateJobFlowSelectiveTemplateOperator, self).__init__(*args, **kwargs) - self.templated_job_flow_overrides = templated_job_flow_overrides - - def execute(self, context): - self.job_flow_overrides.update(self.templated_job_flow_overrides) - return super(EmrCreateJobFlowSelectiveTemplateOperator, self).execute(context) diff --git a/plugins/moz_emr/moz_emr_cluster_sensors.py b/plugins/moz_emr/moz_emr_cluster_sensors.py deleted file mode 100644 index 7f5076463..000000000 --- a/plugins/moz_emr/moz_emr_cluster_sensors.py +++ /dev/null @@ -1,40 +0,0 @@ -import logging - -from airflow.exceptions import AirflowException -from airflow.contrib.sensors.emr_job_flow_sensor import EmrJobFlowSensor - - -class MozEmrClusterBaseSensor(EmrJobFlowSensor): - # Overriding with the code in current airflow master to allow multiple failure states - def poke(self, context): - response = self.get_emr_response() - - if not response['ResponseMetadata']['HTTPStatusCode'] == 200: - logging.info('Bad HTTP response: %s' % response) - return False - - state = self.state_from_response(response) - logging.info('Job flow currently %s' % state) - - if state in self.NON_TERMINAL_STATES: - return False - - if state in self.FAILED_STATE: - raise AirflowException('EMR job failed') - - return True - - -# This sensor is meant to be used for spinning up a cluster which will be used for several steps -# The sensor only returns true when it's in "waiting" state, meaning it's ready for another step to -# be added -# Note: 'TERMINATED' is a fatal state since the purpose is to have a cluster ready for additional -# steps in the DAG -class MozEmrClusterStartSensor(MozEmrClusterBaseSensor): - NON_TERMINAL_STATES = ['STARTING', 'BOOTSTRAPPING', 'RUNNING', 'TERMINATING'] - FAILED_STATE = ['TERMINATED_WITH_ERRORS', 'TERMINATED'] - - -class MozEmrClusterEndSensor(MozEmrClusterBaseSensor): - NON_TERMINAL_STATES = ['STARTING', 'BOOTSTRAPPING', 'RUNNING', 'WAITING', 'TERMINATING'] - FAILED_STATE = ['TERMINATED_WITH_ERRORS'] diff --git a/plugins/moz_emr/moz_emr_mixin.py b/plugins/moz_emr/moz_emr_mixin.py deleted file mode 100644 index 7ec0c1dba..000000000 --- a/plugins/moz_emr/moz_emr_mixin.py +++ /dev/null @@ -1,113 +0,0 @@ -from os import environ -import requests - - -class MozEmrMixin: - DEFAULT_EMR_RELEASE = 'emr-5.13.0' - - template_fields = ('environment',) - region = environ['AWS_REGION'] - key_name = environ['EMR_KEY_NAME'] - flow_role = environ['EMR_FLOW_ROLE'] - service_role = environ['EMR_SERVICE_ROLE'] - instance_type = environ['EMR_INSTANCE_TYPE'] - spark_bucket = environ['SPARK_BUCKET'] - airflow_bucket = environ['AIRFLOW_BUCKET'] - private_output_bucket = environ['PRIVATE_OUTPUT_BUCKET'] - public_output_bucket = environ['PUBLIC_OUTPUT_BUCKET'] - - @staticmethod - def get_jobflow_args(owner, instance_count, job_name="Default Job Name", - release_label=DEFAULT_EMR_RELEASE, keep_alive=False): - - config_url = ( - 'https://s3-{}.amazonaws.com/{}/configuration/configuration.json' - .format(MozEmrMixin.region, MozEmrMixin.spark_bucket) - ) - - return { - "Name": job_name, - "ReleaseLabel": release_label, - "JobFlowRole": MozEmrMixin.flow_role, - "ServiceRole": MozEmrMixin.service_role, - "Applications": [{'Name': 'Spark'}, {'Name': 'Hive'}], - "VisibleToAllUsers": True, - "Configurations": requests.get(config_url).json(), - "LogUri": ( - 's3://{}/logs/{}/{}/' - .format(MozEmrMixin.airflow_bucket, - owner, - job_name) - ), - "Instances": { - 'MasterInstanceType': MozEmrMixin.instance_type, - 'SlaveInstanceType': MozEmrMixin.instance_type, - 'InstanceCount': instance_count, - 'Ec2KeyName': MozEmrMixin.key_name, - 'KeepJobFlowAliveWhenNoSteps': keep_alive, - }, - "BootstrapActions": [{ - 'Name': 'telemetry-bootstrap', - 'ScriptBootstrapAction': { - 'Path': ( - 's3://{}/bootstrap/telemetry.sh' - .format(MozEmrMixin.spark_bucket) - ) - } - }], - "Tags": [ - {'Key': 'Owner', 'Value': owner}, - {'Key': 'Application', - 'Value': 'telemetry-analysis-worker-instance'}, - {'Key': 'Source', - 'Value': 'Airflow'}, - {'Key': 'Job', - 'Value': job_name}, - ] - } - - @staticmethod - def get_step_args(job_name, - owner, - uri, - env=None, - output_visibility='private', - arguments='', - action_on_failure='TERMINATE_JOB_FLOW'): - if output_visibility == 'public': - data_bucket = MozEmrMixin.public_output_bucket - elif output_visibility == 'private': - data_bucket = MozEmrMixin.private_output_bucket - - if env is not None: - environment = ' '.join(['{}={}'.format(k, v) - for k, v in env.items()]) - - jar_url = ( - 's3://{}.elasticmapreduce/libs/script-runner/script-runner.jar' - .format(MozEmrMixin.region) - ) - - args = [ - 's3://{}/steps/airflow.sh'.format( - MozEmrMixin.airflow_bucket - ), - '--job-name', job_name, - '--user', owner, - '--uri', uri, - '--data-bucket', data_bucket, - '--environment', environment - ] - # Empty quotes will be parsed as literals in the shell, so avoid - # passing in arguments unless they are actually needed. See issue #189. - if arguments: - args += ['--arguments', '"{}"'.format(arguments)] - - return [{ - 'Name': job_name, - 'ActionOnFailure': action_on_failure, - 'HadoopJarStep': { - 'Jar': jar_url, - 'Args': args - } - }] diff --git a/plugins/moz_emr_plugin.py b/plugins/moz_emr_plugin.py deleted file mode 100644 index a9c93609e..000000000 --- a/plugins/moz_emr_plugin.py +++ /dev/null @@ -1,12 +0,0 @@ -from airflow.plugins_manager import AirflowPlugin - -from moz_emr import EmrAddStepsOperator, EmrCreateJobFlowSelectiveTemplateOperator, \ - MozEmrClusterStartSensor, MozEmrClusterEndSensor - - -class MozEmr(AirflowPlugin): - name = "moz_emr" - operators = [EmrAddStepsOperator, - EmrCreateJobFlowSelectiveTemplateOperator, - MozEmrClusterStartSensor, - MozEmrClusterEndSensor] From f9718230c77fcd3eb35062984acbe2139b21c374 Mon Sep 17 00:00:00 2001 From: Anthony Miyaguchi Date: Fri, 29 Jan 2021 13:14:57 -0800 Subject: [PATCH 3/8] Remove emr_spark_operator --- dags/example.py | 36 --- dags/operators/emr_spark_operator.py | 316 --------------------------- dags/parquet_export.py | 1 - dags/utils/mozetl.py | 31 --- dags/utils/tbv.py | 54 ----- 5 files changed, 438 deletions(-) delete mode 100644 dags/example.py delete mode 100644 dags/operators/emr_spark_operator.py delete mode 100644 dags/utils/mozetl.py delete mode 100644 dags/utils/tbv.py diff --git a/dags/example.py b/dags/example.py deleted file mode 100644 index 8381ff1ff..000000000 --- a/dags/example.py +++ /dev/null @@ -1,36 +0,0 @@ -from airflow import DAG -from airflow.operators import BashOperator -from datetime import datetime, timedelta -from operators.emr_spark_operator import EMRSparkOperator - - -default_args = { - 'owner': 'example@mozilla.com', - 'depends_on_past': False, - 'start_date': datetime(2099, 5, 31), - 'email': ['example@mozilla.com'], - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 3, - 'retry_delay': timedelta(minutes=10), -} - -dag = DAG('example', default_args=default_args, schedule_interval='@daily') - -spark = EMRSparkOperator( - task_id = "spark", - job_name = "Spark Example Job", - instance_count = 1, - execution_timeout=timedelta(hours=4), - env = {"date": "{{ ds_nodash }}"}, - uri = "https://raw.githubusercontent.com/mozilla/telemetry-airflow/master/examples/spark/example_date.ipynb", - dag = dag) - -bash = EMRSparkOperator( - task_id = "bash", - job_name = "Bash Example Job", - instance_count = 1, - execution_timeout=timedelta(hours=4), - env = {"date": "{{ ds_nodash }}"}, - uri = "https://raw.githubusercontent.com/mozilla/telemetry-airflow/master/examples/spark/example_date.sh", - dag = dag) diff --git a/dags/operators/emr_spark_operator.py b/dags/operators/emr_spark_operator.py deleted file mode 100644 index 7a1c559ec..000000000 --- a/dags/operators/emr_spark_operator.py +++ /dev/null @@ -1,316 +0,0 @@ -import logging -import time -from os import environ - -import boto3 -from io import BytesIO -from gzip import GzipFile -from urllib.parse import urlparse -import requests -from airflow.models import BaseOperator -from airflow.utils.decorators import apply_defaults -from airflow.exceptions import AirflowException -from retrying import retry - - -class EMRSparkOperator(BaseOperator): - """ - Execute a Spark job on EMR. - - :param job_name: The name of the job. - :type job_name: string - - :param owner: The e-mail address of the user owning the job. - :type owner: string - - :param uri: The URI of the job to run, which can be either a Jupyter - notebook or a JAR file. - :type uri: string - - :param instance_count: The number of workers the cluster should have. - :type instance_count: int - - :param env: If env is not None, it must be a mapping that defines the - environment variables for the new process (templated). - :type env: string - """ - template_fields = ('environment',) - region = environ['AWS_REGION'] - key_name = environ['EMR_KEY_NAME'] - flow_role = environ['EMR_FLOW_ROLE'] - service_role = environ['EMR_SERVICE_ROLE'] - instance_type = environ['EMR_INSTANCE_TYPE'] - spark_bucket = environ['SPARK_BUCKET'] - airflow_bucket = environ['AIRFLOW_BUCKET'] - private_output_bucket = environ['PRIVATE_OUTPUT_BUCKET'] - public_output_bucket = environ['PUBLIC_OUTPUT_BUCKET'] - deploy_environment = environ['DEPLOY_ENVIRONMENT'] - deploy_tag = environ['DEPLOY_TAG'] - artifacts_bucket = environ['ARTIFACTS_BUCKET'] - - # constants - mozilla_slug = 'mozilla' - telemetry_batch_view_slug = 'telemetry-batch-view' - - def __del__(self): - self.on_kill() - - def post_execute(self, context): - self.on_kill() - - def on_kill(self): - if self.job_flow_id is None: - return - - client = boto3.client('emr', region_name=EMRSparkOperator.region) - result = client.describe_cluster(ClusterId=self.job_flow_id) - status = result['Cluster']['Status']['State'] - - if status != 'TERMINATED_WITH_ERRORS' and status != 'TERMINATED': - logging.warn('Terminating Spark job {}'.format(self.job_name)) - client.terminate_job_flows(JobFlowIds=[self.job_flow_id]) - - @staticmethod - def _format_envvar(env=None): - # use a default value if an environment dictionary isn't supplied - return ' '.join(['{}={}'.format(k, v) for k, v in (env or {}).items()]) - - @apply_defaults - def __init__(self, job_name, owner, uri, instance_count, - dev_instance_count=1, disable_on_dev=False, - release_label='emr-5.13.0', output_visibility='private', - env=None, arguments='', bootstrap_args=(), - *args, **kwargs): - """ - Create an operator for launching EMR clusters. - - :param job_name: Job name as seen in the EMR console - :param owner: Email address(es) for notifications - :param uri: Remote path to the executable application as per airflow.sh - :param instance_count: The number of instances to use in production - :param dev_instance_count: The number of instances to use in development - :param disable_on_dev: Turn the job into a no-op if run in development - :param release_label: The EMR release label (i.e. 'emr-5.13.0') - :param output_visibility: 'public' or 'private', specifying the default - output bucket for data - :param env: A dictionary of environment variables to pass during runtime - :param dev_env: Additional environment variables to pass in development - :param arguments: Passed to `airflow.sh` - :param bootstrap_args: An iterable of arguments passed to `telemetry.sh` - """ - is_dev = self.deploy_environment == 'dev' - - super(EMRSparkOperator, self).__init__(*args, **kwargs) - self.job_name = job_name - if self.deploy_environment != 'prod': - username = environ.get("DEV_USERNAME") - if username: - prefix = "{}-{}".format(self.deploy_environment, username) - else: - prefix = self.deploy_environment - self.job_name = "{}: {}".format(prefix, self.job_name) - - self.owner = owner - self.uri = uri - self.release_label = release_label - self.arguments = arguments - self.bootstrap_args = bootstrap_args - self.environment = self._format_envvar(env) - self.job_flow_id = None - self.instance_count = dev_instance_count if is_dev else instance_count - self.disable_on_dev = is_dev and disable_on_dev - - if output_visibility == 'public': - self.data_bucket = EMRSparkOperator.public_output_bucket - elif output_visibility == 'private': - self.data_bucket = EMRSparkOperator.private_output_bucket - else: - raise AirflowException( - '{} visibility is not supported!'.format(output_visibility)) - - def execute(self, context): - if self.disable_on_dev: - logging.info( - "Skipping {} in the development environment" - .format(self.job_name) - ) - return - - jar_url = ( - 's3://{}.elasticmapreduce/libs/script-runner/script-runner.jar' - .format(EMRSparkOperator.region) - ) - - args = [ - 's3://{}/steps/airflow.sh'.format( - EMRSparkOperator.airflow_bucket - ), - '--job-name', self.job_name, - '--user', self.owner, - '--uri', self.uri, - '--data-bucket', self.data_bucket, - '--environment', self.environment - ] - # Empty quotes will be parsed as literals in the shell, so avoid - # passing in arguments unless they are actually needed. See issue #189. - if self.arguments: - args += ['--arguments', '"{}"'.format(self.arguments)] - - self.steps = [{ - 'Name': 'RunJobStep', - 'ActionOnFailure': 'TERMINATE_JOB_FLOW', - 'HadoopJarStep': { - 'Jar': jar_url, - 'Args': args - } - }] - - if environ.get("AWS_ACCESS_KEY_ID", None) == "": - del(environ["AWS_ACCESS_KEY_ID"]) - - if environ.get("AWS_SECRET_ACCESS_KEY", None) == "": - del(environ["AWS_SECRET_ACCESS_KEY"]) - - config_url = ( - 'https://s3-{}.amazonaws.com/{}/configuration/configuration.json' - .format(EMRSparkOperator.region, EMRSparkOperator.spark_bucket) - ) - client = boto3.client('emr', region_name=EMRSparkOperator.region) - response = client.run_job_flow( - Name=self.job_name, - ReleaseLabel=self.release_label, - JobFlowRole=EMRSparkOperator.flow_role, - ServiceRole=EMRSparkOperator.service_role, - Applications=[{'Name': 'Spark'}, {'Name': 'Hive'}], - VisibleToAllUsers=True, - Configurations=requests.get(config_url).json(), - LogUri=self._log_uri(), - Instances={ - 'MasterInstanceType': EMRSparkOperator.instance_type, - 'SlaveInstanceType': EMRSparkOperator.instance_type, - 'InstanceCount': self.instance_count, - 'Ec2KeyName': EMRSparkOperator.key_name - }, - BootstrapActions=[{ - 'Name': 'telemetry-bootstrap', - 'ScriptBootstrapAction': { - 'Path': ( - 's3://{}/bootstrap/telemetry.sh' - .format(EMRSparkOperator.spark_bucket) - ), - 'Args': self.bootstrap_args, - } - }], - Tags=[ - {'Key': 'Owner', 'Value': self.owner}, - {'Key': 'Application', - 'Value': 'telemetry-analysis-worker-instance'}, - {'Key': 'Environment', - 'Value': EMRSparkOperator.deploy_environment}, - {'Key': 'app', - 'Value': 'airflow-emr-spark'}, - {'Key': 'env', - 'Value': EMRSparkOperator.deploy_environment}, - {'Key': 'realm', - 'Value': EMRSparkOperator.deploy_environment}, - {'Key': 'program_name', - 'Value': 'data'}, - {'Key': 'program_code', - 'Value': 'data'}, - {'Key': 'cost_center', - 'Value': '5650'}, - ], - Steps=self.steps - ) - - self.job_flow_id = response['JobFlowId'] - logging.info( - 'Running Spark Job {} with JobFlow ID {}' - .format(self.job_name, self.job_flow_id) - ) - log_url = ( - 'https://console.aws.amazon.com/s3/home' - '?region={}#&bucket={}&prefix=logs/{}/{}/{}' - .format(EMRSparkOperator.region, - EMRSparkOperator.airflow_bucket, - self.owner, - self.job_name, - self.job_flow_id) - ) - logging.info('Logs will be available at: {}'.format(log_url)) - - while True: - # wait 2^i seconds between each retry up to 5m, stop after 30m - @retry(wait_exponential_multiplier=1000, - wait_exponential_max=300000, - stop_max_delay=1800000) - def describe_cluster(): - return client.describe_cluster(ClusterId=self.job_flow_id) - - result = describe_cluster() - status = result['Cluster']["Status"]["State"] - - if status == 'TERMINATED_WITH_ERRORS': - reason_code = result['Cluster']['Status']['StateChangeReason']['Code'] - reason_message = result['Cluster']['Status']['StateChangeReason']['Message'] - step_logs = self.get_failed_step_logs() - spark_log_location = self.get_spark_log_location() - - raise AirflowException( - u'Spark job {} terminated with errors: {} - {}
' - 'Cluster Stderr: {}
' - 'Cluster Stdout: {}
' - 'Spark Driver Log Location: {}' - .format(self.job_name, reason_code, reason_message, - step_logs['stderr'], step_logs['stdout'], spark_log_location) - .encode('ascii', 'replace') - ) - elif status == 'TERMINATED': - break - elif status == 'WAITING': - raise AirflowException( - 'Spark job {} is waiting'.format(self.job_name) - ) - - logging.info( - "Spark Job '{}' status' is {}".format(self.job_name, status) - ) - time.sleep(300) - - def _log_uri(self): - return ('s3://{}/logs/{}/{}/' - .format(EMRSparkOperator.airflow_bucket, - self.owner, - self.job_name)) - - def get_failed_step_logs(self): - logs = {'stdout': '', 'stderr': ''} - - try: - emr_client = boto3.client('emr', region_name=EMRSparkOperator.region) - response = emr_client.list_steps(ClusterId=self.job_flow_id, StepStates=['FAILED']) - step_file_loc = urlparse(response['Steps'][0]['Status']['FailureDetails']['LogFile']) - bucket = step_file_loc.netloc - base_key = step_file_loc.path.lstrip('/') - s3 = boto3.client('s3') - - for logfile in ['stdout', 'stderr']: - try: - obj = s3.get_object(Bucket=bucket, Key=base_key + logfile + '.gz') - bytestream = BytesIO(obj['Body'].read()) - logs[logfile] = GzipFile(None, 'rb', fileobj=bytestream).read().decode('utf-8') - except Exception as e: - logging.warn("Getting {} failed with {}".format(logfile, e)) - except Exception as e: - logging.warn("Getting step logs failed with {}".format(e)) - return logs - - def get_spark_log_location(self): - try: - emr_client = boto3.client('emr', region_name=EMRSparkOperator.region) - response = emr_client.list_instances(ClusterId=self.job_flow_id, InstanceGroupTypes=['MASTER']) - master = response['Instances'][0]['Ec2InstanceId'] - return '{}{}/node/{}/applications/spark/spark.log.gz'.format(self._log_uri(), self.job_flow_id, master) - except Exception as e: - logging.warn("Exception while getting spark log location: {}".format(e)) - return '' diff --git a/dags/parquet_export.py b/dags/parquet_export.py index 3939c4544..d37fe190f 100644 --- a/dags/parquet_export.py +++ b/dags/parquet_export.py @@ -11,7 +11,6 @@ moz_dataproc_jar_runner, get_dataproc_parameters, ) -from utils.tbv import tbv_envvar from utils.gcp import ( bigquery_etl_query, bigquery_etl_copy_deduplicate, diff --git a/dags/utils/mozetl.py b/dags/utils/mozetl.py deleted file mode 100644 index 208f0b82d..000000000 --- a/dags/utils/mozetl.py +++ /dev/null @@ -1,31 +0,0 @@ -"""Utility functions for launching mozetl jobs""" -from operators.emr_spark_operator import EMRSparkOperator - - -def mozetl_envvar(command, options, dev_options={}, other={}): - """Set up environment variables for mozetl jobs. - - The command line interface can read options from the environment instead of usage - flags, through a library called Click. All environment variables must be prefixed - by the command and subcommand names. For example, a job registered with mozetl with - the name `example` taking a `--date` option can use `MOZETL_EXAMPLE_DATE` instead. - - :command string: name of the command registered with python_mozel - :options dict: environment variables to prefix - :dev_options dict: variables to use when in the development environment - :other dict: environment variables to pass through - - :returns: a dictionary that contains properly prefixed command and options - """ - - if EMRSparkOperator.deploy_environment == 'dev': - options.update(dev_options) - - prefixed_options = { - "MOZETL_{}_{}".format(command.upper(), key.upper().replace("-", "_")): value - for key, value in options.items() - } - prefixed_options["MOZETL_COMMAND"] = command - prefixed_options.update(other) - - return prefixed_options diff --git a/dags/utils/tbv.py b/dags/utils/tbv.py deleted file mode 100644 index 4802344a7..000000000 --- a/dags/utils/tbv.py +++ /dev/null @@ -1,54 +0,0 @@ -"""Utility functions for launching telemetry-batch-view jobs""" - -from operators.emr_spark_operator import EMRSparkOperator -from utils.deploy import get_artifact_url - - -def tbv_envvar(klass, options, dev_options={}, branch=None, tag=None, other={}, - artifact_url=None): - """Set up environment variables for telemetry-batch-view jobs. - - The command line interface can read options from the environment. All - environment variables must be prefixed by `TBV_`. For example, a class in - telemetry-batch-view taking a `--date` option can use `TBV_DATE` instead. - There is a limitation that spaces cannot be in environment variables, so - ValueError is thrown if spaces are found outside templating brackets. - - :klass string: name of the class in telemetry-batch-view - :options dict: environment variables to prefix - :dev_options dict: variables to use when in the development environment - :branch string: the branch to run the job from, incompatible with tag - :tag string: the tag to run the job from, incompatible with branch - :other dict: environment variables to pass through - :artifact_url string: Location of pre-built binaries - - :returns: a dictionary that contains properly prefixed class and options - """ - if artifact_url is None: - slug = "{{ task.__class__.telemetry_batch_view_slug }}" - url = get_artifact_url(slug, branch=branch, tag=tag) - else: - url = artifact_url - - if EMRSparkOperator.deploy_environment == 'dev': - options.update(dev_options) - - prefixed_options = { - "TBV_{}".format(key.replace("-", "_")): value - for key, value in options.items() - } - - if klass is not None: - prefixed_options["TBV_CLASS"] = klass - else: - assert other.get("DO_SUBMIT", "True") == "False", "To submit there must be a class name" - - prefixed_options["ARTIFACT_URL"] = url - prefixed_options.update(other) - - # raise ValueError if spaces found in non-templated envvar values - for item in prefixed_options.values(): - if "{{" not in item and " " in item: - raise ValueError("env cannot contain spaces: '{}'".format(item)) - - return prefixed_options From 484057f5e1d125a4dc21adecc134194fef9cdf33 Mon Sep 17 00:00:00 2001 From: Anthony Miyaguchi Date: Fri, 29 Jan 2021 13:21:53 -0800 Subject: [PATCH 4/8] Remove tox.ini and unused docker variables --- Dockerfile | 14 +------------- Dockerfile.dev | 14 +------------- README.md | 11 +---------- docker-compose.yml | 1 - tox.ini | 32 -------------------------------- 5 files changed, 3 insertions(+), 69 deletions(-) delete mode 100644 tox.ini diff --git a/Dockerfile b/Dockerfile index 7090fb2c7..ddbfda1b5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -42,22 +42,10 @@ USER 10001 ENV PYTHONUNBUFFERED=1 \ PORT=8000 - # AWS_REGION= \ # AWS_ACCESS_KEY_ID= \ # AWS_SECRET_ACCESS_KEY= \ - # SPARK_BUCKET= \ - # AIRFLOW_BUCKET= \ - # PRIVATE_OUTPUT_BUCKET= \ - # PUBLIC_OUTPUT_BUCKET= \ - # EMR_KEY_NAME= \ - # EMR_FLOW_ROLE= \ - # EMR_SERVICE_ROLE= \ - # EMR_INSTANCE_TYPE= \ # DEPLOY_ENVIRONMENT = \ - # DEPLOY_TAG = \ - # ARTIFACTS_BUCKET = \ - # DATABRICKS_DEFAULT_IAM \ - + # DEPLOY_TAG = ENV AIRFLOW_HOME=/app \ AIRFLOW_EMAIL_BACKEND="airflow.utils.email.send_email_smtp" diff --git a/Dockerfile.dev b/Dockerfile.dev index 9543cf81a..6847526c8 100644 --- a/Dockerfile.dev +++ b/Dockerfile.dev @@ -37,22 +37,10 @@ COPY ./config/ /app USER 10001 ENV PYTHONUNBUFFERED=1 \ - AWS_REGION=us-west-2 \ - SPARK_BUCKET=telemetry-spark-emr-2 \ - AIRFLOW_BUCKET=telemetry-airflow \ - PRIVATE_OUTPUT_BUCKET=telemetry-test-bucket \ - PUBLIC_OUTPUT_BUCKET=telemetry-test-bucket \ - EMR_KEY_NAME=20161025-dataops-dev \ - EMR_FLOW_ROLE=telemetry-spark-cloudformation-TelemetrySparkInstanceProfile-1SATUBVEXG7E3 \ - EMR_SERVICE_ROLE=EMR_DevRole \ - EMR_INSTANCE_TYPE=c3.4xlarge \ PORT=8000 \ DEPLOY_ENVIRONMENT=dev \ DEVELOPMENT=1 \ - DEPLOY_TAG=master \ - ARTIFACTS_BUCKET=net-mozaws-data-us-west-2-ops-ci-artifacts \ - DATABRICKS_DEFAULT_IAM=arn:aws:iam::144996185633:instance-profile/databricks-ec2 - # DEV_USERNAME= + DEPLOY_TAG=master # AWS_ACCESS_KEY_ID= # AWS_SECRET_ACCESS_KEY= diff --git a/README.md b/README.md index 90ca3bcb0..6c9de5dbf 100644 --- a/README.md +++ b/README.md @@ -145,14 +145,6 @@ variables: - `AWS_ACCESS_KEY_ID` -- The AWS access key ID to spin up the Spark clusters - `AWS_SECRET_ACCESS_KEY` -- The AWS secret access key -- `SPARK_BUCKET` -- The AWS S3 bucket where Spark related files are stored, - e.g. `telemetry-spark-emr-2` -- `AIRFLOW_BUCKET` -- The AWS S3 bucket where airflow specific files are stored, - e.g. `telemetry-airflow` -- `PUBLIC_OUTPUT_BUCKET` -- The AWS S3 bucket where public job results are - stored in, e.g. `telemetry-public-analysis-2` -- `PRIVATE_OUTPUT_BUCKET` -- The AWS S3 bucket where private job results are - stored in, e.g. `telemetry-parquet` - `AIRFLOW_DATABASE_URL` -- The connection URI for the Airflow database, e.g. `mysql://username:password@hostname:port/database` - `AIRFLOW_BROKER_URL` -- The connection URI for the Airflow worker queue, e.g. @@ -179,8 +171,7 @@ variables: `master` or `tags`. You can specify the tag or travis build exactly as well, e.g. `master/42.1` or `tags/v2.2.1`. Not specifying the exact tag or build will use the latest from that branch, or the latest tag. -- `ARTIFACTS_BUCKET` -- The s3 bucket where the build artifacts can be found, e.g. - `net-mozaws-data-us-west-2-ops-ci-artifacts` + Also, please set diff --git a/docker-compose.yml b/docker-compose.yml index 3edee76a1..d7fe699a9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -32,7 +32,6 @@ services: environment: - AWS_SECRET_ACCESS_KEY - AWS_ACCESS_KEY_ID - - DB_TOKEN - GOOGLE_APPLICATION_CREDENTIALS=/app/.credentials web: diff --git a/tox.ini b/tox.ini deleted file mode 100644 index 94d32ec95..000000000 --- a/tox.ini +++ /dev/null @@ -1,32 +0,0 @@ -[tox] -envlist = py37 -skipsdist = True - -[testenv] -deps= - pytest - pytest-mock - moto == 1.3.13 - -r{toxinidir}/requirements.txt -setenv = - PYTHONPATH = {toxinidir}/plugins:{toxinidir}/dags - AIRFLOW__CORE__SQL_ALCHEMY_CONN = sqlite:///{toxworkdir}/airflow.db - AIRFLOW__CORE__UNIT_TEST_MODE = True - AWS_ACCESS_KEY_ID=test - AWS_SECRET_ACCESS_KEY=test - # required by test_moz_databricks_operator - AWS_REGION = test - SPARK_BUCKET = test - PRIVATE_OUTPUT_BUCKET = test - PUBLIC_OUTPUT_BUCKET = test - DEPLOY_ENVIRONMENT = dev - DEPLOY_TAG = dev - ARTIFACTS_BUCKET = test - DATABRICKS_DEFAULT_IAM = test - EMR_INSTANCE_TYPE = test - AIRFLOW__CORE__FERNET_KEY="0000000000000000000000000000000000000000000=" - SLUGIFY_USES_TEXT_UNIDECODE = yes - -commands= - airflow resetdb -y - python -m pytest {posargs} From fa84c59da7b9261a4bd9cfa0048493c680256503 Mon Sep 17 00:00:00 2001 From: Anthony Miyaguchi Date: Fri, 29 Jan 2021 13:22:35 -0800 Subject: [PATCH 5/8] Remove email on schema change operator --- .../operators/email_schema_change_operator.py | 72 ------------------- dags/parquet_export.py | 1 - 2 files changed, 73 deletions(-) delete mode 100644 dags/operators/email_schema_change_operator.py diff --git a/dags/operators/email_schema_change_operator.py b/dags/operators/email_schema_change_operator.py deleted file mode 100644 index e351108c3..000000000 --- a/dags/operators/email_schema_change_operator.py +++ /dev/null @@ -1,72 +0,0 @@ -import boto3 -import logging -import re -from airflow.exceptions import AirflowException -from airflow.operators.email_operator import EmailOperator -from airflow.utils.decorators import apply_defaults -from difflib import unified_diff -from os import environ - - -class EmailSchemaChangeOperator(EmailOperator): - """ - Execute a Spark job on EMR and send email on schema changes. - - :param key_prefix: The s3 prefix within bucket containing schemas. - :type key_prefix: string - - :param bucket: The name of the s3 bucket containing schemas. - :type bucket: string - - :param latest_schema_key: The s3 key after key_prefix for the latest schema. - :type latest_schema_key: string - - :param previous_schema_key: The s3 key after key_prefix for the previous schema. - :type previous_schema_key: string - - And all EmailOperator params. The diff will be appended to html_content. - """ - template_fields = EmailOperator.template_fields + ( - 'bucket', - 'key_prefix', - 'latest_schema_key', - 'previous_schema_key', - ) - - # s3 buckets from env - private_output_bucket = environ['PRIVATE_OUTPUT_BUCKET'] - public_output_bucket = environ['PUBLIC_OUTPUT_BUCKET'] - - @apply_defaults - def __init__(self, key_prefix, bucket='{{ task.__class__.private_output_bucket }}', - latest_schema_key='{{ ds_nodash }}', - previous_schema_key='{{ yesterday_ds_nodash }}', - html_content='schema diff between {{ yesterday_ds }} and {{ ds }}:

', - subject='Airflow: Schema change between {{ yesterday_ds }} and {{ ds }}: {{ ti }}', - *args, **kwargs): - super(EmailSchemaChangeOperator, self).__init__( - subject=subject, html_content=html_content, *args, **kwargs) - self.bucket = bucket - self.key_prefix = key_prefix - self.latest_schema_key = latest_schema_key - self.previous_schema_key = previous_schema_key - - def execute(self, context): - client = boto3.client('s3') - try: - previous = client.get_object( - Key=self.key_prefix + self.previous_schema_key, - Bucket=self.bucket)['Body'].read().splitlines(True) - except client.exceptions.NoSuchKey: - logging.info('previous_schema not found, skipping change detection') - return # if there's no previous schema, don't try to compare - latest = client.get_object( - Key=self.key_prefix + self.latest_schema_key, - Bucket=self.bucket)['Body'].read().splitlines(True) - diff = list(unified_diff(previous, latest)) - if diff: - logging.info('schema change detected, sending email') - self.html_content += ''.join(diff).replace('\n', '
') - super(EmailSchemaChangeOperator, self).execute(context) - else: - logging.info('no schema change detected') diff --git a/dags/parquet_export.py b/dags/parquet_export.py index d37fe190f..7cec316b9 100644 --- a/dags/parquet_export.py +++ b/dags/parquet_export.py @@ -5,7 +5,6 @@ from airflow.operators.moz_databricks import MozDatabricksSubmitRunOperator from airflow.operators.subdag_operator import SubDagOperator from airflow.operators.sensors import ExternalTaskSensor -from operators.email_schema_change_operator import EmailSchemaChangeOperator from utils.dataproc import ( moz_dataproc_pyspark_runner, moz_dataproc_jar_runner, From 1079c82f0aabd23726f14ccddc895c6a1a79ae47 Mon Sep 17 00:00:00 2001 From: Anthony Miyaguchi Date: Fri, 29 Jan 2021 13:23:39 -0800 Subject: [PATCH 6/8] Add utility script for exporting AWS credentials into env --- bin/export_aws_credentials | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 bin/export_aws_credentials diff --git a/bin/export_aws_credentials b/bin/export_aws_credentials new file mode 100644 index 000000000..12490a843 --- /dev/null +++ b/bin/export_aws_credentials @@ -0,0 +1,8 @@ +#!/usr/bin/env bash +# Exports credentials from an aws credentials file into the environment so it +# may be passed into the airflow container. +# Usage: `source bin/export_aws_credentials` + +keys=$(grep default -A2 < ~/.aws/credentials | tail -n2 | awk '{print $3}') +export AWS_ACCESS_KEY_ID=$(echo $keys | head -n1) +export AWS_SECRET_ACCESS_KEY=$(echo $keys | tail -n1) From 67326078a91de7be75b58ca2e8347caa1172043c Mon Sep 17 00:00:00 2001 From: Anthony Miyaguchi Date: Fri, 29 Jan 2021 13:59:42 -0800 Subject: [PATCH 7/8] Fix broken imports --- dags/mozaggregator_release.py | 2 ++ dags/parquet_export.py | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/dags/mozaggregator_release.py b/dags/mozaggregator_release.py index 8d1808f7d..991645f46 100644 --- a/dags/mozaggregator_release.py +++ b/dags/mozaggregator_release.py @@ -1,6 +1,8 @@ from airflow import DAG from datetime import datetime, timedelta +from airflow.operators.dummy_operator import DummyOperator + default_args = { "owner": "frank@mozilla.com", "depends_on_past": True, diff --git a/dags/parquet_export.py b/dags/parquet_export.py index 7cec316b9..54ce6afda 100644 --- a/dags/parquet_export.py +++ b/dags/parquet_export.py @@ -2,7 +2,6 @@ from datetime import datetime, timedelta from airflow.contrib.hooks.aws_hook import AwsHook from airflow.executors import get_default_executor -from airflow.operators.moz_databricks import MozDatabricksSubmitRunOperator from airflow.operators.subdag_operator import SubDagOperator from airflow.operators.sensors import ExternalTaskSensor from utils.dataproc import ( From 6200120d129aa83fb9c78697637870f95bfe68f7 Mon Sep 17 00:00:00 2001 From: Anthony Miyaguchi Date: Fri, 29 Jan 2021 13:59:58 -0800 Subject: [PATCH 8/8] Fix docker-compose.yml file with invalid values --- docker-compose.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index d7fe699a9..65099344b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -51,7 +51,7 @@ services: extends: service: app restart: always - mem_limit: 4294967296 + mem_limit: "4294967296" ports: - "8793:8793" links: @@ -60,7 +60,7 @@ services: command: worker scheduler: - mem_limit: 4294967296 + mem_limit: "4294967296" extends: service: app restart: always