# Conduit Examples

This notebook demonstrates various examples of computation using Conduit using streams selected from BTrDB. Note that these particular examples are using data fetched directly from Dominion; but this notebook should be refactored to select other data.

In [1]:
import btrdb
import conduit

import numpy as np
import pandas as pd

In [2]:
db = btrdb.connect(profile="dominion.predictivegrid.com")
db.info()

{'majorVersion': 5, 'build': '5.6.3', 'proxy': {'proxyEndpoints': []}}

## Data Selection

Right now conduits are constructed by instantiating a `Conduit` with a collection of `btrdb.Stream` objects. In my opinion this should be the starting place for most computation, though there are several design elements that must be considered.

In [3]:
def label_prefix(prefix):
    def label_prefix_filter(stream):
        meta, _ = stream.annotations()
        return meta.get("label", "").startswith(prefix)
    return label_prefix_filter

streams = conduit.Conduit(*db.streams_in_collection("dfr/DFR!PMU-NOR2_237"))
streams = streams.subset(func=label_prefix("Northeast Carver 3"))

In [None]:
print(streams.describe())

### Stream Filtering

Right now conduit uses filter masks to select streams to apply various computations. For example, to select all voltage phases, you would do the following:

In [None]:
voltages = streams.subset(streams.unit_mask("VPHM", "VPHA"))
print(voltages.describe())

In [None]:
streams.unit_mask("VPHM", "VPHA")

To get all of the phase A streams you would filter as follows:

In [None]:
phase_a = streams.subset(streams.phase_mask("A"))
print(phase_a.describe())

Masks can combined with binary operators, e.g. `&` and `|` to put together more complex queries. For example to get the Phase A phasor pair (voltage and current phasors) you would combine the two masks with the `&` operator:

In [None]:
phasor_mask = streams.unit_mask("VPHM", "VPHA", "IPHM", "IPHA") & streams.phase_mask("A")
phasor_pair = streams.subset(phasor_mask)
print(phasor_pair.describe())

The `subset` method can take any boolean mask and the `unit_mask` and `phase_mask` helper methods create filter masks based on the metadata of the stream. Note that you can create more complex streams filters with functions as shown in the first stream selection example.

### Phasor Compositions

At the moment, you cannot compose `Phasor`, `PhasorPair`, `PhasorGroup` or `PhasorPairGroup` objects directly from a `Conduit` object since these objects extend a `Conduit` and the current code structure would then have a circular dependency. However, you can use the filter masks from above to quickly construct these objects and call their computations. 

In [None]:
vmask = streams.unit_mask("VPHM", "VPHA")
imask = streams.unit_mask("IPHM", "IPHA")

phaseVA = conduit.Phasor(*streams.subset(vmask & streams.phase_mask("A")))
phaseIB = conduit.Phasor(*streams.subset(imask & streams.phase_mask("B")))

print(phaseVA)
print(phaseIB)

In [None]:
vpg = conduit.PhasorGroup(*streams.subset(vmask))

for phasor in vpg.phasors():
    print(phasor)

The `Phasor` objects validate their inputs and raise an exception if they are composed incorrectly.

In [None]:
try:
    conduit.Phasor(*streams.subset(vmask))
except Exception as e:
    print("{}: {}".format(type(e), str(e)))

In [None]:
try:
    conduit.Phasor(streams[0], streams[1])
except Exception as e:
    print("{}: {}".format(type(e), str(e)))

## Fetch Data

The `Conduit` and `Phasor` compositions are all intended to store meta data about streams, so that data can be fetched easily on demand and various standard computations applied. Data is returned as a `pd.DataFrame` from the `values`, `windows`, and `aligned_windows` methods as follows:

In [None]:
phase_a.values("2019-08-22 18:27:00.000", "2019-08-22 18:27:15.000").head()

Note that the `DataFrame` will ensure that the time index is joined properly, e.g. if there are any streams that do not have a data point for the time index, they will be filled in with `np.nan`.

