# SEG-D Reader — Step‑by‑Step

This notebook walks you **from a raw SEG‑D file to first visualizations**, without relying on a dedicated SEG‑D Python package.
It includes:
- Lightweight binary helpers (BCD, big‑endian reads).
- A minimal parser for key headers (General Header 1 + Channel Set Descriptors).
- A small `SegDReader` to peek the first record and traces.
- Utilities to estimate samples per trace and compute quick stats.
- Ready‑to‑use plotting helpers for a first look (image and wiggle).

> **Assumptions / limitations**  
> This minimal reader targets common airborne/marine field tapes where sample formats are **16‑bit integers (format code 8036)**
> or **32‑bit floats (format code 8058)**. Other variants may need tweaks in the parsing logic.
> Header layouts in SEG‑D can vary; this notebook focuses on a pragmatic subset for quick QC.

## 0) Prerequisites

- Python 3.9+
- `numpy` and `matplotlib` installed (already imported below).
- A **SEG‑D** file on disk (e.g., copied from media or provided by your acquisition team).

In [1]:

# ==== Imports ====
# Standard library
import os, struct, io, math
from dataclasses import dataclass
from typing import Optional, List

# Third-party
import numpy as np
import matplotlib.pyplot as plt

# Display options
plt.rcParams['figure.figsize'] = (10, 6)

## 1) Point to your SEG‑D file

Update `SEGD_PATH` to the file you want to inspect. The cell validates the path and prints the file size.

In [None]:

# <<< EDIT ME >>>
SEGD_PATH = "Path to Your SEG-D file.segd"  # or .sgd / .segd — use your actual file path

# --- Basic validation ---
assert isinstance(SEGD_PATH, str) and len(SEGD_PATH) > 0, "Please provide a non-empty path string."
if not os.path.exists(SEGD_PATH):
    print("⚠️  The file path does not exist yet — edit SEGD_PATH above.")
else:
    size_mb = os.path.getsize(SEGD_PATH) / (1024*1024)
    print(f"Found: {SEGD_PATH}  |  Size: {size_mb:.2f} MB")

Found: /Users/kavehdehghan/Downloads/SEGD/1001.1.2.expl.segd  |  Size: 868.13 MB


## 2) Binary helpers

These functions decode **BCD** bytes and read **big‑endian** unsigned integers commonly found in SEG‑D headers.
Each helper is tiny but keeps the parsing code below clean and readable.

In [4]:
# ----------------------------
# Small binary helpers
# ----------------------------
def to_bcd_byte(byte_val: int) -> Optional[int]:
    tens = (byte_val >> 4) & 0xF
    ones = byte_val & 0xF
    if tens > 9 or ones > 9:
        return None
    return tens * 10 + ones

def read_uint16_be(b, off):
    return int.from_bytes(b[off:off+2], "big")

def read_uint24_be(b, off):
    return int.from_bytes(b[off:off+3], "big")

def read_uint32_be(b, off):
    return int.from_bytes(b[off:off+4], "big")

## 3) Lightweight data classes

We use small `@dataclass` containers for the key header pieces we parse. This keeps the returned objects
easy to inspect and debug.

In [5]:
# ----------------------------
# Data classes
# ----------------------------
@dataclass
class GeneralHeader1:
    format_code_bcd: Optional[int]
    year_bcd: Optional[int]
    julian_day_bcd: Optional[int]
    hour_bcd: Optional[int]
    minute_bcd: Optional[int]
    second_bcd: Optional[int]
    additional_gh_blocks: int
    channel_sets_per_scan_type_bcd: Optional[int]
    extended_header_blocks_count: Optional[int]
    external_header_blocks_count: Optional[int]

@dataclass
class ChannelSetDescriptor:
    num_channels: int

@dataclass
class ParsedSEGDRecord:
    gh1: GeneralHeader1
    header_size_bytes: int
    channel_sets: List[ChannelSetDescriptor]

## 4) Header parsers (General Header 1 & Channel Set Descriptors)

The functions below extract a **minimal but useful subset** of fields from:
- **General Header 1**: revision, recording system ID, sample interval, format code (via BCD), etc.
- **Channel Set Descriptors**: number of channels and basic sampling info.

