Skip to content

Commit

Permalink
Provide automatic URL tracking for Spark applications (#2669)
Browse files Browse the repository at this point in the history
- changed `track_url_in_stderr` to `stream_for_searching_tracking_url` according to the latest changes in external_program.py
- added tests for spark url tracking in cluster and client modes
  • Loading branch information
GoodDok authored and Tarrasch committed Mar 28, 2019
1 parent 44f89a2 commit 62fbaf2
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 4 deletions.
2 changes: 1 addition & 1 deletion luigi/contrib/spark.py
Expand Up @@ -54,7 +54,7 @@ class SparkSubmitTask(ExternalProgramTask):
# Only log stderr if spark fails (since stderr is normally quite verbose)
always_log_stderr = False
# Spark applications write its logs into stderr
track_url_in_stderr = True
stream_for_searching_tracking_url = 'stderr'

def run(self):
if self.deploy_mode == "cluster":
Expand Down
42 changes: 39 additions & 3 deletions test/contrib/spark_test.py
Expand Up @@ -26,7 +26,10 @@
from helpers import with_config, temporary_unloaded_module
from luigi.contrib.external_program import ExternalProgramRunError
from luigi.contrib.spark import SparkSubmitTask, PySparkTask
from mock import patch, call, MagicMock
from mock import mock, patch, call, MagicMock
from functools import partial
from multiprocessing import Value
from subprocess import Popen

from nose.plugins.attrib import attr

Expand All @@ -47,7 +50,6 @@ def setup_run_process(proc):


class TestSparkSubmitTask(SparkSubmitTask):
deploy_mode = "client"
name = "AppName"
entry_class = "org.test.MyClass"
jars = ["jars/my.jar"]
Expand Down Expand Up @@ -99,7 +101,7 @@ def main(self, sc, *args):
class SparkSubmitTaskTest(unittest.TestCase):
ss = 'ss-stub'

@with_config({'spark': {'spark-submit': ss, 'master': "yarn-client", 'hadoop-conf-dir': 'path'}})
@with_config({'spark': {'spark-submit': ss, 'master': "yarn-client", 'hadoop-conf-dir': 'path', 'deploy-mode': 'client'}})
@patch('luigi.contrib.external_program.subprocess.Popen')
def test_run(self, proc):
setup_run_process(proc)
Expand Down Expand Up @@ -187,6 +189,40 @@ def interrupt():
pass
proc.return_value.kill.check_called()

@with_config({'spark': {'deploy-mode': 'client'}})
def test_tracking_url_is_found_in_stderr_client_mode(self):
test_val = Value('i', 0)

def fake_set_tracking_url(val, url):
if url == "http://10.66.76.155:4040":
val.value += 1

def Popen_wrap(args, **kwargs):
return Popen('>&2 echo "INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.66.76.155:4040"', shell=True, **kwargs)

task = TestSparkSubmitTask()
with mock.patch('luigi.contrib.external_program.subprocess.Popen', wraps=Popen_wrap):
with mock.patch.object(task, 'set_tracking_url', new=partial(fake_set_tracking_url, test_val)):
task.run()
self.assertEqual(test_val.value, 1)

@with_config({'spark': {'deploy-mode': 'cluster'}})
def test_tracking_url_is_found_in_stderr_cluster_mode(self):
test_val = Value('i', 0)

def fake_set_tracking_url(val, url):
if url == "https://127.0.0.1:4040":
val.value += 1

def Popen_wrap(args, **kwargs):
return Popen('>&2 echo "tracking URL: https://127.0.0.1:4040"', shell=True, **kwargs)

task = TestSparkSubmitTask()
with mock.patch('luigi.contrib.external_program.subprocess.Popen', wraps=Popen_wrap):
with mock.patch.object(task, 'set_tracking_url', new=partial(fake_set_tracking_url, test_val)):
task.run()
self.assertEqual(test_val.value, 1)


@attr('apache')
class PySparkTaskTest(unittest.TestCase):
Expand Down

0 comments on commit 62fbaf2

Please sign in to comment.