# Working with the TAPE `Ensemble` object

When working with many lightcurves, the TAPE `Ensemble` object serves as a singular interface for storing, filtering, and analyzing timeseries data. 
Let's consider an example set of lightcurves, generated as follows:

In [None]:
import numpy as np
import pandas as pd

np.random.seed(1)    

# Generate 10 astronomical objects
n_obj = 10
ids = 8000 + np.arange(n_obj)
names = ids.astype(str)
object_table = pd.DataFrame(
    {
        "id": ids, 
        "name": names,
        "ddf_bool": np.random.randint(0, 2, n_obj), # 0 if from deep drilling field, 1 otherwise
        "libid_cadence": np.random.randint(1, 130, n_obj),
    }
)

# Create 1000 lightcurves with 100 measurements each
lc_len = 100
num_points = 1000
all_bands = np.array(["r", "g", "b", "i"])
source_table = pd.DataFrame(
    {
        "id": 8000 + (np.arange(num_points) % n_obj),
        "time": np.arange(num_points),
        "flux":  np.random.random_sample(size=num_points)*10,
        "band": np.repeat(all_bands, num_points / len(all_bands)),
        "error": np.random.random_sample(size=num_points),
        "count": np.arange(num_points),
    },
)

We can load these into the `Ensemble` using `Ensemble.from_pandas()`:

In [None]:
from tape.ensemble import Ensemble

ens = Ensemble()  # initialize an ensemble object

# Read in the generated lightcurve data
ens.from_pandas(
    source_frame=source_table,
    object_frame=object_table,
    id_col="id",
    time_col="time",
    flux_col="flux",
    err_col="error",
    band_col="band",
    npartitions=1)

We now have an `Ensemble` object, and have provided it with the constructed data in the source dictionary. Within the call to `Ensemble.from_pandas`, we specified which columns of the input file mapped to timeseries quantities that the `Ensemble` needs to understand. It's important to link these arguments properly, as the `Ensemble` will use these columns when operations are requested on understood quantities. For example, if a TAPE analysis function requires the time column, from this linking the `Ensemble` will automatically supply that function with the 'time' column.

## Column Mapping with the ColumnMapper

In the above example, we manually provide the column labels within the call to `Ensemble.from_pandas`. Alternatively, the `tape.utils.ColumnMapper` class offers a means to assign the column mappings. Either manually as shown before, or even populated from a known mapping scheme.

In [None]:
from tape.utils import ColumnMapper

# columns assigned manually
col_map = ColumnMapper().assign(id_col="id",
                                time_col="time",
                                flux_col="flux",
                                err_col="error",
                                band_col="band")

# Pass the ColumnMapper along to from_pandas
ens.from_pandas(
    source_frame=source_table,
    object_frame=object_table,
    column_mapper=col_map,
    npartitions=1)

## The Object and Source Frames
The `Ensemble` maintains two dataframes under the hood, the "object dataframe" and the "source dataframe". This borrows from the Rubin Observatories object-source convention, where object denotes a given astronomical object and source is the collection of measurements of that object. Essentially, the Object frame stores one-off information about objects, and the source frame stores the available time-domain data. As a result, `Ensemble` functions that operate on the underlying dataframes need to be pointed at either object or source. In most cases, the default is the object table as it's a more helpful interface for understanding the contents of the `Ensemble`, especially when dealing with large volumes of data.

We can also access Ensemble frames individually with `Ensemble.source` and `Ensemble.object`

## Dask and "Lazy Evaluation"

Before going any further, the `Ensemble` is built on top of `Dask`, which brings with it a powerful framework for parallelization and scalability. However, there are some differences in how `Dask` code works that, if you're unfamiliar with it, is worth establishing right here at the get-go. The first is that `Dask` evaluates code "lazily". Meaning that many operations are not executed when the line of code is run, but instead are added to a scheduler to be executed when the result is actually needed. See below for an example:

In [None]:
ens.source  # We have not actually loaded any data into memory

