# test_02_remote

In [None]:
#|default_exp test_02_remote
#|export_as_func true

In [None]:
#|hide
import nblite; from nblite import show_doc; nblite.nbl_export()
import tests as this_module

In [None]:
#|top_export
import subprocess
from pathlib import Path
import shutil
import toml
import pytest
import asyncio

from repoyard import const
from repoyard.cmds import *
from repoyard._models import get_repoyard_meta, RepoPart
from repoyard.config import get_config

from tests.utils import *

from dotenv import load_dotenv

In [None]:
#|top_export
@pytest.mark.integration
def test_02_remote():
    asyncio.run(_test_02_remote())

In [None]:
#|set_func_signature
async def _test_02_remote(): ...

Load config from env var. If it doesn't exist then skip test.

In [None]:
#|export
from dotenv import load_dotenv
load_dotenv()
import os

if 'TEST_CONF_PATH' not in os.environ or 'TEST_STORAGE_LOCATION_NAME' not in os.environ or 'TEST_STORAGE_LOCATION_STORE_PATH' not in os.environ:
    pytest.skip("Environment variable TEST_CONF_PATH or TEST_STORAGE_LOCATION_NAME or TEST_STORAGE_LOCATION_STORE_PATH not set.")
else:
    config_path = Path(os.environ['TEST_CONF_PATH']).expanduser().resolve()
    config = get_config(config_path)
    sl_name = os.environ['TEST_STORAGE_LOCATION_NAME']
    sl_store_path = os.environ['TEST_STORAGE_LOCATION_STORE_PATH']

Ensure `repoyard` is installed

In [None]:
#|export
from tests.utils import run_cmd, run_cmd_in_background, CmdFailed

try:
    run_cmd("repoyard")
except CmdFailed:
    pytest.skip("repoyard not installed")

Create repo and sync it

In [None]:
#|export
repo_index_name1 = run_cmd(f"repoyard new -n test-repo-1 -g repoyard-unit-tests -s {sl_name}").strip()
run_cmd(f"repoyard sync -r {repo_index_name1}", capture_output=True);

Create two other repos and see they can both sync at the same time (i.e. test if simultaneous rclone commands can be run)

In [None]:
#|export
repo_index_name2 = run_cmd(f"repoyard new -n test-repo-2 -g repoyard-unit-tests -s {sl_name}").strip()
repo_index_name3 = run_cmd(f"repoyard new -n test-repo-3 -g repoyard-unit-tests -s {sl_name}").strip()

p1 = run_cmd_in_background(f"repoyard sync -r {repo_index_name2}", print_output=False)
p2 = run_cmd_in_background(f"repoyard sync -r {repo_index_name3}", print_output=False)

p1.wait()
p2.wait()

Verify that the repos are there on remote

In [None]:
#|export
from repoyard._models import get_repoyard_meta
repoyard_meta = get_repoyard_meta(config, force_create=True)
repo_meta1 = repoyard_meta.by_index_name[repo_index_name1]
repo_meta2 = repoyard_meta.by_index_name[repo_index_name2]
repo_meta3 = repoyard_meta.by_index_name[repo_index_name3]

from repoyard._utils import rclone_lsjson
for repo_meta in [repo_meta1, repo_meta2, repo_meta3]:
    assert await rclone_lsjson(
        config.rclone_config_path,
        source=sl_name,
        source_path=repo_meta.get_remote_path(config),
    ) is not None

Exclude repos

In [None]:
#|export
for repo_meta in [repo_meta1, repo_meta2, repo_meta3]:
    run_cmd(f"repoyard exclude -r {repo_meta.index_name}")

In [None]:
#|export
async def _task(repo_meta):
    assert await rclone_lsjson(
        config.rclone_config_path,
        source="",
        source_path=repo_meta.get_local_part_path(config, RepoPart.DATA),
    ) is None

await asyncio.gather(*[_task(repo_meta) for repo_meta in [repo_meta1, repo_meta2, repo_meta3]]);

Re-include repos

In [None]:
#|export
for repo_meta in [repo_meta1, repo_meta2, repo_meta3]:
    run_cmd(f"repoyard include -r {repo_meta.index_name}")

In [None]:
#|export
async def _task(repo_meta):
    assert await rclone_lsjson(
        config.rclone_config_path,
        source="",
        source_path=repo_meta.get_local_part_path(config, RepoPart.DATA),
    ) is not None

await asyncio.gather(*[_task(repo_meta) for repo_meta in [repo_meta1, repo_meta2, repo_meta3]]);

Delete repos

In [None]:
#|export
for repo_meta in [repo_meta1, repo_meta2, repo_meta3]:
    run_cmd(f"repoyard delete -r {repo_meta.index_name}")

In [None]:
#|export
async def _task(repo_meta):
    assert await rclone_lsjson(
        config.rclone_config_path,
        source=sl_name,
        source_path=repo_meta.get_remote_path(config),
    ) is None

await asyncio.gather(*[_task(repo_meta) for repo_meta in [repo_meta1, repo_meta2, repo_meta3]]);