Skip to content

Commit

Permalink
feat: refactor head layers (#130)
Browse files Browse the repository at this point in the history
* feat: refactor head layers

* refactor: pytorch and base

* fix: black

* fix: torch

* fix: torch

* fix: torch

* fix: flake8

* fix: torch log generator

* test: add torch tests

* feat: add new losses for paddle

* fix: paddle and tests

* refactor(keras): refactor keras loss layer

* refactor(keras): refactor keras loss layer

* refactor(keras): refactor keras loss layer

* refactor(keras): refactor keras loss layer

* refactor(keras): refactor keras loss layer

* refactor(keras): refactor keras loss layer

* refactor(keras): refactor keras loss layer

* refactor(keras): refactor keras loss layer

Co-authored-by: Han Xiao <han.xiao@jina.ai>
  • Loading branch information
Tadej Svetina and hanxiao committed Oct 17, 2021
1 parent 5a25a72 commit 84585be
Show file tree
Hide file tree
Showing 37 changed files with 630 additions and 467 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ jobs:
fail_ci_if_error: false

test-gpu:
needs: prep-testbed
runs-on: [self-hosted, gpu]
steps:
- uses: actions/checkout@v2
Expand All @@ -164,7 +165,7 @@ jobs:

# just for blocking the merge until all parallel core-test are successful
success-all-test:
needs: core-test
needs: [core-test, test-gpu]
if: always()
runs-on: ubuntu-latest
steps:
Expand Down
8 changes: 4 additions & 4 deletions finetuner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def fit(
eval_data: Optional['DocumentArrayLike'] = None,
epochs: int = 10,
batch_size: int = 256,
head_layer: str = 'CosineLayer',
loss: str = 'CosineSiameseLoss',
learning_rate: float = 1e-3,
optimizer: str = 'adam',
optimizer_kwargs: Optional[Dict] = None,
Expand All @@ -37,7 +37,7 @@ def fit(
eval_data: Optional['DocumentArrayLike'] = None,
epochs: int = 10,
batch_size: int = 256,
head_layer: str = 'CosineLayer',
loss: str = 'CosineSiameseLoss',
learning_rate: float = 1e-3,
optimizer: str = 'adam',
optimizer_kwargs: Optional[Dict] = None,
Expand All @@ -61,7 +61,7 @@ def fit(
clear_labels_on_start: bool = False,
port_expose: Optional[int] = None,
runtime_backend: str = 'thread',
head_layer: str = 'CosineLayer',
loss: str = 'CosineSiameseLoss',
learning_rate: float = 1e-3,
optimizer: str = 'adam',
optimizer_kwargs: Optional[Dict] = None,
Expand All @@ -79,7 +79,7 @@ def fit(
clear_labels_on_start: bool = False,
port_expose: Optional[int] = None,
runtime_backend: str = 'thread',
head_layer: str = 'CosineLayer',
loss: str = 'CosineSiameseLoss',
learning_rate: float = 1e-3,
optimizer: str = 'adam',
optimizer_kwargs: Optional[Dict] = None,
Expand Down
4 changes: 2 additions & 2 deletions finetuner/labeler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def fit(
clear_labels_on_start: bool = False,
port_expose: Optional[int] = None,
runtime_backend: str = 'thread',
head_layer: str = 'CosineLayer',
loss: str = 'CosineSiameseLoss',
**kwargs,
) -> None:
dam_path = tempfile.mkdtemp()
Expand All @@ -44,7 +44,7 @@ def get_embed_model(self):
uses=MyExecutor,
uses_with={
'dam_path': dam_path,
'head_layer': head_layer,
'loss': loss,
},
)
)
Expand Down
6 changes: 3 additions & 3 deletions finetuner/labeler/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ def __init__(
self,
dam_path: str,
metric: str = 'cosine',
head_layer: str = 'CosineLayer',
loss: str = 'CosineSiameseLoss',
**kwargs,
):
super().__init__(**kwargs)
self._all_data = DocumentArrayMemmap(dam_path)
self._metric = metric
self._head_layer = head_layer
self._loss = loss

@abc.abstractmethod
def get_embed_model(self):
Expand Down Expand Up @@ -77,7 +77,7 @@ def fit(self, docs, parameters: Dict, **kwargs):
self._embed_model,
docs,
epochs=int(parameters.get('epochs', 10)),
head_layer=self._head_layer,
loss=self._loss,
)

@requests(on='/save')
Expand Down
4 changes: 2 additions & 2 deletions finetuner/tuner/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def fit(
eval_data: Optional[DocumentArrayLike] = None,
epochs: int = 10,
batch_size: int = 256,
head_layer: str = 'CosineLayer',
loss: str = 'CosineSiameseLoss',
learning_rate: float = 1e-3,
optimizer: str = 'adam',
optimizer_kwargs: Optional[Dict] = None,
Expand All @@ -38,7 +38,7 @@ def fit(
) -> TunerReturnType:
ft = get_tuner_class(embed_model)

return ft(embed_model, head_layer=head_layer).fit(
return ft(embed_model, loss=loss).fit(
train_data,
eval_data,
epochs=epochs,
Expand Down
67 changes: 11 additions & 56 deletions finetuner/tuner/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,40 +11,21 @@
from ..helper import AnyDNN, AnyDataLoader, AnyOptimizer, DocumentArrayLike


class BaseHead:
class BaseLoss:
arity: int

def __init__(self, arity_model: Optional[AnyDNN] = None):
super().__init__()
self._arity_model = arity_model

def forward(self, *inputs):
if self._arity_model:
inputs = self._arity_model(*inputs)
return self.get_output(*inputs)

@abc.abstractmethod
def get_output(self, *inputs):
...

@abc.abstractmethod
def loss_fn(self, pred_val, target_val):
...

@abc.abstractmethod
def metric_fn(self, pred_val, target_val):
...


class BaseTuner(abc.ABC):
def __init__(
self,
embed_model: Optional[AnyDNN] = None,
head_layer: Union[AnyDNN, str, None] = None,
loss: Union[AnyDNN, str] = 'CosineSiameseLoss',
**kwargs,
):
self._embed_model = embed_model
self._head_layer = head_layer
self._loss = self._get_loss(loss)
self._train_data_len = 0
self._eval_data_len = 0

def _get_optimizer_kwargs(self, optimizer: str, custom_kwargs: Optional[Dict]):
"""Merges user-provided optimizer kwargs with default ones."""
Expand Down Expand Up @@ -87,16 +68,6 @@ def embed_model(self) -> AnyDNN:
"""Get the base model of this object."""
return self._embed_model

@property
@abc.abstractmethod
def wrapped_model(self) -> AnyDNN:
"""Get the wrapped model of this object.
A wrapped model is an :py:attr:`.embed_model` replicated by :py:attr:`.arity` times
with a ``head_layer`` that fuses all.
"""
...

@property
def arity(self) -> int:
"""Get the arity of this object.
Expand All @@ -105,13 +76,7 @@ def arity(self) -> int:
- ``arity = 2`` corresponds to the siamese network;
- ``arity = 3`` corresponds to the triplet network.
"""
return self.head_layer.arity

@property
@abc.abstractmethod
def head_layer(self) -> AnyDNN:
"""Get the head layer of this object."""
...
return self._loss.arity

@abc.abstractmethod
def _get_optimizer(
Expand All @@ -138,11 +103,12 @@ def fit(

@abc.abstractmethod
def save(self, *args, **kwargs):
"""Save the weights of the :py:attr:`.embed_model`.
"""Save the weights of the :py:attr:`.embed_model`."""
...

Note that, the :py:attr:`.head_layer` and :py:attr:`.wrapped_model` do not need to be stored,
as they are auxiliary layers for tuning :py:attr:`.embed_model`.
"""
@abc.abstractmethod
def _get_loss(self, loss: Union[str, BaseLoss]) -> BaseLoss:
"""Get the loss layer."""
...

@abc.abstractmethod
Expand Down Expand Up @@ -174,14 +140,3 @@ def __init__(
):
super().__init__()
self._inputs = inputs() if callable(inputs) else inputs


class BaseArityModel:
"""The helper class to copy the network for multi-inputs."""

def __init__(self, embed_model: AnyDNN):
super().__init__()
self._embed_model = embed_model

def forward(self, *args):
return tuple(self._embed_model(a) for a in args)
91 changes: 30 additions & 61 deletions finetuner/tuner/keras/__init__.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,24 @@
from typing import Dict, Optional
from typing import Dict, Optional, Union

import tensorflow as tf
from jina.logging.profile import ProgressBar
from tensorflow import keras
from tensorflow.keras import Model
from tensorflow.keras.layers import Layer
from tensorflow.keras.optimizers import Optimizer

from . import head_layers, datasets
from .head_layers import HeadLayer
from ..base import BaseTuner
from ...helper import DocumentArrayLike
from . import losses, datasets
from ..base import BaseTuner, BaseLoss
from ..dataset.helper import get_dataset
from ..logger import LogGenerator
from ...helper import DocumentArrayLike


class KerasTuner(BaseTuner):
@property
def head_layer(self) -> HeadLayer:
if isinstance(self._head_layer, str):
return getattr(head_layers, self._head_layer)
elif isinstance(self._head_layer, HeadLayer):
return self._head_layer

@property
def wrapped_model(self) -> Model:
if self.embed_model is None:
raise ValueError('embed_model is not set')

if getattr(self, '_wrapped_model', None) is not None:
return self._wrapped_model

input_shape = self.embed_model.input_shape[1:]
input_values = [keras.Input(shape=input_shape) for _ in range(self.arity)]
head_layer = self.head_layer()
head_values = head_layer(*(self.embed_model(v) for v in input_values))
self._wrapped_model = Model(inputs=input_values, outputs=head_values)

return self._wrapped_model
def _get_loss(self, loss: Union[BaseLoss, str]):
if isinstance(loss, str):
return getattr(losses, loss)()
elif isinstance(loss, BaseLoss):
return loss

def _get_data_loader(self, inputs, batch_size: int, shuffle: bool):

Expand Down Expand Up @@ -76,60 +58,54 @@ def _get_optimizer(
return keras.optimizers.SGD(learning_rate=learning_rate, **optimizer_kwargs)

def _train(self, data, optimizer, description: str):
head_layer = self.head_layer()

losses = []
metrics = []

log_generator = LogGenerator('T', losses, metrics)
log_generator = LogGenerator('T', losses)

train_data_len = 0
with ProgressBar(
description,
message_on_done=log_generator,
final_line_feed=False,
total_length=train_data_len,
total_length=self._train_data_len,
) as p:
train_data_len = 0
self._train_data_len = 0
for inputs, label in data:
with tf.GradientTape() as tape:
outputs = self.wrapped_model(inputs, training=True)
loss = head_layer.loss_fn(pred_val=outputs, target_val=label)
metric = head_layer.metric_fn(pred_val=outputs, target_val=label)
embeddings = [self._embed_model(inpt) for inpt in inputs]
loss = self._loss([*embeddings, label])

grads = tape.gradient(loss, self.wrapped_model.trainable_weights)
grads = tape.gradient(loss, self._embed_model.trainable_weights)
optimizer.apply_gradients(
zip(grads, self.wrapped_model.trainable_weights)
zip(grads, self._embed_model.trainable_weights)
)

losses.append(loss.numpy())
metrics.append(metric.numpy())

p.update(message=log_generator())
train_data_len += 1
self._train_data_len += 1

return losses, metrics
return losses

def _eval(self, data, description: str = 'Evaluating', train_log: str = ''):
head_layer = self.head_layer()

losses = []
metrics = []

log_generator = LogGenerator('E', losses, metrics, train_log)
log_generator = LogGenerator('E', losses, train_log)

with ProgressBar(description, message_on_done=log_generator) as p:
with ProgressBar(
description, message_on_done=log_generator, total_length=self._eval_data_len
) as p:
self._eval_data_len = 0
for inputs, label in data:
outputs = self.wrapped_model(inputs, training=False)
loss = head_layer.loss_fn(pred_val=outputs, target_val=label)
metric = head_layer.metric_fn(pred_val=outputs, target_val=label)
embeddings = [self._embed_model(inpt) for inpt in inputs]
loss = self._loss([*embeddings, label])

losses.append(loss.numpy())
metrics.append(metric.numpy())

p.update(message=log_generator())
self._eval_data_len += 1

return losses, metrics
return losses

def fit(
self,
Expand Down Expand Up @@ -164,30 +140,23 @@ def fit(
_optimizer = self._get_optimizer(optimizer, optimizer_kwargs, learning_rate)

losses_train = []
metrics_train = []
losses_eval = []
metrics_eval = []

with device:
for epoch in range(epochs):
lt, mt = self._train(
lt = self._train(
_train_data,
_optimizer,
description=f'Epoch {epoch + 1}/{epochs}',
)
losses_train.extend(lt)
metrics_train.extend(mt)

if eval_data:
le, me = self._eval(
_eval_data, train_log=LogGenerator('T', lt, mt)()
)
le = self._eval(_eval_data, train_log=LogGenerator('T', lt)())
losses_eval.extend(le)
metrics_eval.extend(me)

return {
'loss': {'train': losses_train, 'eval': losses_eval},
'metric': {'train': metrics_train, 'eval': metrics_eval},
}

def save(self, *args, **kwargs):
Expand Down
Loading

0 comments on commit 84585be

Please sign in to comment.