Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
executable file 290 lines (252 sloc) 10.8 KB
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
from __future__ import print_function
import itertools as it, operator as op, functools as ft
from os.path import realpath, ismount, join, exists
import os, sys, io, re, json, subprocess, tempfile, shutil, time, atexit
### systemd unit file:
# [Unit]
# DefaultDependencies=no
# After=systemd-fsck-root.service
# Before=systemd-remount-fs.service -.mount
# ConditionPathExists=!/etc/boot-resize-done
# [Service]
# Type=oneshot
# RemainAfterExit=yes
# ExecStart=/usr/local/bin/resize-rpi-fat32-for-card
# [Install]
def main(args=None, error_func=None):
import argparse
parser = argparse.ArgumentParser(
description='Script to resize card partitions to maximize the size of first FAT one.')
parser.add_argument('--skip-ro-check', action='store_true',
help='Do not abort if rootfs is not read-only when trying to relocate it.')
default='/etc/boot-resize-done', metavar='path',
help='File to create when fs-resize is done.'
' Not checked in the script, but can be checked in e.g. systemd unit.'
' "none" or empty arg disables. Default: %(default)s')
parser.add_argument('-i', '--overlay-image',
default='/srv/resize-card-splash.jpg', metavar='path',
help='Overlay image to set via "splash" tool'
' (from "openvg" project, with -resize option specified).'
' Empty argument or non-existing path will disable it. Default: %(default)s')
parser.add_argument('-n', '--reboot-delay', type=float, metavar='seconds',
help='Insert delay before reboot after actions that require it.'
' Can be used to e.g. read debug stuff that has been displayed on the monitor.')
parser.add_argument('--print-systemd-unit', action='store_true',
help='Print example systemd unit file to stdout and exit.')
parser.add_argument('-d', '--debug', action='store_true', help='Verbose operation mode.')
opts = parser.parse_args(sys.argv[1:] if args is None else args)
if opts.print_systemd_unit:
with open(__file__) as src:
echo = False
for line in iter(src.readline, ''):
if echo: print(re.sub(r'^#+\s*', '', line.strip()))
if'^### systemd unit file:', line):
echo = True
elif echo and not line.strip(): return
global log
import logging
format='%(asctime)s :: %(levelname)s :: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.DEBUG if opts.debug else logging.WARNING )
log = logging.getLogger()
if opts.debug:
from pprint import pformat
log_pp = lambda v: '\n' + pformat(v)
else: log_pp = lambda v: v
os.environ['LANG'] = 'C' # ensures more consistent output
os.environ['PATH'] = '/usr/local/bin:/opt/bin:/usr/bin'
if not error_func:
error_func = lambda fmt,*a,**kw: parser.error(fmt.format(*a, **kw))
if not opts.done_touch_file\
or opts.done_touch_file == 'none'\
or exists(opts.done_touch_file):
opts.done_touch_file = None
root_dev = root_ro = None
boot_part, root_part = 'mmcblk0p1', 'mmcblk0p2'
boot_part_n, root_part_n = 1, 2
checks = set()
out = json.loads(subprocess.check_output('lsblk -Jnb'.split()))
for dev in out['blockdevices']:
root_dev = dev['name']
for n, p in enumerate(dev['children'], 1):
if p['type'] != 'part': continue
if p['name'] == boot_part: # can be not mounted yet
assert n == boot_part_n, [n, boot_part_n]
elif p['mountpoint'] == '/':
assert p['name'] == root_part, p
assert n == root_part_n, [n, root_part_n]
root_ro = int(p['ro'])
elif checks:
error_func('Extra partition detected on root_dev: {}', p)
if checks: break
if checks != {'boot', 'root'}:
error_func('Failed to match rootfs/boot partitions in lsblk output')
root_dev, root_part, boot_part = (
join('/dev', k).encode('utf-8')
for k in [root_dev, root_part, boot_part] )
log.debug('Detected root_dev=%r from lsblk json output', root_dev)
bs, bs_sector = 8 * 2**10, 512
assert bs % bs_sector == 0, [bs, bs_sector]
parse_size = lambda n: int(n.rstrip('B'))
# parted can be replaced with "sfdisk -Jd ...", but that lacks device size
out = subprocess.check_output(
['parted', '-ms', root_dev, 'unit B print free'] )
root_dev_len, parts = None, list()
for n, line in enumerate(out.splitlines()):
assert line.endswith(';'), line
if n == 0:
assert line == 'BYT;', line
line = line[:-1].split(':')
if n == 1:
assert line[0] == root_dev, [root_dev, line]
assert int(line[3]) == int(line[4]) == bs_sector, line
root_dev_len = parse_size(line[1])
n=int(line[0]) if line[4] != 'free' else None,
a=parse_size(line[1]), b=parse_size(line[2]),
len=parse_size(line[3]), t=line[4] ))
assert root_dev_len
assert root_dev_len % bs_sector == 0, [root_dev_len, bs_sector]
log.debug('Detected partition schema: %s', log_pp(parts))
parts_by_n = dict((part['n'], part) for part in parts if part['n'])
assert parts_by_n[boot_part_n]['t'] in ['fat32', 'fat16'], parts
assert parts_by_n[root_part_n]['t'] == 'ext4', parts
assert len(parts_by_n) == 2, parts
part_root, part_boot = parts_by_n[root_part_n], parts_by_n[boot_part_n]
for p in part_root, part_boot:
assert p['a'] % bs == 0, p
assert p['a'] % bs_sector == 0, p
assert p['len'] == p['b'] - p['a'] + 1, p
resize_stage = None
if parts[-1]['t'] == 'free':
assert parts[-2]['t'] == 'ext4', parts
if not opts.skip_ro_check:
if not root_ro:
try: subprocess.check_call(['mount', '-o', 'remount,ro', '/'])
except subprocess.CalledProcessError: pass
log.debug('Remounted rootfs read-only')
root_ro = True
assert root_ro, 'rootfs must be mounted read-only'
resize_stage = 'root'
elif parts[-1]['t'] == 'ext4':
if parts[-2]['t'] == 'free': resize_stage = 'boot' # should not happen
assert parts[-2]['t'] in ['fat32', 'fat16'], parts
assert len(parts) == 3, parts # free, boot, root
fsstat = subprocess.check_output(['fsstat', boot_part])
m =
r'\n\s*File System Layout \(in sectors\)\s*\n'
r'\s*Total Range:\s+0\s+-\s+(\d+)', fsstat)
if not m: error_func('Failed to match FAT32 FS length from fsstat output')
boot_fs_len = (int( + 1) * bs_sector
log.debug( 'boot-fs-size=%s part-size=%s diff=%s',
boot_fs_len, part_boot['len'], part_boot['len'] - boot_fs_len )
if boot_fs_len < part_boot['len']: resize_stage = 'boot-fs'
else: resize_stage = 'done'
if not resize_stage:
error_func('Failed to determine resize_stage from partition table: {}', parts)
log.debug('Resize stage: %s', resize_stage)
if resize_stage == 'done':
if opts.done_touch_file:
log.debug('Creating done-file: %r', opts.done_touch_file)
if root_ro: subprocess.check_call(['mount', '-o', 'remount,rw', '/'])
with open(opts.done_touch_file, 'wb') as dst: pass
except Exception as err:
log.warn('Failed to create done-file: %s', err)
overlay = None
if opts.overlay_image and exists(opts.overlay_image):
try: overlay = subprocess.Popen([
'splash', '-bg', 'black', '-resize', opts.overlay_image ])
except: pass
else: atexit.register(lambda: overlay.terminate())
if resize_stage == 'root':
bs_seek = (root_dev_len - part_root['len']) // bs
assert bs_seek * bs > part_root['b'], [bs_seek * bs, part_root['b']]
bs_count = part_root['len'] // bs + int(bool(part_root['len'] % bs))
boot_part_len_new = (bs_seek * bs) - part_boot['a']
log.debug( 'Cloning data for the rootfs to the'
' new location (seek: %s, count: %s, bs: %s)', bs_seek, bs_count, bs )
subprocess.check_call([ 'dd',
'if={}'.format(root_part), 'of={}'.format(root_dev),
'bs={}'.format(bs), 'count={}'.format(bs_count), 'seek={}'.format(bs_seek),
'status={}'.format('none' if not opts.debug else 'progress') ])
log.debug('Updating partition table for new rootfs location')
label = '\n'.join([
'label: dos', 'label-id: 0xa842b4de',
'device: {}'.format(root_dev), 'unit: sectors',
'{dev} : start= {a}, size= {l}, type=c'.format(
dev=boot_part, a=part_boot['a'] // bs_sector, l=boot_part_len_new // bs_sector ),
'{dev} : start= {a}, size= {l}, type=83'.format(
dev=root_part, a=(bs_seek * bs) // bs_sector,
l=(root_dev_len - bs_seek * bs) // bs_sector + 1 ),
'' ])
proc = subprocess.Popen(['sfdisk', '--no-reread', root_dev], stdin=subprocess.PIPE)
proc = proc.wait()
if proc != 0: error_func('sfdisk exited with error code {}', proc)
elif resize_stage == 'boot':
### This stage should not really be reached
### if partition table gets updated for rootfs and boot in one go.
new_boot_len = part_root['a'] - part_boot['a']
log.debug('Updating partition table for resized /boot')
label = '\n'.join([
'label: dos', 'label-id: 0xa842b4de',
'device: {}'.format(root_dev), 'unit: sectors',
'{dev} : start= {a}, size= {l}, type=c'.format(
dev=boot_part, a=part_boot['a'] // bs_sector, l=new_boot_len // bs_sector ),
'{dev} : start= {a}, size= {l}, type=83'.format(
dev=root_part, a=part_root['a'] // bs_sector, l=part_root['len'] // bs_sector ),
'' ])
proc = subprocess.Popen(['sfdisk', '--no-reread', root_dev], stdin=subprocess.PIPE)
proc = proc.wait()
if proc != 0: error_func('sfdisk exited with error code {}', proc)
elif resize_stage == 'boot-fs':
log.debug('Remounting rootfs rw')
subprocess.Popen(['mount', '-o', 'remount,rw', '/'])
tmp_dir = tempfile.mkdtemp(dir='/var/tmp', prefix='boot_resize.')
# atexit.register(lambda: shutil.rmtree(tmp_dir, ignore_errors=True))
log.debug('rootfs tmp dir for /boot contents: %r', tmp_dir)
log.debug('Backing-up /boot contents (dir: %r)', tmp_dir)
if not ismount('/boot'):['fsck', '-a', boot_part])
subprocess.check_call(['mount', '-o', 'ro', boot_part, '/boot'])
subprocess.check_call(['cp', '-a', '/boot/.', join(tmp_dir, '.')])
subprocess.check_call(['umount', '/boot'])
log.debug('Re-creating /boot fs (dev: %r)', boot_part)
subprocess.check_call(['mkfs.vfat', boot_part])
subprocess.check_call(['mount', '-o', 'rw', boot_part, '/boot'])
log.debug('Restoring /boot contents (from dir: %r)', tmp_dir)
subprocess.check_call(['cp', '-a', join(tmp_dir, '.'), '/boot/.'])
subprocess.check_call(['umount', '/boot'])
if opts.done_touch_file:
log.debug('Creating done-file: %r', opts.done_touch_file)
with open(opts.done_touch_file, 'wb') as dst: pass
raise NotImplementedError(resize_stage)
log.debug( 'Resize stage %r completed successfully,'
' initiating reboot (delay: %.1f)', resize_stage, opts.reboot_delay or 0 )
if opts.reboot_delay: time.sleep(opts.reboot_delay)
os.execvp('systemctl', ['systemctl', 'reboot'])
if __name__ == '__main__': sys.exit(main())