> If your data uses additional header blocks (e.g., Scan Type, Extended Headers), you can extend these parsers similarly.

In [6]:
# ----------------------------
# Parsers
# ----------------------------
def parse_general_header1(block: bytes) -> GeneralHeader1:
    fmt_code = (block[2] >> 4)*1000 + (block[2]&0xF)*100 + (block[3]>>4)*10 + (block[3]&0xF)
    year2 = to_bcd_byte(block[10])
    julian_hundreds = block[11] & 0xF
    julian_tens_ones = to_bcd_byte(block[12])
    julian_day = julian_hundreds*100 + (julian_tens_ones or 0) if julian_tens_ones else None
    hour = to_bcd_byte(block[13])
    minute = to_bcd_byte(block[14])
    second = to_bcd_byte(block[15])
    addl_gh = (block[11] >> 4) & 0xF
    ch_sets = to_bcd_byte(block[28])
    ext_hdr = None if block[30] == 0xFF else to_bcd_byte(block[30])
    ext_ext = None if block[31] == 0xFF else to_bcd_byte(block[31])
    return GeneralHeader1(
        format_code_bcd=fmt_code,
        year_bcd=year2,
        julian_day_bcd=julian_day,
        hour_bcd=hour,
        minute_bcd=minute,
        second_bcd=second,
        additional_gh_blocks=addl_gh,
        channel_sets_per_scan_type_bcd=ch_sets,
        extended_header_blocks_count=ext_hdr,
        external_header_blocks_count=ext_ext,
    )

def parse_channel_set_descriptor(block: bytes) -> ChannelSetDescriptor:
    num_channels = read_uint16_be(block, 8)  # bytes 9–10
    return ChannelSetDescriptor(num_channels=num_channels)

## 5) A tiny `SegDReader`

`SegDReader` opens the file and parses the **first record**, exposing helpers to:
- Inspect header objects
- Read the first trace
- Estimate samples per trace

It is intentionally minimal; for production, you might add buffered reading and full record iteration.

In [7]:
# ----------------------------
# Reader
# ----------------------------
class SegDReader:
    def __init__(self, path: str):
        self.path = path
        with open(path, "rb") as f:
            self.data = f.read()

    def parse_first_record(self) -> ParsedSEGDRecord:
        b = self.data
        gh1 = parse_general_header1(b[:32])
        gh_blocks_total = 1 + max(gh1.additional_gh_blocks, 0)
        offs = gh_blocks_total * 32

        # Channel sets
        channel_sets = []
        csd_count = gh1.channel_sets_per_scan_type_bcd or 0
        for i in range(csd_count):
            block = b[offs:offs+32]
            if len(block) < 32: break
            channel_sets.append(parse_channel_set_descriptor(block))
            offs += 32

        # Skip extended/external headers
        offs += (gh1.extended_header_blocks_count or 0) * 32
        offs += (gh1.external_header_blocks_count or 0) * 32

        return ParsedSEGDRecord(gh1=gh1, header_size_bytes=offs, channel_sets=channel_sets)

    def read_first_trace(self, header_size: int, sample_count: int, fmt_code: Optional[int]):
        # First 20-byte trace header
        th = self.data[header_size:header_size+20]
        trc_num = int.from_bytes(th[4:6], "big") if len(th) >= 6 else None
        data_offset = header_size + 20

        if fmt_code == 8058:  # IEEE float (4 bytes/sample)
            nbytes = sample_count * 4
            raw = self.data[data_offset:data_offset+nbytes]
            samples = list(struct.unpack(f">{sample_count}f", raw)) if len(raw) == nbytes else []
            return trc_num, samples

        elif fmt_code == 8036:  # 16-bit signed int (2 bytes/sample)
            nbytes = sample_count * 2
            raw = self.data[data_offset:data_offset+nbytes]
            samples = list(struct.unpack(f">{sample_count}h", raw)) if len(raw) == nbytes else []
            return trc_num, samples

        else:
            return trc_num, []

## 6) Utilities

