|
| 1 | +#!/usr/bin/env python3 |
| 2 | + |
| 3 | +""" |
| 4 | +Probe lfs files. |
| 5 | +For each source file provided as output, this will print: |
| 6 | +* "local", if the source file is not an LFS pointer |
| 7 | +* the sha256 hash, a space character and a transient download link obtained via the LFS protocol otherwise |
| 8 | +""" |
| 9 | + |
| 10 | +import sys |
| 11 | +import pathlib |
| 12 | +import subprocess |
| 13 | +import os |
| 14 | +import shutil |
| 15 | +import json |
| 16 | +import urllib.request |
| 17 | +from urllib.parse import urlparse |
| 18 | +import re |
| 19 | +import base64 |
| 20 | +from dataclasses import dataclass |
| 21 | + |
| 22 | + |
| 23 | +@dataclass |
| 24 | +class Endpoint: |
| 25 | + href: str |
| 26 | + headers: dict[str, str] |
| 27 | + |
| 28 | + def update_headers(self, d: dict[str, str]): |
| 29 | + self.headers.update((k.capitalize(), v) for k, v in d.items()) |
| 30 | + |
| 31 | + |
| 32 | +sources = [pathlib.Path(arg).resolve() for arg in sys.argv[1:]] |
| 33 | +source_dir = pathlib.Path(os.path.commonpath(src.parent for src in sources)) |
| 34 | +source_dir = subprocess.check_output(["git", "rev-parse", "--show-toplevel"], cwd=source_dir, text=True).strip() |
| 35 | + |
| 36 | + |
| 37 | +def get_env(s, sep="="): |
| 38 | + ret = {} |
| 39 | + for m in re.finditer(fr'(.*?){sep}(.*)', s, re.M): |
| 40 | + ret.setdefault(*m.groups()) |
| 41 | + return ret |
| 42 | + |
| 43 | + |
| 44 | +def git(*args, **kwargs): |
| 45 | + return subprocess.run(("git",) + args, stdout=subprocess.PIPE, text=True, cwd=source_dir, **kwargs).stdout.strip() |
| 46 | + |
| 47 | + |
| 48 | +def get_endpoint(): |
| 49 | + lfs_env = get_env(subprocess.check_output(["git", "lfs", "env"], text=True, cwd=source_dir)) |
| 50 | + endpoint = next(v for k, v in lfs_env.items() if k.startswith('Endpoint')) |
| 51 | + endpoint, _, _ = endpoint.partition(' ') |
| 52 | + ssh_endpoint = lfs_env.get(" SSH") |
| 53 | + endpoint = Endpoint(endpoint, { |
| 54 | + "Content-Type": "application/vnd.git-lfs+json", |
| 55 | + "Accept": "application/vnd.git-lfs+json", |
| 56 | + }) |
| 57 | + if ssh_endpoint: |
| 58 | + # see https://github.com/git-lfs/git-lfs/blob/main/docs/api/authentication.md |
| 59 | + server, _, path = ssh_endpoint.partition(":") |
| 60 | + ssh_command = shutil.which(os.environ.get("GIT_SSH", os.environ.get("GIT_SSH_COMMAND", "ssh"))) |
| 61 | + assert ssh_command, "no ssh command found" |
| 62 | + resp = json.loads(subprocess.check_output([ssh_command, server, "git-lfs-authenticate", path, "download"])) |
| 63 | + endpoint.href = resp.get("href", endpoint) |
| 64 | + endpoint.update_headers(resp.get("header", {})) |
| 65 | + url = urlparse(endpoint.href) |
| 66 | + # this is how actions/checkout persist credentials |
| 67 | + # see https://github.com/actions/checkout/blob/44c2b7a8a4ea60a981eaca3cf939b5f4305c123b/src/git-auth-helper.ts#L56-L63 |
| 68 | + auth = git("config", f"http.{url.scheme}://{url.netloc}/.extraheader") |
| 69 | + endpoint.update_headers(get_env(auth, sep=": ")) |
| 70 | + if "GITHUB_TOKEN" in os.environ: |
| 71 | + endpoint.headers["Authorization"] = f"token {os.environ['GITHUB_TOKEN']}" |
| 72 | + if "Authorization" not in endpoint.headers: |
| 73 | + # last chance: use git credentials (possibly backed by a credential helper like the one installed by gh) |
| 74 | + # see https://git-scm.com/docs/git-credential |
| 75 | + credentials = get_env(git("credential", "fill", check=True, |
| 76 | + # drop leading / from url.path |
| 77 | + input=f"protocol={url.scheme}\nhost={url.netloc}\npath={url.path[1:]}\n")) |
| 78 | + auth = base64.b64encode(f'{credentials["username"]}:{credentials["password"]}'.encode()).decode('ascii') |
| 79 | + endpoint.headers["Authorization"] = f"Basic {auth}" |
| 80 | + return endpoint |
| 81 | + |
| 82 | + |
| 83 | +# see https://github.com/git-lfs/git-lfs/blob/310d1b4a7d01e8d9d884447df4635c7a9c7642c2/docs/api/basic-transfers.md |
| 84 | +def get_locations(objects): |
| 85 | + endpoint = get_endpoint() |
| 86 | + indexes = [i for i, o in enumerate(objects) if o] |
| 87 | + ret = ["local" for _ in objects] |
| 88 | + req = urllib.request.Request( |
| 89 | + f"{endpoint.href}/objects/batch", |
| 90 | + headers=endpoint.headers, |
| 91 | + data=json.dumps({ |
| 92 | + "operation": "download", |
| 93 | + "transfers": ["basic"], |
| 94 | + "objects": [o for o in objects if o], |
| 95 | + "hash_algo": "sha256", |
| 96 | + }).encode("ascii"), |
| 97 | + ) |
| 98 | + with urllib.request.urlopen(req) as resp: |
| 99 | + data = json.load(resp) |
| 100 | + assert len(data["objects"]) == len(indexes), f"received {len(data)} objects, expected {len(indexes)}" |
| 101 | + for i, resp in zip(indexes, data["objects"]): |
| 102 | + ret[i] = f'{resp["oid"]} {resp["actions"]["download"]["href"]}' |
| 103 | + return ret |
| 104 | + |
| 105 | + |
| 106 | +def get_lfs_object(path): |
| 107 | + with open(path, 'rb') as fileobj: |
| 108 | + lfs_header = "version https://git-lfs.github.com/spec".encode() |
| 109 | + actual_header = fileobj.read(len(lfs_header)) |
| 110 | + sha256 = size = None |
| 111 | + if lfs_header != actual_header: |
| 112 | + return None |
| 113 | + data = get_env(fileobj.read().decode('ascii'), sep=' ') |
| 114 | + assert data['oid'].startswith('sha256:'), f"unknown oid type: {data['oid']}" |
| 115 | + _, _, sha256 = data['oid'].partition(':') |
| 116 | + size = int(data['size']) |
| 117 | + return {"oid": sha256, "size": size} |
| 118 | + |
| 119 | + |
| 120 | +objects = [get_lfs_object(src) for src in sources] |
| 121 | +for resp in get_locations(objects): |
| 122 | + print(resp) |
0 commit comments