# _models

In [None]:
#|default_exp _models

In [None]:
#|hide
import nblite; from nblite import show_doc; nblite.nbl_export()
import repoyard._models as this_module

Environment variable DISABLE_NBLITE_EXPORT is set to True, skipping export.


In [None]:
#|export
from typing import Callable, Literal
from pydantic import BaseModel, Field, model_validator
from pathlib import Path
import toml, json
from datetime import datetime, timezone
from ulid import ULID
from enum import Enum
import repoyard.config
from repoyard import const
from repoyard.config import RepoGroupConfig

# `RepoMeta`

In [None]:
#|export
class RepoPart(Enum):
    DATA = "data"
    META = "meta"
    CONF = "conf"

In [None]:
#|export
class RepoMeta(const.StrictModel):
    ulid: ULID = Field(default_factory=ULID)
    name: str
    storage_location: str
    creator_hostname: str
    groups: list[str]
    
    @property
    def full_name(self) -> str:
        return f"{str(self.ulid)}__{self.name}"

    def get_storage_location_config(self, config: repoyard.config.StorageConfig) -> repoyard.config.StorageConfig:
        return config.storage_locations[self.storage_location]

    def get_remote_path(self, config: repoyard.config.Config) -> Path:
        return config.storage_locations[self.storage_location].store_path / const.REMOTE_REPOS_REL_PATH / self.full_name
    
    def get_remote_repometa_path(self, config: repoyard.config.Config) -> Path:
        return self.get_remote_path(config) / const.REPO_METAFILE_REL_PATH
    
    def get_remote_repoconf_path(self, config: repoyard.config.Config) -> Path:
        return self.get_remote_path(config) / const.REPO_CONF_REL_PATH
    
    def get_remote_repodata_path(self, config: repoyard.config.Config) -> Path:
        return self.get_remote_path(config) / const.REPO_DATA_REL_PATH
    
    def get_local_path(self, config: repoyard.config.Config) -> Path:
        return config.local_store_path / self.storage_location / self.full_name
    
    def get_local_repometa_path(self, config: repoyard.config.Config) -> Path:
        return self.get_local_path(config) / const.REPO_METAFILE_REL_PATH
    
    def get_local_repoconf_path(self, config: repoyard.config.Config) -> Path:
        return self.get_local_path(config) / const.REPO_CONF_REL_PATH
    
    def get_local_repodata_path(self, config: repoyard.config.Config) -> Path:
        return self.get_local_path(config) / const.REPO_DATA_REL_PATH

    def get_local_sync_record_path(self, config: repoyard.config.Config, repo_part: RepoPart) -> Path:
        return config.repoyard_data_path / const.SYNC_RECORDS_REL_PATH / self.full_name / f"{repo_part.value}.rec"

    def get_remote_sync_record_path(self, config: repoyard.config.Config, repo_part: RepoPart) -> Path:
        sl_conf = self.get_storage_location_config(config)
        return sl_conf.store_path / const.SYNC_RECORDS_REL_PATH / self.full_name / f"{repo_part.value}.rec"
    
    def check_included(self, config: repoyard.config.Config) -> bool:
        included_repo_path = self.get_local_repodata_path(config)
        return included_repo_path.is_dir() and included_repo_path.exists()
    
    def save(self, config: repoyard.config.Config):
        save_path = self.get_local_repometa_path(config)
        save_path.parent.mkdir(parents=True, exist_ok=True)
        model_dump = self.model_dump()
        del model_dump['ulid']
        del model_dump['name']
        save_path.write_text(toml.dumps(model_dump))
        
    @model_validator(mode='after')
    def validate_config(self):
        if len(self.groups) != len(set(self.groups)):
            raise ValueError("Groups must be unique.")
        return self

# `RepoyardMeta`

In [None]:
#|export
class RepoyardMeta(const.StrictModel):
    repo_metas: list[RepoMeta]

    @property
    def by_storage_location(self) -> dict[str, dict[str, RepoMeta]]:
        if not hasattr(self, '__by_storage_location'):
            self.__by_storage_location = {
                sl_name: {
                    repo_meta.full_name: repo_meta
                    for repo_meta in self.repo_metas
                    if repo_meta.storage_location == sl_name
            }
            for sl_name in self.by_storage_location
        }
        return self.__by_storage_location

    @property
    def by_ulid(self) -> dict[str, RepoMeta]:
        if not hasattr(self, '__by_ulid'):
            self.__by_ulid = {
                repo_meta.ulid: repo_meta
                for repo_meta in self.repo_metas
            }
        return self.__by_ulid

    @property
    def by_full_name(self) -> dict[str, RepoMeta]:
        if not hasattr(self, '__by_full_name'):
            self.__by_full_name = {
                repo_meta.full_name: repo_meta
                for repo_meta in self.repo_metas
            }
        return self.__by_full_name


