diff --git a/source_modelling/rupture_propagation.py b/source_modelling/rupture_propagation.py new file mode 100644 index 0000000..95097c8 --- /dev/null +++ b/source_modelling/rupture_propagation.py @@ -0,0 +1,229 @@ +""" +Rupture Propagation Module + +This module provides functions for computing likely rupture paths from +information about the distances between faults. + +Functions: + - shaw_dieterich_distance_model: + Compute fault jump probabilities using the Shaw-Dieterich distance model. + - prune_distance_graph: Prune the distance graph based on a cutoff value. + - probability_graph: + Convert a graph of distances between faults into a graph of jump + probabilities using the Shaw-Dieterich model. + - probabilistic_minimum_spanning_tree: Generate a probabilistic minimum spanning tree. + +Typing Aliases: + - DistanceGraph: A graph representing distances between faults. + - ProbabilityGraph: A graph representing jump probabilities between faults. + - RuptureCausalityTree: A tree representing the causality of ruptures between faults. +""" + +from collections import defaultdict, namedtuple +from typing import Optional + +import numpy as np + +from qcore import coordinates +from source_modelling import sources + +DistanceGraph = dict[str, dict[str, int]] +ProbabilityGraph = defaultdict[str, dict[str, float]] +RuptureCausalityTree = dict[str, Optional[str]] + +JumpPair = namedtuple("JumpPair", ["from_point", "to_point"]) + + +def shaw_dieterich_distance_model(distance: float, d0: float, delta: float) -> float: + """ + Compute fault jump probabilities using the Shaw-Dieterich distance model[0]. + + Parameters + ---------- + distance : float + The distance between two faults. + d0 : float + The characteristic distance parameter. + delta : float + The characteristic slip distance parameter. + + Returns + ------- + float + The calculated probability. + + References + ---------- + [0]: Shaw, B. E., & Dieterich, J. H. (2007). Probabilities for jumping fault + segment stepovers. Geophysical Research Letters, 34(1). + """ + return min(1, np.exp(-(distance - delta) / d0)) + + +def prune_distance_graph(distances: DistanceGraph, cutoff: int) -> DistanceGraph: + """ + Prune the distance graph based on a cutoff value. + + Parameters + ---------- + distances : DistanceGraph + The graph of distances between faults. + cutoff : int + The cutoff distance in metres. + + Returns + ------- + DistanceGraph + A copy of the input distance graph, keeping only edges that are less + than the cutoff. + """ + return { + fault_u: { + fault_v: distance + for fault_v, distance in neighbours_fault_u.items() + if distance < cutoff + } + for fault_u, neighbours_fault_u in distances.items() + } + + +def probability_graph( + distances: DistanceGraph, d0: float = 3, delta: float = 1 +) -> ProbabilityGraph: + """ + Convert a graph of distances between faults into a graph of jump + probablities using the Shaw-Dieterich model. + + Parameters + ---------- + distances : DistanceGraph + The distance graph between faults. + d0 : float, optional + The d0 parameter for the Shaw_Dieterich model. See `shaw_dieterich_distance_model`. + delta : float, optional + The delta parameter for the Shaw_Dieterich model. See `shaw_dieterich_distance_model`. + + Returns + ------- + ProbabilityGraph + The graph with faults as vertices. Each edge (fault_u, fault_v) + has a log-probability -p as a weight. The log-probability -p here + is the negative of the log-probability a rupture propogates from + fault_u to fault_v, relative to the probability it propogates to + any of the other neighbours of fault_u. + """ + probabilities_raw = { + fault_u: { + fault_v: shaw_dieterich_distance_model(distance / 1000, d0, delta) + for fault_v, distance in neighbours_fault_u.items() + } + for fault_u, neighbours_fault_u in distances.items() + } + probabilities_log = defaultdict(dict) + for fault_u, neighbours_fault_u in probabilities_raw.items(): + normalising_constant = sum(neighbours_fault_u.values()) + if normalising_constant == 0: + for fault_v, _ in neighbours_fault_u.items(): + probabilities_log[fault_u][fault_v] = 1 / len(neighbours_fault_u) + for fault_v, prob in neighbours_fault_u.items(): + probabilities_log[fault_u][fault_v] = prob / normalising_constant + return probabilities_log + + +def probabilistic_minimum_spanning_tree( + probability_graph: ProbabilityGraph, initial_fault: str +) -> RuptureCausalityTree: + """ + Generate a probabilistic minimum spanning tree. + + The minimum spanning tree of the probability graph represents rupture + causality tree with the highest likelihood of occuring assuming that + rupture jumps are independent. + + NOTE: While the overall probability of the minimum spanning tree is high, + the paths from the initial fault may not be the most likely. + + Parameters + ---------- + probability_graph : ProbabilityGraph + The probability graph. + initial_fault : str + The initial fault. + + Returns + ------- + RuptureCausalityTree + The probabilistic minimum spanning tree. + """ + rupture_causality_tree = {initial_fault: None} + path_probabilities = defaultdict(lambda: np.inf) + path_probabilities[initial_fault] = 0 + processed_faults = set() + for _ in range(len(probability_graph)): + current_fault = min( + (fault for fault in probability_graph if fault not in processed_faults), + key=lambda fault: path_probabilities[fault], + ) + processed_faults.add(current_fault) + for fault_neighbour, probability in probability_graph[current_fault].items(): + if ( + fault_neighbour not in processed_faults + and probability < path_probabilities[fault_neighbour] + ): + path_probabilities[fault_neighbour] = probability + rupture_causality_tree[fault_neighbour] = current_fault + return rupture_causality_tree + + +def distance_between( + source_a: sources.IsSource, + source_b: sources.IsSource, + source_a_point: np.ndarray, + source_b_point: np.ndarray, +) -> float: + global_point_a = source_a.fault_coordinates_to_wgs_depth_coordinates(source_a_point) + global_point_b = source_b.fault_coordinates_to_wgs_depth_coordinates(source_b_point) + return coordinates.distance_between_wgs_depth_coordinates( + global_point_a, global_point_b + ) + + +def estimate_most_likely_rupture_propagation( + sources_map: dict[str, sources.IsSource], + initial_source: str, + jump_impossibility_limit_distance: int = 15000, +) -> RuptureCausalityTree: + distance_graph = { + source_a_name: { + source_b_name: distance_between( + source_a, + source_b, + *sources.closest_point_between_sources(source_a, source_b), + ) + for source_b_name, source_b in sources_map.items() + if source_a_name != source_b_name + } + for source_a_name, source_a in sources_map.items() + } + distance_graph = prune_distance_graph( + distance_graph, jump_impossibility_limit_distance + ) + jump_probability_graph = probability_graph(distance_graph) + return probabilistic_minimum_spanning_tree( + jump_probability_graph, initial_fault=initial_source + ) + + +def jump_points_from_rupture_tree( + source_map: dict[str, sources.IsSource], + rupture_causality_tree: RuptureCausalityTree, +) -> dict[str, JumpPair]: + jump_points = {} + for source, parent in rupture_causality_tree.items(): + if parent is None: + continue + source_point, parent_point = sources.closest_point_between_sources( + source_map[source], source_map[parent] + ) + jump_points[source] = JumpPair(parent_point, source_point) + return jump_points diff --git a/source_modelling/sources.py b/source_modelling/sources.py index b98d1f9..850614b 100644 --- a/source_modelling/sources.py +++ b/source_modelling/sources.py @@ -76,56 +76,27 @@ def from_lat_lon_depth(point_coordinates: np.ndarray, **kwargs) -> "Point": @property def coordinates(self) -> np.ndarray: - """Return the coordinates of the point in (lat, lon, depth) format. - - Returns - ------- - np.ndarray - The coordinates of the point in (lat, lon, depth) format. - Depth is in metres. - """ + """np.ndarray: The coordinates of the point in (lat, lon, depth) format. Depth is in metres.""" return coordinates.nztm_to_wgs_depth(self.bounds) @property def length(self) -> float: - """ - Returns - ------- - float - The length of the approximating planar patch (in kilometres). - """ + """float: The length of the approximating planar patch (in kilometres).""" return self.length_m / _KM_TO_M @property def width_m(self) -> float: - """ - Returns - ------- - float - The width of the approximating planar patch (in metres). - """ + """float: The width of the approximating planar patch (in metres).""" return self.length_m @property def width(self) -> float: - """ - Returns - ------- - float - The width of the approximating planar patch (in kilometres). - """ + """float: The width of the approximating planar patch (in kilometres).""" return self.width_m / _KM_TO_M @property def centroid(self) -> np.ndarray: - """ - - Returns - ------- - np.ndarray - The centroid of the point source (which is just the - point's coordinates). - """ + """np.ndarray: The centroid of the point source (which is just the point's coordinates).""" return self.coordinates def fault_coordinates_to_wgs_depth_coordinates( @@ -145,7 +116,6 @@ def fault_coordinates_to_wgs_depth_coordinates( coordinates. Because this is a point-source, the global coordinates are just the location of the point source. """ - return self.coordinates def wgs_depth_coordinates_to_fault_coordinates( @@ -225,105 +195,52 @@ def from_corners(corners: np.ndarray) -> "Plane": @property def corners(self) -> np.ndarray: - """ - Returns - ------- - np.ndarray - The corners of the fault plane in (lat, lon, depth) format. The - corners are the same as in corners_nztm. - """ + """np.ndarray: The corners of the fault plane in (lat, lon, depth) format. The corners are the same as in corners_nztm.""" return coordinates.nztm_to_wgs_depth(self.bounds) @property def length_m(self) -> float: - """ - Returns - ------- - float - The length of the fault plane (in metres). - """ + """float: The length of the fault plane (in metres).""" return np.linalg.norm(self.bounds[1] - self.bounds[0]) @property def width_m(self) -> float: - """ - Returns - ------- - float - The width of the fault plane (in metres). - """ + """float: The width of the fault plane (in metres).""" return np.linalg.norm(self.bounds[-1] - self.bounds[0]) @property def bottom_m(self) -> float: - """ - Returns - ------- - float - The bottom depth (in metres). - """ + """float: The bottom depth (in metres).""" return self.bounds[-1, -1] @property def top_m(self) -> float: - """ - Returns - ------- - float - The top depth of the fault. - """ + """float: The top depth of the fault.""" return self.bounds[0, -1] @property def width(self) -> float: - """ - Returns - ------- - float - The width of the fault plane (in kilometres). - """ + """float: The width of the fault plane (in kilometres).""" return self.width_m / _KM_TO_M @property def length(self) -> float: - """ - Returns - ------- - float - The length of the fault plane (in kilometres). - """ + """float: The length of the fault plane (in kilometres).""" return self.length_m / _KM_TO_M @property def projected_width_m(self) -> float: - """ - Returns - ------- - float - The projected width of the fault plane (in metres). - """ + """float: The projected width of the fault plane (in metres).""" return self.length_m * np.cos(np.radians(self.dip)) @property def projected_width(self) -> float: - """ - Returns - ------- - float - The projected width of the fault plane (in kilometres). - """ + """float: The projected width of the fault plane (in kilometres).""" return self.projected_width_m / _KM_TO_M @property def strike(self) -> float: - """ - Returns - ------- - float - The bearing of the strike direction of the fault - (from north; in degrees) - """ - + """float: The bearing of the strike direction of the fault (from north; in degrees).""" north_direction = np.array([1, 0, 0]) up_direction = np.array([0, 0, 1]) strike_direction = self.bounds[1] - self.bounds[0] @@ -333,12 +250,7 @@ def strike(self) -> float: @property def dip_dir(self) -> float: - """ - Returns - ------- - float - The bearing of the dip direction (from north; in degrees). - """ + """float: The bearing of the dip direction (from north; in degrees).""" if np.isclose(self.dip, 90): return 0 # TODO: Is this right for this case? north_direction = np.array([1, 0, 0]) @@ -351,12 +263,7 @@ def dip_dir(self) -> float: @property def dip(self) -> float: - """ - Returns - ------- - float - The dip angle of the fault. - """ + """float: The dip angle of the fault.""" return np.degrees(np.arcsin(np.abs(self.bottom_m - self.top_m) / self.width_m)) @staticmethod @@ -369,7 +276,7 @@ def from_centroid_strike_dip( length: float, width: float, ) -> "Plane": - """Create a fault plane from the centroid, strike, dip_dir, top, bottom, length, and width + """Create a fault plane from the centroid, strike, dip_dir, top, bottom, length, and width. This is used for older descriptions of sources. Internally converts everything to corners so self.strike ~ strike (but @@ -378,7 +285,7 @@ def from_centroid_strike_dip( Parameters ---------- centroid : np.ndarray - The centre of the fault plane in lat, lon coordinate.s + The centre of the fault plane in lat, lon coordinates. strike : float The strike of the fault (in degrees). dip_dir : Optional[float] @@ -410,21 +317,6 @@ def from_centroid_strike_dip( ) return Plane(coordinates.wgs_depth_to_nztm(corners)) - @property - def centroid(self) -> np.ndarray: - return self.fault_coordinates_to_wgs_depth_coordinates(np.array([1 / 2, 1 / 2])) - - @property - def centroid(self) -> np.ndarray: - """ - - Returns - ------- - np.ndarray - The centre of the fault plane. - """ - return self.fault_coordinates_to_wgs_depth_coordinates(np.array([1 / 2, 1 / 2])) - def fault_coordinates_to_wgs_depth_coordinates( self, plane_coordinates: np.ndarray ) -> np.ndarray: @@ -522,9 +414,7 @@ def wgs_depth_coordinates_in_plane(self, global_coordinates: np.ndarray) -> bool """ try: - plane_coordinates = self.wgs_depth_coordinates_to_fault_coordinates( - global_coordinates - ) + self.wgs_depth_coordinates_to_fault_coordinates(global_coordinates) return True except ValueError: return False @@ -592,25 +482,12 @@ def area(self) -> float: @property def lengths(self) -> np.ndarray: - """The lengths of each plane in the fault. - - Returns - ------- - np.ndarray - A numpy array of each plane length (in km). - """ + """np.ndarray: A numpy array of each plane length (in km).""" return np.array([fault.length for fault in self.planes]) @property def length(self) -> float: - """The length of the fault. - - Returns - ------- - float - The total length of each fault plane. - """ - + """float: The total length of each fault plane.""" return self.lengths.sum() @property @@ -627,47 +504,22 @@ def width(self) -> float: @property def dip_dir(self) -> float: - """The dip direction of the fault. - - Returns - ------- - float - The dip direction of the first fault plane (A fault is - assumed to have planes of constant dip direction). - """ + """float: The dip direction of the first fault plane (A fault is assumed to have planes of constant dip direction).""" return self.planes[0].dip_dir @property def corners(self) -> np.ndarray: - """Get all corners of a fault. - - Returns - ------- - np.ndarray of shape (4n x 3) - The corners in (lat, lon, depth) format of each fault plane in the - fault, stacked vertically. - """ + """np.ndarray of shape (4n x 3): The corners in (lat, lon, depth) format of each fault plane in the fault, stacked vertically.""" return np.vstack([plane.corners for plane in self.planes]) @property def bounds(self) -> np.ndarray: - """Get all corners of a fault. - - Returns - ------- - np.ndarray of shape (4n x 3) - The corners in NZTM format of each fault plane in the fault, stacked vertically. - """ + """np.ndarray of shape (4n x 3): The corners in NZTM format of each fault plane in the fault, stacked vertically.""" return np.vstack([plane.bounds for plane in self.planes]) @property def centroid(self) -> np.ndarray: - """ - Returns - ------- - np.ndarray - The centre of the fault. - """ + """np.ndarray: The centre of the fault.""" return self.fault_coordinates_to_wgs_depth_coordinates(np.array([1 / 2, 1 / 2])) def wgs_depth_coordinates_to_fault_coordinates(