You can also use the `windows` and `aligned_windows` queries to get `DataFrames` with aggregate values:

In [None]:
phase_a.aligned_windows("2019-08-22 18:00:00.000", "2019-08-22 20:00:00.000", 39, agg="mean").head()

It may be possible to have more interesting data frame headers from different phase groups and phase group metadata:

In [None]:
vpg.values_poc("2019-08-22 18:27:00.000", "2019-08-22 18:27:15.000").head()

### Applying Transformations

The first method of applying computations is to apply a transformation function with an optional mask that determines which streams will have the transformation applied on them. The transformation function should be as follows:

```python
def transformer(series, tags={}, annotations={}):
    return series
```

The caveat is that right now the transformer must only accept a single stream and return only a single stream. 

Note that if the transformer filters rows, these will simply become `np.nan` in the final `DataFrame`.

In [None]:
mags = streams.subset(streams.unit_mask("VPHM", "IPHM"))
mags.values("2019-08-22 18:27:00.000", "2019-08-22 18:27:15.000").head()

In [None]:
def per_unit(units, base_kv=None):
    if units == "IPHM":
        def per_unit_current_transformer(series, tags=None, annotations=None):
            bkv = base_kv or annotations.get("base_kv", None)
            if bkv is None:
                raise ValueError("could not find required base kV for current to per-unit")
            tags["unit"] = "per-unit"
            return series / (100000.0 / (bkv *np.sqrt(3)))
        return per_unit_current_transformer
    
    if units == "VPHM":
        def per_unit_voltage_transformer(series, tags=None, annotations=None):
            bkv = base_kv or annotations.get("base_kv", None)
            if bkv is None:
                raise ValueError("could not find required base kV for voltage to per-unit")
            return series * np.sqrt(3) / (bkv * 1000)
        return per_unit_voltage_transformer
    
    raise ValueError("unknown per-unit transformation unis: {}".format(units))

    
vphm_mask = streams.unit_mask("VPHM")
iphm_mask = streams.unit_mask("IPHM")

mags.apply(per_unit("VPHM", base_kv=140), vphm_mask)
mags.apply(per_unit("IPHM", base_kv=140), iphm_mask)


mags.values("2019-08-22 18:27:00.000", "2019-08-22 18:27:15.000").head()

### Compositional Computation

This is a proof of concept about how the specific `Phasor` objects could be used to perform compositional style computation.

In [None]:
vpg.complex_poc("2019-08-22 18:27:00.000", "2019-08-22 18:27:15.000").head()

In [None]:
vpg.symmetrical_components_poc("2019-08-22 18:27:00.000", "2019-08-22 18:27:15.000").head()

## Phasor Pair Power

In [4]:
pair = conduit.PhasorPair(*streams.subset(streams.phase_mask('A')))

In [5]:
pair.complex_power_poc("2019-08-22 18:27:00.000", "2019-08-22 18:27:15.000").head()

2019-08-22 18:27:00.033333300    (16895121.864918888+3777910.7101272354j)
2019-08-22 18:27:00.066666600    (16682646.532123093+3802666.5067336573j)
2019-08-22 18:27:00.099999700    (16863629.584425475+3872083.8382258536j)
2019-08-22 18:27:00.133333000     (16836907.401072208+3982758.949077471j)
2019-08-22 18:27:00.166666800     (16767446.797394488+3936075.092472428j)
Name: complex power, dtype: complex128

In [8]:
pair.real_power_poc("2019-08-22 18:27:00.000", "2019-08-22 18:27:15.000").head()

2019-08-22 18:27:00.033333300    1.689512e+07
2019-08-22 18:27:00.066666600    1.668265e+07
2019-08-22 18:27:00.099999700    1.686363e+07
2019-08-22 18:27:00.133333000    1.683691e+07
2019-08-22 18:27:00.166666800    1.676745e+07
Name: real power, dtype: float64

In [11]:
pair.reactive_power_poc("2019-08-22 18:27:00.000", "2019-08-22 18:27:15.000").head()

