Skip to content

Commit

Permalink
wip: more surgery
Browse files Browse the repository at this point in the history
  • Loading branch information
swysocki committed Jan 12, 2022
1 parent 6cf051e commit c2ea588
Showing 1 changed file with 136 additions and 108 deletions.
244 changes: 136 additions & 108 deletions gpt_image/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,141 +6,169 @@
import io
import uuid
from dataclasses import dataclass
from typing import Union
from typing import List

from gpt_image.disk import Geometry


class Table:
"""GPT Partition Table Object
@dataclass
class TableEntry:
"""Individual table entries
Each disk has two partition tables, primary and backup. The
Table class represents one of the partition tables.
Creates a consistent structure for writing GPT table data to its
respective locations. This is used for headers and partitions.
The data field only accepts bytes to simplify writing back to a buffer.
"""

@dataclass
class HeaderEntry:
offset: int
length: int
content: Union[int, str, bytes]

def __init__(
self, geometry: Geometry, is_backup: bool = False, sector_size: int = 512
) -> None:
offset: int
length: int
data: bytes
# @TODO: make default data b"\x00" * length


class Header:
"""GPT Partition Table Header Object
Each table has two GPT headers, primary and secondary (backup). The primary is
written to LBA 1 and secondary is written to LBA -1. The GPT Headers contain
various data including the locations of one another. Therefore two headers
are created for each table.
"""

def __init__(self, geometry: Geometry, is_backup: bool = False):
self.backup = is_backup
self.buffer = io.BytesIO()
self.geometry = geometry
self._header_sig = Table.HeaderEntry(0, 8, b"EFI PART")
self._revision = Table.HeaderEntry(8, 4, b"\x00\x00\x01\x00")
self._header_size = Table.HeaderEntry(12, 4, 92)
self._header_crc = Table.HeaderEntry(16, 4, 0)
self._reserved = Table.HeaderEntry(20, 4, 0)
self._primary_header_lba = Table.HeaderEntry(24, 8, geometry.primary_header_lba)
self._secondary_header_lba = Table.HeaderEntry(
32, 8, geometry.backup_header_lba
self._buffer = io.BytesIO()
self._header_sig = TableEntry(0, 8, b"EFI PART")
self._revision = TableEntry(8, 4, b"\x00\x00\x01\x00")
self._header_size = TableEntry(12, 4, (92).to_bytes(4, "little"))
self._header_crc = TableEntry(16, 4, (0).to_bytes(4, "little"))
self._reserved = TableEntry(20, 4, (0).to_bytes(4, "little"))
self._primary_header_lba = TableEntry(
24, 8, (self.geometry.primary_header_lba).to_bytes(8, "little")
)
if self.backup:
self._primary_header_lba.offset = 32
self._secondary_header_lba.offset = 24
self._partition_start_lba = Table.HeaderEntry(
40, 8, geometry.partition_start_lba
self._secondary_header_lba = TableEntry(
32, 8, (self.geometry.backup_header_lba).to_bytes(8, "little")
)
self._partition_start_lba = TableEntry(
40, 8, (self.geometry.partition_start_lba).to_bytes(8, "little")
)
self._partition_last_lba = Table.HeaderEntry(48, 8, geometry.partition_last_lba)
self._disk_guid = Table.HeaderEntry(56, 16, uuid.uuid4().bytes_le)
self._partition_array_start = Table.HeaderEntry(
72, 8, geometry.primary_array_lba
self._partition_last_lba = TableEntry(
48, 8, (self.geometry.partition_last_lba).to_bytes(8, "little")
)
self._disk_guid = TableEntry(56, 16, uuid.uuid4().bytes_le)
self._partition_array_start = TableEntry(
72, 8, (self.geometry.primary_array_lba).to_bytes(8, "little")
)
self._partition_array_length = TableEntry(80, 4, (128).to_bytes(4, "little"))
self._partition_entry_size = TableEntry(84, 4, (128).to_bytes(4, "little"))
self._partition_array_crc = TableEntry(88, 4, (0).to_bytes(4, "little"))
self._reserved_padding = TableEntry(92, 420, b"\x00" * 420)
# the secondary header adjustments
if self.backup:
self._partition_array_start.content = geometry.backup_header_array_lba
self._partition_array_length = Table.HeaderEntry(80, 4, 128)
self._partition_entry_size = Table.HeaderEntry(84, 4, 128)
self._partition_array_crc = Table.HeaderEntry(88, 4, 0)
self._primary_header_lba.offset = 32
self._secondary_header_lba.offset = 24
self._partition_array_start.data = (
self.geometry.backup_header_array_lba
).to_bytes(8, "little")

# header start byte relative the table itself, not the disk
# primary will be 0 backup will be LBA 32
# primary will be 0 secondary will be LBA 32
self.header_start_byte = 0
self.partition_entry_start_byte = int(1 * sector_size)
self.partition_entry_start_byte = int(1 * self.geometry.sector_size)
if self.backup:
self.header_start_byte = int(32 * sector_size)
self.header_start_byte = int(32 * self.geometry.sector_size)
self.partition_entry_start_byte = 0

def _write_table(self) -> None:
# start with blank table
self.buffer.write(b"\x00" * 33 * self.geometry.sector_size)
# group the header fields to allow byte operations such as
# checksum
self.header_fields = [
self._header_sig,
self._revision,
self._header_size,
self._header_crc,
self._reserved,
self._primary_header_lba,
self._secondary_header_lba,
self._partition_start_lba,
self._partition_last_lba,
self._disk_guid,
self._partition_array_start,
self._partition_array_length,
self._partition_entry_size,
self._partition_array_crc,
self._reserved_padding,
]


class Partition:
@dataclass
class Type:
"""GPT Partition Types
def _write_header(self) -> None:
"""Write the table header to proper location
https://en.wikipedia.org/wiki/GUID_Partition_Table#Partition_entries
"""