- `estimate_samples_per_trace()` — heuristically guesses samples per trace based on header info and file size.
- `count_traces_in_record()` — counts the number of traces in the first record.
- `segd_stats()` — quick amplitude stats for sanity checks.

In [8]:
def estimate_samples_per_trace(rdr: SegDReader) -> Optional[int]:
    b = rdr.data
    gh1 = parse_general_header1(b[:32])
    # GH2 only exists if there are additional GH blocks
    if gh1.additional_gh_blocks >= 1 and len(b) >= 64:
        gh2_block = b[32:64]
        record_length_us = read_uint32_be(gh2_block, 16)   # bytes 17–20
        # Try to decode sample interval
        sample_interval_code = gh1.sample_interval_code if hasattr(gh1, "sample_interval_code") else None
        sample_interval_us = None

        # Simple lookup for common codes (manufacturer-dependent!)
        # This may need adjustment for your data
        lookup = {
            1: 250,    # 250 µs (4 ms)
            2: 500,    # 500 µs (2 ms)
            3: 1000,   # 1000 µs (1 ms)
            4: 2000,   # 2000 µs (0.5 ms)
        }
        if sample_interval_code in lookup:
            sample_interval_us = lookup[sample_interval_code]

        # If GH1 code is invalid, check GH2 bytes 25–27
        if sample_interval_us is None:
            sample_interval_us = read_uint24_be(gh2_block, 24)  # bytes 25–27

        if record_length_us and sample_interval_us:
            return int(record_length_us / sample_interval_us)
    return None

In [9]:
def count_traces_in_record(segd_path: str):
    rdr = SegDReader(segd_path)
    rec = rdr.parse_first_record()

    total_traces = sum(cs.num_channels for cs in rec.channel_sets)
    print(f"Estimated number of traces in this record: {total_traces}")
    return total_traces

In [10]:
def segd_stats(segd_path: str, n_traces: int = 50):
    rdr = SegDReader(segd_path)
    rec = rdr.parse_first_record()
    fmt_code = rec.gh1.format_code_bcd
    nsamples = estimate_samples_per_trace(rdr)

    # ---- sample interval ----
    b = rdr.data
    gh1 = parse_general_header1(b[:32])
    sample_interval_us = None
    if hasattr(gh1, "sample_interval_code"):
        lookup = {1: 250, 2: 500, 3: 1000, 4: 2000}
        sample_interval_us = lookup.get(getattr(gh1, "sample_interval_code", None))
    if sample_interval_us is None and len(b) >= 64:
        sample_interval_us = read_uint24_be(b[32:64], 24)
    dt = sample_interval_us * 1e-6 if sample_interval_us else 0.001

    # ---- read traces ----
    traces = read_traces(rdr, rec.header_size_bytes, n_traces, nsamples, fmt_code)
    if not traces:
        print("No traces decoded")
        return

    import numpy as np
    data = np.array([s for _, s in traces]).T  # shape (samples, traces)

    stats = {
        "Format code": fmt_code,
        "Number of traces read": data.shape[1],
        "Samples per trace": data.shape[0],
        "Sample interval (µs)": sample_interval_us,
        "Sample interval (s)": dt,
        "Trace length (s)": data.shape[0] * dt,
        "Amplitude min": float(np.min(data)),
        "Amplitude max": float(np.max(data)),
        "Amplitude mean": float(np.mean(data)),
        "Amplitude std": float(np.std(data)),
        "Amplitude RMS": float(np.sqrt(np.mean(data**2))),
    }

    # print nicely
    for k, v in stats.items():
        print(f"{k:25s}: {v}")
    return stats

## 7) Quick file summary

`segd_summary(path)` gives you a compact overview: revision, sample interval, format code, channels, traces/record, etc.

In [11]:
import os
import numpy as np

