<img align="left" src = https://project.lsst.org/sites/default/files/Rubin-O-Logo_0.png width=170 style="padding: 10px"> 
<br><b>Little Demo: Data Sonification</b> <br>
Contact author: Andrés A. Plazas Malagón <br>
Last verified to run: 2024-07-31 <br>
LSST Science Pipelines version: Weekly 2024_16 <br>
Container size: small <be>

## Introduction

This notebook demonstrates the use of the software [`Astronify`](https://astronify.readthedocs.io/en/latest/astronify/index.html) on the Rubin Science Platform (RSP) for sonifying astronomical data. `Astronify` is a toolkit developed by members of the Space Telescope Science Institute and designed to convert data series into sound by mapping one column to time and another to pitch. Usually, the observation time is scaled to listening time and flux is mapped to pitch, which is related to the sound frequency. Other software for sonification in astronomy, such as the one developed by the [`Rubin Rhapsodies` project](https://github.com/RileyWClarke/RubinRhapsodies/tree/main), map additional features such as flux error to volume (related to amplitude) and filter band to timbre.

The `Astronify` package is under active development, with plans to expand its sonification capabilities further.

Currently, there are issues with installing `Astronify` directly on the RSP due to dependencies like [`pyo`](https://github.com/belangeo/pyo) and [`libsndfile`](https://github.com/libsndfile/libsndfile). However, it is possible to work around this limitation by directly copying the necessary classes and functions from `Astronify`. 

This demo will feature a Kepler light curve, as demonstrated in the `Astronify` documentation, and a light curve from the Data Preview 0.2 (DP0.2) data, as shown in the `07a_DiaObject_samples` tutorial.

### 1.1 Import Packages

The [`lightkurve`](https://docs.lightkurve.org/about/install.html) package can be locally installed by opening a terminal in the RSP and typing:

- `python3 -m pip install lightkurve`

The `astropy`, `warnings`, and `inspect` imports are required by the `Astronify` classes and functions used in this notebook and defined below in section 1.2. 

The `numpy` and `wave` imports are used to replace the `write()` method of `Astronify`'s `SoniSeries` class, which originally uses `pyo`, and write a `WAV` file with the audio to disk.  

The `IPython` package is used to read and reproduce the `WAV` audio file.

In [None]:
from astropy.utils.exceptions import AstropyWarning
import lightkurve
from astropy.table import Table, MaskedColumn
from astropy.time import Time
from astropy.visualization import (SqrtStretch, LogStretch, AsinhStretch, SinhStretch,
                                   LinearStretch, MinMaxInterval, ManualInterval,
                                   AsymmetricPercentileInterval)
import numpy as np
import warnings
from inspect import signature, Parameter
import wave
import IPython

from lsst.rsp import get_tap_service

### 1.2 Define functions and parameters

Import utilities from `Astronify` files [exceptions.py](https://github.com/spacetelescope/astronify/blob/main/astronify/utils/exceptions.py) and [pitch_mapping.py](https://github.com/spacetelescope/astronify/blob/main/astronify/utils/pitch_mapping.py) from the `Astronify` [repository](https://github.com/spacetelescope/astronify). Click on each cell to see the hidden code in the following two cells.

In [None]:
class InvalidInputError(Exception):
    """
    Exception to be issued when user input is incorrect in a 
    way that prevents the function from running.
    """
    pass


class InputWarning(AstropyWarning):
    """
    Warning to be issued when user input is incorrect in
    some way but doesn't prevent the function from running.
    """
    pass

In [None]:
def data_to_pitch(data_array, pitch_range=[100, 10000], center_pitch=440, zero_point="median",
                  stretch='linear', minmax_percent=None, minmax_value=None, invert=False):
    """
    Map data array to audible pitches in the given range, and apply stretch and scaling
    as required.

    Parameters
    ----------
    data_array : array-like
        Data to map to pitch values. Individual data values should be floats.
    pitch_range : array
        Optional, default [100,10000]. Range of acceptable pitches in Hz.
    center_pitch : float
        Optional, default 440. The pitch in Hz where that the the zero point of the
        data will be mapped to.
    zero_point : str or float
        Optional, default "median". The data value that will be mapped to the center
        pitch. Options are mean, median, or a specified data value (float).
    stretch : str
        Optional, default 'linear'. The stretch to apply to the data array.
        Valid values are: asinh, sinh, sqrt, log, linear
    minmax_percent : array
        Optional. Interval based on a keeping a specified fraction of data values
        (can be asymmetric) when scaling the data. The format is [lower percentile,
        upper percentile], where data values below the lower percentile and above the upper
        percentile are clipped. Only one of minmax_percent and minmax_value should be specified.
    minmax_value : array
        Optional. Interval based on user-specified data values when scaling the data array.
        The format is [min value, max value], where data values below the min value and above
        the max value are clipped.
        Only one of minmax_percent and minmax_value should be specified.
    invert : bool
        Optional, default False.  If True the pitch array is inverted (low pitches become high
        and vice versa).

    Returns
    -------
    response : array
        The normalized data array, with values in given pitch range.
    """
    # Parsing the zero point
    if zero_point in ("med", "median"):
        zero_point = np.median(data_array)
    if zero_point in ("ave", "mean", "average"):
        zero_point = np.mean(data_array)

    # The center pitch cannot be >= max() pitch range, or <= min() of pitch range.
    # If it is, fall back to using the mean of the pitch range provided.
    if center_pitch <= pitch_range[0] or center_pitch >= pitch_range[1]:
        warnings.warn("Given center pitch is outside the pitch range, defaulting to the mean.",
                      InputWarning)
        center_pitch = np.mean(pitch_range)

    if (data_array == zero_point).all():  # All values are the same, no more calculation needed
        return np.full(len(data_array), center_pitch)

    # Normalizing the data_array and adding the zero point (so it can go through the same transform)
    data_array = np.append(np.array(data_array), zero_point)

    # Setting up the transform with the stretch
    if stretch == 'asinh':
        transform = AsinhStretch()
    elif stretch == 'sinh':
        transform = SinhStretch()
    elif stretch == 'sqrt':
        transform = SqrtStretch()
    elif stretch == 'log':
        transform = LogStretch()
    elif stretch == 'linear':
        transform = LinearStretch()
    else:
        raise InvalidInputError("Stretch {} is not supported!".format(stretch))

    # Adding the scaling to the transform
    if minmax_percent is not None:
        transform += AsymmetricPercentileInterval(*minmax_percent)

        if minmax_value is not None:
            warnings.warn("Both minmax_percent and minmax_value are set, minmax_value will be ignored.",
                          InputWarning)
    elif minmax_value is not None:
        transform += ManualInterval(*minmax_value)
    else:  # Default, scale the entire image range to [0,1]
        transform += MinMaxInterval()

    # Performing the transform and then putting it into the pich range
    pitch_array = transform(data_array)

    if invert:
        pitch_array = 1 - pitch_array

    zero_point = pitch_array[-1]
    pitch_array = pitch_array[:-1]

    # In rare cases, the zero-point at this stage might be 0.0.
    # One example is an input array of two values where the median() is the same as the
    # lowest of the two values. In this case, the zero-point is 0.0 and will lead to error
    # (divide by zero). Change to small value to avoid dividing by zero (in reality the choice
    # of zero-point calculation by the user was probably poor, but not in purview to mandate or
    # change user's choice here.  May want to consider providing info back to the user about the
    # distribution of pitches actually used based on their sonification options in some way.
    if zero_point == 0.0:
        zero_point = 1E-6

    if ((1/zero_point)*(center_pitch - pitch_range[0]) + pitch_range[0]) <= pitch_range[1]:
        pitch_array = (pitch_array/zero_point)*(center_pitch - pitch_range[0]) + pitch_range[0]
    else:
        pitch_array = (((pitch_array-zero_point)/(1-zero_point))*(pitch_range[1] - center_pitch) +
                       center_pitch)

    return pitch_array

Import classes from `Astronify` file [series.py](https://github.com/spacetelescope/astronify/blob/main/astronify/series/series.py) from the `Astronify` [repository](https://github.com/spacetelescope/astronify). The `write` method in the `SoniSeries` class has been replaced with a version that utilizes `numpy` and `wave` instead of `pyo`. Additionally, the `self._init_pyo()` method in the `SoniSeries` constructor is commented out to avoid dependency issues. Click on each cell to see the hidden code in the following two cells.

In [None]:
class PitchMap():

    def __init__(self, pitch_func=data_to_pitch, **pitch_args):
        """
        Class that encapsulates the data value to pitch function 
        and associated arguments.

        Parameters
        ----------
        pitch_func : function
            Optional. Defaults to `~astronify.utils.data_to_pitch`.
            If supplying a function it should take a data array as the first
            parameter, and all other parameters should be optional.
        **pitch_args 
            Default parameters and values for the pitch function. Should include
            all necessary arguments other than the data values.
        """

        # Setting up the default arguments
        if (not pitch_args) and (pitch_func == data_to_pitch):
            pitch_args = {"pitch_range": [100, 10000],
                          "center_pitch": 440,
                          "zero_point": "median",
                          "stretch": "linear"}
        
        self.pitch_map_func = pitch_func
        self.pitch_map_args = pitch_args

        
    def _check_func_args(self):
        """
        Make sure the pitch mapping function and argument dictionary match.

        Note: This function does not check the the function gets all the required arguments.
        """
        # Only test if both pitch func and args are set
        if hasattr(self, "pitch_map_func") and hasattr(self, "pitch_map_args"):

            # Only check parameters if there is no kwargs argument
            param_types = [x.kind for x in signature(self.pitch_map_func).parameters.values()]
            if Parameter.VAR_KEYWORD not in param_types:
                for arg_name in list(self.pitch_map_args):
                    if arg_name not in signature(self.pitch_map_func).parameters:
                        wstr = "{} is not accepted by the pitch mapping function and will be ignored".format(arg_name)
                        warnings.warn(wstr, InputWarning)
                        del self.pitch_map_args[arg_name]

    def __call__(self, data):
        """
        Where does this show up?
        """
        self._check_func_args()
        return self.pitch_map_func(data, **self.pitch_map_args)

    @property
    def pitch_map_func(self):
        """
        The pitch mapping function. 
        """
        return self._pitch_map_func

    @pitch_map_func.setter
    def pitch_map_func(self, new_func):
        assert callable(new_func), "Pitch mapping function must be a function."
        self._pitch_map_func = new_func
        self._check_func_args()

    @property
    def pitch_map_args(self):
        """
        Dictionary of additional arguments (other than the data array)
        for the pitch mapping function.
        """
        return self._pitch_map_args

    @pitch_map_args.setter
    def pitch_map_args(self, new_args):
        assert isinstance(new_args, dict), "Pitch mapping function args must be in a dictionary."
        self._pitch_map_args = new_args
        self._check_func_args()

             



In [None]:
class SoniSeries():

    def __init__(self, data, time_col="time", val_col="flux"):
        """
        Class that encapsulates a sonified data series.

        Parameters
        ----------
        data : `astropy.table.Table`
            The table of data to be sonified.
        time_col : str
            Optional, default "time". The data column to be mapped to time.
        val_col : str
            Optional, default "flux". The data column to be mapped to pitch.
        """
        self.time_col = time_col
        self.val_col = val_col
        self.data = data

        # Default specs
        self.note_duration = 0.5  # note duration in seconds
        self.note_spacing = 0.01  # spacing between notes in seconds
        self.gain = 0.05  # default gain in the generated sine wave. pyo multiplier, -1 to 1.
        self.pitch_mapper = PitchMap(data_to_pitch)

        # self._init_pyo()

    def _init_pyo(self):
        self.server = pyo.Server()
        self.streams = None

    @property
    def data(self):
        """ The data table (~astropy.table.Table). """
        return self._data

    @data.setter
    def data(self, data_table):

        if not isinstance(data_table, Table):
            raise TypeError('Data must be an astropy.table.Table object.')

        for c in list(data_table.columns):
            data_table.rename_column(c, c.lower())


        if self.time_col not in data_table.columns:
            raise AttributeError(f"Input Table must contain time column '{self.time_col}'")

        if self.val_col not in data_table.columns:
            raise AttributeError(f"Input Table must contain a value column '{self.val_col}'")

        # Removing any masked values as they interfere with the sonification
        if isinstance(data_table[self.val_col], MaskedColumn):
            data_table = data_table[~data_table[self.val_col].mask]
        if isinstance(data_table[self.time_col], MaskedColumn):
            data_table = data_table[~data_table[self.time_col].mask]

        # Removing any nans as they interfere with the sonification
        data_table = data_table[~np.isnan(data_table[self.val_col])]

        # making sure we have a float column for time
        if isinstance(data_table[self.time_col], Time):
            float_col = "asf_time"
            data_table[float_col] = data_table[self.time_col].jd
            self.time_col = float_col
            
        self._data = data_table

    @property
    def time_col(self):
        """ The data column mappend to time when sonifying. """
        return self._time_col

    @time_col.setter
    def time_col(self, value):
        assert isinstance(value, str), 'Time column name must be a string.'
        self._time_col = value

    @property
    def val_col(self):
        """ The data column mappend to putch when sonifying. """
        return self._val_col

    @val_col.setter
    def val_col(self, value):
        assert isinstance(value, str), 'Value column name must be a string.'
        self._val_col = value

    @property
    def pitch_mapper(self):
        """ The pitch mapping object that takes data values to pitch values (Hz). """
        return self._pitch_mapper

    @pitch_mapper.setter
    def pitch_mapper(self, value):
        self._pitch_mapper = value

    @property
    def gain(self):
        """ Adjustable gain for output. """
        return self._gain

    @gain.setter
    def gain(self, value):
        self._gain = value

    @property
    def note_duration(self):
        """ How long each individual note will be in seconds."""
        return self._note_duration

    @note_duration.setter
    def note_duration(self, value):
        # Add in min value check
        self._note_duration = value

    @property
    def note_spacing(self):
        """ The spacing of the notes on average (will adjust based on time) in seconds. """
        return self._note_spacing

    @note_spacing.setter
    def note_spacing(self, value):
        # Add in min value check
        self._note_spacing = value
        
    def sonify(self):
        """
        Perform the sonification, two columns will be added to the data table: asf_pitch, and asf_onsets. 
        The asf_pitch column will contain the sonified data in Hz.
        The asf_onsets column will contain the start time for each note in seconds from the first note.
        Metadata will also be added to the table giving information about the duration and spacing 
        of the sonified pitches, as well as an adjustable gain.
        """
        data = self.data
        exptime = np.median(np.diff(data[self.time_col]))

        data.meta["asf_exposure_time"] = exptime
        data.meta["asf_note_duration"] = self.note_duration
        data.meta["asf_spacing"] = self.note_spacing
        
        data["asf_pitch"] = self.pitch_mapper(data[self.val_col])
        data["asf_onsets"] = [x for x in (data[self.time_col] - data[self.time_col][0])/exptime*self.note_spacing]

    def play(self):
        """
        Play the data sonification.
        """

        # Making sure we have a clean server
        if self.server.getIsBooted():
            self.server.shutdown()

        self.server.boot()
        self.server.start()

        # Getting data ready
        duration = self.data.meta["asf_note_duration"]
        pitches = np.repeat(self.data["asf_pitch"], 2)
        delays = np.repeat(self.data["asf_onsets"], 2)

        # TODO: This doesn't seem like the best way to do this, but I don't know
        # how to make it better
        env = pyo.Linseg(list=[(0, 0), (0.01, 1), (duration - 0.1, 1),
                               (duration - 0.05, 0.5), (duration - 0.005, 0)],
                         mul=[self.gain for i in range(len(pitches))]).play(
                             delay=list(delays), dur=duration)

        self.streams = pyo.Sine(list(pitches), 0, env).out(delay=list(delays),
                                                           dur=duration)

    def stop(self):
        """
        Stop playing the data sonification.
        """
        self.streams.stop() 

    def write(self, filepath):
        """
        Save data sonification to the given file in WAV format.
    
        Parameters
        ----------
        filepath : str
            The path to the output file.
        """
        # Sampling parameters
        sample_rate = 44100  # Standard sample rate in Hz
        max_amplitude = 32767  # Max amplitude for 16-bit audio
        amplitude = 0.5  # Amplitude scaling factor
    
        # Prepare the data for writing
        duration = self.data.meta["asf_note_duration"]
        pitches = self.data["asf_pitch"]
        onsets = self.data["asf_onsets"]

        # Calculate the total number of samples
        total_duration = onsets[-1] + duration
        total_samples = int(sample_rate * total_duration)
    
        # Create an array to hold the audio data
        audio_data = np.zeros(total_samples)
    
        for pitch, onset in zip(pitches, onsets):
            if pitch <= 0:
                continue  # Skip silence or invalid pitch values
    
            start_sample = int(sample_rate * onset)
            end_sample = start_sample + int(sample_rate * duration)
            t = np.linspace(0, duration, end_sample - start_sample, endpoint=False)
            sine_wave = amplitude * np.sin(2 * np.pi * pitch * t)
    
            # Apply a simple fade-in and fade-out envelope
            fade_length = int(sample_rate * 0.01)  # 10 ms fade
            envelope = np.ones_like(sine_wave)
            envelope[:fade_length] = np.linspace(0, 1, fade_length)
            envelope[-fade_length:] = np.linspace(1, 0, fade_length)
            sine_wave *= envelope
            
            # Ensure the generated samples fit within the audio_data array
            audio_data[start_sample:end_sample] += sine_wave
    
        # Normalize the audio data to prevent clipping
        audio_data = (audio_data / np.max(np.abs(audio_data))) * max_amplitude
        audio_data = audio_data.astype(np.int16)  # Convert to 16-bit PCM format
    
        # Write to WAV file
        with wave.open(filepath, 'w') as wf:
            wf.setnchannels(1)  # Mono sound
            wf.setsampwidth(2)  # 2 bytes per sample (16 bits)
            wf.setframerate(sample_rate)
            wf.writeframes(audio_data.tobytes())

## 2. Sonification of a Kepler lightcurve

Reproduce one example from the `Astronify` documentation.

The following query is modified slightly from what is shown in the [documentation](https://astronify.readthedocs.io/en/latest/astronify/index.html) to avoid deprecation messages.

In [None]:
kep12b_lc = lightkurve.search_lightcurve("KIC 11804465", cadence="long", quarter=1).download_all()[0].to_table()


Print the table to see its format. 

In [None]:
print (kep12b_lc)

By default, `Soniseries` looks for columns labeled "time" and "flux", but this can be changed in the argument of the class (see the [documentation](https://astronify.readthedocs.io/en/latest/astronify/index.html)).

In [None]:
kep12b_obj = SoniSeries(kep12b_lc)

Sonify the data. See the `Astronify` [documentation](https://astronify.readthedocs.io/en/latest/astronify/index.html) for more options and a description of the algorithm used.

The warning can be ignored.

In [None]:
kep12b_obj.sonify() 

The `SoniSeries` class has a `play()` method that is used to play the sonified data. This method cannot be used on the RSP, as the platform does not support direct sound playback.

Instead, the sonified data can be written to a file in the RSP (such as a `WAV` file) and played in the notebook using `IPython`.

Use the redifined `write` function to produce an audiofile in `WAV` format.

In [None]:
kep12b_obj.write("./kepler12_sonification_demo.wav")

Play the file with `IPython`.

In [None]:
IPython.display.Audio(filename="./kepler12_sonification_demo.wav")

## 3. Sonification of a DP0.2 light curve.

Use data from DP0.2 tutorial `07a_DiaObject_samples`, section 4.2.2 "Light curve for epochs near SNR>5 detections", to sonify the light curve of `DiaObject` with ID `1250953961339360185`.

In [None]:
DiaObjID = 1250953961339360185
print(DiaObjID)

Use the time and difference-image point source (PS) flux data from tutorial `07a_DiaObject_samples`. Use the `DiaSrc['midPointTai']` and `DiaSrc['psFlux']` data in the `r` band from section 4.2.2 of that tutorial.  Time is reported in the DiaSource table as `midPointTai`, which is in the SI unit of "TAI" (<a href="https://en.wikipedia.org/wiki/International_Atomic_Time">International Atomic Time</a>), and is presented in days (in particular, as "<a href="https://en.wikipedia.org/wiki/Julian_day">Modified Julian Days</a>").

The query is a simplified version of the query in that section of the tutorial, retrieving only the quantities needed for this demo.

In [None]:
service = get_tap_service("tap")

In [None]:
results = service.search("SELECT psFlux, "
                         "filterName, midPointTai "
                         "FROM dp02_dc2_catalogs.DiaSource "
                         "WHERE diaObjectId = "+str(DiaObjID))
DiaSrc = results.to_table()
del results

Get the time and flux in the `r` band.

In [None]:
filt = 'r'
fx = np.where(DiaSrc['filterName'] == filt)[0]
time_, flux_ = DiaSrc['midPointTai'][fx], DiaSrc['psFlux'][fx]

`SoniSeries` expect the data to be sorted by time.

In [None]:
index = np.argsort(time_)
time = time_[index]
flux = flux_[index]

Create an `astropy` Table to instantiate the `SoniSeries` object, and call the `sonify` mehod. 

In [None]:
data_table = Table({"time": time,
                    "flux": flux})

data_soni = SoniSeries(data_table)
data_soni.sonify()

Write the data into a `WAV` file.

In [None]:
data_soni.write("./dia_object_sonification_demo.wav")

Reproduce the audio file. The sounds are sparsers compared to the more constant sounds from the Kepler light curve from section 2, which has a higher cadence. 

In [None]:
IPython.display.Audio(filename="./dia_object_sonification_demo.wav")