The header is typically 92 bytes with the remainder of bytes in the sector
zeroed. The primary header is at LBA 1 with the partition entries at 2 - 33.
The backup header is at LBA -1 with the partition entries at -2 to -33.
LinuxFileSystem = "0FC63DAF-8483-4772-8E79-3D69D8477DE4"
EFISystemPartition = "C12A7328-F81F-11D2-BA4B-00A0C93EC93B"

def __init__(self):
"""
self._write_section(self._header_sig)
self._write_section(self._revision)
self._write_section(self._header_size)
self._write_section(self._header_crc)
self._write_section(self._reserved)
self._write_section(self._primary_header_lba)
self._write_section(self._secondary_header_lba)
self._write_section(self._partition_start_lba)
self._write_section(self._partition_last_lba)
self._write_section(self._disk_guid)
self._write_section(self._partition_array_start)
self._write_section(self._partition_array_length)
self._write_section(self._partition_entry_size)
self._write_section(self._partition_array_crc)

# @TODO: write partition entries

# once the partitions are written its CRC is calculated
self._checksum_partitions()

# once the header is written its CRC is calculated
self._checksum_header()

def _write_section(self, entry: HeaderEntry) -> None:
"""Write a GPT header entry
Args:
entry: HeaderEntry object
# @TOOD: add 64 bits of partition attributes
"""
# seek to section's offset
self.buffer.seek(self.header_start_byte + entry.offset)
if type(entry.content) != bytes:
self.buffer.write((entry.content).to_bytes(entry.length, "little"))
else:
self.buffer.write(entry.content)
self.type_guid = TableEntry(
0, 16, uuid.UUID(Partition.Type.LinuxFileSystem).bytes_le
)
self.partition_guid = TableEntry(16, 16, uuid.uuid4().bytes_le)
self.first_lba = TableEntry(32, 8, b"\x00" * 8)
self.last_lba = TableEntry(40, 8, b"\x00" * 8)
self.attribute_flags = TableEntry(48, 8, b"\x00" * 8)
self.partition_name = TableEntry(56, 72, b"")

self.partition_fields = [
self.type_guid,
self.partition_guid,
self.first_lba,
self.last_lba,
self.attribute_flags,
self.partition_name,
]


def _checksum_header(self) -> None:
"""Calculate the header checksum
class Table:
"""GPT Partition Table Object
A table contains a primary and secondary header and a primary
and secondary partition entry table. All management of their entries
is done with this object.
"""

def __init__(self, geometry: Geometry, sector_size: int = 512) -> None:
self.buffer = io.BytesIO()
self.geometry = geometry
self.primary_header = Header(geometry)
self.secondary_header = Header(geometry, is_backup=True)
# partition entry array
self.partitions: List[Partition] = []

def checksum_partitions(self, header: Header):
"""Checksum the partition entries"""

partition_bytes = b""
for part in self.partitions:
for field in part.partition_fields:
partition_bytes += field.data
header._partition_array_crc.data = binascii.crc32(partition_bytes)

def checksum_header(self, header: Header):
"""Checksum the table header
This CRC includes the partition checksum, and must be calculated
after that has been written.
"""
# zero field before calculating
self._header_crc.content = 0
self._write_section(self._header_crc)
# read header
self.buffer.seek(self.header_start_byte)
raw_header = self.buffer.read(self._header_size.content)
self._header_crc.content = binascii.crc32(raw_header)
self._write_section(self._header_crc)

def _checksum_partitions(self) -> None:
"""Write the partition checksum
Move to the start of the partition entries. Read the partition
entry array length * the partition entry size. CRC that value.
"""
# read partitions entries relative to the table
self.buffer.seek(self.partition_entry_start_byte)
if self.backup:
self.buffer.seek(self.partition_entry_start_byte)

raw_partitions = self.buffer.read(
self._partition_array_length.content * self._partition_entry_size.content
)
self._partition_array_crc.content = binascii.crc32(raw_partitions)
self._write_section(self._partition_array_crc)
# zero the old checksum before calculating
header._header_crc.data = b"\x00" * 4
header_bytes = [x.data for x in header.header_fields]
header._header_crc.data = binascii.crc32(header_bytes)

0 comments on commit c2ea588

Please sign in to comment.