def segd_summary(segd_path: str):
    rdr = SegDReader(segd_path)
    rec = rdr.parse_first_record()
    fmt_code = rec.gh1.format_code_bcd
    nsamples = estimate_samples_per_trace(rdr)

    # ---- sample interval ----
    b = rdr.data
    gh1 = parse_general_header1(b[:32])
    sample_interval_us = None
    if hasattr(gh1, "sample_interval_code"):
        lookup = {1: 250, 2: 500, 3: 1000, 4: 2000}
        sample_interval_us = lookup.get(getattr(gh1, "sample_interval_code", None))
    if sample_interval_us is None and len(b) >= 64:
        sample_interval_us = read_uint24_be(b[32:64], 24)
    dt = sample_interval_us * 1e-6 if sample_interval_us else 0.001

    # ---- channel sets → traces per record ----
    total_channels = sum(cs.num_channels for cs in rec.channel_sets)

    # ---- trace size ----
    if fmt_code == 8036:  # 16-bit int
        bytes_per_sample = 2
    elif fmt_code == 8058:  # 32-bit float
        bytes_per_sample = 4
    else:
        bytes_per_sample = 2  # fallback

    bytes_per_trace = nsamples * bytes_per_sample
    bytes_per_record = total_channels * bytes_per_trace

    # ---- file size ----
    file_size = os.path.getsize(segd_path)
    est_records = file_size / bytes_per_record if bytes_per_record > 0 else None
    est_total_traces = est_records * total_channels if est_records else None

    # ---- print summary ----
    print("=== SEG-D File Summary ===")
    print(f"File size on disk         : {file_size/1e6:.1f} MB")
    print(f"Format code               : {fmt_code}")
    print(f"Channels per record       : {total_channels}")
    print(f"Samples per trace         : {nsamples}")
    print(f"Sample interval (µs)      : {sample_interval_us}")
    print(f"Sample interval (s)       : {dt}")
    print(f"Trace length (s)          : {nsamples*dt:.2f}")
    print(f"Bytes per trace           : {bytes_per_trace/1e3:.1f} kB")
    print(f"Bytes per record (est.)   : {bytes_per_record/1e6:.1f} MB")
    print(f"Estimated records in file : {est_records:.1f}")
    print(f"Estimated total traces    : {est_total_traces:.0f}")

    return {
        "file_size_bytes": file_size,
        "channels_per_record": total_channels,
        "samples_per_trace": nsamples,
        "sample_interval_s": dt,
        "trace_length_s": nsamples*dt,
        "bytes_per_trace": bytes_per_trace,
        "bytes_per_record": bytes_per_record,
        "estimated_records": est_records,
        "estimated_total_traces": est_total_traces
    }

## 8) First-look plots

Two helpers to visualize your data fast:

- `plot_image_section()` — an **image** (traces stacked) for a quick section view.
- `plot_wiggle_section()` — a classic **wiggle** plot for a subset of traces.

> Tip: Start with a small number of traces to confirm parsing works; then scale up.

In [12]:
import numpy as np
import matplotlib.pyplot as plt

def plot_image_section(segd_path: str, n_traces: int = 50, downsample: int = 4, clip: float = 0.2):
    rdr = SegDReader(segd_path)
    rec = rdr.parse_first_record()
    fmt_code = rec.gh1.format_code_bcd
    nsamples = estimate_samples_per_trace(rdr)

    # ---- sample interval (µs) ----
    b = rdr.data
    gh1 = parse_general_header1(b[:32])
    sample_interval_us = None
    if hasattr(gh1, "sample_interval_code"):
        lookup = {1: 250, 2: 500, 3: 1000, 4: 2000}
        sample_interval_us = lookup.get(getattr(gh1, "sample_interval_code", None))
    if sample_interval_us is None and len(b) >= 64:
        sample_interval_us = read_uint24_be(b[32:64], 24)
    dt = sample_interval_us * 1e-6 if sample_interval_us else 0.001

    # ---- read traces ----
    traces = read_traces(rdr, rec.header_size_bytes, n_traces, nsamples, fmt_code)
    if not traces:
        print("No traces decoded")
        return

    import numpy as np
    data = np.array([s for _, s in traces]).T  # [samples, traces]
    if downsample > 1:
        data = data[::downsample, :]

    # normalize per trace
    max_per_trace = np.max(np.abs(data), axis=0, keepdims=True)
    max_per_trace[max_per_trace == 0] = 1
    data = data / max_per_trace

    # apply clipping (optional)
    data = np.clip(data, -clip, clip)

    # time axis
    nt, nx = data.shape
    times = np.arange(nt) * downsample * dt

    plt.figure(figsize=(12, 8))
    extent = [0, nx, times[-1], times[0]]
    plt.imshow(
        data,
        cmap="gray",
        aspect="auto",
        interpolation="bilinear",  # smoother
        extent=extent,
        vmin=-clip,
        vmax=clip
    )
    plt.title(f"Seismic Section — First {n_traces} Traces (Format {fmt_code})")
    plt.xlabel("Trace index")
    plt.ylabel("Time (s)")
    plt.colorbar(label="Normalized amplitude (clipped)")
    plt.show()