Here we are accessing the Dask dataframe and despite running a command to read in our source data, we only see an empty dataframe with some high-level information available. To explicitly bring the data into memory, we must run a `compute()` command on the data's frame.

In [None]:
ens.source.compute()  # Compute lets dask know we're ready to bring the data into memory

With this compute, we see above that we have returned a populated dataframe (a Pandas dataframe in fact!). From this, many workflows in Dask and by extension TAPE, will look like a series of lazily evaluated commands that are chained together and then executed with a .compute() call at the end of the workflow.

Alternatively we can use `ens.persist()` to execute the chained commands without loading the result into memory. This can speed up future `compute()` calls.

Note that `Ensemble.source` and `Ensemble.object` are instances of the `tape.SourceFrame` and `tape.ObjectFrame` classes respectively. These are subclasses of Dask dataframes that provide some additional utility for tracking by the ensemble while supporting most of the Dask dataframe API.  

## Inspection, Filtering, and Selecting

The `Ensemble` contains an assortment of functions for inspecting and filtering your data.

### Inspection

These functions provide views into the contents of your `Ensemble` dataframe, especially important when dealing with large data volumes that cannot be brought into memory all at once. The first is `Ensemble.info` which provides information on the columns, data types, and memory usage of the dataframe.

In [None]:
# Inspection

ens.info(verbose=True, memory_usage=True)  # Grabs high level information about the dataframes

`Ensemble.info` shows that we have 2000 rows and the the memory they use, and it also shows the columns we've brought in with their respective data types. If you'd like to actually bring a few rows into memory to inspect, `EnsembleFrame.head` and `EnsembleFrame.tail` provide access to the first n and last n rows respectively.

In [None]:
ens.object.head(5)  # Grabs the first 5 rows of the object table

In [None]:
ens.source.tail(5)  # Grabs the last 5 rows of the source table

Additionally, when you are working with a small enough dataset, `Ensemble.compute` can be used to bring the whole dataframe into memory (as shown previously). 

In [None]:
ens.source.compute()

### Filtering

The `Ensemble` provides a general filtering function [`query`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.query.html) that mirrors a Pandas or Dask `query` command. Specifically, the function takes a string that provides an expression indicating which rows to **keep**. As with other `Ensemble` functions, an optional `table` parameter allows you to filter on either the object or the source table.

For example, the following code filters the sources to only include rows with flux values above the median. It uses `ens._flux_col` to retrieve the name of the column with that information.

In [None]:
highest_flux = ens.source[ens._flux_col].quantile(0.95).compute()
ens.source.query(f"{ens._flux_col} < {highest_flux}").compute()

Alternatively, we could use a Dask dataseries of Booleans to indicate which rows to *keep*. We can often compute these series as the result of some operation on the underlying tables:

In [None]:
# Find all of the source points with the lowest 90% of errors.
keep_rows = ens.source["error"] < ens.source["error"].quantile(0.9)
keep_rows.compute()

We also provide filtering at the `Ensemble` level, so you can pass the above series to the `Ensemble.filter_from_series` function:

In [None]:
ens.filter_from_series(keep_rows, table="source")
ens.source.compute()

Additionally, several more specific functions are available for common operations.

In [None]:
# Cleaning nans
ens.source.dropna()  # clean nans from source table
ens.object.dropna()  # clean nans from object table

# Filtering on number of observations
ens.prune(threshold=10)  # threshold is the minimum number of observations needed to retain the object

ens.info(verbose=True)

In the above operations, we remove any rows that have at least 1 NaN value present. And then filter such that only lightcurves which have at least 50 measurements are retained.

### Selecting

The `Ensemble` also provides a `select` function to filter down to a subset of columns.

In [None]:
# Add a new column so we can filter it out later.
ens.source.assign(band2=ens.source["band"] + "2").compute()

In [None]:
ens.select(["time", "flux", "error", "band"], table="source")
print("The Source table is dirty: " + str(ens.source.is_dirty()))
ens.source.compute()

