Skip to content
This repository was archived by the owner on Jan 23, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,26 @@ class FileAddress(BaseModel):
file: str
address: str


class DtbVariant(BaseModel):
default: str
address: str
variants: dict[str, str]


class FlasherLogin(BaseModel):
login_prompt: str
username: str | None = None
password: str | None = None
prompt: str


class FlashBundleSpecV1Alpha1(BaseModel):
manufacturer: str
link: Optional[str]
bootcmd: str
shelltype: Literal["busybox"] = Field(default="busybox")
login: FlasherLogin = Field(
default_factory=lambda: FlasherLogin(
login_prompt="login:",
prompt="#")
)
login: FlasherLogin = Field(default_factory=lambda: FlasherLogin(login_prompt="login:", prompt="#"))
default_target: str
targets: dict[str, str]
kernel: FileAddress
Expand All @@ -41,6 +40,7 @@ class FlashBundleSpecV1Alpha1(BaseModel):
class ObjectMeta(BaseModel):
name: str


class FlasherBundleManifestV1Alpha1(BaseModel):
apiVersion: Literal["jumpstarter.dev/v1alpha1"] = Field(default="jumpstarter.dev/v1alpha1")
kind: Literal["FlashBundleManifest"] = Field(default="FlashBundleManifest")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@

from jumpstarter_driver_flashers.bundle import FlasherBundleManifestV1Alpha1

from .uboot import UbootConsole
from jumpstarter.common.exceptions import ArgumentError

debug_console_option = click.option("--console-debug", is_flag=True, help="Enable console debug mode")


@dataclass(kw_only=True)
class BaseFlasherClient(FlasherClient, CompositeClient):
"""
Expand All @@ -41,6 +41,7 @@ def __post_init__(self):
def set_console_debug(self, debug: bool):
"""Set console debug mode"""
self._console_debug = debug
# TODO: also set console debug on uboot client

@contextmanager
def busybox_shell(self):
Expand All @@ -60,12 +61,8 @@ def bootloader_shell(self):
self.logger.info("Setting up flasher bundle files in exporter")
self.call("setup_flasher_bundle")
with self._services_up():
with self.serial.pexpect() as console:
if self._console_debug:
console.logfile_read = sys.stdout.buffer
uboot = UbootConsole(console=console, power=self.power, logger=self.logger)
uboot.reboot_to_console()
console.sendline("")
with self.uboot.reboot_to_console():
pass
yield self.serial

def flash(
Expand Down Expand Up @@ -99,10 +96,11 @@ def flash(
error_queue = Queue()

# Start the storage write operation in the background
storage_thread = threading.Thread(target=self._transfer_bg_thread,
args=(path, operator, operator_scheme,
os_image_checksum, self.http.storage, error_queue),
name="storage_transfer")
storage_thread = threading.Thread(
target=self._transfer_bg_thread,
args=(path, operator, operator_scheme, os_image_checksum, self.http.storage, error_queue),
name="storage_transfer",
)
storage_thread.start()

# Make the exporter download the bundle contents and set files in the right places
Expand Down Expand Up @@ -155,7 +153,6 @@ def flash(
self.logger.info("Powering off target")
self.power.off()


def _flash_with_progress(self, console, manifest, path, image_url, target_path):
"""Flash image to target device with progress monitoring.

Expand All @@ -170,8 +167,8 @@ def _flash_with_progress(self, console, manifest, path, image_url, target_path):
decompress_cmd = _get_decompression_command(path)
flash_cmd = (
f'( wget -q -O - "{image_url}" | '
f'{decompress_cmd} '
f'dd of={target_path} bs=64k iflag=fullblock oflag=direct) &'
f"{decompress_cmd} "
f"dd of={target_path} bs=64k iflag=fullblock oflag=direct) &"
)
console.sendline(flash_cmd)
console.expect(manifest.spec.login.prompt, timeout=60)
Expand All @@ -190,16 +187,16 @@ def _flash_with_progress(self, console, manifest, path, image_url, target_path):
if "No such file or directory" in console.before.decode(errors="ignore"):
break
data = console.before.decode(errors="ignore")
match = re.search(r'pos:\s+(\d+)', data)
match = re.search(r"pos:\s+(\d+)", data)
if match:
current_bytes = int(match.group(1))
current_time = time.time()
elapsed = current_time - last_time

