Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ $ uvx --from 'vcspull' --prerelease allow vcspull

_Upcoming changes will be written here._

### Bug Fixes

#### `vcspull add --no-merge` preserves duplicate workspace roots (#482)

- Duplicate workspace sections are no longer flattened when adding a repository
with `--no-merge`; all previously configured repositories stay intact.
- The configuration loader now exposes ordered duplicate entries so CLI writes
can target the correct section without data loss.

## vcspull v1.44.0 (2025-11-02)

### Improvements
Expand Down
39 changes: 29 additions & 10 deletions src/vcspull/_internal/config_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ def __init__(self, stream: str) -> None:
super().__init__(stream)
self.top_level_key_values: dict[t.Any, list[t.Any]] = {}
self._mapping_depth = 0
self.top_level_items: list[tuple[t.Any, t.Any]] = []


def _duplicate_tracking_construct_mapping(
Expand All @@ -248,7 +249,9 @@ def _duplicate_tracking_construct_mapping(
value = construct(value_node)

if loader._mapping_depth == 1:
loader.top_level_key_values.setdefault(key, []).append(copy.deepcopy(value))
duplicated_value = copy.deepcopy(value)
loader.top_level_key_values.setdefault(key, []).append(duplicated_value)
loader.top_level_items.append((copy.deepcopy(key), duplicated_value))

mapping[key] = value

Expand All @@ -270,20 +273,27 @@ def __init__(
content: RawConfigData,
*,
duplicate_sections: dict[str, list[t.Any]] | None = None,
top_level_items: list[tuple[str, t.Any]] | None = None,
) -> None:
super().__init__(content)
self._duplicate_sections = duplicate_sections or {}
self._top_level_items = top_level_items or []

@property
def duplicate_sections(self) -> dict[str, list[t.Any]]:
"""Mapping of top-level keys to the list of duplicated values."""
return self._duplicate_sections

@property
def top_level_items(self) -> list[tuple[str, t.Any]]:
"""Ordered list of top-level items, including duplicates."""
return copy.deepcopy(self._top_level_items)

@classmethod
def _load_yaml_with_duplicates(
cls,
content: str,
) -> tuple[dict[str, t.Any], dict[str, list[t.Any]]]:
) -> tuple[dict[str, t.Any], dict[str, list[t.Any]], list[tuple[str, t.Any]]]:
loader = _DuplicateTrackingSafeLoader(content)

try:
Expand All @@ -306,33 +316,42 @@ def _load_yaml_with_duplicates(
if len(values) > 1
}

return loaded, duplicate_sections
top_level_items = [
(t.cast("str", key), copy.deepcopy(value))
for key, value in loader.top_level_items
]

return loaded, duplicate_sections, top_level_items

@classmethod
def _load_from_path(
cls,
path: pathlib.Path,
) -> tuple[dict[str, t.Any], dict[str, list[t.Any]]]:
) -> tuple[dict[str, t.Any], dict[str, list[t.Any]], list[tuple[str, t.Any]]]:
if path.suffix.lower() in {".yaml", ".yml"}:
content = path.read_text(encoding="utf-8")
return cls._load_yaml_with_duplicates(content)

return ConfigReader._from_file(path), {}
return ConfigReader._from_file(path), {}, []

@classmethod
def from_file(cls, path: pathlib.Path) -> DuplicateAwareConfigReader:
content, duplicate_sections = cls._load_from_path(path)
return cls(content, duplicate_sections=duplicate_sections)
content, duplicate_sections, top_level_items = cls._load_from_path(path)
return cls(
content,
duplicate_sections=duplicate_sections,
top_level_items=top_level_items,
)

@classmethod
def _from_file(cls, path: pathlib.Path) -> dict[str, t.Any]:
content, _ = cls._load_from_path(path)
content, _, _ = cls._load_from_path(path)
return content

