Skip to content

Commit

Permalink
Implement interpolator reader which takes only geomagnetic component …
Browse files Browse the repository at this point in the history
…and rotates it over starshape
  • Loading branch information
MijnheerD committed May 22, 2024
1 parent c1d2ebd commit 21c7b9d
Showing 1 changed file with 128 additions and 0 deletions.
128 changes: 128 additions & 0 deletions NuRadioReco/modules/io/coreas/readCoREASInterpolator.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,3 +352,131 @@ def position_contained_in_starshape(channel_positions: np.ndarray, starhape_posi
channel_positions[:, :-1], axis=-1) <= star_radius
return contained


class readCoREASComponentInterpolator(readCoREASInterpolator):
@staticmethod
def signal_to_geo_ce(ant_position, signal):
"""
Decouple the signal in the showerplane coordinate system in the
geomagnetic and charge-excess components.
Parameters
----------
signal : ndarray
The signal traces, shaped as (timesamples, 3)
ant_position : list[float]
The antenna position (in the showerplane)
"""
assert signal.shape[1] == 3, "Please provide signal traces as (timesamples, 3)"

x, y = ant_position[:2]
e_geo = signal[:, 1] * x / y - signal[:, 0]
e_ce = -signal[:, 1] * np.sqrt(x ** 2 + y ** 2) / y

return e_geo, e_ce

@staticmethod
def geo_ce_to_signal(ant_position, geo, ce):
"""
Reform geomagnetic and charge-excess components back into
a 3D electric field signal.
Parameters
----------
ant_position : list[float]
The antenna position (in the showerplane)
geo : ndarray
The geomagnetic trace
ce : ndarray
The charge-excess trace
"""
assert len(geo) == len(ce), "Geomagnetic and charge-excess traces do not have same length"

x, y = ant_position[:2]

signal = np.zeros((len(geo), 3))

signal[:, 0] = - geo - x / np.sqrt(x ** 2 + y ** 2) * ce
signal[:, 1] = - y / np.sqrt(x ** 2 + y ** 2) * ce

return signal

def _set_showerplane_positions_and_signals(self, component='geomagnetic'):
"""
Reads the positions and signals from a star-shape CoREAS simulation,
then takes only the specified component of the eletric field and
converts both to the correct coordinate system and units.
"""
assert self.corsika is not None and self.cs is not None

starpos_vvB = []
signal_dict = {}
trace_start_times = []

for observer in self.corsika['CoREAS/observers'].values():
position_coreas = observer.attrs['position']
position_nr = np.array(
[-position_coreas[1], position_coreas[0], 0]) * units.cm
position_vvB = self.cs.transform_to_vxB_vxvxB(
self.cs.transform_from_magnetic_to_geographic(
position_nr
)
)
starpos_vvB.append(position_vvB)

# Only consider antenna if on positive vxvxB axis
if np.abs(position_vvB[0]) < 20 * units.cm and position_vvB[0] > 0:
nrr_observer = coreas.observer_to_si_geomagnetic(observer)
trace_start_times.append(nrr_observer[0, 0])
signal = self.cs.transform_from_magnetic_to_geographic(
nrr_observer[:, 1:].T)
signal_vvB = self.cs.transform_to_vxB_vxvxB(
signal.T
)
signal_geo, signal_ce = self.signal_to_geo_ce(position_vvB, signal_vvB)

if component == 'geomagnetic':
signal = self.geo_ce_to_signal(position_vvB, signal_geo, np.zeros_like(signal_ce))

elif component == 'charge':
signal = self.geo_ce_to_signal(position_vvB, np.zeros_like(signal_geo), signal_ce)

signal = self.cs.transform_from_vxB_vxvxB(signal)
signal = self.cs.transform_from_ground_to_onsky(signal.T)

if self.kind == "fluence":
filter_response = bandpass_filter.get_filter_response(
np.fft.rfftfreq(signal.shape[1], d=self.sampling_period / units.s) * units.Hz,
[self.lowfreq, self.highfreq], 'rectangular', order=0
)
sampling_rate = 1 / self.sampling_period
signal_fft = fft.time2freq(signal, sampling_rate)
signal_fft *= filter_response # filter the signal
signal = fft.freq2time(signal_fft, sampling_rate, signal.shape[1])
signal = np.sum(np.square(signal[1:])) # get the fluence of vxB and vxvB only

signal_dict[np.around((position_vvB[0] ** 2 + position_vvB[1] ** 2) ** 0.5, 1)] = signal.T

logger.debug(
f"parsed starshape detector at position {position_nr}"
)

starpos_vvB = np.array(starpos_vvB)

dd = (starpos_vvB[:, 0] ** 2 + starpos_vvB[:, 1] ** 2) ** 0.5
logger.info(f"assumed star shape from: {-dd.max()} - {dd.max()}")

# Make signal array containing the signals for all the positions,
# by copying over the data from the vxvxB arm
signals = []
trace_start = []
signals_per_distance = np.array(list(signal_dict.values()))
for pos in starpos_vvB:
distance_to_shower_axis = np.around((pos[0] ** 2 + pos[1] ** 2) ** 0.5, 1)
distance_index = np.argmin(np.abs(np.array(list(signal_dict.keys())) - distance_to_shower_axis))
signals.append(signals_per_distance[distance_index])
trace_start.append(trace_start_times[distance_index])

self.starshape_showerplane = starpos_vvB
self.signals = np.array(signals)
self.trace_start = np.array(trace_start)

0 comments on commit 21c7b9d

Please sign in to comment.