# Updating an Ensemble's Frames

The `Ensemble` is a manager of `EnsembleFrame` objects (of which `Ensemble.source` and `Ensemble.object` are special cases). When performing operations on one of the tables, the results are not automatically sent to the `Ensemble`.

So while in the above examples we demonstrate several methods where we generated filtered views of the source table, note that the underlying data remained unchanged, with no changes to the rows or columns of `Ensemble.source`

In [None]:
queried_src = ens.source.query(f"{ens._flux_col} < {highest_flux}")

print(len(queried_src))
print(len(ens.source))

When modifying the views of a dataframe tracked by the `Ensemble`, we can update the `Source` or `Object` frame to use the updated view by calling

`Ensemble.update_frame(view_frame)`

Or alternately:

`view_frame.update_ensemble()`

In [None]:
# Now apply the views filter to the source frame.
queried_src.update_ensemble()

ens.source.compute()

Note that the above is still a series of lazy operations that will not be fully evaluated until an operation such as `compute`. So a call to `update_ensemble` will not yet alter or move any underlying data.

## Assignments and Column Manipulation

The ensemble object supports assignment through the Dask `assign` function. We can pass in either a callable or a series to assign to the new column. New column names are produced automatically from the argument name.

For example, if we want to compute the lower bound of an error range as the estimated flux minus twice the estimated error, we would use:

In [None]:
lower_bnd = ens.source.assign(lower_bnd=lambda x: x["flux"] - 2.0 * x["error"])
lower_bnd

## Batch Analysis

The `Ensemble` provides a powerful batching interface, `Ensemble.batch`, with in-built parallelization (provided the input data is in multiple partitions). In addition, TAPE has a suite of analysis functions on-hand for your use. Below, we show the application of `tape.analysis.calc_stetson_J` on our dataset.

In [None]:
# using tape analysis functions
from tape.analysis import calc_stetson_J

res = ens.batch(calc_stetson_J, compute=True)  # compute is set to true to execute immediately (non-lazily)
res

# Storing and Accessing Result Frames

Note for the above `batch` operation, we also printed:

`Using generated label, result_1, for a batch result.`

In addition to the source and object frames, the `Ensemble` may track other frames as well, accessed by either generated or user-provided labels.

We can access a saved frame with `Ensemble.select_frame(label)`

In [None]:
ens.select_frame("result_1").compute()

`Ensemble.batch` has an optional `label` argument that will store the result with a user-provided label.

In [None]:
res = ens.batch(calc_stetson_J, compute=True, label="stetson_j")

ens.select_frame("stetson_j").compute()

Likewise we can rename a frame with with a new label, and drop the original frame.

In [None]:
ens.add_frame(ens.select_frame("stetson_j"), "stetson_j_result_1") # Add result under new label
ens.drop_frame("stetson_j") # Drop original label

ens.select_frame("stetson_j_result_1").compute()

We can also add our own frames with `Ensemble.add_frame(frame, label)`. For instance, we can copy this result and add it to a new frame for the `Ensemble` to track as well.

In [None]:
ens.add_frame(res.copy(), "new_res")
ens.select_frame("new_res")

Finally we can also drop frames we are no longer interested in having the `Ensemble` track.

In [None]:
ens.drop_frame("result_1")

try:
    ens.select_frame("result_1") # This should result in a KeyError since the frame has been dropped.
except Exception as e:
    print("As expected, the frame 'result_1 was dropped.\n" + str(e))

# Keeping the Object and Source Tables in Sync

The Tape `Ensemble` attempts to lazily "sync" the Object and Source tables such that:

* If a series of operations removes all lightcurves for a particular object from the Source table, we will lazily remove that object from the Object table.
* If a series of operations removes an object from the Object table, we will lazily remove all light curves for that object from the Source table.

As an example let's filter the Object table only for objects observed from deep drilling fields. This operation marks the result table as `dirty` indicating to the `Ensemble` that if used as part of a result computation, it should check if the object and source tables are synced. 