In [None]:
#|export
def create_repoyard_meta(
    config: repoyard.config.Config
) -> RepoyardMeta:
    """Create a dict of all repo metas. To be saved in `config.repoyard_meta_path`."""
    repo_metas = []
    for storage_location_name in config.storage_locations:
        local_storage_location_path = config.local_store_path / storage_location_name
        
        for repo_path in local_storage_location_path.glob('*'):
            if not repo_path.is_dir():
                raise ValueError(f"Repo path {repo_path} is not a directory.")
            
            full_name = repo_path.stem
            ulid, name = full_name.split('__', 1)
            
            repometa_path = repo_path / const.REPO_METAFILE_REL_PATH
            if not repometa_path.exists():
                raise ValueError(f"Repo meta file {repometa_path} does not exist.")
            
            repo_metas.append(RepoMeta(**{
                **toml.loads(repometa_path.read_text()),
                'ulid': ulid,
                'name': name,
                'storage_location': storage_location_name,
            }))
    return RepoyardMeta(repo_metas=repo_metas)

In [None]:
#|export
def refresh_repoyard_meta(
    config: repoyard.config.Config,
) -> RepoyardMeta:
    repoyard_meta = create_repoyard_meta(config)
    config.repoyard_meta_path.write_text(repoyard_meta.model_dump_json())

In [None]:
#|export
def get_repoyard_meta(
    config: repoyard.config.Config,
    force_create: bool=False,
) -> RepoyardMeta:
    if not config.repoyard_meta_path.exists() or force_create:
        refresh_repoyard_meta(config)
    return RepoyardMeta.model_validate_json(config.repoyard_meta_path.read_text())

In [None]:
#|export
def get_virtual_repo_group_filters(
    config: repoyard.config.Config,
) -> dict[str, Callable[[RepoMeta], bool]]:
    return {}

In [None]:
#|export
def get_repo_group_configs(
    config: repoyard.config.Config,
    repo_metas: list[RepoMeta],
) -> dict[str, RepoGroupConfig]:
    repo_group_configs = {rgc.group_name for rgc in config.repo_groups}
    for repo_meta in repo_metas.values():
        if not repo_meta.is_included: continue
        for group_name in repo_meta.groups:
            if group_name not in repo_group_configs:
                repo_group_configs[group_name] = RepoGroupConfig(group_name=group_name)
    return repo_group_configs

In [None]:
#|export
def create_user_repos_symlinks(
    config: repoyard.config.Config,
    repo_metas: list[RepoMeta],
):
    for path in config.user_repos_path.glob('*'):
        if path.is_symlink(): path.unlink()
    
    for repo_meta in repo_metas.values():
        if not repo_meta.is_included: continue
        source_path = config.included_repostore_path / repo_meta.full_name
        symlink_path = config.user_repos_path / repo_meta.full_name
        if symlink_path.is_symlink(): 
            if symlink_path.resolve() != source_path.resolve():
                symlink_path.unlink()
            else: continue # already correct
        symlink_path.symlink_to(source_path, target_is_directory=True)

In [None]:
#|export
def create_user_repo_group_symlinks(
    config: repoyard.config.Config,
    repo_metas: list[RepoMeta],
):
    virtual_repo_groups = get_virtual_repo_group_filters(config)
    groups = get_repo_group_configs(config, repo_metas)
    
    symlink_paths = []
    
    for group_name, group_config in groups.items():
        for repo_meta in repo_metas.values():
            if not repo_meta.is_included: continue
            is_in_group = False
            if group_config.is_virtual:
                is_in_group = virtual_repo_groups[group_name](repo_meta)
            else:
                is_in_group = group_name in repo_meta.groups
            if is_in_group:
                source_path = config.included_repostore_path / repo_meta.full_name
                symlink_path = config.user_repo_groups_path / group_name / repo_meta.full_name
                symlink_path.parent.mkdir(parents=True, exist_ok=True)
                symlink_paths.append(symlink_path.resolve().as_posix())
                if symlink_path.is_symlink():
                    if symlink_path.resolve() != source_path.resolve():
                        symlink_path.unlink()
                    else: continue # already correct
                symlink_path.symlink_to(source_path, target_is_directory=True)
    
    # Remove any symlinks that are not in the symlink_paths list
    for group_folder_path in config.user_repo_groups_path.glob('*'):
        for symlink_path in group_folder_path.glob('*'):
            if not symlink_path.is_symlink(): continue
            if symlink_path.resolve().as_posix() not in symlink_paths:
                symlink_path.unlink()
        # Remove the group folder if it is empty after removing symlinks
        if not any(group_folder_path.iterdir()):
            group_folder_path.rmdir()