if elapsed >= 1.0: # Update speed every second
bytes_diff = current_bytes - last_pos
speed_mb = (bytes_diff / (1024*1024)) / elapsed
total_mb = current_bytes/(1024*1024)
speed_mb = (bytes_diff / (1024 * 1024)) / elapsed
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you pass to reformat those files? should we set it in ruff linting?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing, just uv run ruff format && uv run ruff check --fix --unsafe-fixes

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah uv run ruff format thanks! :)

total_mb = current_bytes / (1024 * 1024)
self.logger.info(f"Flash progress: {total_mb:.2f} MB, Speed: {speed_mb:.2f} MB/s")

last_pos = current_bytes
Expand All @@ -209,7 +206,6 @@ def _flash_with_progress(self, console, manifest, path, image_url, target_path):
console.sendline("sync")
console.expect(manifest.spec.login.prompt, timeout=1200)


def _get_target_device(self, target: str, manifest: FlasherBundleManifestV1Alpha1, console) -> str:
"""Get the target device path from the manifest, resolving block devices if needed.

Expand All @@ -229,15 +225,19 @@ def _get_target_device(self, target: str, manifest: FlasherBundleManifestV1Alpha
raise ArgumentError(f"Target {target} not found in manifest")

if target_path.startswith("/sys/class/block#"):
target_path = self._lookup_block_device(
console, manifest.spec.login.prompt, target_path.split("#")[1])
target_path = self._lookup_block_device(console, manifest.spec.login.prompt, target_path.split("#")[1])

return target_path


def _transfer_bg_thread(self, src_path: PathBuf, src_operator: Operator, src_operator_scheme: str,
known_hash: str | None,
to_storage: OpendalClient, error_queue):
def _transfer_bg_thread(
self,
src_path: PathBuf,
src_operator: Operator,
src_operator_scheme: str,
known_hash: str | None,
to_storage: OpendalClient,
error_queue,
):
"""Transfer image to storage in the background