In [18]:
import struct

def read_traces(rdr, header_size_bytes, n_traces, nsamples, fmt_code):
    """
    Read the first `n_traces` traces from the SEG-D file.

    Parameters
    ----------
    rdr : SegDReader
        The reader object with .data (raw file bytes).
    header_size_bytes : int
        Offset to the first trace (from parse_first_record()).
    n_traces : int
        How many traces to read.
    nsamples : int
        Number of samples per trace (estimated from headers).
    fmt_code : int
        SEG-D format code (e.g., 8036=int16, 8058=float32).

    Returns
    -------
    list of (trace_index, np.ndarray)
        Each entry is (trace number, samples).
    """
    traces = []
    b = rdr.data
    offs = header_size_bytes

    if nsamples is None:
        print("⚠️ Cannot read traces: unknown number of samples.")
        return []

    # Choose decoding rule based on format code
    if fmt_code == 8036:   # 16-bit int
        bytes_per_sample = 2
        fmt = ">h"   # big-endian int16
    elif fmt_code == 8058: # 32-bit float
        bytes_per_sample = 4
        fmt = ">f"   # big-endian float32
    else:
        print(f"⚠️ Unknown format code {fmt_code}, assuming int16")
        bytes_per_sample = 2
        fmt = ">h"

    bytes_per_trace = nsamples * bytes_per_sample

    for i in range(n_traces):
        trc_offs = offs + i * (20 + bytes_per_trace)  # 20-byte trace header
        if trc_offs + 20 + bytes_per_trace > len(b):
            break
        # skip 20-byte trace header
        dbytes = b[trc_offs+20 : trc_offs+20+bytes_per_trace]

        # unpack samples
        samples = [
            struct.unpack(fmt, dbytes[j:j+bytes_per_sample])[0]
            for j in range(0, len(dbytes), bytes_per_sample)
        ]
        traces.append((i, np.array(samples, dtype=float)))

    return traces

In [None]:
def plot_wiggle_section(segd_path: str, n_traces: int = 20, downsample: int = 10):
    rdr = SegDReader(segd_path)
    rec = rdr.parse_first_record()
    fmt_code = rec.gh1.format_code_bcd
    nsamples = estimate_samples_per_trace(rdr)

    # ---- Try to extract sample interval (µs) ----
    b = rdr.data
    gh1 = parse_general_header1(b[:32])
    sample_interval_us = None
    if hasattr(gh1, "sample_interval_code"):
        # simple lookup — may need refinement for your acquisition system
        lookup = {
            1: 250,    # 0.25 ms
            2: 500,    # 0.5 ms
            3: 1000,   # 1 ms
            4: 2000,   # 2 ms
        }
        sample_interval_us = lookup.get(gh1.sample_interval_code)
    if sample_interval_us is None and len(b) >= 64:
        sample_interval_us = read_uint24_be(b[32:64], 24)  # GH2 bytes 25–27

    dt = sample_interval_us * 1e-6 if sample_interval_us else 0.001  # fallback 1 ms

    traces = read_traces(rdr, rec.header_size_bytes, n_traces, nsamples, fmt_code)
    if not traces:
        print("No traces decoded")
        return

    import numpy as np
    plt.figure(figsize=(12, 6))

    for i, (trc_num, s) in enumerate(traces):
        s = np.array(s)[::downsample]  # downsample for speed/clarity
        t = np.arange(len(s)) * downsample * dt  # convert to seconds

        # Normalize each trace individually
        if np.max(np.abs(s)) > 0:
            s = s / np.max(np.abs(s)) * 0.4  # scale to +/-0.4 around center

        x = i + s
        plt.plot(x, t, 'k', lw=0.5)
        plt.fill_betweenx(t, i, x, where=(s > 0), color='black', alpha=0.6)

    plt.gca().invert_yaxis()
    plt.title(f"Wiggle Plot — First {n_traces} traces (Format {fmt_code})")
    plt.xlabel("Trace index")
    plt.ylabel("Time (s)")
    plt.show()