2019-08-22 18:27:00.033333300    3.777911e+06
2019-08-22 18:27:00.066666600    3.802667e+06
2019-08-22 18:27:00.099999700    3.872084e+06
2019-08-22 18:27:00.133333000    3.982759e+06
2019-08-22 18:27:00.166666800    3.936075e+06
Name: reactive power, dtype: float64

In [12]:
pair.apparent_power_poc("2019-08-22 18:27:00.000", "2019-08-22 18:27:15.000").head()

2019-08-22 18:27:00.033333300    1.731236e+07
2019-08-22 18:27:00.066666600    1.711055e+07
2019-08-22 18:27:00.099999700    1.730246e+07
2019-08-22 18:27:00.133333000    1.730156e+07
2019-08-22 18:27:00.166666800    1.722324e+07
Name: apparent power, dtype: float64

In [16]:
pair.power_factor_poc("2019-08-22 18:27:00.000", "2019-08-22 18:27:15.000").head()

2019-08-22 18:27:00.033333300    0.975899
2019-08-22 18:27:00.066666600    0.974992
2019-08-22 18:27:00.099999700    0.974638
2019-08-22 18:27:00.133333000    0.973144
2019-08-22 18:27:00.166666800    0.973536
Name: power factor, dtype: float64

## Phasor Pair Group Power


In [6]:
pg = conduit.PhasorPairGroup(*streams.subset(streams.phase_mask('A', 'B', 'C')))

In [7]:
pg.complex_power_poc("2019-08-22 18:27:00.000", "2019-08-22 18:27:15.000").head()

2019-08-22 18:27:00.033333300     (48859665.16463483+14009554.774793874j)
2019-08-22 18:27:00.066666600      (48582604.93651591+13929174.85814242j)
2019-08-22 18:27:00.099999700    (48910205.167097494+14186188.961366167j)
2019-08-22 18:27:00.133333000    (48951617.038899146+14176898.834545221j)
2019-08-22 18:27:00.166666800    (48720651.197375774+14191429.478326395j)
Name: complex power, dtype: complex128

In [9]:
pg.real_power_poc("2019-08-22 18:27:00.000", "2019-08-22 18:27:15.000").head()

2019-08-22 18:27:00.033333300    4.885967e+07
2019-08-22 18:27:00.066666600    4.858260e+07
2019-08-22 18:27:00.099999700    4.891021e+07
2019-08-22 18:27:00.133333000    4.895162e+07
2019-08-22 18:27:00.166666800    4.872065e+07
Name: real power, dtype: float64

In [10]:
pg.reactive_power_poc("2019-08-22 18:27:00.000", "2019-08-22 18:27:15.000").head()

2019-08-22 18:27:00.033333300    1.400955e+07
2019-08-22 18:27:00.066666600    1.392917e+07
2019-08-22 18:27:00.099999700    1.418619e+07
2019-08-22 18:27:00.133333000    1.417690e+07
2019-08-22 18:27:00.166666800    1.419143e+07
Name: reactive power, dtype: float64

In [13]:
pg.apparent_power_poc("2019-08-22 18:27:00.000", "2019-08-22 18:27:15.000").head()

2019-08-22 18:27:00.033333300    5.091303e+07
2019-08-22 18:27:00.066666600    5.062708e+07
2019-08-22 18:27:00.099999700    5.100956e+07
2019-08-22 18:27:00.133333000    5.103601e+07
2019-08-22 18:27:00.166666800    5.082061e+07
Name: apparent power, dtype: float64

In [15]:
pg.power_factor_poc("2019-08-22 18:27:00.000", "2019-08-22 18:27:15.000").head()

2019-08-22 18:27:00.033333300    2.879308
2019-08-22 18:27:00.066666600    2.879450
2019-08-22 18:27:00.099999700    2.876954
2019-08-22 18:27:00.133333000    2.878037
2019-08-22 18:27:00.166666800    2.876576
Name: power factor, dtype: float64