Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bispectra #267

Merged
merged 4 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions py_neuromodulation/nm_bispectra.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
from typing import Iterable
import numpy as np
from pybispectra import compute_fft, get_example_data_paths, WaveShape

from py_neuromodulation import nm_features_abc

class Bispectra(nm_features_abc.Feature):

def __init__(self, settings: dict, ch_names: Iterable[str], sfreq: int | float) -> None:
super().__init__(settings, ch_names, sfreq)
self.sfreq = sfreq
self.ch_names = ch_names
self.s = settings
self.f1s = settings["bispectrum"]["f1s"]
self.f2s = settings["bispectrum"]["f2s"]

@staticmethod
def test_settings(
settings: dict,
ch_names: Iterable[str],
sfreq: int | float,
):
s = settings

def test_range(f_name, filter_range):
assert isinstance(
filter_range[0],
int,
), f"bispectrum frequency range {f_name} needs to be of type int, got {filter_range[0]}"
assert isinstance(
filter_range[1],
int,
), f"bispectrum frequency range {f_name} needs to be of type int, got {filter_range[1]}"
assert (
filter_range[1] > filter_range[0]
), f"second frequency range value needs to be higher than first one, got {filter_range}"
assert filter_range[0] < sfreq and filter_range[1] < sfreq, (
"filter frequency range has to be smaller than sfreq, "
f"got sfreq {sfreq} and filter range {filter_range}"
)

test_range("f1s", s["bispectrum"]["f1s"])
test_range("f2s", s["bispectrum"]["f2s"])

for feature_name, val in s["bispectrum"][
"components"
].items():
assert isinstance(
val, bool
), f"bispectrum component {feature_name} has to be of type bool, got {val}"

for feature_name, val in s["bispectrum"][
"bispectrum_features"
].items():
assert isinstance(
val, bool
), f"bispectrum feature {feature_name} has to be of type bool, got {val}"

assert (
f_band_bispectrum in s["frequency_ranges_hz"]
for f_band_bispectrum in s["bispectrum"]["frequency_bands"]
), (
"bispectrum selected frequency bands don't match the ones"
"specified in s['frequency_ranges_hz']"
f"bispectrum frequency bands: {s['bispectrum']['frequency_bands']}"
f"specified frequency_ranges_hz: {s['frequency_ranges_hz']}"
)

def compute_bs_features(self, spectrum_ch: np.array, features_compute: dict, ch_name: str, component: str, f_band: str) -> dict:

for bispectrum_feature in self.s["bispectrum"]["bispectrum_features"]:

if bispectrum_feature == "mean":
func = np.nanmean
if bispectrum_feature == "sum":
func = np.nansum
if bispectrum_feature == "var":
func = np.nanvar

if f_band is not None:
str_feature = "_".join([ch_name, "Bispectrum", component, bispectrum_feature, f_band])
else:
str_feature = "_".join([ch_name, "Bispectrum", component, bispectrum_feature, "whole_fband_range"])

features_compute[str_feature] = func(spectrum_ch)

return features_compute

def calc_feature(self, data: np.array, features_compute: dict) -> dict:
for ch_idx, ch_name in enumerate(self.ch_names):
fft_coeffs, freqs = compute_fft(
data=np.expand_dims(data[ch_idx, :], axis=(0,1)),
sampling_freq=self.sfreq,
n_points=data.shape[1],
verbose=False,
)

f_spectrum_range = freqs[np.logical_and(freqs >= np.min([self.f1s, self.f2s]), freqs <= np.max([self.f1s, self.f2s]))]

waveshape = WaveShape(
data=fft_coeffs,
freqs=freqs.astype(int),
sampling_freq=self.sfreq,
verbose=False,
)

waveshape.compute(f1s=tuple(self.f1s), f2s=tuple(self.f2s))

bispectrum = np.squeeze(waveshape.results._data)

for component in self.s["bispectrum"]["components"]:
if self.s["bispectrum"]["components"][component]:
if component == "real":
spectrum_ch = bispectrum.real
if component == "imag":
spectrum_ch = bispectrum.imag
if component == "absolute":
spectrum_ch = np.abs(bispectrum)
if component == "phase":
spectrum_ch = np.angle(bispectrum)

