In [1]:
from onlineLearning.learners import Learner,SafeLearner,EpsilonBanditLearner

In [2]:
le = EpsilonBanditLearner()

In [3]:
le.params

{'family': 'epsilon_bandit', 'epsilon': 0.05}

In [12]:
le.predict([1,2,3],[1,2,3,4])

[0.4875, 0.0125, 0.0125, 0.4875]

In [9]:
le.learn([1,2,3],1,1.5,None,None)

In [11]:
le.learn([1,2,3],4,1.5,None,None)

In [21]:
from abc import ABC, abstractmethod
from typing import Any, TypeVar, Generic, Dict
from typing import Iterable, Any, Sequence, Dict, Callable, Optional, Union, MutableSequence, MutableMapping

_T_out = TypeVar("_T_out", bound=Any, covariant    =True)
_T_in  = TypeVar("_T_in" , bound=Any, contravariant=True)

class Pipe:

    @property
    def params(self) -> Dict[str,Any]:
        """Parameters describing the pipe."""
        return { }

    def __str__(self) -> str:
        return str(self.params)

class Filter(ABC, Pipe, Generic[_T_in, _T_out]):
    """A pipe that can modify an item."""

    @abstractmethod
    def filter(self, item: _T_in) -> _T_out:
        """Filter the item."""
        ...

class Flatten(Filter[Iterable[Any], Iterable[Any]]):
    """A filter which flattens rows in table shaped data."""

    def filter(self, data: Iterable[Any]) -> Iterable[Any]:

        for row in data:

            if isinstance(row,dict):
                row = dict(row)
                for k in list(row.keys()):
                    if isinstance(row[k],(list,tuple)):
                        row.update([(f"{k}_{i}", v) for i,v in enumerate(row.pop(k))])

            elif isinstance(row,list):
                row = list(row)
                for k in reversed(range(len(row))):
                    if isinstance(row[k],(list,tuple)):
                        for v in reversed(row.pop(k)):
                            row.insert(k,v)

            elif isinstance(row,tuple):
                row = list(row)
                for k in reversed(range(len(row))):
                    if isinstance(row[k],(list,tuple)):
                        for v in reversed(row.pop(k)):
                            row.insert(k,v)
                row = tuple(row)

            yield row


context = [[1,3,2],[1,2,4]]
list(Flatten().filter([list(context)]))[0] if context else []

[1, 3, 2, 1, 2, 4]

In [22]:

import numpy as np
 
np.random.seed(10)
a = np.random.randint(1, 10, [5, 3])
print(a)
b = np.amax(a, axis=1)   #找一个每行最大的
print(b)


[[5 1 2]
 [1 2 9]
 [1 9 7]
 [5 4 1]
 [5 7 9]]
[5 9 9 5 9]


In [23]:
a

array([[5, 1, 2],
       [1, 2, 9],
       [1, 9, 7],
       [5, 4, 1],
       [5, 7, 9]])

In [24]:
np.amax(a)

9

In [25]:
np.reshape([1,2,3], (-1, 1))

array([[1],
       [2],
       [3]])

In [28]:
np.array([1,2,3]).T

array([1, 2, 3])

In [33]:
np.zeros(10)

array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])

In [34]:
np.zeros((10))

array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])

In [59]:
from typing import List

import numpy as np


class UCB1:
    def __init__(self, models: List[str]):
        self.models, n_models = models, len(models)

        self.model_successes = np.zeros((n_models))
        self.model_tries = np.zeros((n_models))

    def _increment_model_tries(self, model: str) -> None:
        self.model_tries[self.models.index(model)] += 1

    def _get_model_with_max_ucb(self) -> str:
        ucb_numerator = 2 * np.log(np.sum(self.model_tries))
        per_model_means = self.model_successes / self.model_tries
        ucb1_estimates = per_model_means + np.sqrt(ucb_numerator / self.model_tries)
        print(ucb1_estimates)
        return self.models[np.nanargmax(ucb1_estimates)]

    def select_model(self) -> str:
        untested_models = np.nonzero(self.model_tries == 0)[0]
        if untested_models.size == 0:
            best_model_so_far = self._get_model_with_max_ucb()
            self._increment_model_tries(best_model_so_far)
            return best_model_so_far
        else:
            untested_model = self.models[untested_models[0]]
            self._increment_model_tries(untested_model)
            return untested_model

    def reward_model(self, model: str) -> None:
        if model not in self.models:
            raise ValueError(f"model {model} not recognized")
        model_index = self.models.index(model)
        self.model_successes[model_index] += 1
        
test=UCB1(["x1","x2","x3"])
test.select_model()

'x1'

In [86]:
test.select_model()

[1.21335052 1.21335052 1.45895062]


'x3'

In [83]:
test.reward_model("x3")