Skip to content

Commit

Permalink
refactoring: clean up code, extract methods, document some others
Browse files Browse the repository at this point in the history
  • Loading branch information
YYYasin19 committed Aug 23, 2023
1 parent b2af339 commit 76e7c29
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 38 deletions.
12 changes: 9 additions & 3 deletions quetz/dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ def cleanup_channel_db(
Package.channel_name == channel_name
)
if package_name:
all_packages = all_packages.filter(
all_packages = all_packages.join(PackageVersion).filter(
PackageVersion.package_name == package_name
)
for each_package in all_packages:
Expand Down Expand Up @@ -396,7 +396,7 @@ def cleanup_channel_db(
Package.channel_name == channel_name
)
if package_name:
all_packages = all_packages.filter(
all_packages = all_packages.join(PackageVersion).filter(
PackageVersion.package_name == package_name
)
for each_package in all_packages:
Expand Down Expand Up @@ -429,7 +429,7 @@ def cleanup_channel_db(
Package.channel_name == channel_name
)
if package_name:
all_packages = all_packages.filter(
all_packages = all_packages.join(PackageVersion).filter(
PackageVersion.package_name == package_name
)
for x, each_package in enumerate(all_packages):
Expand Down Expand Up @@ -595,6 +595,7 @@ def get_channel(self, channel_name: str) -> Optional[Channel]:
return self.db.query(Channel).filter(Channel.name == channel_name).one_or_none()

def get_package(self, channel_name: str, package_name: str) -> Optional[Package]:
print(f"get_package: {channel_name}{package_name}")
return (
self.db.query(Package)
.join(Channel)
Expand Down Expand Up @@ -1026,6 +1027,11 @@ def get_channel_datas(self, channel_name: str):
)

def assert_size_limits(self, channel_name: str, size: int):
"""
validate that adding a package of size `size` to channel `channel_name`
does not exceed the channel size limit.
raises: QuotaError
"""
channel_size, channel_size_limit = (
self.db.query(Channel.size, Channel.size_limit)
.filter(Channel.name == channel_name)
Expand Down
4 changes: 4 additions & 0 deletions quetz/pkgstores.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ def support_redirect(self) -> bool:

@abc.abstractmethod
def create_channel(self, name):
"""
create channel with given name, depending on backend,
e.g. create a directory or bucket
"""
pass

@abc.abstractmethod
Expand Down
95 changes: 60 additions & 35 deletions quetz/tasks/mirror.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,26 @@ def download_file(remote_repository, path_metadata):
return f, package_name, metadata


@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
after=after_log(logger, logging.WARNING),
)
def _upload_package(file, channel_name, subdir, pkgstore):
dest = os.path.join(subdir, file.filename)

try:
file.file.seek(0)
logger.debug(
f"uploading file {dest} from channel {channel_name} to package store"
)
pkgstore.add_package(file.file, channel_name, dest)

except AttributeError as e:
logger.error(f"Could not upload {file}, {file.filename}. {str(e)}")
raise TryAgain


def handle_repodata_package(
channel,
files_metadata,
Expand All @@ -217,6 +237,7 @@ def handle_repodata_package(
proxylist = channel.load_channel_metadata().get('proxylist', [])
user_id = auth.assert_user()

# check package format and permissions, calculate total size
total_size = 0
for file, package_name, metadata in files_metadata:
parts = file.filename.rsplit("-", 2)
Expand All @@ -240,38 +261,24 @@ def handle_repodata_package(
total_size += size
file.file.seek(0)

dao.assert_size_limits(channel_name, total_size)

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
after=after_log(logger, logging.WARNING),
)
def _upload_package(file, channel_name, subdir):
dest = os.path.join(subdir, file.filename)

try:
file.file.seek(0)
logger.debug(
f"uploading file {dest} from channel {channel_name} to package store"
)
pkgstore.add_package(file.file, channel_name, dest)
# create package in database
# channel_data = _load_remote_channel_data(remote_repository)
# create_packages_from_channeldata(channel_name, user_id, channel_data, dao)

except AttributeError as e:
logger.error(f"Could not upload {file}, {file.filename}. {str(e)}")
raise TryAgain
# validate quota
dao.assert_size_limits(channel_name, total_size)

pkgstore.create_channel(channel_name)
nthreads = config.general_package_unpack_threads

with TicToc("upload file without extracting"):
nthreads = config.general_package_unpack_threads
with ThreadPoolExecutor(max_workers=nthreads) as executor:
for file, package_name, metadata in files_metadata:
if proxylist and package_name in proxylist:
# skip packages that should only ever be proxied
continue
subdir = get_subdir_compat(metadata)
executor.submit(_upload_package, file, channel_name, subdir)
executor.submit(_upload_package, file, channel_name, subdir, pkgstore)

with TicToc("add versions to the db"):
for file, package_name, metadata in files_metadata:
Expand Down Expand Up @@ -618,6 +625,24 @@ def create_versions_from_repodata(
create_version_from_metadata(channel_name, user_id, filename, metadata, dao)


def _load_remote_channel_data(remote_repository: RemoteRepository) -> dict:
"""
given the remote repository, load the channeldata.json file
raises: HTTPException if the remote server is unavailable
"""
try:
channel_data = remote_repository.open("channeldata.json").json()
except (RemoteFileNotFound, json.JSONDecodeError):
channel_data = {}
except RemoteServerError as e:
logger.error(f"Remote server error: {e}")
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=f"Remote channel {remote_repository.host} unavailable",
)
return channel_data


def synchronize_packages(
channel_name: str,
dao: Dao,
Expand All @@ -628,6 +653,14 @@ def synchronize_packages(
excludelist: List[str] = None,
use_repodata: bool = False,
):
"""synchronize package from a remote channel.
Args:
channel_name (str): the channel to be updated, e.g. the mirror channel
dao (Dao): database access object
pkgstore (PackageStore): the target channels package store
use_repodata (bool, optional): wether to create packages from repodata.json
"""
logger.info(f"executing synchronize_packages task in a process {os.getpid()}")

new_channel = dao.get_channel(channel_name)
Expand All @@ -639,22 +672,14 @@ def synchronize_packages(
for mirror_channel_url in new_channel.mirror_channel_urls:
remote_repo = RemoteRepository(mirror_channel_url, session)

user_id = auth.assert_user()
auth.assert_user()

try:
channel_data = remote_repo.open("channeldata.json").json()
if use_repodata:
create_packages_from_channeldata(
channel_name, user_id, channel_data, dao
)
channel_data = _load_remote_channel_data(remote_repo)
subdirs = None
if use_repodata:
# create_packages_from_channeldata(channel_name, user_id, channel_data, dao)
subdirs = channel_data.get("subdirs", [])
except (RemoteFileNotFound, json.JSONDecodeError):
subdirs = None
except RemoteServerError:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=f"Remote channel {mirror_channel_url} unavailable",
)

# if no channel data use known architectures
if subdirs is None:
subdirs = KNOWN_SUBDIRS
Expand Down

0 comments on commit 76e7c29

Please sign in to comment.