Skip to content

Commit

Permalink
wip: write geometry tests
Browse files Browse the repository at this point in the history
  • Loading branch information
swysocki committed Jan 3, 2022
1 parent 0105626 commit 28602f4
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 61 deletions.
79 changes: 55 additions & 24 deletions pygpt_disk/disk.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,67 @@
import io
import pathlib


class Geometry:
"""Geometry of disk image
Attributes:
sector_size: statically set to 512 bytes
total_bytes: disk size in bytes
total_sectors: number of sectors on the disk
total_lba: number of logical blocks on the disk
partition_start_lba: logical block where the partitions start
partition_last_lba: logical block where the partitions end
primary_header_lba: logical block location of primary header
primary_header_byte: byte where primary header starts
primary_array_lba: logical block where the primary partition array starts
primary_array_byte: byte where the primary partition array starts
backup_header_lba: logical block where the backup header starts
backup_header_byte: byte where the backup header starts
backup_header_array_lba: logical block where backup partition array starts
backup_header_array_byte: byte where the backup partition array starts
"""

def __init__(self, size: int) -> None:
"""Init Geometry with size in bytes"""
self.sector_size = 512
self.total_bytes = size
self.total_sectors = int(size / self.sector_size)
self.total_lba = int(size / self.sector_size)
self.partition_start_lba = 34
self.partition_last_lba = int(self.total_lba - 34)
self.primary_header_lba = 1
self.primary_header_byte = int(self.primary_header_lba * self.sector_size)
self.primary_array_lba = 2
self.primary_array_byte = int(self.primary_array_lba * self.sector_size)
self.backup_header_lba = int(self.total_lba - 1)
self.backup_header_byte = int(self.backup_header_lba * self.sector_size)
self.backup_header_array_lba = int(self.total_lba - 33)
self.backup_header_array_byte = int(
self.backup_header_array_lba * self.sector_size
)


class Disk:
sector_size = 512
"""GPT disk
def __init__(self, image_path: str, size: int = 0) -> None:
"""Create a disk object
A disk objects represents a new or existing GPT disk image. If the file exists,
it is assumed to be an existing GPT image. If it does not, a new file is created.
A disk objects represents a new or existing GPT disk
image. If the file exists, it is assumed to be an existing
GPT image. If it does not, a new file is created.
Attributes:
image_path: file image path (absolute or relative)
size: disk image size in bytes
geometry:
"""

Args:
image_path: file image path (absolute or relative)
size: disk image size in bytes
"""
def __init__(self, image_path: str, size: int = 0) -> None:
"""Init Disk with a file path and size in bytes"""
self._image_path = pathlib.Path(image_path)
self.name = self._image_path.name
self.size = size
self._size = size
if self._image_path.exists():
self.size = self._image_path.stat().st_size
self.sectors = size / Disk.sector_size
self.buffer = io.BytesIO()

# @NOTE: bad idea here. This could create a large buffer if the
# disk is huge
def create(self) -> None:
"""Create a buffer to write image contents"""
self.buffer.seek(self.size - 1)
self.buffer.write(b"\0")
self._size = self._image_path.stat().st_size
self.geometry = Geometry(self._size)

def write(self) -> None:
"""Write a disk buffer to file"""
self._image_path.write_bytes(self.buffer.getvalue())
"""Write blank disk"""
self._image_path.write_bytes(b"\x00" * self.geometry.total_bytes)
28 changes: 15 additions & 13 deletions pygpt_disk/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dataclasses import dataclass
from typing import Union

from pygpt_disk.disk import Disk
from pygpt_disk.disk import Geometry


class Table:
Expand All @@ -26,30 +26,33 @@ class HeaderEntry:
content: Union[int, str, bytes]

def __init__(
self, disk: Disk, is_backup: bool = False, sector_size: int = 512
self, geometry: Geometry, is_backup: bool = False, sector_size: int = 512
) -> None:
self.disk = disk
self.backup = is_backup
self.buffer = io.BytesIO()

