diff --git a/source_modelling/fsp.py b/source_modelling/fsp.py index 58059e3..97822f6 100644 --- a/source_modelling/fsp.py +++ b/source_modelling/fsp.py @@ -26,9 +26,13 @@ from pathlib import Path from typing import IO, Optional +import numpy as np import pandas as pd import parse +from qcore import coordinates +from source_modelling.sources import Plane + HEADER_PATTERN = """EventTAG: {event_tag} Loc : LAT = {latitude:g} LON = {longitude:g} DEP = {depth:g} Size : LEN = {length:g} km WID = {width:g} km Mw = {magnitude:g} Mo = {moment:g} Nm @@ -64,6 +68,78 @@ class FSPParseError(Exception): pass +@dataclasses.dataclass +class Segment: + """A representation of a segment in an FSP file.""" + + strike: Optional[float] = None + """The strike of the segment.""" + dip: Optional[float] = None + """The dip of the segment.""" + length: Optional[float] = None + """The length of the segment.""" + width: Optional[float] = None + """The width of the segment.""" + dtop: Optional[float] = None + """The top depth of the segment.""" + top_centre: Optional[np.ndarray] = None + """The top centre of the segment.""" + hypocentre: Optional[np.ndarray] = None + """The hypocentre of the segment.""" + dx: Optional[float] = None + """The length of the subfaults.""" + dz: Optional[float] = None + """The width of the subfaults.""" + subfaults: Optional[int] = None + """The number of subfaults in the segment.""" + + def as_plane(self) -> Plane: + """Convert the segment to a Plane object. + + Note + ---- + Only works for segments with a defined strike, dip, length, + width, and top_centre. Segments must be within the bounds of + NZTM coordinates to workflow reliably. + + Returns + ------- + Plane + The Plane object representing the segment. + """ + if not ( + self.strike is not None + and self.dip + and self.length + and self.width + and self.dtop is not None + and isinstance(self.top_centre, np.ndarray) + ): + raise ValueError( + "Cannot convert segment to Plane: missing required attributes." + ) + strike_nztm = coordinates.great_circle_bearing_to_nztm_bearing( + self.top_centre, self.width, self.strike + ) + top_centre_nztm = coordinates.wgs_depth_to_nztm(self.top_centre) + dip_dir = strike_nztm + 90 + dip_direction = np.array( + [np.cos(np.radians(dip_dir)), np.sin(np.radians(dip_dir))] + ) + km_to_m = 1000 + projected_width_m = self.width * km_to_m / 2 * np.cos(np.radians(self.dip)) + centroid = projected_width_m * dip_direction + top_centre_nztm + return Plane.from_centroid_strike_dip( + coordinates.nztm_to_wgs_depth(centroid), + self.dip, + self.length, + self.width, + dtop=self.dtop, + strike_nztm=strike_nztm, + dip_dir_nztm=dip_dir, + ) + + @dataclasses.dataclass class FSPFile: """A representation of the FSP file format used to represent source models in SRCMOD. @@ -130,6 +206,8 @@ class FSPFile: are the segement index, length and width respectively. To perform operations over multi-segment data, group by 'segment' and apply the operation to each group. + segments : list[Segment] + A list of segments, each containing a `Segment` object. """ event_tag: str @@ -177,6 +255,8 @@ class FSPFile: segment_count: int data: pd.DataFrame + segments: list[Segment] = dataclasses.field(default_factory=list) + @classmethod def read_from_file(cls: Callable, fsp_ffp: Path) -> "FSPFile": """Parse an FSPFile. @@ -226,9 +306,13 @@ def read_from_file(cls: Callable, fsp_ffp: Path) -> "FSPFile": velocity_model = _parse_velocity_density_structure(fsp_file_handle) # now sniff ahead for the columns - data = _parse_segment_slip(fsp_file_handle, metadata["segment_count"]) + data, segments = _parse_segment_slip( + fsp_file_handle, metadata["segment_count"] + ) - fsp_file = cls(data=data, velocity_model=velocity_model, **metadata) + fsp_file = cls( + data=data, velocity_model=velocity_model, segments=segments, **metadata + ) # A lot of parameters can be 999 or -999 to indicate "no known # value", so we can detect that and set to None @@ -255,7 +339,97 @@ def read_from_file(cls: Callable, fsp_ffp: Path) -> "FSPFile": return fsp_file -def _parse_segment_slip(fsp_file_handle: IO[str], segment_count: int) -> pd.DataFrame: +SEGMENT_HEADER_PATTERN = re.compile( + r""" + %\s*SEGMENT\s*\#\s* + (?P\d+) # Segment index number + \s*:\s* + STRIKE\s*=\s* + (?P[\d.]+) # Strike value in degrees + \s*deg\s* + DIP\s*=\s* + (?P[\d.]+) # Dip value in degrees + \s*deg + """, + re.VERBOSE, +) + +DEPTH_TO_TOP_PATTERN = re.compile( + r""" + %\s*depth\s+to\s+top:\s+ + Z2top\s*=\s* + (?P[\d.]+) # Depth to top in km + \s*km + """, + re.VERBOSE, +) + +LATLON_PATTERN = re.compile( + r""" + %\s*LAT\s*=\s* + (?P[+-]?\d+(?:\.\d+)?) # Latitude value + \s*,\s* + LON\s*=\s* + (?P[+-]?\d+(?:\.\d+)?) # Longitude value + """, + re.VERBOSE, +) + +DXDZ_PATTERN = re.compile( + r""" + %\s*Dx\s*=\s* + (?P[+-]?\d+(?:\.\d+)?) # Dx value in km + \s*km\s+ + Dz\s*=\s* + (?P[+-]?\d+(?:\.\d+)?) # Dz value in km + \s*km + """, + re.VERBOSE, +) + +HYPOCENTRE_PATTERN = re.compile( + r""" + %\s*hypocenter\s+on\s+SEG\s+\#\s*\d+\s*:\s* + along-strike\s*\(X\)\s*=\s* + (?P[\d.]+) # Along-strike value + \s*,\s* + down-dip\s*\(Z\)\s*=\s* + (?P[\d.]+) # Down-dip value + """, + re.VERBOSE, +) + +SUBFAULTS_PATTERN = re.compile( + r""" + %\s+Nsbfs\s*=\s* + (?P\d+) # Number of subfaults + """, + re.VERBOSE, +) + +DIMENSIONS_PATTERN = re.compile( + r""" + %\s+LEN\s*=\s* + (?P\d+(?:\.\d+)?) # Length in km + \s+km\s+ + WID\s*=\s* + (?P\d+(?:\.\d+)?) # Width in km + \s+km + """, + re.VERBOSE, +) + +COLUMN_HEADER_PATTERN = re.compile( + r""" + %\s+LAT\s+LON # Column header indicator + """, + re.VERBOSE, +) + + +def _parse_segment_slip( + fsp_file_handle: IO[str], segment_count: int +) -> tuple[pd.DataFrame, list[Segment]]: """ Parse segment slip data from an FSP file. @@ -275,6 +449,8 @@ def _parse_segment_slip(fsp_file_handle: IO[str], segment_count: int) -> pd.Data pd.DataFrame A DataFrame containing parsed subfault data for all segments. The DataFrame includes a "segment" column identifying the segment index. + list[Segment] + A list of Segment objects representing each segment's properties. Raises ------ @@ -282,55 +458,90 @@ def _parse_segment_slip(fsp_file_handle: IO[str], segment_count: int) -> pd.Data If the column headers for a segment cannot be found or the number of subfaults for a segment is missing. """ - segments: list[pd.DataFrame] = [] + segments: list[Segment] = [] + segment_data: list[pd.DataFrame] = [] + for i in range(segment_count): - subfaults = None - length = width = None + segment = Segment() for line in fsp_file_handle: - # hunt for the line % Nsbfs = x, where x is the number of subfaults. - if match := re.search(r"%\s+Nsbfs\s*=\s*(\d+)", line): - subfaults = match.group(1) - # hunt for the line % LEN = x km WID = y km, where x and y are the - # length and width respectively. - if dimensions := re.match( - r"%\s+LEN\s*=\s*(\d+(\.\d+)?)\s+km\s+WID\s*=\s*(\d+(\.\d+)?)\s+km", line - ): - length = float(dimensions.group(1)) - width = float(dimensions.group(1)) - if re.match(r"%\s+LAT\s+LON", line): + # Parse segment header with strike and dip values + if segment_header := SEGMENT_HEADER_PATTERN.match(line): + segment_index = int(segment_header.group("segment_index")) + if segment_index != i + 1: + raise FSPParseError( + f"Expected segment {i + 1} but found segment {segment_index}" + ) + segment.strike = float(segment_header.group("strike")) + segment.dip = float(segment_header.group("dip")) + + # Parse depth to top + if dtop_match := DEPTH_TO_TOP_PATTERN.match(line): + segment.dtop = float(dtop_match.group("dtop")) + + # Parse latitude and longitude + if latlon_match := LATLON_PATTERN.match(line): + segment.top_centre = np.array( + [float(latlon_match.group("lat")), float(latlon_match.group("lon"))] + ) + + # Parse dx and dz values + if dxdz_match := DXDZ_PATTERN.match(line): + segment.dx = float(dxdz_match.group("dx")) + segment.dz = float(dxdz_match.group("dz")) + + # Parse hypocentre information + if hypocentre_match := HYPOCENTRE_PATTERN.match(line): + segment.hypocentre = np.array( + [ + float(hypocentre_match.group("along_strike")), + float(hypocentre_match.group("down_dip")), + ] + ) + + # Parse number of subfaults + if match := SUBFAULTS_PATTERN.search(line): + segment.subfaults = int(match.group("subfaults")) + + # Parse segment dimensions + if dimensions := DIMENSIONS_PATTERN.match(line): + segment.length = float(dimensions.group("length")) + segment.width = float(dimensions.group("width")) + + # Check for column headers + if COLUMN_HEADER_PATTERN.match(line): break else: raise FSPParseError(f"Cannot find columns for FSP file on segment {i + 1}.") - if subfaults is None: + segments.append(segment) + + if segment.subfaults is None: raise FSPParseError( f"Cannot find number of subfaults for FSP file on segment {i + 1}." ) - subfaults = int(subfaults) columns = line.lower().lstrip("% ").split() _ = next(fsp_file_handle) # Skip header decoration - # read subfaults lines into StringIO buffer + + # Read subfaults lines into StringIO buffer lines = io.StringIO( - "\n".join([next(fsp_file_handle) for _ in range(subfaults)]) + "\n".join([next(fsp_file_handle) for _ in range(segment.subfaults)]) ) + data = pd.read_csv( lines, delimiter=r"\s+", header=None, names=columns, ) + data = data.rename( columns={"x==ew": "x", "y==ns": "y", "x==ns": "x", "y==ew": "y"} ) data["segment"] = i - if length: - data["length"] = length - if width: - data["width"] = width - segments.append(data) + segment_data.append(data) - return pd.concat(segments) + return pd.concat(segment_data), segments def _parse_velocity_density_structure( diff --git a/tests/test_fsp.py b/tests/test_fsp.py index 4b32f0a..ab38fa2 100644 --- a/tests/test_fsp.py +++ b/tests/test_fsp.py @@ -1,5 +1,6 @@ from pathlib import Path +import numpy as np import pandas as pd import pytest @@ -47,11 +48,33 @@ def test_darfield_fsp(): "y": -5.1682, "z": 0.0000, "slip": 4.8065, - "length": 10.0, - "width": 10.0, } + assert fsp_file.segments[0].strike == pytest.approx(307.0) + assert fsp_file.segments[0].dip == pytest.approx(86.2) + assert fsp_file.segments[0].length == pytest.approx(10.0) + assert fsp_file.segments[0].width == pytest.approx(12.0) + assert fsp_file.segments[0].dx == pytest.approx(1.0) + assert fsp_file.segments[0].dz == pytest.approx(1.0) + assert fsp_file.segments[0].dtop == pytest.approx(0.0) + assert fsp_file.segments[0].top_centre == pytest.approx( + np.array([-43.5721, 172.0226]) + ) + assert fsp_file.segments[0].hypocentre == pytest.approx( + np.array([4.151557898146846, 0.5150635970119828]) + ) + assert fsp_file.segments[0].subfaults == 120 assert fsp_file.velocity_model is None + plane = fsp_file.segments[0].as_plane() + assert plane.strike == pytest.approx(307.0, abs=1) + assert plane.dip == pytest.approx(86.2, abs=1) + assert plane.length == pytest.approx(10.0) + assert plane.width == pytest.approx(12.0) + assert plane.bounds[0, -1] == pytest.approx(0.0) + assert plane.wgs_depth_coordinates_to_fault_coordinates( + np.array([-43.5721, 172.0226]) + ) == pytest.approx(np.array([0.5, 0])) + def test_kanto_fsp(): """Check that the Kanto FSP file is parsed with the correct values extracted."""