Skip to content

Commit

Permalink
wip: write headers
Browse files Browse the repository at this point in the history
  • Loading branch information
swysocki committed Jan 13, 2022
1 parent c2ea588 commit 05aab61
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 46 deletions.
1 change: 1 addition & 0 deletions gpt_image/disk.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class Disk:

def __init__(self, image_path: str, size: int = 0) -> None:
"""Init Disk with a file path and size in bytes"""
# @TODO: check that disk is large enough to contain all table data
self._image_path = pathlib.Path(image_path)
self.name = self._image_path.name
self._size = size
Expand Down
118 changes: 72 additions & 46 deletions gpt_image/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
"""
import binascii
import io
import uuid
from dataclasses import dataclass
from typing import List

from gpt_image.disk import Geometry
from gpt_image.disk import Disk, Geometry


@dataclass
Expand Down Expand Up @@ -40,37 +39,36 @@ class Header:
def __init__(self, geometry: Geometry, is_backup: bool = False):
self.backup = is_backup
self.geometry = geometry
self._buffer = io.BytesIO()
self._header_sig = TableEntry(0, 8, b"EFI PART")
self._revision = TableEntry(8, 4, b"\x00\x00\x01\x00")
self._header_size = TableEntry(12, 4, (92).to_bytes(4, "little"))
self._header_crc = TableEntry(16, 4, (0).to_bytes(4, "little"))
self._reserved = TableEntry(20, 4, (0).to_bytes(4, "little"))
self._primary_header_lba = TableEntry(
self.header_sig = TableEntry(0, 8, b"EFI PART")
self.revision = TableEntry(8, 4, b"\x00\x00\x01\x00")
self.header_size = TableEntry(12, 4, (92).to_bytes(4, "little"))
self.header_crc = TableEntry(16, 4, (0).to_bytes(4, "little"))
self.reserved = TableEntry(20, 4, (0).to_bytes(4, "little"))
self.primary_header_lba = TableEntry(
24, 8, (self.geometry.primary_header_lba).to_bytes(8, "little")
)
self._secondary_header_lba = TableEntry(
self.secondary_header_lba = TableEntry(
32, 8, (self.geometry.backup_header_lba).to_bytes(8, "little")
)
self._partition_start_lba = TableEntry(
self.partition_start_lba = TableEntry(
40, 8, (self.geometry.partition_start_lba).to_bytes(8, "little")
)
self._partition_last_lba = TableEntry(
self.partition_last_lba = TableEntry(
48, 8, (self.geometry.partition_last_lba).to_bytes(8, "little")
)
self._disk_guid = TableEntry(56, 16, uuid.uuid4().bytes_le)
self._partition_array_start = TableEntry(
self.disk_guid = TableEntry(56, 16, uuid.uuid4().bytes_le)
self.partition_array_start = TableEntry(
72, 8, (self.geometry.primary_array_lba).to_bytes(8, "little")
)
self._partition_array_length = TableEntry(80, 4, (128).to_bytes(4, "little"))
self._partition_entry_size = TableEntry(84, 4, (128).to_bytes(4, "little"))
self._partition_array_crc = TableEntry(88, 4, (0).to_bytes(4, "little"))
self._reserved_padding = TableEntry(92, 420, b"\x00" * 420)
self.partition_array_length = TableEntry(80, 4, (128).to_bytes(4, "little"))
self.partition_entry_size = TableEntry(84, 4, (128).to_bytes(4, "little"))
self.partition_array_crc = TableEntry(88, 4, (0).to_bytes(4, "little"))
self.reserved_padding = TableEntry(92, 420, b"\x00" * 420)
# the secondary header adjustments
if self.backup:
self._primary_header_lba.offset = 32
self._secondary_header_lba.offset = 24
self._partition_array_start.data = (
self.primary_header_lba.offset = 32
self.secondary_header_lba.offset = 24
self.partition_array_start.data = (
self.geometry.backup_header_array_lba
).to_bytes(8, "little")

Expand All @@ -84,24 +82,29 @@ def __init__(self, geometry: Geometry, is_backup: bool = False):

# group the header fields to allow byte operations such as
# checksum
# this can be done with the `inspect` module
self.header_fields = [
self._header_sig,
self._revision,
self._header_size,
self._header_crc,
self._reserved,
self._primary_header_lba,
self._secondary_header_lba,
self._partition_start_lba,
self._partition_last_lba,
self._disk_guid,
self._partition_array_start,
self._partition_array_length,
self._partition_entry_size,
self._partition_array_crc,
self._reserved_padding,
self.header_sig,
self.revision,
self.header_size,
self.header_crc,
self.reserved,
self.primary_header_lba,
self.secondary_header_lba,
self.partition_start_lba,
self.partition_last_lba,
self.disk_guid,
self.partition_array_start,
self.partition_array_length,
self.partition_entry_size,
self.partition_array_crc,
self.reserved_padding,
]

def as_bytes(self) -> bytes:
"""Return the header as bytes"""
return [x.data for x in self.header.header_fields]


class Partition:
@dataclass
Expand All @@ -116,7 +119,7 @@ class Type:

def __init__(self):
"""
# @TOOD: add 64 bits of partition attributes
# @TODO: add 64 bits of partition attributes
"""
self.type_guid = TableEntry(
0, 16, uuid.UUID(Partition.Type.LinuxFileSystem).bytes_le
Expand All @@ -136,6 +139,10 @@ def __init__(self):
self.partition_name,
]

def as_bytes(self) -> bytes:
"""Return the partition as bytes"""
return [x.data for x in self.partition.partition_fields]


class Table:
"""GPT Partition Table Object
Expand All @@ -145,22 +152,42 @@ class Table:
is done with this object.
"""

def __init__(self, geometry: Geometry, sector_size: int = 512) -> None:
self.buffer = io.BytesIO()
self.geometry = geometry
self.primary_header = Header(geometry)
self.secondary_header = Header(geometry, is_backup=True)
def __init__(self, disk: Disk, sector_size: int = 512) -> None:
self.disk = disk
self.geometry = disk.geometry
self.primary_header = Header(self.geometry)
self.secondary_header = Header(self.geometry, is_backup=True)
# partition entry array
self.partitions: List[Partition] = []

def write(self):
"""Write the table to disk"""
pass
# calculate partition checksum and write to header
self.checksum_partitions(self.primary_header)
self.checksum_partitions(self.secondary_header)
# calculate header checksum and write to header
self.checksum_header(self.primary_header)
self.checksum_header(self.secondary_header)

with open(self.disk.name, "+b") as f:
# move to primary header location and write
f.seek(self.geometry.primary_header_byte)
f.write(self.primary_header.as_bytes())
# move to secondary header location and write
f.seek(self.geometry.backup_header_byte)
f.write(self.secondary_header.as_bytes())

def checksum_partitions(self, header: Header):
"""Checksum the partition entries"""

partition_bytes = b""
for part in self.partitions:
for field in part.partition_fields:
partition_bytes += field.data
header._partition_array_crc.data = binascii.crc32(partition_bytes)
header.partition_array_crc.data = binascii.crc32(partition_bytes).to_bytes(
4, "little"
)

def checksum_header(self, header: Header):
"""Checksum the table header
Expand All @@ -169,6 +196,5 @@ def checksum_header(self, header: Header):
after that has been written.
"""
# zero the old checksum before calculating
header._header_crc.data = b"\x00" * 4
header_bytes = [x.data for x in header.header_fields]
header._header_crc.data = binascii.crc32(header_bytes)
header.header_crc.data = b"\x00" * 4
header.header_crc.data = binascii.crc32(header.as_bytes).to_bytes(4, "little")

0 comments on commit 05aab61

Please sign in to comment.