Note that because we have not called `update_ensemble()` the `Ensemble` is still using the original Object table which is **not** marked `dirty`.


In [None]:
ddf_only = ens.object.query("ddf_bool == True")

print("Object table is dirty: " + str(ens.object.is_dirty()))
print("ddf_only is dirty: " + str(ddf_only.is_dirty()))
ddf_only.compute()

Now let's update the `Ensemble`'s Object table. We can see that the Object table is now considered "dirty" so a sync between the Source and Object tables will be triggered by computing a `batch` operation. 

As part of the sync the Source table has been modified to drop all sources for objects not observed via Deep Drilling Fields. This is reflected both in the `batch` result output and in the reduced number of rows in the Source table.

In [None]:
ddf_only.update_ensemble()
print("Updated object table is now dirty: " + str(ens.object.is_dirty()))

print("Length of the Source table before the batch operation: " + str(len(ens.source)))
res = ens.batch(calc_stetson_J, compute=True)
print("Post-computation object table is now dirty: " + str(ens.object.is_dirty()))
print("Length of the Source table after the batch operation: " + str(len(ens.source)))
res

To summarize:

* An operation that alters a frame marks that frame as "dirty"
* Such an operation on `Ensemble.source` or `Ensemble.object` won't cause a sync unless the output frame is stored back to either `Ensemble.source` or `Ensemble.object` respectively. This is usually done by a call to `EnsembleFrame.update_ensemble()`
* Syncs are done lazily such that even when the Object and/or Source frames are "dirty", a sync between tables won't be triggered until a relevant computation yields an observable output, such as `batch(..., compute=True)`

## Using light-curve package features

`Ensemble.batch` also supports the use of [light-curve](https://pypi.org/project/light-curve/) package feature extractor:

In [None]:
import light_curve as licu

extractor = licu.Extractor(licu.Amplitude(), licu.AndersonDarlingNormal(), licu.StetsonK())
res = ens.batch(extractor, compute=True, band_to_calc="g")
res

## Using a Custom Analysis Function
The analysis functions contained in TAPE are meant to provide a collection of performant, on-hand routines for common timeseries use cases. However, TAPE is also equipped to handle externally defined functions. Let's walk through a short example of defining a simple custom function and applying it through `Ensemble.batch`.

Here we define a simple function, that returns an average flux for each photometric band. It requires an array of fluxes, an array of band labels per measurement, and has a keyword argument for determining which averaging strategy to use (mean or median).

In [None]:
import numpy as np


# Defining a simple function
def my_flux_average(flux_array, band_array, method="mean"):
    """Read in an array of fluxes, and return the average of the fluxes by band"""
    res = {}
    for band in np.unique(band_array):
        mask = [band_array == band]  # Create a band by band mask
        band_fluxes = flux_array[tuple(mask)]  # Mask the flux array
        if method == "mean":
            res[band] = np.mean(band_fluxes)
        elif method == "median":
            res[band] = np.median(band_fluxes)
    return res

With the function defined, we next supply it to `Ensemble.batch`. The column labels of the `Ensemble` columns we want to use as arguments must be provided, as well as any keyword arguments. In this case, we pass along `"flux"` and `"band"`, so that the `Ensemble` will map those columns to `flux_array` and `band_array` respectively. We also pass `method='mean'`, which will pass that kwarg along to `my_flux_average`.

In [None]:
# Applying the function to the ensemble
res = ens.batch(my_flux_average, "flux", "band", compute=True, meta=None, method="median")
res

We see that we now have a `Pandas.series` of `my_average_flux` result by object_id (lightcurve). In many cases, this may not be the ideal output for your function. This output is controlled by the `Dask` `meta` parameter. For more information on this parameter, you can read the `Dask` [documentation](https://blog.dask.org/2022/08/09/understanding-meta-keyword-argument). You may pass the `meta` parameter through `Ensemble.batch`, as shown above.

In [None]:
ens.client.close()  # Tear down the ensemble client