# `SyncRecord`

In [None]:
#|export
class SyncDirection(Enum):
    PUSH = "push" # local -> remote
    PULL = "pull" # remote -> local

In [None]:
#|export
class SyncRecord(const.StrictModel):
    ulid: ULID = Field(default_factory=ULID)
    creator_hostname: str
    direction: SyncDirection

    @property
    def datetime(self) -> datetime:
        return self.ulid.datetime

    @classmethod
    def create(cls, direction: SyncDirection, creator_hostname: str|None=None) -> None:
        from repoyard._utils import get_hostname
        return SyncRecord(
            creator_hostname=creator_hostname or get_hostname(),
            direction=direction,
        )

    def rclone_save(self, rclone_config_path: str, dest: str, dest_path: str) -> None:
        from repoyard._utils import rclone_copyto
        import tempfile
        temp_path = Path(tempfile.mkstemp(suffix='.json')[1])
        temp_path.write_text(self.model_dump_json())
        rclone_copyto(
            rclone_config_path=rclone_config_path,
            source="",
            source_path=temp_path.as_posix(),
            dest=dest,
            dest_path=dest_path,
            dry_run=False,
        )

    @classmethod
    def rclone_read(cls, rclone_config_path: str, source: str, sync_record_path: str) -> str:
        from repoyard._utils import rclone_cat
        sync_record_exists, sync_record = rclone_cat(
            rclone_config_path=rclone_config_path,
            source=source,
            source_path=sync_record_path,
        )
        
        if sync_record_exists:
            return SyncRecord.model_validate_json(sync_record)
        else:
            return None

In [None]:
#|export
from typing import NamedTuple

class SyncCondition(Enum):
    SYNCED = "synced"
    CONFLICT = "conflict"
    NEEDS_PUSH = "needs_push"
    NEEDS_PULL = "needs_pull"

class SyncStatus(NamedTuple):
    sync_condition: SyncCondition
    local_path_exists: bool
    remote_path_exists: bool
    local_sync_record: SyncRecord
    remote_sync_record: SyncRecord
    is_dir: bool

def get_sync_status(
    rclone_config_path: str,
    local_path: str,
    local_sync_record_path: str,
    remote: str,
    remote_path: str,
    remote_sync_record_path: str,
) -> tuple[SyncStatus, bool, bool, SyncRecord, SyncRecord]:
    from repoyard._utils import check_last_time_modified
    from repoyard._utils import rclone_path_exists

    local_path_exists, local_path_is_dir = rclone_path_exists(
        rclone_config_path=rclone_config_path,
        source="",
        source_path=local_path,
    )

    remote_path_exists, remote_path_is_dir = rclone_path_exists(
        rclone_config_path=rclone_config_path,
        source=remote,
        source_path=remote_path,
    )

    if (local_path_exists and remote_path_exists) and (local_path_is_dir != remote_path_is_dir):
        _local = "directory" if local_path_is_dir else "file"
        _remote = "directory" if remote_path_is_dir else "file"
        raise Exception(f"Local and remote paths are not both files or both directories. Local is {_local} and remote is {_remote}.")

    if not local_path_exists and not remote_path_exists:
        raise Exception(f"Local and remote paths do not exist!")
    
    is_dir = local_path_is_dir or remote_path_is_dir

    local_sync_record = SyncRecord.rclone_read(
        rclone_config_path=rclone_config_path,
        source="",
        sync_record_path=local_sync_record_path,
    )

    remote_sync_record = SyncRecord.rclone_read(
        rclone_config_path=rclone_config_path,
        source=remote,
        sync_record_path=remote_sync_record_path,
    )

    sync_records_match = (local_sync_record is not None and remote_sync_record is not None) and \
        (local_sync_record.ulid == remote_sync_record.ulid)

    local_last_modified = check_last_time_modified(local_path)

    if sync_records_match:
        if local_last_modified > local_sync_record.datetime:
            sync_condition = SyncCondition.NEEDS_PUSH
        else:
            sync_condition = SyncCondition.SYNCED
    else:
        if local_path_exists and not remote_path_exists:
            sync_condition = SyncCondition.NEEDS_PUSH
        elif not local_path_exists and remote_path_exists:
            sync_condition = SyncCondition.NEEDS_PULL
        else:
            sync_condition = SyncCondition.CONFLICT

    return SyncStatus(
        sync_condition=sync_condition,
        local_path_exists=local_path_exists,
        remote_path_exists=remote_path_exists,
        local_sync_record=local_sync_record,
        remote_sync_record=remote_sync_record,
        is_dir=is_dir,
    )