Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 32 additions & 26 deletions sling/sling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,34 +492,40 @@ def cli(*args, return_output=False):
return 0


def _exec_cmd(cmd, stdin=None, stdout=PIPE, stderr=STDOUT, env:dict=None):
lines = []

env = env or {}
for k,v in os.environ.items():
env[k] = env.get(k, v)

env['SLING_PACKAGE'] = 'python'
for pkg in ['dagster', 'airflow', 'temporal', 'orkes']:
if is_package(pkg):
env['SLING_PACKAGE'] = pkg

with Popen(cmd, shell=True, env=env, stdin=stdin, stdout=stdout, stderr=stderr) as proc:
if stdout and stdout != STDOUT and proc.stdout:
for line in proc.stdout:
line = str(line.strip(), 'utf-8', errors='replace')
yield line
def _exec_cmd(
cmd: str, stdin=None, stdout=PIPE, stderr=STDOUT, env: dict = None
):
lines: list[str] = []

proc.wait()

if stderr and stderr != STDOUT and proc.stderr:
lines = '\n'.join(list(proc.stderr))

if proc.returncode != 0:
if len(lines) > 0:
raise Exception(f'Sling command failed:\n{lines}')
raise Exception(f'Sling command failed')
env = env or {}
for k, v in os.environ.items():
env[k] = env.get(k, v)

env["SLING_PACKAGE"] = "python"
for pkg in ["dagster", "airflow", "temporal", "orkes"]:
if is_package(pkg):
env["SLING_PACKAGE"] = pkg

with Popen(
cmd, shell=True, env=env, stdin=stdin, stdout=stdout, stderr=stderr
) as proc:
if stdout and stdout != STDOUT and proc.stdout:
for line in proc.stdout:
line = str(line.strip(), "utf-8", errors="replace")
lines.append(line)
yield line

proc.wait()

if stderr and stderr != STDOUT and proc.stderr:
lines.extend(
str(line.strip(), "utf-8", errors="replace") for line in proc.stderr
)

if proc.returncode != 0:
if len(lines) > 0:
raise Exception("Sling command failed:\n" + "\n".join(lines))
raise Exception("Sling command failed")


class SlingError(Exception):
Expand Down
17 changes: 17 additions & 0 deletions sling/tests/test_sling_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,23 @@ def test_stream_arrow_from_polars_input(self, temp_dir, sample_data):
assert abs(actual - expected) < 0.001, f"Salary mismatch at index {i}: {actual} != {expected}"


class TestExecCmd:
"""Tests for the internal command execution helper."""

def test_exec_cmd_includes_stdout_on_error(self):
"""When a subprocess prints an error to STDOUT and exits non-zero
the helper should include that output in the raised exception.
"""
from sling import _exec_cmd

cmd = "bash -c 'echo fatal: invalid stream; exit 1'"

with pytest.raises(Exception) as excinfo:
list(_exec_cmd(cmd))

assert 'fatal:' in str(excinfo.value)


if __name__ == "__main__":
# Run tests if executed directly
pytest.main([__file__, "-v"])