Args:
Expand Down Expand Up @@ -285,7 +285,6 @@ def _transfer_bg_thread(self, src_path: PathBuf, src_operator: Operator, src_ope
raise

def _sha256_file(self, src_operator, src_path) -> str:

m = hashlib.sha256()
with src_operator.open(src_path, "rb") as f:
while True:
Expand All @@ -299,11 +298,13 @@ def _sha256_file(self, src_operator, src_path) -> str:
def _create_metadata_and_json(self, src_operator, src_path) -> tuple[Metadata, str]:
"""Create a metadata json string from a metadata object"""
metadata = src_operator.stat(src_path)
return metadata, json.dumps({
"path": str(src_path),
"content_length": metadata.content_length,
"etag": metadata.etag,
})
return metadata, json.dumps(
{
"path": str(src_path),
"content_length": metadata.content_length,
"etag": metadata.etag,
}
)

def _lookup_block_device(self, console, prompt, address: str) -> str:
"""Lookup block device for a given address.
Expand All @@ -317,7 +318,7 @@ def _lookup_block_device(self, console, prompt, address: str) -> str:
# lrwxrwxrwx 1 root root 0 Jan 1
# 00:00 mmcblk1 -> ../../devices/platform/bus@100000/4fb0000.mmc/mmc_host/mmc1/mmc1:aaaa/block/mmcblk1
output = console.before.decode(errors="ignore")
match = re.search(r'\s(\w+)\s->', output)
match = re.search(r"\s(\w+)\s->", output)
if match:
return "/dev/" + match.group(1)
else:
Expand Down Expand Up @@ -359,54 +360,53 @@ def _services_up(self):
self.http.stop()
self.tftp.stop()


def _generate_uboot_env(self):
"""Generate a uboot environment dictionary, may need specific overrides for different targets"""
tftp_host = self.tftp.get_host()
return {
"serverip": tftp_host,
}


@contextmanager
def _busybox(self):
"""Start a busybox shell.

This is a helper context manager that boots the device into uboot and returns a console object.
"""
with self.serial.pexpect() as console:
if self._console_debug:
console.logfile_read = sys.stdout.buffer
uboot = UbootConsole(console=console, power=self.power, logger=self.logger)
# make sure that the device is booted into the uboot console
uboot.reboot_to_console()

# make sure that the device is booted into the uboot console
with self.uboot.reboot_to_console():
# run dhcp discovery and gather details useful for later
self._dhcp_details = uboot.setup_dhcp()
self._dhcp_details = self.uboot.setup_dhcp()
self.logger.info(f"discovered dhcp details: {self._dhcp_details}")

# configure the environment necessary
env = self._generate_uboot_env()
uboot.set_env_dict(env)
self.uboot.set_env_dict(env)

# load any necessary files to RAM from the tftp storage
manifest = self.manifest
kernel_filename = Path(manifest.get_kernel_file()).name
kernel_address = manifest.get_kernel_address()

uboot.run_command(f"tftpboot {kernel_address} {kernel_filename}", timeout=120)
self.uboot.run_command(f"tftpboot {kernel_address} {kernel_filename}", timeout=120)

if manifest.get_initram_file():
initram_filename = Path(manifest.get_initram_file()).name
initram_address = manifest.get_initram_address()
uboot.run_command(f"tftpboot {initram_address} {initram_filename}", timeout=120)
self.uboot.run_command(f"tftpboot {initram_address} {initram_filename}", timeout=120)

if manifest.get_dtb_file():
dtb_filename = Path(manifest.get_dtb_file()).name
dtb_address = manifest.get_dtb_address()
uboot.run_command(f"tftpboot {dtb_address} {dtb_filename}", timeout=120)
self.uboot.run_command(f"tftpboot {dtb_address} {dtb_filename}", timeout=120)

with self.serial.pexpect() as console:
if self._console_debug:
console.logfile_read = sys.stdout.buffer

self.logger.info(f"Running boot command: {manifest.spec.bootcmd}")
console.send(manifest.spec.bootcmd +"\n")
console.send(manifest.spec.bootcmd + "\n")

# if manifest has login details, we need to login
if manifest.spec.login.username:
Expand Down Expand Up @@ -438,7 +438,7 @@ def use_initram(self, path: PathBuf, operator: Operator | None = None):
def use_kernel(self, path: PathBuf, operator: Operator | None = None):
"""Use kernel file"""
if operator is None:
path, operator, operator_scheme = operator_for_path(path)
path, operator, operator_scheme = operator_for_path(path)

...

Expand All @@ -461,27 +461,37 @@ def base():
@base.command()
@click.argument("file")
@click.option("--partition", type=str)
@click.option('--os-image-checksum',
help='SHA256 checksum of OS image (direct value)')
@click.option('--os-image-checksum-file',
help='File containing SHA256 checksum of OS image',
type=click.Path(exists=True, dir_okay=False))
@click.option('--force-exporter-http', is_flag=True, help='Force use of exporter HTTP')
@click.option('--force-flash-bundle', type=str, help='Force use of a specific flasher OCI bundle')
@click.option("--os-image-checksum", help="SHA256 checksum of OS image (direct value)")
@click.option(
"--os-image-checksum-file",
help="File containing SHA256 checksum of OS image",
type=click.Path(exists=True, dir_okay=False),
)
@click.option("--force-exporter-http", is_flag=True, help="Force use of exporter HTTP")
@click.option("--force-flash-bundle", type=str, help="Force use of a specific flasher OCI bundle")
@debug_console_option
def flash(file, partition, os_image_checksum, os_image_checksum_file,
console_debug, force_exporter_http, force_flash_bundle):
def flash(
file,
partition,
os_image_checksum,
os_image_checksum_file,
console_debug,
force_exporter_http,
force_flash_bundle,
):
"""Flash image to DUT from file"""
if os_image_checksum_file and os.path.exists(os_image_checksum_file):
with open(os_image_checksum_file) as f:
os_image_checksum = f.read().strip().split()[0]
self.logger.info(f"Read checksum from file: {os_image_checksum}")

self.set_console_debug(console_debug)
self.flash(file,
partition=partition,
force_exporter_http=force_exporter_http,
force_flash_bundle=force_flash_bundle)
self.flash(
file,
partition=partition,
force_exporter_http=force_exporter_http,
force_flash_bundle=force_flash_bundle,
)

@base.command()
@debug_console_option
Expand Down Expand Up @@ -522,8 +532,8 @@ def _get_decompression_command(filename_or_url) -> str:
filename = urlparse(filename_or_url).path.split("/")[-1]

filename = filename.lower()
if filename.endswith(('.gz', '.gzip')):
return 'zcat |'
elif filename.endswith('.xz'):
return 'xzcat |'
return ''
if filename.endswith((".gz", ".gzip")):
return "zcat |"
elif filename.endswith(".xz"):
return "xzcat |"
return ""
Loading