Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 60 additions & 17 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "lazy-ecs"
version = "0.7.1"
version = "0.7.2"
description = "A CLI tool for working with AWS services"
readme = "README.md"
authors = [
Expand Down Expand Up @@ -55,28 +55,71 @@ target-version = "py311"
line-length = 120

[tool.ruff.lint]
select = [
"E", # pycodestyle errors
"W", # pycodestyle warnings
"F", # pyflakes
"I", # isort
"B", # flake8-bugbear
"C4", # flake8-comprehensions
"UP", # pyupgrade
"ANN", # flake8-annotations - require type annotations
"N", # naming conventions
"SIM", # simplify
"RET", # return values
"ARG", # unused arguments
"PTH", # pathlib
"RUF", # ruff-specific rules
select = ["ALL"]
ignore = [
# Formatter conflicts
"COM812", # Trailing comma - conflicts with formatter
"ISC001", # Implicit string concatenation - conflicts with formatter

# Docstrings - too verbose for this project style
"D100",
"D101",
"D102",
"D103",
"D104",
"D105",
"D107",
"D203",
"D213", # Incompatible docstring rules
"D401", # First line should be imperative - we prefer descriptive

# Line length - we use 120, not 88
"E501", # Handle with line-length = 120 instead

# Not applicable for CLI tool
"TID252", # Relative imports are fine in our structure
"TC001",
"TC002",
"TC003", # Type checking imports - not worth the complexity

# Too restrictive
"FBT001", # Boolean positional args are OK
"S110", # Try/except pass - sometimes needed
"SLF001", # Private member access - needed for testing
"PLC0415", # Import at top - sometimes needed for dynamic imports
"BLE001", # Blind except - acceptable for top-level error handling
"PLR2004", # Magic values - small numbers in comparisons are clear
"DTZ006",
"DTZ901", # Datetime timezone - not critical for this tool
"PERF401", # List comprehensions - readability over micro-optimizations
"TRY300",
"TRY301", # Try/except else blocks - not needed

# Complexity - will address selectively
"C901", # Function complexity
"PLR0911", # Too many return statements
"PLR0912", # Too many branches
"PLR0913", # Too many arguments
"PLR0915", # Too many statements
]
ignore = []

[tool.ruff.lint.per-file-ignores]
"tests/**" = [
"ANN001", # Missing type annotation for function argument (test fixtures/mocks)
"ANN201", # Missing return type annotation for public function (test fixtures)
"S101", # Use of assert - that's the point of tests!
"PLR0913", # Too many arguments in test functions
"PLR2004", # Magic values in tests are fine for readability
"DTZ001", # Datetime without timezone - test data doesn't need it
"PT019", # Fixture without value - sometimes needed for side effects
"PGH003", # Type ignore without code - acceptable in tests
]
"scripts/**" = [
"T201", # Print statements - needed for script output
"PLC0415", # Import in function - needed for screenshot generation
"DTZ001", # Datetime without timezone - mock data is fine
"PGH003", # Type ignore without code - will fix if needed
"EXE001", # Shebang on non-executable - handled by uv run
]

[tool.pyrefly]
Expand Down
6 changes: 3 additions & 3 deletions scripts/generate_screenshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def generate_task_comparison_screenshot() -> None:
"environment": {"ENV": "staging", "DEBUG": "true"},
"cpu": 256,
"memory": 512,
}
},
],
"taskCpu": "256",
"taskMemory": "512",
Expand All @@ -91,7 +91,7 @@ def generate_task_comparison_screenshot() -> None:
"environment": {"ENV": "production", "LOG_LEVEL": "info"},
"cpu": 256,
"memory": 512,
}
},
],
"taskCpu": "512",
"taskMemory": "1024",
Expand Down Expand Up @@ -186,7 +186,7 @@ def generate_task_failure_screenshot() -> None:
"reason": "OutOfMemoryError: Java heap space",
"health_status": None,
"last_status": "STOPPED",
}
},
],
},
{
Expand Down
108 changes: 58 additions & 50 deletions src/lazy_ecs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,58 +46,46 @@ def main() -> None:

def _create_aws_client(profile_name: str | None) -> "ECSClient":
"""Create optimized AWS ECS client with connection pooling."""
# Optimized configuration for better performance
config = Config(
max_pool_connections=5, # Increase from default 1, but keep reasonable for CLI
retries={
"max_attempts": 2, # Reduce from default 3 for faster failure
"mode": "adaptive",
},
max_pool_connections=5,
retries={"max_attempts": 2, "mode": "adaptive"},
)

if profile_name:
session = boto3.Session(profile_name=profile_name)
return session.client("ecs", config=config)
return boto3.client("ecs", config=config)
session = boto3.Session(profile_name=profile_name) if profile_name else boto3
return session.client("ecs", config=config)


def _create_logs_client(profile_name: str | None) -> "CloudWatchLogsClient":
"""Create optimized CloudWatch Logs client with connection pooling."""
config = Config(
max_pool_connections=5, # Same config as ECS client
max_pool_connections=5,
retries={"max_attempts": 2, "mode": "adaptive"},
)

if profile_name:
session = boto3.Session(profile_name=profile_name)
return session.client("logs", config=config)
return boto3.client("logs", config=config)
session = boto3.Session(profile_name=profile_name) if profile_name else boto3
return session.client("logs", config=config)


def _create_sts_client(profile_name: str | None) -> "STSClient":
"""Create optimized STS client with connection pooling."""
config = Config(
max_pool_connections=5, # Same config as ECS client
max_pool_connections=5,
retries={"max_attempts": 2, "mode": "adaptive"},
)

if profile_name:
session = boto3.Session(profile_name=profile_name)
return session.client("sts", config=config)
return boto3.client("sts", config=config)
session = boto3.Session(profile_name=profile_name) if profile_name else boto3
return session.client("sts", config=config)


def _create_cloudwatch_client(profile_name: str | None) -> "CloudWatchClient":
"""Create optimized CloudWatch client with connection pooling."""
config = Config(
max_pool_connections=5, # Same config as ECS client
max_pool_connections=5,
retries={"max_attempts": 2, "mode": "adaptive"},
)

if profile_name:
session = boto3.Session(profile_name=profile_name)
return session.client("cloudwatch", config=config)
return boto3.client("cloudwatch", config=config)
session = boto3.Session(profile_name=profile_name) if profile_name else boto3
return session.client("cloudwatch", config=config)


def _navigate_clusters(navigator: ECSNavigator, ecs_service: ECSService) -> None:
Expand Down Expand Up @@ -168,41 +156,61 @@ def _navigate_services(navigator: ECSNavigator, ecs_service: ECSService, cluster
# Continue the loop to show the menu again


_CONTAINER_ACTIONS = {
"tail_logs": lambda nav, cluster, task_arn, container: nav.show_container_logs_live_tail(
cluster,
task_arn,
container,
),
"show_env": lambda nav, cluster, task_arn, container: nav.show_container_environment_variables(
cluster,
task_arn,
container,
),
"show_secrets": lambda nav, cluster, task_arn, container: nav.show_container_secrets(cluster, task_arn, container),
"show_ports": lambda nav, cluster, task_arn, container: nav.show_container_port_mappings(
cluster,
task_arn,
container,
),
"show_volumes": lambda nav, cluster, task_arn, container: nav.show_container_volume_mounts(
cluster,
task_arn,
container,
),
}

_TASK_ACTIONS = {
"show_history": lambda nav, cluster, service, _task_arn, _task_details: nav.show_task_history(cluster, service),
"show_details": lambda nav, _cluster, _service, _task_arn, task_details: nav.display_task_details(task_details),
"compare_definitions": lambda nav, _cluster, _service, _task_arn, task_details: nav.show_task_definition_comparison(
task_details,
),
"open_console": lambda nav, cluster, _service, task_arn, _task_details: nav.open_task_in_console(cluster, task_arn),
}


def _handle_task_features(
navigator: ECSNavigator, cluster_name: str, task_arn: str, task_details: TaskDetails | None, service_name: str
navigator: ECSNavigator,
cluster_name: str,
task_arn: str,
task_details: TaskDetails | None,
service_name: str,
) -> bool:
"""Handle task feature selection and execution. Returns True if back was chosen, False if exit."""
while True:
selection = navigator.select_task_feature(task_details)

# Handle navigation responses
should_continue, should_exit = handle_navigation(selection)
if not should_continue:
return not should_exit # True for back, False for exit
return not should_exit

selection_type, action_name, container_name = parse_selection(selection)
if selection_type == "container_action":
# Map action names to methods
action_methods = {
"tail_logs": navigator.show_container_logs_live_tail,
"show_env": navigator.show_container_environment_variables,
"show_secrets": navigator.show_container_secrets,
"show_ports": navigator.show_container_port_mappings,
"show_volumes": navigator.show_container_volume_mounts,
}

if action_name in action_methods:
action_methods[action_name](cluster_name, task_arn, container_name)

elif selection_type == "task_action":
if action_name == "show_history":
navigator.show_task_history(cluster_name, service_name)
elif action_name == "show_details":
navigator.display_task_details(task_details)
elif action_name == "compare_definitions":
navigator.show_task_definition_comparison(task_details)
elif action_name == "open_console":
navigator.open_task_in_console(cluster_name, task_arn)

if selection_type == "container_action" and action_name in _CONTAINER_ACTIONS:
_CONTAINER_ACTIONS[action_name](navigator, cluster_name, task_arn, container_name)
elif selection_type == "task_action" and action_name in _TASK_ACTIONS:
_TASK_ACTIONS[action_name](navigator, cluster_name, service_name, task_arn, task_details)


if __name__ == "__main__":
Expand Down
44 changes: 35 additions & 9 deletions src/lazy_ecs/aws_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ def get_tasks(self, cluster_name: str, service_name: str) -> list[str]:
return self._task.get_tasks(cluster_name, service_name)

def _with_desired_task_definition(
self, cluster_name: str, service_name: str, operation: Callable[[str | None], Any]
self,
cluster_name: str,
service_name: str,
operation: Callable[[str | None], Any],
) -> Any: # noqa: ANN401
"""Helper to reduce repetition in task operations that need desired task definition."""
desired_task_def_arn = self._service.get_desired_task_definition_arn(cluster_name, service_name)
Expand All @@ -66,13 +69,17 @@ def _with_desired_task_definition(
def get_task_info(self, cluster_name: str, service_name: str) -> list[TaskInfo]:
"""Get detailed task information with human-readable names."""
return self._with_desired_task_definition(
cluster_name, service_name, lambda arn: self._task.get_task_info(cluster_name, service_name, arn)
cluster_name,
service_name,
lambda arn: self._task.get_task_info(cluster_name, service_name, arn),
)

def get_task_details(self, cluster_name: str, service_name: str, task_arn: str) -> TaskDetails | None:
"""Get comprehensive task details."""
return self._with_desired_task_definition(
cluster_name, service_name, lambda arn: self._task.get_task_details(cluster_name, task_arn, arn)
cluster_name,
service_name,
lambda arn: self._task.get_task_details(cluster_name, task_arn, arn),
)

def get_log_config(self, cluster_name: str, task_arn: str, container_name: str) -> LogConfig | None:
Expand All @@ -84,7 +91,10 @@ def get_container_logs(self, log_group: str, log_stream: str, lines: int = 50) -
return self._container.get_container_logs(log_group, log_stream, lines)

def get_live_container_logs_tail(
self, log_group: str, log_stream: str, event_filter_pattern: str = ""
self,
log_group: str,
log_stream: str,
event_filter_pattern: str = "",
) -> Generator[StartLiveTailResponseStreamTypeDef | LiveTailSessionLogEventTypeDef]:
"""Tail container logs in real time from CloudWatch."""
return self._container.get_live_container_logs_tail(log_group, log_stream, event_filter_pattern)
Expand All @@ -94,7 +104,11 @@ def list_log_groups(self, cluster_name: str, container_name: str) -> list[str]:
return self._container.list_log_groups(cluster_name, container_name)

def _with_container_context(
self, cluster_name: str, task_arn: str, container_name: str, operation: Callable[[Any], Any]
self,
cluster_name: str,
task_arn: str,
container_name: str,
operation: Callable[[Any], Any],
) -> Any: # noqa: ANN401
"""Helper to reduce repetition in container operations."""
context = self._container.get_container_context(cluster_name, task_arn, container_name)
Expand All @@ -103,25 +117,37 @@ def _with_container_context(
return operation(context)

def get_container_environment_variables(
self, cluster_name: str, task_arn: str, container_name: str
self,
cluster_name: str,
task_arn: str,
container_name: str,
) -> dict[str, str] | None:
"""Get environment variables for a specific container in a task."""
return self._with_container_context(
cluster_name, task_arn, container_name, self._container.get_environment_variables
cluster_name,
task_arn,
container_name,
self._container.get_environment_variables,
)

def get_container_secrets(self, cluster_name: str, task_arn: str, container_name: str) -> dict[str, str] | None:
"""Get secrets configuration for a specific container in a task."""
return self._with_container_context(cluster_name, task_arn, container_name, self._container.get_secrets)

def get_container_port_mappings(
self, cluster_name: str, task_arn: str, container_name: str
self,
cluster_name: str,
task_arn: str,
container_name: str,
) -> list[dict[str, Any]] | None:
"""Get port mappings for a specific container in a task."""
return self._with_container_context(cluster_name, task_arn, container_name, self._container.get_port_mappings)

def get_container_volume_mounts(
self, cluster_name: str, task_arn: str, container_name: str
self,
cluster_name: str,
task_arn: str,
container_name: str,
) -> list[dict[str, Any]] | None:
"""Get volume mounts for a specific container in a task."""
return self._with_container_context(cluster_name, task_arn, container_name, self._container.get_volume_mounts)
Expand Down
Loading
Loading