# Example usage:
plot_wiggle_section(SEGD_PATH, n_traces=10, downsample=5)

In [20]:
def plot_image_section_real_amp(
    segd_path: str,
    n_traces: int = 50,
    downsample: int = 4,
    clip: Optional[float] = None,
    tmin: float = 0.0,
    tmax: Optional[float] = None
):
    rdr = SegDReader(segd_path)
    rec = rdr.parse_first_record()
    fmt_code = rec.gh1.format_code_bcd
    nsamples = estimate_samples_per_trace(rdr)

    # ---- sample interval ----
    b = rdr.data
    gh1 = parse_general_header1(b[:32])
    sample_interval_us = None
    if hasattr(gh1, "sample_interval_code"):
        lookup = {1: 250, 2: 500, 3: 1000, 4: 2000}
        sample_interval_us = lookup.get(getattr(gh1, "sample_interval_code", None))
    if sample_interval_us is None and len(b) >= 64:
        sample_interval_us = read_uint24_be(b[32:64], 24)
    dt = sample_interval_us * 1e-6 if sample_interval_us else 0.001

    # ---- read traces ----
    traces = read_traces(rdr, rec.header_size_bytes, n_traces, nsamples, fmt_code)
    if not traces:
        print("No traces decoded")
        return

    import numpy as np
    data = np.array([s for _, s in traces]).T
    if downsample > 1:
        data = data[::downsample, :]

    # ---- build time axis ----
    nt, nx = data.shape
    times = np.arange(nt) * downsample * dt

    # ---- select time window ----
    if tmax is None:
        tmax = times[-1]
    mask = (times >= tmin) & (times <= tmax)
    data = data[mask, :]
    times = times[mask]

    # ---- clipping ----
    if clip is None:
        limit = np.percentile(np.abs(data), 98)
    else:
        limit = clip

    # ---- plot ----
    plt.figure(figsize=(12, 6))
    extent = [0, nx, times[-1], times[0]]
    plt.imshow(
        data,
        cmap="gray",
        aspect="auto",
        interpolation="bilinear",
        extent=extent,
        vmin=-limit,
        vmax=limit
    )
    plt.title(f"Seismic Section — First {n_traces} traces (Format {fmt_code})")
    plt.xlabel("Trace index")
    plt.ylabel("Time (s)")
    plt.colorbar(label="Amplitude")
    plt.show()

## 9) Example: Summarize and plot

Run the cells below after setting `SEGD_PATH` to your file.

In [None]:

# --- Summary ---
if os.path.exists(SEGD_PATH):
    summary = segd_summary(SEGD_PATH)
else:
    summary = None
summary

In [None]:

# --- First-look image section ---
# Tweak n_traces / downsample / clip as needed for performance/visibility.
if os.path.exists(SEGD_PATH):
    plot_image_section(SEGD_PATH, n_traces=64, downsample=2, clip=0.8)

In [None]:

# --- Optional: Wiggle plot for first N traces ---
if os.path.exists(SEGD_PATH):
    plot_wiggle_section(SEGD_PATH, n_traces=24, downsample=5)

---

### Notes & next steps

- If you hit an unknown **format code**, add a `elif` branch where samples are unpacked (see comments in the reader).
- For **batch processing** or conversion (e.g., to SEG‑Y), consider wrapping the record/trace iteration with writers
  from libraries like `segyio` (for SEG‑Y) once you extract trace arrays here.
- If your dataset uses **extended headers** or specialized **demultiplexed layouts**, extend the header parsers accordingly.
