diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 48d5153..80c650d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,28 +1,7 @@ repos: - - repo: https://github.com/pre-commit/pre-commit-hooks - rev: "v4.4.0" + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.11.13 hooks: - - id: trailing-whitespace - - id: end-of-file-fixer - - id: check-yaml - - id: check-toml - - - repo: https://github.com/asottile/add-trailing-comma - rev: "v2.4.0" - hooks: - - id: add-trailing-comma - args: - - "--py36-plus" - - - repo: https://github.com/PyCQA/isort - rev: "5.12.0" - hooks: - - id: isort - - - repo: https://github.com/pycqa/flake8 - rev: "6.0.0" - hooks: - - id: flake8 - args: - - "--max-line-length=100" - - '--ignore=E501,E226,W503,E402' + - id: ruff-check + args: ["--fix", "--select=I"] + - id: ruff-format diff --git a/pyproject.toml b/pyproject.toml index f545101..2e6502e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,14 +51,8 @@ simple-repository-browser = "simple_repository_browser.__main__:main" [tool.setuptools_scm] version_file = "simple_repository_browser/_version.py" -[tool.isort] -multi_line_output = 3 -include_trailing_comma = true -force_grid_wrap = 0 -use_parentheses = true -ensure_newline_before_comments = true -line_length = 88 -force_sort_within_sections = true +[tool.ruff.lint.isort] +force-sort-within-sections = true # [tool.mypy] # check_untyped_defs = true diff --git a/simple_repository_browser/__init__.py b/simple_repository_browser/__init__.py index 712e9e3..bdb26a3 100644 --- a/simple_repository_browser/__init__.py +++ b/simple_repository_browser/__init__.py @@ -2,6 +2,7 @@ Documentation for the simple_repository_browser package """ + from . import _version # type: ignore __version__ = _version.version # type: ignore diff --git a/simple_repository_browser/__main__.py b/simple_repository_browser/__main__.py index 58f33b3..551827d 100644 --- a/simple_repository_browser/__main__.py +++ b/simple_repository_browser/__main__.py @@ -20,13 +20,25 @@ def configure_parser(parser: argparse.ArgumentParser) -> None: # parsed arguments. parser.set_defaults(handler=handler) - parser.add_argument("repository_url", type=str, nargs='?', default='https://pypi.org/simple/') + parser.add_argument( + "repository_url", type=str, nargs="?", default="https://pypi.org/simple/" + ) parser.add_argument("--host", default="0.0.0.0") parser.add_argument("--port", type=int, default=8080) - parser.add_argument("--cache-dir", type=str, default=Path(os.environ.get('XDG_CACHE_DIR', Path.home() / '.cache')) / 'simple-repository-browser') + parser.add_argument( + "--cache-dir", + type=str, + default=Path(os.environ.get("XDG_CACHE_DIR", Path.home() / ".cache")) + / "simple-repository-browser", + ) parser.add_argument("--url-prefix", type=str, default="") - parser.add_argument('--no-popular-project-crawl', dest='crawl_popular_projects', action='store_false', default=True) - parser.add_argument('--templates-dir', default=here / "templates", type=Path) + parser.add_argument( + "--no-popular-project-crawl", + dest="crawl_popular_projects", + action="store_false", + default=True, + ) + parser.add_argument("--templates-dir", default=here / "templates", type=Path) def handler(args: typing.Any) -> None: @@ -38,9 +50,9 @@ def handler(args: typing.Any) -> None: # Include the base templates so that the given templates directory doesn't have to # implement *all* of the templates. This must be at a lower precedence than the given # templates path, so that they can be overriden. - here/"templates"/"base", + here / "templates" / "base", # Include the "base" folder, such that upstream templates can inherit from "base/...". - here/"templates", + here / "templates", ], static_files_paths=[here / "static"], crawl_popular_projects=args.crawl_popular_projects, @@ -49,10 +61,12 @@ def handler(args: typing.Any) -> None: ).create_app() log_conf = LOGGING_CONFIG.copy() - log_conf["formatters"]["default"]["fmt"] = "%(asctime)s [%(name)s] %(levelprefix)s %(message)s" - log_conf["formatters"]["access"][ - "fmt" - ] = '%(asctime)s [%(name)s] %(levelprefix)s %(client_addr)s - "%(request_line)s" %(status_code)s' + log_conf["formatters"]["default"]["fmt"] = ( + "%(asctime)s [%(name)s] %(levelprefix)s %(message)s" + ) + log_conf["formatters"]["access"]["fmt"] = ( + '%(asctime)s [%(name)s] %(levelprefix)s %(client_addr)s - "%(request_line)s" %(status_code)s' + ) uvicorn.run( app=app, @@ -71,8 +85,8 @@ def main(): args.handler(args) -if __name__ == '__main__': +if __name__ == "__main__": logging.basicConfig(level=logging.INFO) - logging.getLogger('httpcore').setLevel(logging.WARNING) - logging.getLogger('httpx').setLevel(logging.WARNING) + logging.getLogger("httpcore").setLevel(logging.WARNING) + logging.getLogger("httpx").setLevel(logging.WARNING) main() diff --git a/simple_repository_browser/_app.py b/simple_repository_browser/_app.py index 4fe432b..92f72a3 100644 --- a/simple_repository_browser/_app.py +++ b/simple_repository_browser/_app.py @@ -37,8 +37,8 @@ def __init__( self.crawl_popular_projects = crawl_popular_projects self.browser_version = browser_version - self.cache = diskcache.Cache(str(cache_dir/'diskcache')) - self.db_path = cache_dir / 'projects.sqlite' + self.cache = diskcache.Cache(str(cache_dir / "diskcache")) + self.db_path = cache_dir / "projects.sqlite" self.con = sqlite3.connect( self.db_path, detect_types=sqlite3.PARSE_DECLTYPES, @@ -51,7 +51,6 @@ def create_app(self) -> fastapi.FastAPI: _view = self.create_view() async def lifespan(app: fastapi.FastAPI): - async with ( httpx.AsyncClient(timeout=30) as http_client, aiosqlite.connect(self.db_path, timeout=5) as db, @@ -72,7 +71,9 @@ async def lifespan(app: fastapi.FastAPI): # convenient for development purposes. @app.get("/") async def redirect_to_index(): - return fastapi.responses.RedirectResponse(url=app.url_path_for('index')) + return fastapi.responses.RedirectResponse( + url=app.url_path_for("index") + ) yield @@ -92,7 +93,7 @@ async def catch_exceptions_middleware(request: fastapi.Request, call_next): detail = f"Internal server error ({err})" # raise logging.getLogger("simple_repository_browser.error").error( - 'Unhandled exception', + "Unhandled exception", exc_info=err, ) content = _view.error_page( @@ -104,14 +105,20 @@ async def catch_exceptions_middleware(request: fastapi.Request, call_next): status_code=status_code, ) - app.middleware('http')(catch_exceptions_middleware) + app.middleware("http")(catch_exceptions_middleware) return app def create_view(self) -> view.View: - return view.View(self.template_paths, self.browser_version, static_files_manifest=self.static_files_manifest) + return view.View( + self.template_paths, + self.browser_version, + static_files_manifest=self.static_files_manifest, + ) - def create_crawler(self, http_client: httpx.AsyncClient, source: SimpleRepository) -> crawler.Crawler: + def create_crawler( + self, http_client: httpx.AsyncClient, source: SimpleRepository + ) -> crawler.Crawler: return crawler.Crawler( http_client=http_client, crawl_popular_projects=self.crawl_popular_projects, @@ -120,7 +127,9 @@ def create_crawler(self, http_client: httpx.AsyncClient, source: SimpleRepositor cache=self.cache, ) - def _repo_from_url(self, url: str, http_client: httpx.AsyncClient) -> SimpleRepository: + def _repo_from_url( + self, url: str, http_client: httpx.AsyncClient + ) -> SimpleRepository: if urlparse(url).scheme in ("http", "https"): return HttpRepository( url=url, @@ -129,7 +138,9 @@ def _repo_from_url(self, url: str, http_client: httpx.AsyncClient) -> SimpleRepo else: return LocalRepository(Path(url)) - def create_model(self, http_client: httpx.AsyncClient, database: aiosqlite.Connection) -> model.Model: + def create_model( + self, http_client: httpx.AsyncClient, database: aiosqlite.Connection + ) -> model.Model: source = MetadataInjector( self._repo_from_url(self.repository_url, http_client=http_client), http_client=http_client, @@ -141,7 +152,9 @@ def create_model(self, http_client: httpx.AsyncClient, database: aiosqlite.Conne crawler=self.create_crawler(http_client, source), ) - def create_controller(self, view: view.View, model: model.Model) -> controller.Controller: + def create_controller( + self, view: view.View, model: model.Model + ) -> controller.Controller: return controller.Controller( model=model, view=view, diff --git a/simple_repository_browser/_search.py b/simple_repository_browser/_search.py index 354e1e1..4fcc96e 100644 --- a/simple_repository_browser/_search.py +++ b/simple_repository_browser/_search.py @@ -13,9 +13,9 @@ class Term: class FilterOn(Enum): - name = 'name' - summary = 'summary' - name_or_summary = 'name_or_summary' + name = "name" + summary = "summary" + name_or_summary = "name_or_summary" @dataclasses.dataclass @@ -57,8 +57,8 @@ def prepare_name(term: Filter) -> SafeSQLStmt: value = term.value[1:-1] else: value = normalise_name(term.value) - value = value.replace('*', '%') - return "canonical_name LIKE ?", (f'%{value}%',) + value = value.replace("*", "%") + return "canonical_name LIKE ?", (f"%{value}%",) def prepare_summary(term: Filter) -> SafeSQLStmt: @@ -67,8 +67,8 @@ def prepare_summary(term: Filter) -> SafeSQLStmt: value = term.value[1:-1] else: value = term.value - value = value.replace('*', '%') - return "summary LIKE ?", (f'%{value}%',) + value = value.replace("*", "%") + return "summary LIKE ?", (f"%{value}%",) def build_sql(term: typing.Union[Term, typing.Tuple[Term, ...]]) -> SafeSQLStmt: @@ -76,7 +76,7 @@ def build_sql(term: typing.Union[Term, typing.Tuple[Term, ...]]) -> SafeSQLStmt: # Instead, any user input must be in the parameters, which undergoes sqllite built-in cleaning. if isinstance(term, tuple): if len(term) == 0: - return '', () + return "", () # No known query can produce a multi-value term assert len(term) == 1 @@ -103,7 +103,7 @@ def build_sql(term: typing.Union[Term, typing.Tuple[Term, ...]]) -> SafeSQLStmt: return f"({sql1} OR {sql2})", terms1 + terms2 elif isinstance(term, Not): sql1, terms1 = build_sql(term.term) - return f'(Not {sql1})', terms1 + return f"(Not {sql1})", terms1 else: raise ValueError(f"unknown term type {type(term)}") @@ -145,11 +145,11 @@ def query_to_sql(query) -> SafeSQLStmt: | -> ()) """), { - 'And': And, - 'Or': Or, - 'Filter': Filter, - 'Not': Not, - 'FilterOn': FilterOn, + "And": And, + "Or": Or, + "Filter": Filter, + "Not": Not, + "FilterOn": FilterOn, }, ) @@ -166,7 +166,7 @@ def simple_name_from_query(terms: typing.Tuple[Term, ...]) -> typing.Optional[st for term in terms: if isinstance(term, Filter): if term.filter_on in [FilterOn.name_or_summary, FilterOn.name]: - if '*' in term.value or '"' in term.value: + if "*" in term.value or '"' in term.value: break return normalise_name(term.value) else: diff --git a/simple_repository_browser/compatibility_matrix.py b/simple_repository_browser/compatibility_matrix.py index 543ca3f..0075c4d 100644 --- a/simple_repository_browser/compatibility_matrix.py +++ b/simple_repository_browser/compatibility_matrix.py @@ -13,7 +13,7 @@ class CompatibilityMatrixModel: def compatibility_matrix( - files: tuple[model.File, ...], + files: tuple[model.File, ...], ) -> CompatibilityMatrixModel: """ Look at the given files, and compute a compatibility matrix. @@ -28,18 +28,22 @@ def compatibility_matrix( interpreted_py_abi_tags: dict[tuple[str, str], InterpretedPyAndABITag] = {} for file in files: - if not file.filename.lower().endswith('.whl'): + if not file.filename.lower().endswith(".whl"): continue _, _, _, tags = parse_wheel_filename(file.filename) # Ensure that the tags have a consistent sort order. From # packaging they come as a frozenset, so no such upstream guarantee is provided. - sorted_tags = sorted(tags, key=lambda tag: (tag.platform, tag.abi, tag.interpreter)) + sorted_tags = sorted( + tags, key=lambda tag: (tag.platform, tag.abi, tag.interpreter) + ) for tag in sorted_tags: inter_abi_key = (tag.interpreter, tag.abi) if inter_abi_key not in interpreted_py_abi_tags: - interpreted_py_abi_tags[inter_abi_key] = interpret_py_and_abi_tag(tag.interpreter, tag.abi) + interpreted_py_abi_tags[inter_abi_key] = interpret_py_and_abi_tag( + tag.interpreter, tag.abi + ) tag_interp = interpreted_py_abi_tags[inter_abi_key] compat_matrix[(tag_interp.nice_name, tag.platform)] = file @@ -60,11 +64,11 @@ def compatibility_matrix( # https://packaging.python.org/en/latest/specifications/platform-compatibility-tags/#python-tag py_tag_implementations = { - 'py': 'Python', - 'cp': 'CPython', - 'ip': 'IronPython', - 'pp': 'PyPy', - 'jy': 'Jython', + "py": "Python", + "cp": "CPython", + "ip": "IronPython", + "pp": "PyPy", + "jy": "Jython", } @@ -79,39 +83,49 @@ def interpret_py_and_abi_tag(py_tag: str, abi_tag: str) -> InterpretedPyAndABITa if py_tag[:2] in py_tag_implementations: py_impl, version_nodot = py_tag[:2], py_tag[2:] py_impl = py_tag_implementations.get(py_impl, py_impl) - if '_' in version_nodot: - py_version = Version('.'.join(version_nodot.split('_'))) + if "_" in version_nodot: + py_version = Version(".".join(version_nodot.split("_"))) elif len(version_nodot) == 1: # e.g. Pure python wheels py_version = Version(version_nodot) else: - py_version = Version(f'{version_nodot[0]}.{version_nodot[1:]}') + py_version = Version(f"{version_nodot[0]}.{version_nodot[1:]}") if abi_tag.startswith(py_tag): - abi_tag_flags = abi_tag[len(py_tag):] - if 'd' in abi_tag_flags: - abi_tag_flags = abi_tag_flags.replace('d', '') - py_impl += ' (debug)' - if 'u' in abi_tag_flags: - abi_tag_flags = abi_tag_flags.replace('u', '') + abi_tag_flags = abi_tag[len(py_tag) :] + if "d" in abi_tag_flags: + abi_tag_flags = abi_tag_flags.replace("d", "") + py_impl += " (debug)" + if "u" in abi_tag_flags: + abi_tag_flags = abi_tag_flags.replace("u", "") # A python 2 concept. - py_impl += ' (wide)' - if 'm' in abi_tag_flags: - abi_tag_flags = abi_tag_flags.replace('m', '') + py_impl += " (wide)" + if "m" in abi_tag_flags: + abi_tag_flags = abi_tag_flags.replace("m", "") pass if abi_tag_flags: - py_impl += f' (additional flags: {abi_tag_flags})' - return InterpretedPyAndABITag(f'{py_impl} {py_version}', py_impl, py_version) - elif abi_tag.startswith('pypy') and py_impl == 'PyPy': - abi = abi_tag.split('_')[1] - return InterpretedPyAndABITag(f'{py_impl} {py_version} ({abi})', py_impl, py_version) - elif abi_tag == 'abi3': + py_impl += f" (additional flags: {abi_tag_flags})" + return InterpretedPyAndABITag( + f"{py_impl} {py_version}", py_impl, py_version + ) + elif abi_tag.startswith("pypy") and py_impl == "PyPy": + abi = abi_tag.split("_")[1] + return InterpretedPyAndABITag( + f"{py_impl} {py_version} ({abi})", py_impl, py_version + ) + elif abi_tag == "abi3": # Example PyQt6 - return InterpretedPyAndABITag(f'{py_impl} >={py_version} (abi3)', py_impl, py_version) - elif abi_tag == 'none': + return InterpretedPyAndABITag( + f"{py_impl} >={py_version} (abi3)", py_impl, py_version + ) + elif abi_tag == "none": # Seen with pydantic-core 2.11.0 - return InterpretedPyAndABITag(f'{py_impl} {py_version}', py_impl, py_version) + return InterpretedPyAndABITag( + f"{py_impl} {py_version}", py_impl, py_version + ) else: - return InterpretedPyAndABITag(f'{py_impl} {py_version} ({abi_tag})', py_impl, py_version) + return InterpretedPyAndABITag( + f"{py_impl} {py_version} ({abi_tag})", py_impl, py_version + ) - return InterpretedPyAndABITag(f'{py_tag} ({abi_tag})') + return InterpretedPyAndABITag(f"{py_tag} ({abi_tag})") diff --git a/simple_repository_browser/controller.py b/simple_repository_browser/controller.py index 3977e11..83e5fd4 100644 --- a/simple_repository_browser/controller.py +++ b/simple_repository_browser/controller.py @@ -38,6 +38,7 @@ def route( def dec(fn): self._routes_register[path] = Route(fn, methods, response_class, kwargs) return fn + return dec def get(self, path: str, **kwargs: typing.Any): @@ -83,9 +84,15 @@ def __init__(self, model: model.Model, view: view.View) -> None: self.model = model self.view = view - def create_router(self, static_files_manifest: StaticFilesManifest) -> fastapi.APIRouter: + def create_router( + self, static_files_manifest: StaticFilesManifest + ) -> fastapi.APIRouter: router = self.router.build_fastapi_router(self) - router.mount("/static", HashedStaticFileHandler(manifest=static_files_manifest), name="static") + router.mount( + "/static", + HashedStaticFileHandler(manifest=static_files_manifest), + name="static", + ) return router @router.get("/", name="index") @@ -102,7 +109,9 @@ async def search(self, request: fastapi.Request, query: str, page: int = 1) -> s # Note: page is 1 based. We don't have a page 0. page_size = 50 try: - response = self.model.project_query(query=query, page_size=page_size, page=page) + response = self.model.project_query( + query=query, page_size=page_size, page=page + ) except errors.InvalidSearchQuery as e: raise errors.RequestError( detail=str(e), @@ -111,8 +120,14 @@ async def search(self, request: fastapi.Request, query: str, page: int = 1) -> s return self.view.search_page(response, request) @router.get("/project/{project_name}", name="project", response_model=None) - @router.get("/project/{project_name}/{version}", name='project_version', response_model=None) - @router.get("/project/{project_name}/{version}/{page_section}", name='project_version_section', response_model=None) + @router.get( + "/project/{project_name}/{version}", name="project_version", response_model=None + ) + @router.get( + "/project/{project_name}/{version}/{page_section}", + name="project_version_section", + response_model=None, + ) async def project( self, request: fastapi.Request, @@ -127,19 +142,26 @@ async def project( try: _version = Version(version) except InvalidVersion: - raise errors.RequestError(status_code=404, detail=f"Invalid version {version}.") + raise errors.RequestError( + status_code=404, detail=f"Invalid version {version}." + ) - t = asyncio.create_task(self.model.project_page(project_name, _version, recache)) + t = asyncio.create_task( + self.model.project_page(project_name, _version, recache) + ) # Try for 5 seconds to get the response. Otherwise, fall back to a waiting page which can # re-direct us back here once the data is available. # TODO: Prevent infinite looping. await asyncio.wait([t], timeout=5) if not t.done(): + async def iter_file(): # TODO: use a different view for this. yield self.view.error_page( context=model.ErrorModel( - detail=Markup("
Project metadata is being fetched. This page will reload when ready.
"), + detail=Markup( + "
Project metadata is being fetched. This page will reload when ready.
" + ), ), request=request, ) @@ -150,7 +172,8 @@ async def iter_file(): else: break # We are done (or were in an infinite loop). Signal that we are finished, then exit. - yield 'Done!
\n' + yield "Done!
\n" + return StreamingResponse(iter_file(), media_type="text/html") response = t.result() diff --git a/simple_repository_browser/crawler.py b/simple_repository_browser/crawler.py index 06644fa..f0af9aa 100644 --- a/simple_repository_browser/crawler.py +++ b/simple_repository_browser/crawler.py @@ -23,6 +23,7 @@ class Crawler: A crawler designed to populate and periodically reindex the content of the project's metadata database. """ + def __init__( self, http_client: httpx.AsyncClient, @@ -110,24 +111,26 @@ async def refetch_hook(self) -> None: ) packages_w_dist_info = set() for cache_type, name, version in self._cache: - if cache_type == 'pkg-info': + if cache_type == "pkg-info": packages_w_dist_info.add(name) popular_projects = [] if self._crawl_popular_projects: # Add the top 100 packages (and their dependencies) to the repository - URL = 'https://hugovk.github.io/top-pypi-packages/top-pypi-packages-30-days.min.json' + URL = "https://hugovk.github.io/top-pypi-packages/top-pypi-packages-30-days.min.json" try: resp = await self._http_client.get(URL) s = resp.json() - for _, row in zip(range(100), s['rows']): - popular_projects.append(row['project']) + for _, row in zip(range(100), s["rows"]): + popular_projects.append(row["project"]) except Exception as err: - logging.warning(f'Problem fetching popular projects ({err})') + logging.warning(f"Problem fetching popular projects ({err})") pass projects_to_crawl = packages_w_dist_info | set(popular_projects) - logging.info(f'About to start crawling {len(projects_to_crawl)} projects (and their transient dependencies)') + logging.info( + f"About to start crawling {len(projects_to_crawl)} projects (and their transient dependencies)" + ) await self.crawl_recursively(projects_to_crawl) async def run_reindex_periodically(self) -> None: @@ -147,8 +150,7 @@ async def fetch_pkg_info( releases: dict[Version, ShortReleaseInfo], force_recache: bool, ) -> tuple[model.File, PackageInfo]: - - key = ('pkg-info', prj.name, str(version)) + key = ("pkg-info", prj.name, str(version)) if key in self._cache and not force_recache: info_file, files_used_for_cache, pkg_info = self._cache[key] @@ -163,7 +165,7 @@ async def fetch_pkg_info( return info_file, pkg_info if force_recache: - logging.info('Recaching') + logging.info("Recaching") fetch_projects.insert_if_missing( self._projects_db, @@ -171,16 +173,19 @@ async def fetch_pkg_info( prj.name, ) - info_file, pkg_info = await package_info(releases[version].files, self._source, prj.name) + info_file, pkg_info = await package_info( + releases[version].files, self._source, prj.name + ) self._cache[key] = info_file, releases[version].files, pkg_info release_info = releases[version] - if 'latest-release' in release_info.labels: + if "latest-release" in release_info.labels: fetch_projects.update_summary( self._projects_db, name=canonicalize_name(prj.name), summary=pkg_info.summary, - release_date=info_file.upload_time or datetime.fromtimestamp(0, tz=timezone.utc), + release_date=info_file.upload_time + or datetime.fromtimestamp(0, tz=timezone.utc), release_version=str(version), ) diff --git a/simple_repository_browser/fetch_description.py b/simple_repository_browser/fetch_description.py index 6294a3e..2ed13b4 100644 --- a/simple_repository_browser/fetch_description.py +++ b/simple_repository_browser/fetch_description.py @@ -65,8 +65,8 @@ def discover_extra_markers(cls, ast) -> typing.Generator[str, None, None]: if isinstance(ast[2], (list, tuple)): yield from cls.discover_extra_markers(ast[2]) elif isinstance(ast, tuple): - lhs_v = getattr(ast[0], 'value', None) - if lhs_v == 'extra': + lhs_v = getattr(ast[0], "value", None) + if lhs_v == "extra": yield ast[2].value # Note: Technically, it is possible to build a '"foo" == extra' style # marker. We don't bother with it though, since it isn't something @@ -87,6 +87,7 @@ def extras_for_requirement(cls, requirement: Requirement) -> set[str]: @dataclasses.dataclass class PackageInfo: """Represents a simplified pkg-info/dist-info metadata, suitable for easy (and safe) use in html templates""" + summary: str description: str # This is HTML safe (rendered with readme_renderer). author: typing.Optional[str] = None @@ -108,9 +109,9 @@ async def fetch_file(url, dest): try: r.raise_for_status() except httpx.HTTPError as err: - raise IOError(f'Unable to fetch file (reason: { str(err) })') + raise IOError(f"Unable to fetch file (reason: {str(err)})") chunk_size = 1024 * 100 - with open(dest, 'wb') as fd: + with open(dest, "wb") as fd: async for chunk in r.aiter_bytes(chunk_size): fd.write(chunk) @@ -134,9 +135,9 @@ async def package_info( release_files, key=lambda file: ( not file.dist_info_metadata, # Put those with dist info metadata first. - not file.filename.endswith('.whl'), - not file.filename.endswith('.tar.gz'), - not file.filename.endswith('.zip'), + not file.filename.endswith(".whl"), + not file.filename.endswith(".tar.gz"), + not file.filename.endswith(".zip"), file.upload_time, # Distinguish conflicts by picking the earliest one. ), ) @@ -154,6 +155,7 @@ async def package_info( # Compute the size of each file. # TODO: This should be done as part of the repository component interface. async with httpx.AsyncClient(verify=False) as http_client: + async def semaphored_head(filename: str, url: str): async with limited_concurrency: headers: dict[str, str] = {} @@ -161,6 +163,7 @@ async def semaphored_head(filename: str, url: str): filename, await http_client.head(url, follow_redirects=True, headers=headers), ) + coros = [ semaphored_head(file.filename, file.url) for file in files @@ -169,20 +172,20 @@ async def semaphored_head(filename: str, url: str): for coro in asyncio.as_completed(coros): filename, response = await coro files_info[filename] = FileInfo( - size=int(response.headers['Content-Length']), + size=int(response.headers["Content-Length"]), ) file = files[0] if file.dist_info_metadata: - resource_name = file.filename + '.metadata' + resource_name = file.filename + ".metadata" else: raise ValueError(f"Metadata not available for {file}") - logging.debug(f'Downloading metadata for {file.filename} from {resource_name}') + logging.debug(f"Downloading metadata for {file.filename} from {resource_name}") with tempfile.NamedTemporaryFile( - suffix=os.path.splitext(file.filename)[1], + suffix=os.path.splitext(file.filename)[1], ) as tmp: resource = await repository.get_resource(project_name, resource_name) @@ -191,9 +194,11 @@ async def semaphored_head(filename: str, url: str): if not file.upload_time: # If the repository doesn't provide information about the upload time, estimate # it from the headers of the resource, if they exist. - if ct := resource.context.get('creation-date'): + if ct := resource.context.get("creation-date"): if isinstance(ct, str): - file = dataclasses.replace(file, upload_time=datetime.datetime.fromisoformat(ct)) + file = dataclasses.replace( + file, upload_time=datetime.datetime.fromisoformat(ct) + ) elif isinstance(resource, model.HttpResource): await fetch_file(resource.url, tmp.name) else: @@ -210,11 +215,11 @@ async def semaphored_head(filename: str, url: str): def extract_usernames(emails): names = [] parsed = email.parser.Parser(policy=email.policy.default).parsestr( - f'To: {info.author_email}', + f"To: {info.author_email}", ) - for address in parsed['to'].addresses: + for address in parsed["to"].addresses: names.append(address.display_name) - return ', '.join(names) + return ", ".join(names) if not info.author and info.author_email: info.author = extract_usernames(info.author_email) @@ -223,17 +228,18 @@ def extract_usernames(emails): info.maintainer = extract_usernames(info.maintainer_email) project_urls = { - url.split(',')[0].strip().title(): url.split(',')[1].strip() + url.split(",")[0].strip().title(): url.split(",")[1].strip() for url in info.project_urls or [] } # Ensure that a Homepage exists in the project urls - if info.home_page and 'Homepage' not in project_urls: - project_urls['Homepage'] = info.home_page + if info.home_page and "Homepage" not in project_urls: + project_urls["Homepage"] = info.home_page sorted_urls = { - name: url for name, url in sorted( + name: url + for name, url in sorted( project_urls.items(), - key=lambda item: (item[0] != 'Homepage', item[0]), + key=lambda item: (item[0] != "Homepage", item[0]), ) } @@ -245,7 +251,7 @@ def extract_usernames(emails): reqs.append(InvalidRequirementSpecification(req)) pkg = PackageInfo( - summary=info.summary or '', + summary=info.summary or "", description=description, author=info.author, maintainer=info.maintainer, @@ -270,17 +276,21 @@ def extract_usernames(emails): def generate_safe_description_html(package_info: pkginfo.Distribution) -> str: # Handle the valid description content types. # https://packaging.python.org/specifications/core-metadata - description_type = package_info.description_content_type or 'text/x-rst' - raw_description = package_info.description or '' + description_type = package_info.description_content_type or "text/x-rst" + raw_description = package_info.description or "" # Seen in the wild (internal only: sps-deep-hysteresis-compensation). - description_type = description_type.replace('\"', '') + description_type = description_type.replace('"', "") - if description_type == 'text/x-rst' or description_type.startswith('text/x-rst;'): + if description_type == "text/x-rst" or description_type.startswith("text/x-rst;"): return readme_renderer.rst.render(raw_description) or "" - elif description_type == 'text/markdown' or description_type.startswith('text/markdown;'): # Seen longer form with orjson + elif description_type == "text/markdown" or description_type.startswith( + "text/markdown;" + ): # Seen longer form with orjson return readme_renderer.markdown.render(raw_description) or "" - elif description_type == 'text/plain' or description_type.startswith('text/plain;'): # seen with nbformat + elif description_type == "text/plain" or description_type.startswith( + "text/plain;" + ): # seen with nbformat return readme_renderer.txt.render(raw_description) or "" else: raise ValueError(f"Unknown readme format {description_type}") diff --git a/simple_repository_browser/fetch_projects.py b/simple_repository_browser/fetch_projects.py index 946564f..8e2ebb8 100644 --- a/simple_repository_browser/fetch_projects.py +++ b/simple_repository_browser/fetch_projects.py @@ -8,9 +8,9 @@ def create_table(connection): con = connection with con as cursor: cursor.execute( - '''CREATE TABLE IF NOT EXISTS projects + """CREATE TABLE IF NOT EXISTS projects (canonical_name text unique, preferred_name text, summary text, release_date timestamp, release_version text) - ''', + """, ) @@ -24,28 +24,33 @@ def insert_if_missing(connection, canonical_name, preferred_name): def remove_if_found(connection, canonical_name): with connection as cursor: - cursor.execute('DELETE FROM projects where canonical_name = ?;', (canonical_name,)) + cursor.execute( + "DELETE FROM projects where canonical_name = ?;", (canonical_name,) + ) -def update_summary(conn, name: str, summary: str, release_date: datetime.datetime, release_version: str): +def update_summary( + conn, name: str, summary: str, release_date: datetime.datetime, release_version: str +): with conn as cursor: cursor.execute( - ''' + """ UPDATE projects SET summary = ?, release_date = ?, release_version = ? WHERE canonical_name == ?; - ''', (summary, release_date, release_version, name), + """, + (summary, release_date, release_version, name), ) async def fully_populate_db(connection, repository: SimpleRepository): con = connection - logging.info('Fetching names from repository') + logging.info("Fetching names from repository") project_list = await repository.get_project_list() project_names = [ (project.normalized_name, project.name) for project in project_list.projects ] - logging.info('Inserting all new names (if any)') + logging.info("Inserting all new names (if any)") with con as cursor: for canonical_name, name in project_names: cursor.execute( @@ -54,27 +59,33 @@ async def fully_populate_db(connection, repository: SimpleRepository): ) with con as cursor: - db_canonical_names = {row[0] for row in cursor.execute("SELECT canonical_name FROM projects").fetchall()} + db_canonical_names = { + row[0] + for row in cursor.execute("SELECT canonical_name FROM projects").fetchall() + } index_canonical_names = {normed_name for normed_name, _ in project_names} if not index_canonical_names: - logging.warning("No names found in the repository. Not removing from the database, as this is likely a problem with the repository.") + logging.warning( + "No names found in the repository. Not removing from the database, as this is likely a problem with the repository." + ) return names_in_db_no_longer_in_index = db_canonical_names - index_canonical_names if names_in_db_no_longer_in_index: logging.warning( - f'Removing the following { len(names_in_db_no_longer_in_index) } names from the database:\n ' - "\n ".join(list(names_in_db_no_longer_in_index)[:2000]) + "\n", + f"Removing the following {len(names_in_db_no_longer_in_index)} names from the database:\n " + "\n ".join(list(names_in_db_no_longer_in_index)[:2000]) + + "\n", ) with con as cursor: for name in names_in_db_no_longer_in_index: cursor.execute( - ''' + """ DELETE FROM projects WHERE canonical_name == ?; - ''', + """, (name,), ) - logging.info('DB synchronised with repository') + logging.info("DB synchronised with repository") diff --git a/simple_repository_browser/metadata_injector.py b/simple_repository_browser/metadata_injector.py index 8284d27..7ced78c 100644 --- a/simple_repository_browser/metadata_injector.py +++ b/simple_repository_browser/metadata_injector.py @@ -11,16 +11,16 @@ def get_metadata_from_sdist(package_path: pathlib.Path) -> str: archive = tarfile.TarFile.open(package_path) names = archive.getnames() - pkg_info_files = [x.split('/') for x in names if 'PKG-INFO' in x] + pkg_info_files = [x.split("/") for x in names if "PKG-INFO" in x] ordered_pkg_info = sorted(pkg_info_files, key=lambda pth: -len(pth)) for path in ordered_pkg_info: - candidate = '/'.join(path) + candidate = "/".join(path) f = archive.extractfile(candidate) if f is None: continue data = f.read().decode() - if 'Metadata-Version' in data: + if "Metadata-Version" in data: return data raise ValueError(f"No metadata found in {package_path.name}") @@ -30,27 +30,27 @@ def get_metadata_from_zip(package_path: pathlib.Path) -> str: with zipfile.ZipFile(package_path) as archive: names = archive.namelist() - pkg_info_files = [x.split('/') for x in names if 'PKG-INFO' in x] + pkg_info_files = [x.split("/") for x in names if "PKG-INFO" in x] ordered_pkg_info = sorted(pkg_info_files, key=lambda pth: -len(pth)) for path in ordered_pkg_info: - candidate = '/'.join(path) - f = archive.open(candidate, mode='r') + candidate = "/".join(path) + f = archive.open(candidate, mode="r") if f is None: continue data = f.read().decode() - if 'Metadata-Version' in data: + if "Metadata-Version" in data: return data raise ValueError(f"No metadata found in {package_path.name}") class MetadataInjector(MetadataInjectorRepository): def _get_metadata_from_package(self, package_path: pathlib.Path) -> str: - if package_path.name.endswith('.whl'): + if package_path.name.endswith(".whl"): return self._get_metadata_from_wheel(package_path) - elif package_path.name.endswith('.tar.gz'): + elif package_path.name.endswith(".tar.gz"): return get_metadata_from_sdist(package_path) - elif package_path.name.endswith('.zip'): + elif package_path.name.endswith(".zip"): return get_metadata_from_zip(package_path) raise ValueError("Package provided is not a wheel") diff --git a/simple_repository_browser/model.py b/simple_repository_browser/model.py index 0bf3862..e62a795 100644 --- a/simple_repository_browser/model.py +++ b/simple_repository_browser/model.py @@ -78,13 +78,15 @@ def __init__( def repository_stats(self) -> RepositoryStatsModel: with self.projects_db as cursor: - [n_packages] = cursor.execute('SELECT COUNT(canonical_name) FROM projects').fetchone() + [n_packages] = cursor.execute( + "SELECT COUNT(canonical_name) FROM projects" + ).fetchone() with self.cache as cache: n_dist_info = len(cache) packages_w_dist_info = set() for cache_type, name, version in cache: - if cache_type == 'pkg-info': + if cache_type == "pkg-info": packages_w_dist_info.add(name) n_packages_w_dist_info = len(packages_w_dist_info) @@ -94,7 +96,9 @@ def repository_stats(self) -> RepositoryStatsModel: n_packages_w_dist_info=n_packages_w_dist_info, ) - def _compatibility_matrix(self, files: tuple[File, ...]) -> compatibility_matrix.CompatibilityMatrixModel: + def _compatibility_matrix( + self, files: tuple[File, ...] + ) -> compatibility_matrix.CompatibilityMatrixModel: # Compute the compatibility matrix for the given files. return compatibility_matrix.compatibility_matrix(files) @@ -114,14 +118,14 @@ def project_query(self, query: str, page_size: int, page: int) -> QueryResultMod single_name_proposal = _search.simple_name_from_query(search_terms) exact = None - offset = (page-1) * page_size # page is 1 based. + offset = (page - 1) * page_size # page is 1 based. with self.projects_db as cursor: result_count = cursor.execute( - "SELECT COUNT(*) as count FROM projects WHERE " - f"{condition_query}", condition_terms, + f"SELECT COUNT(*) as count FROM projects WHERE {condition_query}", + condition_terms, ).fetchone() - n_results = result_count['count'] + n_results = result_count["count"] n_pages = math.ceil(n_results / page_size) if n_pages > 0 and (page < 1 or page > n_pages): @@ -131,7 +135,7 @@ def project_query(self, query: str, page_size: int, page: int) -> QueryResultMod if single_name_proposal: exact = cursor.execute( - 'SELECT canonical_name, summary, release_version, release_date FROM projects WHERE canonical_name == ?', + "SELECT canonical_name, summary, release_version, release_date FROM projects WHERE canonical_name == ?", (single_name_proposal,), ).fetchone() results = cursor.execute( @@ -163,17 +167,23 @@ async def project_page( canonical_name = canonicalize_name(project_name) try: prj = await self.source.get_project_page(canonical_name) - fetch_projects.insert_if_missing(self.projects_db, canonical_name, project_name) + fetch_projects.insert_if_missing( + self.projects_db, canonical_name, project_name + ) except PackageNotFoundError: # Tidy up the cache if the project is no longer found. for key in list(self.cache): - if key[:2] == ('pkg-info', canonical_name): + if key[:2] == ("pkg-info", canonical_name): self.cache.pop(key) fetch_projects.remove_if_found(self.projects_db, canonical_name) - raise errors.RequestError(status_code=404, detail=f"Project {project_name} not found.") + raise errors.RequestError( + status_code=404, detail=f"Project {project_name} not found." + ) if not prj.files: - raise errors.RequestError(status_code=404, detail=f"No releases for {project_name}.") + raise errors.RequestError( + status_code=404, detail=f"No releases for {project_name}." + ) releases, latest_version = self._release_info_model.release_infos(prj) @@ -181,19 +191,32 @@ async def project_page( version = latest_version if version not in releases: - raise errors.RequestError(status_code=404, detail=f'Release "{version}" not found for {project_name}.') + raise errors.RequestError( + status_code=404, + detail=f'Release "{version}" not found for {project_name}.', + ) release = releases[version] if not release.files: quarantine_context = "" - if 'quarantined' in release.labels: - quarantine_context = " Files have been identified as quarantined for this project." - raise errors.RequestError(status_code=404, detail=f'Release "{version}" has no files.' + quarantine_context, project_page=prj) + if "quarantined" in release.labels: + quarantine_context = ( + " Files have been identified as quarantined for this project." + ) + raise errors.RequestError( + status_code=404, + detail=f'Release "{version}" has no files.' + quarantine_context, + project_page=prj, + ) - info_file, pkg_info = await self.crawler.fetch_pkg_info(prj, version, releases, force_recache=recache) + info_file, pkg_info = await self.crawler.fetch_pkg_info( + prj, version, releases, force_recache=recache + ) classifiers_by_top_level = { - top_level: tuple(classifier) for top_level, classifier in itertools.groupby( - pkg_info.classifiers, key=lambda s: s.split('::')[0], + top_level: tuple(classifier) + for top_level, classifier in itertools.groupby( + pkg_info.classifiers, + key=lambda s: s.split("::")[0], ) } diff --git a/simple_repository_browser/short_release_info.py b/simple_repository_browser/short_release_info.py index b08dd9a..d683c0f 100644 --- a/simple_repository_browser/short_release_info.py +++ b/simple_repository_browser/short_release_info.py @@ -16,12 +16,16 @@ class ShortReleaseInfo: version: Version files: tuple[model.File, ...] release_date: datetime | None - labels: typing.Mapping[str, typing.Annotated[str, 'reason']] # A mapping between labels (yank, partial-yank, quarantined, latest-release, etc.) to a reason for that label. + labels: typing.Mapping[ + str, typing.Annotated[str, "reason"] + ] # A mapping between labels (yank, partial-yank, quarantined, latest-release, etc.) to a reason for that label. class ReleaseInfoModel: @classmethod - def release_infos(cls, project_detail: model.ProjectDetail) -> tuple[dict[Version, ShortReleaseInfo], Version]: + def release_infos( + cls, project_detail: model.ProjectDetail + ) -> tuple[dict[Version, ShortReleaseInfo], Version]: files_grouped_by_version: dict[Version, list[model.File]] = {} if not project_detail.files: @@ -37,11 +41,11 @@ def release_infos(cls, project_detail: model.ProjectDetail) -> tuple[dict[Versio ), ) except (ValueError, InvalidVersion): - release = Version('0.0rc0') + release = Version("0.0rc0") files_grouped_by_version.setdefault(release, []).append(file) # Ensure there is a release for each version, even if there is no files for it. - for version_str in (project_detail.versions or []): + for version_str in project_detail.versions or []: files_grouped_by_version.setdefault(Version(version_str), []) result: dict[Version, ShortReleaseInfo] = {} @@ -50,39 +54,56 @@ def release_infos(cls, project_detail: model.ProjectDetail) -> tuple[dict[Versio if typing.TYPE_CHECKING: RawQuarantinefile = typing.TypedDict( - 'RawQuarantinefile', { - 'filename': str, 'quarantine_release_time': str, 'upload_time': str, + "RawQuarantinefile", + { + "filename": str, + "quarantine_release_time": str, + "upload_time": str, }, ) Quarantinefile = typing.TypedDict( - 'Quarantinefile', { - 'filename': str, 'quarantine_release_time': datetime, 'upload_time': datetime, + "Quarantinefile", + { + "filename": str, + "quarantine_release_time": datetime, + "upload_time": datetime, }, ) - quarantined_files: list[RawQuarantinefile] = typing.cast(typing.Any, project_detail.private_metadata.get('_quarantined_files')) or [] + quarantined_files: list[RawQuarantinefile] = ( + typing.cast( + typing.Any, project_detail.private_metadata.get("_quarantined_files") + ) + or [] + ) quarantined_files_by_release: dict[Version, list[Quarantinefile]] = {} date_format = "%Y-%m-%dT%H:%M:%SZ" for file_info in quarantined_files: quarantined_file: Quarantinefile = { - 'filename': file_info['filename'], - 'quarantine_release_time': datetime.strptime(file_info['quarantine_release_time'], date_format), - 'upload_time': datetime.strptime(file_info['upload_time'], date_format), + "filename": file_info["filename"], + "quarantine_release_time": datetime.strptime( + file_info["quarantine_release_time"], date_format + ), + "upload_time": datetime.strptime(file_info["upload_time"], date_format), } release = Version( extract_package_version( - filename=quarantined_file['filename'], + filename=quarantined_file["filename"], project_name=canonical_name, ), ) - quarantined_files_by_release.setdefault(release, []).append(quarantined_file) + quarantined_files_by_release.setdefault(release, []).append( + quarantined_file + ) # Make sure there is a record for this release, even if there are no files. files_grouped_by_version.setdefault(release, []) for version, files in sorted(files_grouped_by_version.items()): - quarantined_files_for_release = quarantined_files_by_release.get(version, []) + quarantined_files_for_release = quarantined_files_by_release.get( + version, [] + ) upload_times: list[datetime] = [ file.upload_time for file in files if file.upload_time is not None @@ -101,20 +122,27 @@ def release_infos(cls, project_detail: model.ProjectDetail) -> tuple[dict[Versio else: not_yanked_files += 1 if yanked_files > 0 and not_yanked_files > 0: - labels['partial-yank'] = 'Partially yanked' + labels["partial-yank"] = "Partially yanked" elif yanked_files > 0 and not_yanked_files == 0: - labels['yanked'] = '. '.join(yank_reasons or ['No yank reasons given']) + labels["yanked"] = ". ".join(yank_reasons or ["No yank reasons given"]) if quarantined_files_for_release: - quarantine_release_times = [file['quarantine_release_time'] for file in quarantined_files_for_release] + quarantine_release_times = [ + file["quarantine_release_time"] + for file in quarantined_files_for_release + ] quarantine_release_time = min(quarantine_release_times) # When computing the release time, take into account quarantined files. if not upload_times: - upload_times = [file['upload_time'] for file in quarantined_files_for_release] - labels['quarantined'] = f"Release quarantined. Available from {quarantine_release_time}" + upload_times = [ + file["upload_time"] for file in quarantined_files_for_release + ] + labels["quarantined"] = ( + f"Release quarantined. Available from {quarantine_release_time}" + ) if version == latest_version: - labels['latest-release'] = '' + labels["latest-release"] = "" if upload_times: earliest_release_date = min(upload_times) @@ -131,14 +159,19 @@ def release_infos(cls, project_detail: model.ProjectDetail) -> tuple[dict[Versio return result, latest_version @classmethod - def compute_latest_version(cls, versions: dict[Version, list[typing.Any]]) -> Version: + def compute_latest_version( + cls, versions: dict[Version, list[typing.Any]] + ) -> Version: # Use the pip logic to determine the latest release. First, pick the greatest non-dev version, # and if nothing, fall back to the greatest dev version. If no release is available return None. sorted_versions = sorted( versions, key=lambda version: ( - len(versions[version]) > 0, # Prioritise the releases with files (e.g. not quarantined). + # Prioritise the releases with files (e.g. not quarantined). + len(versions[version]) > 0, + # Then, put the non dev-releases first. not version.is_devrelease and not version.is_prerelease, + # Finally, order by the version. version, ), ) diff --git a/simple_repository_browser/static_files.py b/simple_repository_browser/static_files.py index f6b6f5c..563ce08 100644 --- a/simple_repository_browser/static_files.py +++ b/simple_repository_browser/static_files.py @@ -21,20 +21,22 @@ StaticFilesManifest: typing.TypeAlias = dict[str, tuple[str, pathlib.Path]] -def compile_static_files(*, destination: pathlib.Path, manifest: StaticFilesManifest) -> None: +def compile_static_files( + *, destination: pathlib.Path, manifest: StaticFilesManifest +) -> None: """Compile a static directory from one or more source directories.""" # This function is designed to write the static files, could be useful for serving static # files via apache/nginx/etc. - file_map: dict[str, dict[str, str]] = {'file-map': {}} + file_map: dict[str, dict[str, str]] = {"file-map": {}} for input_filename, (hashed_relpath, source_path) in manifest.items(): target = destination / hashed_relpath target.parent.mkdir(parents=True, exist_ok=True) shutil.copy(source_path, target) - file_map['file-map'][str(input_filename)] = str(target) + file_map["file-map"][str(input_filename)] = str(target) - json.dump(file_map, (destination / '.manifest.json').open('w'), indent=2) - (destination / '.gitignore').write_text('*') + json.dump(file_map, (destination / ".manifest.json").open("w"), indent=2) + (destination / ".gitignore").write_text("*") def generate_manifest(sources: typing.Sequence[pathlib.Path]) -> StaticFilesManifest: @@ -45,17 +47,17 @@ def generate_manifest(sources: typing.Sequence[pathlib.Path]) -> StaticFilesMani files_to_compile: dict[pathlib.Path, pathlib.Path] = {} for source in sources: assert source.exists() - for path in sorted(source.glob('**/*')): + for path in sorted(source.glob("**/*")): if not path.is_file(): continue - if path.name.startswith('.'): + if path.name.startswith("."): continue rel = path.relative_to(source) files_to_compile[rel] = path for rel, source_path in files_to_compile.items(): file_hash = sha256(source_path.read_bytes()).hexdigest()[:12] - name = f'{source_path.stem}.{file_hash}{source_path.suffix}' + name = f"{source_path.stem}.{file_hash}{source_path.suffix}" manifest[str(rel)] = (str(rel.parent / name), source_path) return manifest @@ -83,17 +85,21 @@ async def get_response(self, path: str, scope: Scope) -> Response: def main(argv: typing.Sequence[str]) -> None: - parser = argparse.ArgumentParser(prog='simple_repository_browser.static') + parser = argparse.ArgumentParser(prog="simple_repository_browser.static") subparsers = parser.add_subparsers() - parser_compile_static = subparsers.add_parser('compile', help='Compile the static files into a directory') - parser_compile_static.add_argument('destination', type=pathlib.Path, help='Where to write the static files') + parser_compile_static = subparsers.add_parser( + "compile", help="Compile the static files into a directory" + ) + parser_compile_static.add_argument( + "destination", type=pathlib.Path, help="Where to write the static files" + ) parser_compile_static.add_argument( - 'source', + "source", type=pathlib.Path, - help='The source of static files to combine (may be provided multiple times)', - nargs='+', + help="The source of static files to combine (may be provided multiple times)", + nargs="+", ) parser_compile_static.set_defaults(handler=handle_compile) @@ -102,11 +108,11 @@ def main(argv: typing.Sequence[str]) -> None: def handle_compile(args: argparse.Namespace): - print(f'Writing static files to {args.destination}') + print(f"Writing static files to {args.destination}") manifest = generate_manifest(args.source) compile_static_files(destination=args.destination, manifest=manifest) -if __name__ == '__main__': +if __name__ == "__main__": # Enable simple_repository_browser.static_files CLI. main(sys.argv[1:]) diff --git a/simple_repository_browser/tests/test_RequirementSequence.py b/simple_repository_browser/tests/test_RequirementSequence.py index bdcd8e0..eb13bb2 100644 --- a/simple_repository_browser/tests/test_RequirementSequence.py +++ b/simple_repository_browser/tests/test_RequirementSequence.py @@ -5,8 +5,8 @@ def test_extra__basic(): - s = RequirementsSequence((Requirement('foo'), Requirement('bar; extra == "bar"'))) - assert s.extras() == {'bar'} + s = RequirementsSequence((Requirement("foo"), Requirement('bar; extra == "bar"'))) + assert s.extras() == {"bar"} def test_extras__multiple_specs(): @@ -16,44 +16,50 @@ def test_extras__multiple_specs(): Requirement('wobble; extra == "wibble"'), ), ) - assert s.extras() == {'bar', 'foo', 'wibble'} + assert s.extras() == {"bar", "foo", "wibble"} def test_extras__2_extras_or(): s = RequirementsSequence( ( - Requirement('foo'), + Requirement("foo"), Requirement('bar; extra == "bar" or extra == "foo"'), ), ) - assert s.extras() == {'bar', 'foo'} + assert s.extras() == {"bar", "foo"} def test_extras__2_extras_and(): # Not realistic, but technically possible. s = RequirementsSequence((Requirement('bar; extra == "bar" and extra == "foo"'),)) - assert s.extras() == {'bar', 'foo'} + assert s.extras() == {"bar", "foo"} def test_extras__and_py_version(): s = RequirementsSequence( ( - Requirement('foo'), + Requirement("foo"), Requirement('bar; extra == "bar" and python_version>="4.0"'), ), ) - assert s.extras() == {'bar'} + assert s.extras() == {"bar"} def test_extras__legacy_format(): # As seen in cliff/2.10.0 - s = RequirementsSequence((Requirement("unicodecsv (>=0.8.0); (python_version<'3.0')"),)) + s = RequirementsSequence( + (Requirement("unicodecsv (>=0.8.0); (python_version<'3.0')"),) + ) assert s.extras() == set() def test_extras__none(): - s = RequirementsSequence(( - Requirement("foo; (os_name == 'nt' or sys_platform == 'linux') and python_version <= '3.8'"), - Requirement("bar; python_version > '3.8'"), - )) + s = RequirementsSequence( + ( + Requirement( + "foo; (os_name == 'nt' or sys_platform == 'linux') and python_version <= '3.8'" + ), + Requirement("bar; python_version > '3.8'"), + ) + ) assert s.extras() == set() diff --git a/simple_repository_browser/tests/test_compatibility_matrix.py b/simple_repository_browser/tests/test_compatibility_matrix.py index 5ca5751..7c25eb3 100644 --- a/simple_repository_browser/tests/test_compatibility_matrix.py +++ b/simple_repository_browser/tests/test_compatibility_matrix.py @@ -16,9 +16,9 @@ def test_compat_mtx__pure_wheel(): mtx = compatibility_matrix( (model.File("cycler-0.12.1-py3-none-any.whl", "", {}),), ) - assert mtx.py_and_abi_names == ('Python 3',) - assert mtx.platform_names == ('any',) - assert tuple(mtx.matrix.keys()) == (('Python 3', 'any'),) + assert mtx.py_and_abi_names == ("Python 3",) + assert mtx.platform_names == ("any",) + assert tuple(mtx.matrix.keys()) == (("Python 3", "any"),) def test_compat_mtx__invalid_filename(): @@ -28,8 +28,8 @@ def test_compat_mtx__invalid_filename(): mtx = compatibility_matrix(files) - assert mtx.py_and_abi_names == ('madeup27 (madeup2_7m)',) - assert mtx.platform_names == ('anything_you_like',) + assert mtx.py_and_abi_names == ("madeup27 (madeup2_7m)",) + assert mtx.platform_names == ("anything_you_like",) def test_compat_mtx__pyqt6(): @@ -42,29 +42,43 @@ def test_compat_mtx__pyqt6(): mtx = compatibility_matrix(files) - assert mtx.py_and_abi_names == ('CPython >=3.7 (abi3)',) - assert mtx.platform_names == ('macosx_10_14_universal2', 'manylinux_2_28_x86_64', 'win_amd64') + assert mtx.py_and_abi_names == ("CPython >=3.7 (abi3)",) + assert mtx.platform_names == ( + "macosx_10_14_universal2", + "manylinux_2_28_x86_64", + "win_amd64", + ) - assert tuple(mtx.matrix.keys()) == (('CPython >=3.7 (abi3)', 'macosx_10_14_universal2'), ('CPython >=3.7 (abi3)', 'manylinux_2_28_x86_64'), ('CPython >=3.7 (abi3)', 'win_amd64')) + assert tuple(mtx.matrix.keys()) == ( + ("CPython >=3.7 (abi3)", "macosx_10_14_universal2"), + ("CPython >=3.7 (abi3)", "manylinux_2_28_x86_64"), + ("CPython >=3.7 (abi3)", "win_amd64"), + ) def test_compat_mtx__lxml_py2_flags(): files = [ model.File("lxml-4.9.3-cp27-cp27m-manylinux_2_5_i686.whl", "", {}), model.File("lxml-4.9.3-cp27-cp27mu-manylinux_2_5_i686.whl", "", {}), - model.File("lxml-4.9.3-cp27-cp27du-manylinux_2_5_i686.manylinux1_i686.whl", "", {}), + model.File( + "lxml-4.9.3-cp27-cp27du-manylinux_2_5_i686.manylinux1_i686.whl", "", {} + ), ] mtx = compatibility_matrix(files) - assert mtx.py_and_abi_names == ('CPython 2.7', 'CPython (debug) (wide) 2.7', 'CPython (wide) 2.7') - assert mtx.platform_names == ('manylinux1_i686', 'manylinux_2_5_i686') + assert mtx.py_and_abi_names == ( + "CPython 2.7", + "CPython (debug) (wide) 2.7", + "CPython (wide) 2.7", + ) + assert mtx.platform_names == ("manylinux1_i686", "manylinux_2_5_i686") assert tuple(mtx.matrix.keys()) == ( - ('CPython 2.7', 'manylinux_2_5_i686'), - ('CPython (wide) 2.7', 'manylinux_2_5_i686'), - ('CPython (debug) (wide) 2.7', 'manylinux1_i686'), - ('CPython (debug) (wide) 2.7', 'manylinux_2_5_i686'), + ("CPython 2.7", "manylinux_2_5_i686"), + ("CPython (wide) 2.7", "manylinux_2_5_i686"), + ("CPython (debug) (wide) 2.7", "manylinux1_i686"), + ("CPython (debug) (wide) 2.7", "manylinux_2_5_i686"), ) @@ -75,7 +89,7 @@ def test_compat_mtx__unexpected_flags(): mtx = compatibility_matrix(files) - assert mtx.py_and_abi_names == ('CPython (debug) (additional flags: ab) 2.7',) + assert mtx.py_and_abi_names == ("CPython (debug) (additional flags: ab) 2.7",) def test_compat_mtx__underscore_version(): @@ -85,29 +99,35 @@ def test_compat_mtx__underscore_version(): mtx = compatibility_matrix(files) - assert mtx.py_and_abi_names == ('CPython (debug) 3.99',) + assert mtx.py_and_abi_names == ("CPython (debug) 3.99",) def test_compat_mtx__lxml_pypy_flags(): mtx = compatibility_matrix( - (model.File("lxml-4.9.3-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", "", {}),), + ( + model.File( + "lxml-4.9.3-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", "", {} + ), + ), ) - assert mtx.py_and_abi_names == ('PyPy 3.10 (pp73)',) - assert mtx.platform_names == ('manylinux_2_28_x86_64',) + assert mtx.py_and_abi_names == ("PyPy 3.10 (pp73)",) + assert mtx.platform_names == ("manylinux_2_28_x86_64",) def test_compat_mtx__none_abi(): mtx = compatibility_matrix( ( model.File( - "pydantic_core-2.11.0-cp38-cp38-manylinux2014_x86_64.whl", "", {}, + "pydantic_core-2.11.0-cp38-cp38-manylinux2014_x86_64.whl", + "", + {}, ), model.File("pydantic_core-2.11.0-cp38-none-win32.whl", "", {}), model.File("pydantic_core-2.11.0-cp38-none-win_amd64.whl", "", {}), ), ) - assert mtx.py_and_abi_names == ('CPython 3.8',) - assert mtx.platform_names == ('manylinux2014_x86_64', 'win32', 'win_amd64') + assert mtx.py_and_abi_names == ("CPython 3.8",) + assert mtx.platform_names == ("manylinux2014_x86_64", "win32", "win_amd64") def test_compat_mtx__abi3_many_python(): @@ -116,7 +136,9 @@ def test_compat_mtx__abi3_many_python(): mtx = compatibility_matrix( ( model.File( - "PyQt6-6.0.2-cp36.cp37.cp38.cp39-abi3-manylinux_2_28_x86_64.whl", "", {}, + "PyQt6-6.0.2-cp36.cp37.cp38.cp39-abi3-manylinux_2_28_x86_64.whl", + "", + {}, ), ), ) diff --git a/simple_repository_browser/tests/test_search.py b/simple_repository_browser/tests/test_search.py index 6ea6cb7..7ddc323 100644 --- a/simple_repository_browser/tests/test_search.py +++ b/simple_repository_browser/tests/test_search.py @@ -9,56 +9,93 @@ ["query", "expected_expression_graph"], [ ("", ()), - pytest.param("some-name", (Filter(FilterOn.name_or_summary, 'some-name'),)), - pytest.param("some name", (_search.And(Filter(FilterOn.name_or_summary, 'some'), Filter(FilterOn.name_or_summary, 'name')),)), - pytest.param("som*name", (Filter(FilterOn.name_or_summary, 'som*name'),)), + pytest.param("some-name", (Filter(FilterOn.name_or_summary, "some-name"),)), + pytest.param( + "some name", + ( + _search.And( + Filter(FilterOn.name_or_summary, "some"), + Filter(FilterOn.name_or_summary, "name"), + ), + ), + ), + pytest.param("som*name", (Filter(FilterOn.name_or_summary, "som*name"),)), pytest.param('"some name"', (Filter(FilterOn.name_or_summary, '"some name"'),)), pytest.param('"some-name"', (Filter(FilterOn.name_or_summary, '"some-name"'),)), pytest.param('"CASE"', (Filter(FilterOn.name_or_summary, '"CASE"'),)), - pytest.param('-foo', (_search.Not(Filter(FilterOn.name_or_summary, 'foo')),)), - pytest.param('-"foo bar"', (_search.Not(Filter(FilterOn.name_or_summary, '"foo bar"')),)), - pytest.param('-name:"foo bar"', (_search.Not(Filter(FilterOn.name, '"foo bar"')),)), - pytest.param('name:foo', (Filter(FilterOn.name, 'foo'),)), + pytest.param("-foo", (_search.Not(Filter(FilterOn.name_or_summary, "foo")),)), + pytest.param( + '-"foo bar"', (_search.Not(Filter(FilterOn.name_or_summary, '"foo bar"')),) + ), + pytest.param( + '-name:"foo bar"', (_search.Not(Filter(FilterOn.name, '"foo bar"')),) + ), + pytest.param("name:foo", (Filter(FilterOn.name, "foo"),)), pytest.param( - 'name:foo OR name:bar', ( + "name:foo OR name:bar", + ( _search.Or( - Filter(FilterOn.name, 'foo'), - Filter(FilterOn.name, 'bar'), + Filter(FilterOn.name, "foo"), + Filter(FilterOn.name, "bar"), ), ), ), pytest.param( - 'name:foo AND "fiddle AND sticks"', ( + 'name:foo AND "fiddle AND sticks"', + ( _search.And( - Filter(FilterOn.name, 'foo'), + Filter(FilterOn.name, "foo"), Filter(FilterOn.name_or_summary, '"fiddle AND sticks"'), ), ), ), - pytest.param('summary:foo', (Filter(FilterOn.summary, 'foo'),)), + pytest.param("summary:foo", (Filter(FilterOn.summary, "foo"),)), pytest.param( - 'name:"NAME OR" AND "fiddle AND sticks"', ( + 'name:"NAME OR" AND "fiddle AND sticks"', + ( _search.And( Filter(FilterOn.name, '"NAME OR"'), Filter(FilterOn.name_or_summary, '"fiddle AND sticks"'), ), ), ), - pytest.param('(((a)))', (Filter(FilterOn.name_or_summary, 'a'),)), - pytest.param('(((a) OR (b)))', (_search.Or(Filter(FilterOn.name_or_summary, 'a'), Filter(FilterOn.name_or_summary, 'b')),)), + pytest.param("(((a)))", (Filter(FilterOn.name_or_summary, "a"),)), pytest.param( - '(a AND b) OR (c AND d)', ( + "(((a) OR (b)))", + ( _search.Or( - _search.And(Filter(FilterOn.name_or_summary, 'a'), Filter(FilterOn.name_or_summary, 'b')), - _search.And(Filter(FilterOn.name_or_summary, 'c'), Filter(FilterOn.name_or_summary, 'd')), + Filter(FilterOn.name_or_summary, "a"), + Filter(FilterOn.name_or_summary, "b"), ), ), ), pytest.param( - '((a AND b)) OR (c AND -d)', ( + "(a AND b) OR (c AND d)", + ( _search.Or( - _search.And(Filter(FilterOn.name_or_summary, 'a'), Filter(FilterOn.name_or_summary, 'b')), - _search.And(Filter(FilterOn.name_or_summary, 'c'), _search.Not(Filter(FilterOn.name_or_summary, 'd'))), + _search.And( + Filter(FilterOn.name_or_summary, "a"), + Filter(FilterOn.name_or_summary, "b"), + ), + _search.And( + Filter(FilterOn.name_or_summary, "c"), + Filter(FilterOn.name_or_summary, "d"), + ), + ), + ), + ), + pytest.param( + "((a AND b)) OR (c AND -d)", + ( + _search.Or( + _search.And( + Filter(FilterOn.name_or_summary, "a"), + Filter(FilterOn.name_or_summary, "b"), + ), + _search.And( + Filter(FilterOn.name_or_summary, "c"), + _search.Not(Filter(FilterOn.name_or_summary, "d")), + ), ), ), ), @@ -77,7 +114,7 @@ def test_parse_query(query, expected_expression_graph): ("name:foo__unnormed", "foo-unnormed"), ("foo", "foo"), ("some*.Name", None), - ("summary:\"Some Description\"", None), + ('summary:"Some Description"', None), ("foo bar", None), ("foo OR bar", None), ("-name:foo OR -bar", None), @@ -94,15 +131,42 @@ def test_simple_name_proposal(query, expected_result): [ ("", ("", ())), (" ", ("", ())), - ("name:foo", ('canonical_name LIKE ?', ('%foo%',))), - ("name:foo__unnormed", ('canonical_name LIKE ?', ('%foo-unnormed%',))), - ("foo", ('(canonical_name LIKE ? OR summary LIKE ?)', ('%foo%', '%foo%'))), - ("some*.Name", ('(canonical_name LIKE ? OR summary LIKE ?)', ('%some%-name%', '%some%.Name%'))), - ("summary:\"Some Description\"", ('summary LIKE ?', ('%Some Description%',))), - ("foo bar", ('((canonical_name LIKE ? OR summary LIKE ?) AND (canonical_name LIKE ? OR summary LIKE ?))', ('%foo%', '%foo%', '%bar%', '%bar%'))), - ("foo OR bar", ('((canonical_name LIKE ? OR summary LIKE ?) OR (canonical_name LIKE ? OR summary LIKE ?))', ('%foo%', '%foo%', '%bar%', '%bar%'))), - ("-name:foo OR -bar", ('(Not (canonical_name LIKE ? OR (Not (canonical_name LIKE ? OR summary LIKE ?))))', ('%foo%', '%bar%', '%bar%'))), - ("summary:\"Some'; DROP TABLE gotcha; ' Description\"", ('summary LIKE ?', ("%Some'; DROP TABLE gotcha; ' Description%",))), + ("name:foo", ("canonical_name LIKE ?", ("%foo%",))), + ("name:foo__unnormed", ("canonical_name LIKE ?", ("%foo-unnormed%",))), + ("foo", ("(canonical_name LIKE ? OR summary LIKE ?)", ("%foo%", "%foo%"))), + ( + "some*.Name", + ( + "(canonical_name LIKE ? OR summary LIKE ?)", + ("%some%-name%", "%some%.Name%"), + ), + ), + ('summary:"Some Description"', ("summary LIKE ?", ("%Some Description%",))), + ( + "foo bar", + ( + "((canonical_name LIKE ? OR summary LIKE ?) AND (canonical_name LIKE ? OR summary LIKE ?))", + ("%foo%", "%foo%", "%bar%", "%bar%"), + ), + ), + ( + "foo OR bar", + ( + "((canonical_name LIKE ? OR summary LIKE ?) OR (canonical_name LIKE ? OR summary LIKE ?))", + ("%foo%", "%foo%", "%bar%", "%bar%"), + ), + ), + ( + "-name:foo OR -bar", + ( + "(Not (canonical_name LIKE ? OR (Not (canonical_name LIKE ? OR summary LIKE ?))))", + ("%foo%", "%bar%", "%bar%"), + ), + ), + ( + "summary:\"Some'; DROP TABLE gotcha; ' Description\"", + ("summary LIKE ?", ("%Some'; DROP TABLE gotcha; ' Description%",)), + ), ], ) def test_build_sql_predicate(query, expected_predicate): @@ -118,15 +182,15 @@ def test_build_sql_predicate(query, expected_predicate): # ("", ()), ? Should this be an error? Currently explicitly enabled. # (" ", pytest.raises(parsley.ParseError)), ("'s'", pytest.raises(parsley.ParseError)), - ("\"imbalanced", pytest.raises(parsley.ParseError)), + ('"imbalanced', pytest.raises(parsley.ParseError)), ("unacceptable;char", pytest.raises(parsley.ParseError)), ("unacceptable%char", pytest.raises(parsley.ParseError)), ("-name:(foo OR bar)", pytest.raises(parsley.ParseError)), ("name:", pytest.raises(parsley.ParseError)), - ('notallowed:foo', pytest.raises(parsley.ParseError)), + ("notallowed:foo", pytest.raises(parsley.ParseError)), ], ) def test_invalid_query(query, expected_exception): with expected_exception: result = _search.parse(query) - print('Result:', result) + print("Result:", result) diff --git a/simple_repository_browser/tests/test_static_files.py b/simple_repository_browser/tests/test_static_files.py index 5892e64..b870667 100644 --- a/simple_repository_browser/tests/test_static_files.py +++ b/simple_repository_browser/tests/test_static_files.py @@ -9,31 +9,37 @@ def test_cli__help(capsys: pytest.CaptureFixture) -> None: with pytest.raises(SystemExit) as err_info: - main(['--help']) + main(["--help"]) assert err_info.value.code == 0 captured = capsys.readouterr() assert len(captured.out.splitlines()) > 1 def test_cli__compile(tmp_path: pathlib.Path) -> None: - simple_static_dir = pathlib.Path(simple_repository_browser.__file__).parent / 'static' - target_static_dir = tmp_path / 'static' + simple_static_dir = ( + pathlib.Path(simple_repository_browser.__file__).parent / "static" + ) + target_static_dir = tmp_path / "static" orig_files = [ - path for path in simple_static_dir.glob('**/*') if path.is_file() and not path.name.startswith('.') + path + for path in simple_static_dir.glob("**/*") + if path.is_file() and not path.name.startswith(".") ] - main(['compile', str(tmp_path / 'static'), str(simple_static_dir)]) + main(["compile", str(tmp_path / "static"), str(simple_static_dir)]) created_files = [ - path for path in target_static_dir.glob('**/*') if path.is_file() and not path.name.startswith('.') + path + for path in target_static_dir.glob("**/*") + if path.is_file() and not path.name.startswith(".") ] assert len(orig_files) > 1 assert len(created_files) == len(orig_files) - manifest_path = target_static_dir / '.manifest.json' + manifest_path = target_static_dir / ".manifest.json" assert manifest_path.exists() manifest = json.loads(manifest_path.read_text()) - file_map = manifest['file-map'] + file_map = manifest["file-map"] assert len(file_map) == len(orig_files) diff --git a/simple_repository_browser/tests/test_view.py b/simple_repository_browser/tests/test_view.py index 0535aae..7392bf6 100644 --- a/simple_repository_browser/tests/test_view.py +++ b/simple_repository_browser/tests/test_view.py @@ -9,11 +9,12 @@ def test_view_format__no_markers(): ) result = render_markers( - req, format_strings={ - 'expr': "{lhs} :{op}: {rhs}", + req, + format_strings={ + "expr": "{lhs} :{op}: {rhs}", }, ) - assert result == '' + assert result == "" def test_view_format__simple_extra(): @@ -22,8 +23,9 @@ def test_view_format__simple_extra(): ) result = render_markers( - req, format_strings={ - 'expr': "{lhs} :{op}: {rhs}", + req, + format_strings={ + "expr": "{lhs} :{op}: {rhs}", }, ) expected = 'extra :==: "blah"' @@ -37,10 +39,11 @@ def test_view_format__nested(): ) result = render_markers( - req, format_strings={ - 'combine_nested_expr': "[{lhs}] [{op}] [{rhs}]", - 'group_expr': '<<{expr}>>', - 'expr': "|{lhs}/ |{op}/ |{rhs}/", + req, + format_strings={ + "combine_nested_expr": "[{lhs}] [{op}] [{rhs}]", + "group_expr": "<<{expr}>>", + "expr": "|{lhs}/ |{op}/ |{rhs}/", }, ) expected = ( @@ -56,9 +59,10 @@ def test_view_format__legacy_format(): req = Requirement("unicodecsv (>=0.8.0); (python_version<'3.0')") result = render_markers( - req, format_strings={ - 'expr': "{lhs} :{op}: {rhs}", - 'combine_nested_expr': '{lhs} @{op}@ {rhs}', + req, + format_strings={ + "expr": "{lhs} :{op}: {rhs}", + "combine_nested_expr": "{lhs} @{op}@ {rhs}", }, ) expected = 'python_version :<: "3.0"' @@ -72,9 +76,10 @@ def test_view_format__simple_extra_plus_os(): ) result = render_markers( - req, format_strings={ - 'expr': "{lhs} :{op}: {rhs}", - 'combine_nested_expr': '{lhs} @{op}@ {rhs}', + req, + format_strings={ + "expr": "{lhs} :{op}: {rhs}", + "combine_nested_expr": "{lhs} @{op}@ {rhs}", }, ) expected = 'python_version :<=: "3.8" @and@ extra :==: "blah"' diff --git a/simple_repository_browser/view.py b/simple_repository_browser/view.py index d91e93c..a8a3e44 100644 --- a/simple_repository_browser/view.py +++ b/simple_repository_browser/view.py @@ -11,7 +11,12 @@ class View: - def __init__(self, templates_paths: typing.Sequence[Path], browser_version: str, static_files_manifest: StaticFilesManifest): + def __init__( + self, + templates_paths: typing.Sequence[Path], + browser_version: str, + static_files_manifest: StaticFilesManifest, + ): self.templates_paths = templates_paths self.version = browser_version self.static_files_manifest = static_files_manifest @@ -19,10 +24,16 @@ def __init__(self, templates_paths: typing.Sequence[Path], browser_version: str, def create_templates_environment(self) -> jinja2.Environment: loader = jinja2.FileSystemLoader(self.templates_paths) - templates = jinja2.Environment(loader=loader, autoescape=True, undefined=jinja2.StrictUndefined) + templates = jinja2.Environment( + loader=loader, autoescape=True, undefined=jinja2.StrictUndefined + ) @jinja2.pass_context - def url_for(context: typing.Mapping[str, typing.Any], name: str, **path_params: typing.Any) -> URL: + def url_for( + context: typing.Mapping[str, typing.Any], + name: str, + **path_params: typing.Any, + ) -> URL: request: fastapi.Request = context["request"] # We don't use request.url_for, as it always returns an absolute URL. # This prohibits running behind a proxy which doesn't correctly set @@ -32,13 +43,15 @@ def url_for(context: typing.Mapping[str, typing.Any], name: str, **path_params: return URL(str(request.app.url_path_for(name, **path_params))) @jinja2.pass_context - def static_file_url(context: typing.Mapping[str, typing.Any], target_file: str) -> URL: + def static_file_url( + context: typing.Mapping[str, typing.Any], target_file: str + ) -> URL: if target_file.startswith("/"): target_file = target_file[1:] filename, _ = self.static_files_manifest.get(target_file) or [None, None] if not filename: raise ValueError(f"Asset not found in manifest: {target_file}") - return url_for(context, 'static', path=filename) + return url_for(context, "static", path=filename) def sizeof_fmt(num: float, suffix: str = "B"): for unit in ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"): @@ -47,11 +60,11 @@ def sizeof_fmt(num: float, suffix: str = "B"): num /= 1024.0 return f"{num:.1f}Yi{suffix}" - templates.globals['url_for'] = url_for - templates.globals['static_file_url'] = static_file_url - templates.globals['fmt_size'] = sizeof_fmt - templates.globals['browser_version'] = self.version - templates.globals['render_markers'] = render_markers + templates.globals["url_for"] = url_for + templates.globals["static_file_url"] = static_file_url + templates.globals["fmt_size"] = sizeof_fmt + templates.globals["browser_version"] = self.version + templates.globals["render_markers"] = render_markers return templates @@ -61,19 +74,27 @@ def render_template( request: fastapi.Request, template: str, ) -> str: - return self.templates_env.get_template(template).render(request=request, **context) + return self.templates_env.get_template(template).render( + request=request, **context + ) # TODO: use typed arguments in the views - def about_page(self, context: model.RepositoryStatsModel, request: fastapi.Request) -> str: + def about_page( + self, context: model.RepositoryStatsModel, request: fastapi.Request + ) -> str: return self.render_template(context, request, "about.html") - def search_page(self, context: model.QueryResultModel, request: fastapi.Request) -> str: + def search_page( + self, context: model.QueryResultModel, request: fastapi.Request + ) -> str: return self.render_template(context, request, "search.html") def index_page(self, request: fastapi.Request) -> str: return self.render_template({}, request, "index.html") - def project_page(self, context: model.ProjectPageModel, request: fastapi.Request) -> str: + def project_page( + self, context: model.ProjectPageModel, request: fastapi.Request + ) -> str: return self.render_template(context, request, "project.html") def error_page(self, context: model.ErrorModel, request: fastapi.Request) -> str: @@ -82,7 +103,7 @@ def error_page(self, context: model.ErrorModel, request: fastapi.Request) -> str def render_markers(requirement: Requirement, *, format_strings: dict[str, str]) -> str: req_marker = requirement.marker - result = '' + result = "" if req_marker: # Access the AST. Not yet a public API, see https://github.com/pypa/packaging/issues/448. markers_ast = req_marker._markers @@ -90,7 +111,9 @@ def render_markers(requirement: Requirement, *, format_strings: dict[str, str]) return result -def render_marker_ast(ast: list | tuple, *, format_strings: dict[str, str]) -> tuple[str, int]: +def render_marker_ast( + ast: list | tuple, *, format_strings: dict[str, str] +) -> tuple[str, int]: # Render the given ast, and return the maximum depth of the ast that was found when rendering. # Comment in https://github.com/pypa/packaging/blob/09f131b326453f18a217fe34f4f7a77603b545db/src/packaging/markers.py#L203C13-L215C16. @@ -115,17 +138,19 @@ def render_marker_ast(ast: list | tuple, *, format_strings: dict[str, str]) -> t if isinstance(ast, list): lhs_str, lhs_maxdepth = render_marker_ast(ast[0], format_strings=format_strings) rhs_str, rhs_maxdepth = render_marker_ast(ast[2], format_strings=format_strings) - group_formatter = format_strings.get('group_expr', '({expr})') + group_formatter = format_strings.get("group_expr", "({expr})") if lhs_maxdepth >= 1: lhs_str = group_formatter.format(expr=lhs_str) if rhs_maxdepth >= 1: rhs_str = group_formatter.format(expr=rhs_str) - format_str = format_strings['combine_nested_expr'] + format_str = format_strings["combine_nested_expr"] result = format_str.format(lhs=lhs_str, op=ast[1], rhs=rhs_str) return result, max([lhs_maxdepth, rhs_maxdepth]) + 1 elif isinstance(ast, tuple): - format_str = format_strings['expr'] - result = format_str.format(lhs=ast[0].serialize(), op=ast[1].serialize(), rhs=ast[2].serialize()) + format_str = format_strings["expr"] + result = format_str.format( + lhs=ast[0].serialize(), op=ast[1].serialize(), rhs=ast[2].serialize() + ) return result, 0 else: - raise TypeError(f'Unhandled marker {ast!r}') + raise TypeError(f"Unhandled marker {ast!r}")