for fb in self.s["bispectrum"]["frequency_bands"]:
range_ = (f_spectrum_range >= self.s["frequency_ranges_hz"][fb][0]) \
& (f_spectrum_range <= self.s["frequency_ranges_hz"][fb][1])
#waveshape.results.plot()
data_bs = spectrum_ch[range_, range_]

features_compute = self.compute_bs_features(data_bs, features_compute, ch_name, component, fb)

if self.s["bispectrum"]["compute_features_for_whole_fband_range"]:
features_compute = self.compute_bs_features(spectrum_ch, features_compute, ch_name, component, None)

return features_compute
3 changes: 3 additions & 0 deletions py_neuromodulation/nm_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
nm_oscillatory,
nm_bursts,
nm_linelength,
nm_bispectra
)


Expand Down Expand Up @@ -69,6 +70,8 @@ def __init__(
FeatureClass = nm_linelength.LineLength
case "mne_connectivity":
FeatureClass = nm_mne_connectivity.MNEConnectivity
case "bispectrum":
FeatureClass = nm_bispectra.Bispectra
case _:
raise ValueError(f"Unknown feature found. Got: {feature}.")

Expand Down
31 changes: 30 additions & 1 deletion py_neuromodulation/nm_settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
"linelength": true,
"coherence": false,
"nolds": false,
"mne_connectivity": false
"mne_connectivity": false,
"bispectrum": false
},
"postprocessing": {
"feature_normalization": true,
Expand Down Expand Up @@ -257,5 +258,33 @@
"mne_connectiviy": {
"method": "plv",
"mode": "multitaper"
},
"bispectrum": {
"f1s": [
5,
35
],
"f2s": [
5,
35
],
"compute_features_for_whole_fband_range": true,
"frequency_bands": [
"theta",
"alpha",
"low beta",
"high beta"
],
"components": {
"absolute": true,
"real": true,
"imag": true,
"phase": true
},
"bispectrum_features": {
"mean": true,
"sum": true,
"var": true
}
}
}
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ dependencies = [
"scipy >= 1.7.1",
"seaborn >= 0.11",
"notebook",
"ipython"
"ipython",
"pybispectra"
]

[project.optional-dependencies]
Expand Down
67 changes: 67 additions & 0 deletions tests/test_bispectra.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import pytest
import mne
import numpy as np

from py_neuromodulation import (
nm_analysis,
nm_decode,
nm_define_nmchannels,
nm_IO,
nm_plots,
nm_settings,
nm_stream_offline
)

def test_bispectrum():

(
RUN_NAME,
PATH_RUN,
PATH_BIDS,
PATH_OUT,
datatype,
) = nm_IO.get_paths_example_data()

(
raw,
data,
sfreq,
line_noise,
coord_list,
coord_names,
) = nm_IO.read_BIDS_data(
PATH_RUN=PATH_RUN, BIDS_PATH=PATH_BIDS, datatype=datatype
)

ch_names = raw.ch_names[4]
ch_types = raw.get_channel_types()[4]

nm_channels = nm_define_nmchannels.set_channels(
ch_names=[ch_names],
ch_types=[ch_types],
reference="default",
bads=None,
new_names="default",
used_types=("ecog", "dbs", "seeg"),
target_keywords=("MOV_RIGHT_CLEAN",),
)

settings = nm_settings.get_default_settings()
settings = nm_settings.reset_settings(settings)

settings["features"]["bispectrum"] = True

stream = nm_stream_offline.Stream(
settings=settings,
nm_channels=nm_channels,
path_grids=None,
verbose=True,
sfreq=sfreq,
line_noise=line_noise,
coord_list=coord_list,
coord_names=coord_names,
)

features = stream.run(np.expand_dims(data[3, :], axis=0))

assert features["ECOG_RIGHT_1_Bispectrum_phase_mean_whole_fband_range"].sum() != 0
Loading