In [1]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' 

import warnings
warnings.filterwarnings("ignore")

import pandas as pd
import numpy as np
import tensorflow as tf
import dask.dataframe as dd
import crossfit as cf

### Generate some random data

In [2]:
size = int(1e6)

# Generate some random data
targets = np.random.randint(2, size=size)
predictions = (np.random.rand(size) > 0.5).astype(int)

countries = np.random.choice(["US", "UK", "DE", "FR", "IT", "NL"], size=size)

df = pd.DataFrame({"targets": targets, "predictions": predictions, "country": countries})
ddf = dd.from_pandas(df, npartitions=2)

ddf

Unnamed: 0_level_0,targets,predictions,country
npartitions=2,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
0,int64,int64,object
500000,...,...,...
999999,...,...,...


### Implement a metric in Keras

In order to use it during training we would do:

```python
model = tf.keras.Model = ...
model.fit(data, metrics=[BinaryTruePositives()])
```

In [3]:
class BinaryTruePositives(tf.keras.metrics.Metric):
    def __init__(self, name='binary_true_positives', **kwargs):
        super(BinaryTruePositives, self).__init__(name=name, **kwargs)
        self.true_positives = self.add_weight(name='tp', initializer='zeros')

    def update_state(self, y_true, y_pred, sample_weight=None):
        y_true = tf.cast(y_true, tf.bool)
        y_pred = tf.cast(y_pred, tf.bool)

        values = tf.logical_and(tf.equal(y_true, True), tf.equal(y_pred, True))
        values = tf.cast(values, self.dtype)
        if sample_weight is not None:
            sample_weight = tf.cast(sample_weight, self.dtype)
            sample_weight = tf.broadcast_to(sample_weight, values.shape)
            values = tf.multiply(values, sample_weight)
        self.true_positives.assign_add(tf.reduce_sum(values))

    def result(self):
        return self.true_positives


BinaryTruePositives()(
    cf.convert_array(targets, tf.Tensor), 
    cf.convert_array(predictions, tf.Tensor)
)

<tf.Tensor: shape=(), dtype=float32, numpy=249496.0>

### How would we use this in Dask? 
We would like to calculate binary-true-positives sliced by country.

Something like this (obviously wouldn't work since we are mixing frameworks)

```python
ddf.groupby("country").agg(BinaryTruePositives())
```

In [4]:
ddf["targets"].min().compute()

0

Dask typically does aggregations using a function called [apply_concat_apply](https://github.com/dask/dask/blob/bdb21aedecb36e755d49eadc5e3873192c975411/dask/dataframe/core.py#L6357). This a more functional approach, which lends itself well for distributed computing. 

![aggregator](./img/aggregator.png)

Keras, on the other hand uses a more object-oriented approach where the internal state gets mutated by each update-step.

### Aggregator
One of the core abstractions to create metrics using crossfit is the `Aggregator`. Let's turn `BinaryTruePositives` into a `Aggregator`.

```python
class SomeAggregator(cf.Aggregator):
    def prepare(self, data):
        ...

    def present(self, state):
        ...

```

In [5]:
from crossfit.metrics import Sum
from crossfit.backends.dask.aggregate import aggregate


class BinaryTruePositivesAggregator(cf.Aggregator):
    def prepare(self, targets, predictions, sample_weight=None):
        tf_targets = cf.convert_array(targets, tf.Tensor)
        tf_predictions = cf.convert_array(predictions, tf.Tensor)
        if sample_weight is not None:
            sample_weight = cf.convert_array(sample_weight, tf.Tensor)
            
        result = BinaryTruePositives()(tf_targets, tf_predictions, sample_weight=sample_weight)
        
        return Sum(sum=cf.convert_array(result, type(targets)))
        

def pre(df):
    return df["targets"], df["predictions"]


aggregator = BinaryTruePositivesAggregator(pre=pre)

aggregate(ddf, aggregator).result

0    249496.0
dtype: float32

Most metrics are actually reduced using mean. We offer a shorthand for this: `from_tf_metric`.

In [6]:
from crossfit.backends.tf import from_tf_metric

acc = from_tf_metric(tf.keras.metrics.BinaryAccuracy())
precision = from_tf_metric(tf.keras.metrics.Precision())
recall = from_tf_metric(tf.keras.metrics.Recall())

metrics = cf.Aggregator({
    "accuracy": acc,
    "precision": precision,
    "recall": recall
}, pre=pre)

aggregate(ddf, metrics, to_frame=True)

Unnamed: 0,recall,accuracy,precision
0,0.499836,0.500154,0.49931


In [7]:
aggregate(ddf, cf.Aggregator(metrics, groupby="country"), to_frame=True)

Unnamed: 0_level_0,Aggregator.recall,Aggregator.accuracy,Aggregator.precision
country,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
DE,0.499881,0.500244,0.499301
UK,0.499812,0.500127,0.499358
IT,0.50001,0.500129,0.499411
US,0.511162,0.500156,0.499365
NL,0.499917,0.500132,0.499283
FR,0.499782,0.500056,0.499333