@classmethod
def load_with_duplicates(
cls,
path: pathlib.Path,
) -> tuple[dict[str, t.Any], dict[str, list[t.Any]]]:
) -> tuple[dict[str, t.Any], dict[str, list[t.Any]], list[tuple[str, t.Any]]]:
reader = cls.from_file(path)
return reader.content, reader.duplicate_sections
return reader.content, reader.duplicate_sections, reader.top_level_items
60 changes: 56 additions & 4 deletions src/vcspull/cli/add.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import copy
import logging
import pathlib
import subprocess
Expand All @@ -18,6 +19,7 @@
merge_duplicate_workspace_roots,
normalize_workspace_roots,
save_config_yaml,
save_config_yaml_with_items,
workspace_root_label,
)
from vcspull.util import contract_user_home
Expand Down Expand Up @@ -335,13 +337,15 @@ def add_repo(
# Load existing config
raw_config: dict[str, t.Any]
duplicate_root_occurrences: dict[str, list[t.Any]]
top_level_items: list[tuple[str, t.Any]]
display_config_path = contract_user_home(config_file_path)

if config_file_path.exists() and config_file_path.is_file():
try:
(
raw_config,
duplicate_root_occurrences,
top_level_items,
) = DuplicateAwareConfigReader.load_with_duplicates(config_file_path)
except TypeError:
log.exception(
Expand All @@ -357,11 +361,32 @@ def add_repo(
else:
raw_config = {}
duplicate_root_occurrences = {}
top_level_items = []
log.info(
"Config file %s not found. A new one will be created.",
display_config_path,
)

config_items: list[tuple[str, t.Any]] = (
[(label, copy.deepcopy(section)) for label, section in top_level_items]
if top_level_items
else [(label, copy.deepcopy(section)) for label, section in raw_config.items()]
)

def _aggregate_items(items: list[tuple[str, t.Any]]) -> dict[str, t.Any]:
aggregated: dict[str, t.Any] = {}
for label, section in items:
if isinstance(section, dict):
workspace_section = aggregated.setdefault(label, {})
for repo_name, repo_config in section.items():
workspace_section[repo_name] = copy.deepcopy(repo_config)
else:
aggregated[label] = copy.deepcopy(section)
return aggregated

if not merge_duplicates:
raw_config = _aggregate_items(config_items)

duplicate_merge_conflicts: list[str] = []
duplicate_merge_changes = 0
duplicate_merge_details: list[tuple[str, int]] = []
Expand Down Expand Up @@ -416,14 +441,18 @@ def add_repo(
cwd = pathlib.Path.cwd()
home = pathlib.Path.home()

aggregated_config = (
raw_config if merge_duplicates else _aggregate_items(config_items)
)

if merge_duplicates:
(
raw_config,
workspace_map,
merge_conflicts,
merge_changes,
) = normalize_workspace_roots(
raw_config,
aggregated_config,
cwd=cwd,
home=home,
)
Expand All @@ -435,7 +464,7 @@ def add_repo(
merge_conflicts,
_merge_changes,
) = normalize_workspace_roots(
raw_config,
aggregated_config,
cwd=cwd,
home=home,
)
Expand All @@ -458,15 +487,24 @@ def add_repo(
)
workspace_map[workspace_path] = workspace_label
raw_config.setdefault(workspace_label, {})
if not merge_duplicates:
config_items.append((workspace_label, {}))

if workspace_label not in raw_config:
raw_config[workspace_label] = {}
if not merge_duplicates:
config_items.append((workspace_label, {}))
elif not isinstance(raw_config[workspace_label], dict):
log.error(
"Workspace root '%s' in configuration is not a dictionary. Aborting.",
workspace_label,
)
return
workspace_sections: list[tuple[int, dict[str, t.Any]]] = [
(idx, section)
for idx, (label, section) in enumerate(config_items)
if label == workspace_label and isinstance(section, dict)
]

# Check if repo already exists
if name in raw_config[workspace_label]:
Expand Down Expand Up @@ -517,7 +555,18 @@ def add_repo(
return

# Add the repository in verbose format
raw_config[workspace_label][name] = {"repo": url}
new_repo_entry = {"repo": url}
if merge_duplicates:
raw_config[workspace_label][name] = new_repo_entry
else:
target_section: dict[str, t.Any]
if workspace_sections:
_, target_section = workspace_sections[-1]
else:
target_section = {}
config_items.append((workspace_label, target_section))
target_section[name] = copy.deepcopy(new_repo_entry)
raw_config[workspace_label][name] = copy.deepcopy(new_repo_entry)

# Save or preview config
if dry_run:
Expand All @@ -540,7 +589,10 @@ def add_repo(
)
else:
try:
save_config_yaml(config_file_path, raw_config)
if merge_duplicates:
save_config_yaml(config_file_path, raw_config)
else:
save_config_yaml_with_items(config_file_path, config_items)
log.info(
"%s✓%s Successfully added %s'%s'%s (%s%s%s) to %s%s%s under '%s%s%s'.",
Fore.GREEN,
Expand Down
1 change: 1 addition & 0 deletions src/vcspull/cli/discover.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ def discover_repos(
(
raw_config,
duplicate_root_occurrences,
_top_level_items,
) = DuplicateAwareConfigReader.load_with_duplicates(config_file_path)
except TypeError:
log.exception(
Expand Down
2 changes: 1 addition & 1 deletion src/vcspull/cli/fmt.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def format_single_config(

# Load existing config
try:
raw_config, duplicate_root_occurrences = (
raw_config, duplicate_root_occurrences, _top_level_items = (
DuplicateAwareConfigReader.load_with_duplicates(config_file_path)
)
except TypeError:
Expand Down
25 changes: 24 additions & 1 deletion src/vcspull/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ def load_configs(
file = pathlib.Path(file)
assert isinstance(file, pathlib.Path)

config_content, duplicate_roots = (
config_content, duplicate_roots, _top_level_items = (
DuplicateAwareConfigReader.load_with_duplicates(file)
)

Expand Down Expand Up @@ -481,6 +481,29 @@ def save_config_yaml(config_file_path: pathlib.Path, data: dict[t.Any, t.Any]) -
config_file_path.write_text(yaml_content, encoding="utf-8")


def save_config_yaml_with_items(
config_file_path: pathlib.Path,
items: list[tuple[str, t.Any]],
) -> None:
"""Persist configuration data while preserving duplicate top-level sections."""
documents: list[str] = []

for label, section in items:
dumped = ConfigReader._dump(
fmt="yaml",
content={label: section},
indent=2,
).rstrip()
if dumped:
documents.append(dumped)

yaml_content = "\n".join(documents)
if yaml_content:
yaml_content += "\n"

config_file_path.write_text(yaml_content, encoding="utf-8")


def merge_duplicate_workspace_root_entries(
label: str,
occurrences: list[t.Any],
Expand Down
Loading