self.geometry = geometry
self._header_sig = Table.HeaderEntry(0, 8, b"EFI PART")
self._revision = Table.HeaderEntry(8, 4, b"\x00\x00\x01\x00")
self._header_size = Table.HeaderEntry(12, 4, 92)
self._header_crc = Table.HeaderEntry(16, 4, 0)
self._reserved = Table.HeaderEntry(20, 4, 0)
self._primary_header_lba = Table.HeaderEntry(24, 8, 1)
self._primary_header_lba = Table.HeaderEntry(24, 8, geometry.primary_header_lba)
self._secondary_header_lba = Table.HeaderEntry(
32, 8, int(self.disk.sectors - 1)
32, 8, geometry.backup_header_lba
)
if self.backup:
self._primary_header_lba.offset = 32
self._secondary_header_lba.offset = 24
self._partition_start_lba = Table.HeaderEntry(40, 8, 34)
self._partition_last_lba = Table.HeaderEntry(48, 8, int(self.disk.sectors - 34))
self._partition_start_lba = Table.HeaderEntry(
40, 8, geometry.partition_start_lba
)
self._partition_last_lba = Table.HeaderEntry(48, 8, geometry.partition_last_lba)
self._disk_guid = Table.HeaderEntry(56, 16, uuid.uuid4().bytes_le)
self._partition_array_start = Table.HeaderEntry(72, 8, 2)
self._partition_array_start = Table.HeaderEntry(
72, 8, geometry.primary_array_lba
)
if self.backup:
self._partition_array_start.content = int(self.disk.sectors - 33)
self._partition_array_start.content = geometry.backup_header_array_lba
self._partition_array_length = Table.HeaderEntry(80, 4, 128)
self._partition_entry_size = Table.HeaderEntry(84, 4, 128)
self._partition_array_crc = Table.HeaderEntry(88, 4, 0)
Expand All @@ -64,17 +67,16 @@ def __init__(

def _write_table(self) -> None:
# start with blank table
self.buffer.write(b"\x00" * 33 * self.disk.sector_size)
self.buffer.write(b"\x00" * 33 * self.geometry.sector_size)

def _write_header(self) -> None:
"""Write the table header to proper location
The header is typically 92 bytes with the remainder of bytes in the sector
zeroed. The primary header is is at LBA 1 with the partition entries at 2 - 33.
zeroed. The primary header is at LBA 1 with the partition entries at 2 - 33.
The backup header is at LBA -1 with the partition entries at -2 to -33.
"""

self._write_section(self._header_sig)
self._write_section(self._revision)
self._write_section(self._header_size)
Expand Down
Empty file added tests/__init__.py
Empty file.
35 changes: 19 additions & 16 deletions tests/test_disk.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
from pygpt_disk import disk

import pytest
from pygpt_disk import disk

DISK_SIZE = 8 * 1024 * 1024
DISK_NAME = "test-image.img"
Expand All @@ -13,28 +14,30 @@ def test_disk(tmp_path):
return p.resolve().__str__()


def test_init(test_disk):
# init new disk
d = disk.Disk(DISK_NAME, DISK_SIZE)
assert d.size == DISK_SIZE
assert d.name == DISK_NAME
assert d.sector_size == 512
assert d.sectors == DISK_SIZE / d.sector_size

# init existing disk
d = disk.Disk(test_disk)
assert d.size == DISK_SIZE
def test_geometry():
geo = disk.Geometry(DISK_SIZE)
assert geo.sector_size == 512
assert geo.total_bytes == DISK_SIZE
assert geo.total_sectors == int(DISK_SIZE / 512)
assert geo.total_lba == int(DISK_SIZE / 512)
assert geo.partition_last_lba == int((DISK_SIZE / 512) - 34)
assert geo.primary_header_byte == int(1 * 512)
assert geo.primary_array_byte == int(2 * 512)
assert geo.backup_header_lba == int((DISK_SIZE / 512) - 1)
assert geo.backup_header_byte == int(DISK_SIZE - 512)
assert geo.backup_header_array_lba == int((DISK_SIZE / 512) - 33)
assert geo.backup_header_array_byte == int(DISK_SIZE - (512 * 33))


def test_create():
def test_disk_init(test_disk):
# init new disk
d = disk.Disk(DISK_NAME, DISK_SIZE)
d.create()
assert len(d.buffer.getvalue()) == DISK_SIZE
assert d._size == DISK_SIZE
assert d.name == DISK_NAME


def test_write(tmp_path):
name = tmp_path / DISK_NAME
d = disk.Disk(name, DISK_SIZE)
d.create()
d.write()
assert os.path.exists(name)
17 changes: 9 additions & 8 deletions tests/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ def fresh_disk(tmp_path):


def test_init(fresh_disk: disk.Disk):
t = table.Table(fresh_disk)
assert type(t.disk) == disk.Disk
t = table.Table(fresh_disk.geometry)
assert not t.backup
assert t._header_sig.content == SIGNATURE


def test__write_header(fresh_disk: disk.Disk):
primary_header = table.Table(fresh_disk)
primary_header = table.Table(fresh_disk.geometry)
primary_header._write_table()

def read_header(header):
def test_header(header):
assert header.buffer.read(8) == SIGNATURE
assert header.buffer.read(4) == REVISION
assert header.buffer.read(4) == HEADER_SIZE
Expand Down Expand Up @@ -68,12 +69,12 @@ def read_header(header):
primary_header._write_header()
primary_header.buffer.seek(0)

read_header(primary_header)
test_header(primary_header)

backup_header = table.Table(fresh_disk, is_backup=True)
backup_header = table.Table(fresh_disk.geometry, is_backup=True)
backup_header._write_table()
backup_header._write_header()
backup_header.buffer.seek(
int(32 * fresh_disk.sector_size)
int(32 * fresh_disk.geometry.sector_size)
) # secondary header lba start
read_header(backup_header)
test_header(backup_header)

0 comments on commit 28602f4

Please sign in to comment.