# Wprowadzenie do biblioteki River

[River](https://github.com/online-ml/river) jest jedną z najbardziej rozszerzonych bibliotek z dziedziny online machine learning. Jest wynikiem połączenia dwóch bibliotek [creme](https://github.com/MaxHalford/creme) oraz [scikit-multiflow](https://github.com/scikit-multiflow/scikit-multiflow). Jest głównie napisana w języku Python, przy czym intensywne obliczeniowo operacje napisane są w języku Rust.

Główna funkcjonalność biblioteki obejmuje m.in.:
- Modele liniowe i klasyczne modele maszynowego uczenia 
- Wykrywanie anomalii i dryftu konceptu
- Systemy rekomendycjne
- Prognozowanie na podstawie szeregów czasowych
- Uczenie niezbalansowanych danych
- Klasteryzacja
- Metryki i statystyki zadaptowane do dziedziny Online Machine Learning

Najważniejsze linki:
- Strona główna [[LINK]](https://riverml.xyz/0.14.0/)
- Tutoriale [[LINK]](https://riverml.xyz/0.14.0/recipes/reading-data/)
- Przykłady [[LINK]](https://riverml.xyz/0.14.0/examples/batch-to-online/)
- Dokumentacja [[LINK]](https://riverml.xyz/0.14.0/api/overview/)


```{warning}

Materiały wraz z tutorialami zostały przygotowane w oparciu o biblioteke `river` w wersji 0.14
```

## Dane w bibliotece river

Dodatkowo, w bibliotce `river` zawarto pakiet zbiorów z dziedziny online machine learning wraz z potrzebnymi loaderami.

```{hint}

Pełną listę zbiorów można przejrzeć na stronie biblioteki [[LINK]](https://riverml.xyz/0.14.0/api/datasets/AirlinePassengers/)
```

Funkcjanolność biblioteki nie ogranicza się tylko do udostępnionych zbiorów. W bibliotece znajdziemy obsługę wielu popularnych formatów danych takich jak np. numpy array, pliki csv czy bezpośrednie pobieranie danych z baz sql.

Pełne przedstawienie formatów danych wspierających przez bibliotekę `river` przedstawiono w poniższej tabeli:

| Format danych | Loader
| --- | --- |
| arff | [stream.iter_arff](https://riverml.xyz/0.14.0/api/stream/iter-arff/) |
| numpy array | [stream.iter_array](https://riverml.xyz/0.14.0/api/stream/iter-array/)|
| csv | [stream.iter_csv](https://riverml.xyz/0.14.0/api/stream/iter-csv/) |
| LIBSVM | [stream.iter_libsvm](https://riverml.xyz/0.14.0/api/stream/iter-libsvm/) |
| pandas DataFrame | [stream.iter_pandas](https://riverml.xyz/0.14.0/api/stream/iter-pandas/) |
| sklearn dataset | [stream.iter_sklearn_dataset](https://riverml.xyz/0.14.0/api/stream/iter-sklearn-dataset/) |
| sql | [stream.iter_sql](https://riverml.xyz/0.14.0/api/stream/iter-sql/) |
| vaex DataFrame | [stream.iter_vaex](https://riverml.xyz/0.14.0/api/stream/iter-vaex/)



## Tutorial

W tutorialu wykorzystano zbiór danych [`MovieLens100K`](https://grouplens.org/datasets/movielens/100k/). Jest to zbiór danych przygotowany przez naukowców z University of Minnesota. Składa się z 100 000 ocen w skali 1-5 1682 filmów pochodzących od 943 użytkowników. 

### Podstawowe operacje na zbiorze danych

#### Wczytanie zbioru danych dostępnego w bibliotece

In [None]:
from river.datasets import MovieLens100K

ds = MovieLens100K()
ds

#### Dostęp do elementów zbioru 
Standardowy dostęp do elementów ze zbioru danych odbywa się za pomocą iteratora.

In [None]:
next(iter(ds))

W przypadku wykorzystania pętli, nie ma potrzeby tworzenia wcześniej iteratora.

```{hint}
W celu przejrzystego wyświetlania danych została wykorzystana metoda [`display_pretty`](https://ipython.readthedocs.io/en/stable/api/generated/IPython.display.html#IPython.display.display_pretty) z modułu `IPython.display`
```

In [None]:
from IPython.display import display_pretty

for x, y in ds:
    display_pretty(x, y)
    break

W przypadku zbiorów danych udostępnionych przez bibliotekę, do dyspozycji mamy również metodę [`take`](https://riverml.xyz/0.14.0/api/datasets/base/Dataset/), która zwraca iterator na n pierwszych elementów

In [None]:
list(ds.take(k=2))

#### Wczytanie zbioru z pliku csv

W celach ćwiczenia wczytamy zbiór danych bezpośrednio z pliku `csv`, ale wykorzystamy już wersję zbioru pobraną wcześniej. Aby dostać się do ścieżki pobranego zbioru odwołamy się do pola `path` z utworzonego wcześniej obiektu zbioru.

In [None]:
path = ds.path
path

Do stworzenia iteratora z pliku skorzystamy z metody [`iter_csv`](https://riverml.xyz/0.14.0/api/stream/iter-csv/)

In [None]:
from river.stream import iter_csv

ds = iter_csv(path, delimiter="\t")

for x, y in ds:
    display_pretty(x, y)
    break

W porównaniu do poprzednego formatu możemy zauważyć nieporządane właściowości:
- Wszystkie cechy wczytane są jako obiekty `string`
- Cecha wyjściowa nie jest ustawiona, w razultacie otrzymujemy obiekt `None`

Aby to poprawić, musimy zdefiniować dodatkowe argumenty, przekazane do metody `iter_csv`:
- Mapowanie pól danych przekozujemy poprzez ustawienie parametru `converters`
- Zdefiniowanie cechy wyjśćiowowej za pomocą parametru `target`

In [None]:
from river.stream import iter_csv

ds = iter_csv(
    path,
    delimiter="\t",
    target="rating",
    converters={
        "rating": float,
        "age": int,
        "timestamp": int,
        "release_date": int,
        "title": str
    },
)

for x, y in ds:
    display_pretty(x, y)
    break

Częściowo możemy to obejść poprzez wykorzystanie biblioteki `pandas` i metody [`iter_pandas`](https://riverml.xyz/0.11.1/api/stream/iter-pandas/). Pola danych zostaną automatycznie zmapowane za wyjątkiem zmiennej wyjściowej.


In [None]:
import pandas as pd
from river.stream import iter_pandas

df = pd.read_csv(path, sep="\t", na_values="")
target = df.pop("rating")
ds = iter_pandas(df, y=target)

for x, y in ds:
    display_pretty(x, y)
    break

```{warning}
Trzeba mieć na uwadze, że takie rozwiązanie nie jest w pełni podejściem `online`, ponieważ plik zostanie najpierw wczytany w całości. Rozwiązanie może być natomiast przydatne w trakcie eksperymentowania.
```

```{danger}

Pomimo tego, że rozwiązania na pierwszy rzut oka wyglądają identycznie, tak na prawdę różnią się.
Dlaczego? Pandas uzupełni brakujące wartości jako `nan`, który jest type float. 
```

Sprawdźmy brakujące wartości

In [None]:
df.isna().any(axis=1).sum()

```{warning}
Łącznie w całym zbiorze mamy 909 rekordów z co najmniej jedną brakującą wartością.
```

Sprawdźmy teraz dla jakich kolumn występowały brakujące wartości

In [None]:
df.isna().any(axis=0)

Wartości brakujące występowały wyłącznie dla zmiennych tekstowych `title` oraz `occupation`
W takim razie wystarczy wykorzystać metodę ```fillna(str)```, aby mieć domyślne zachowanie.

In [None]:
df = pd.read_csv(path, sep="\t").fillna("str")

```{danger}
Metoda `fillna()` powinna być jednak wykorzystana z rozwagą. Dodatkowe sprawdzenia były celowe, ponieważ jeżeli brakujące wartości pojawiły się w polach liczbowych pandas uzupełniłby je wartościami pustymi typu `str`.
```

### Przetwarzanie zbioru danych

#### Rozkład klas liczony od na całym wczytanym zbiorze

Zacznijmy od zbadania rozkładu klas. Zacznijmy od scenariusza, gdzie mamy dostęp do wczytanego pełnego zbioru (wykonanego za pomocą biblioteki `pandas`)

In [None]:
from collections import Counter

import seaborn as sns
import matplotlib.pyplot as plt

counter = Counter(target.values)
sns.barplot(y=list(counter.values()), x=list(counter.keys()), errorbar=None)
plt.show()

#### Obliczenie rozkładu klas z wykorzystaniem podejścia strumieniowego

W bibliotece river możemy również obliczać statystyki w podejściu strumieniowym, czego przykładem jest [`Histogram`](https://riverml.xyz/0.14.0/api/sketch/Histogram/)

In [None]:
from river.sketch import Histogram

ds = iter_pandas(df, y=target)
hist = Histogram(max_bins=5)

for x, y in ds:
    hist = hist.update(y)
    
sns.barplot(y=[b.count for b in hist], x=[b.left for b in hist], errorbar=None)
plt.show()

#### Obliczenie rozkładu klas z wykorzystaniem licznika przybliżonego

Często jednak wartości cech mogą nie mieć zdefiniowanego zakresu, ale pochodzić z całego zbioru liczb rzeczywistych. Obliczenie histogramu dla takich cech, może być zbyt kosztowne obliczeniowo. W takiej sytuacji możemy skorzystać z przybliżonego obliczenia wartości. W tym celu wykorzystamy klasę [Counter](https://riverml.xyz/0.14.0/api/sketch/Counter/)


In [None]:
from river.sketch import Counter

cms = Counter(seed=441)
ds = iter_pandas(df, y=target)
values = set()

for x, y in ds:
    cms = cms.update(int(y))
    values.add(int(y))

```{warning}
Obliczanie przybliżone licznika wystąpień poszczególnych wartości dla cech z ograniczonym podzbiorem, jakim jest cecha wyjściowa zbioru MovieLens100K może nie mieć uzsadnienia. Warto jednak pamiętać o takim rozwiązaniu przy cechach z dużą liczbą unikatowych wartości.
```

#### Porównanie i podsumowanie dokładności podejść obliczenia rozkładu klas

In [None]:
summary = []
for rating in range(1, 6):
    summary.append(
        {
            "Counter": counter[rating],
            "Counter (approx. version)": cms[rating],
            "Histogram (streaming version)": hist[rating - 1].count,
        }
    )

pd.DataFrame(summary, index=range(1, 6))

W testowanym scenariuszu podejścia mają identyczne wyniki. Wynika to z ograniczonego rozmiaru zbioru (przyp. tylko 100K przypadków) oraz ograniczanego zakresu wartości całkowitych <1,5>.

### Transformacje i ekstrakcja cech

W tym podrozdziale omówimy podstawowe transformacje cech z wykorzystaniem river.

Przypomnijmy jakie cechy znajdują się w zbiorze danych

In [None]:
df.head()

W ramach materiału wykonamy poniższych przekształceń:
- Przekształcenie cech `timestamp` oraz `release_date`  z formatu `timestamp` do formatu `datetime`
- Kompozycja cech
- Modelowanie tematyczne tytułów filmów za pomocą algorytmu `LDA` 
- Tworzenie sekwencji kroków przetwarzania za pomocą klasy `Pipeline`
- Agregacje cech z perspektywy danego użytkownika

#### Przekształcenie cech `timestamp` oraz `release_date`  z formatu `timestamp` do formatu `datetime`

In [None]:
import datetime

ds = iter_pandas(df, y=target)

for x, y in ds:
    for date_column in ("timestamp", "release_date"):
        x[date_column] = datetime.datetime.fromtimestamp(x[date_column] / 1e9)
    
    # `break` i `display_pretty` tylko w celach wyświetlenia pojedynczego rekordu
    display_pretty(x)
    break 

#### Kompozycja transformacji w jedną

Zamiast ręcznie pisać pętle możemy do przetwarzenia kilku kolumn, możemy je złożyć w jedną transformację za pomocą klasy [`TransformerUnion`](https://riverml.xyz/0.14.0/api/compose/TransformerUnion/). W tym calu musimy jednak mieć naszą metodę transformującą dane z wykorzystaniem klasy [`FuncTransformer`](https://riverml.xyz/0.14.0/api/compose/FuncTransformer/).

Zaczniemy od przepisania transformacji w klasę.

In [None]:
from typing import Any, Dict


class TimestamptoDateConverter:
    def __init__(self, col_name: str) -> None:
        self.col_name = col_name

    def __call__(self, x: Dict[str, Any]) -> Dict[str, datetime.datetime]:
        # Do not assume incorrectly that something is a date in timestamp format
        assert isinstance(x[self.col_name], int)
        return {
            f"{self.col_name}_datetime": datetime.datetime.fromtimestamp(
                x[self.col_name] / 1e9
            )
        }

Teraz możemy utworzyć obiekt `FuncTransformer`

In [None]:
from river.compose import FuncTransformer

FuncTransformer(TimestamptoDateConverter(col_name="timestamp"))

Teraz możemy złożyć transformację kilku cech za tworząc obiekt klasy [`TransformerUnion`](https://riverml.xyz/0.14.0/api/compose/TransformerUnion/) lub użyć operatora `+`

In [None]:
from river.compose import TransformerUnion

transformations = TransformerUnion(
    FuncTransformer(TimestamptoDateConverter(col_name="timestamp")),
    FuncTransformer(TimestamptoDateConverter(col_name="release_date")),
)
transformations

In [None]:
transformations = (
    FuncTransformer(TimestamptoDateConverter(col_name="timestamp"))
    + FuncTransformer(TimestamptoDateConverter(col_name="release_date")),
)
transformations

In [None]:

transformations = TransformerUnion(
    FuncTransformer(TimestamptoDateConverter(col_name="timestamp")),
    FuncTransformer(TimestamptoDateConverter(col_name="release_date")),
)

for x, y in ds:
    x.update(transformations.learn_one(x).transform_one(x))

    display_pretty(x)
    break

```{warning}

Zamiast nadpisywać obiekty w powyższym kodzie, dodano nowe kolumny. 
To jak zostanie to obsłużone jest po naszej stronie. W niektórych przypadkach źródłowe kolumny usuwa się dopiero na końcu, ponieważ mogą być wymagane do transformacji innych cech.
```

#### Modelowanie tematyczne tytułów filmów za pomocą algorytmu `LDA` 

In [None]:
from river.feature_extraction import BagOfWords
from river.preprocessing import LDA

ds = iter_pandas(df, y=target)

bow = BagOfWords(on="title")
lda = LDA(seed=441)

for x, y in ds:
    bow_title = bow.transform_one(x)

    # `break` i `display_pretty` tylko w celach wyświetlenia pojedynczego rekordu
    display_pretty(lda.learn_transform_one(bow_title))
    break

#### Tworzenie sekwencji kroków przetwarzania za pomocą klasy `Pipeline`

Przekształcimy teraz LDA w jeden pipeline. Możemy to zrobić wykorzystując operator `|`

In [None]:
pipe = BagOfWords(on="title") | LDA(seed=441)
pipe

lub możemy bezpośrednio utworzyć obiekt klasy `Pipeline`

In [None]:
from river.compose.pipeline import Pipeline

pipe = Pipeline(BagOfWords(on="title"), LDA(seed=441))
pipe

Co uproszcza cały kod do następującej postaci:

In [None]:
pipe = Pipeline(BagOfWords(on="title"), LDA(seed=441))

ds = iter_pandas(df, y=target)
for x, y in ds:
    # `break` i `display_pretty` tylko w celach wyświetlenia pojedynczego rekordu
    display_pretty(pipe.learn_one(x).transform_one(x))
    break

#### Agregacje cech z perspektywy danego użytkownika

Aggregacje na strumieniu w bibliotece river wykonujemy za pomocą modułu [Agg](https://riverml.xyz/0.14.0/api/feature-extraction/Agg/)

W ramach materiałów skupimy się na aggregacji zmiennej wyjściowej czyli oceny filmu. Wym celu wykorzystamy klasę [`TargetAgg`](https://riverml.xyz/0.14.0/api/feature-extraction/TargetAgg/), która dokonuje aggrecji cechy wyjściowej.

Obliczmy teraz liczbę recenzji użytkowników dla pierwszych dwudziestu elementów strumienia

In [None]:
from river.feature_extraction import TargetAgg
from river.stats import Count

ds = iter_pandas(df.iloc[0:20], y=target[0:20])

agg = TargetAgg(by=["user"], how=Count())
users_reviews_count = []
for x, y in ds:
    users_reviews_count.append({"user": x["user"], **agg.learn_one(x, y).transform_one(x)})
    
users_reviews_count_df = pd.DataFrame(users_reviews_count)
users_reviews_count_df.index.name = "event_id"
users_reviews_count_df

Aggregacje cechy z perspektywy całej historii użytkownika to tylko jedna z możliwości. Możemy zostosować aggregacje typu [`Rolling`](https://riverml.xyz/0.14.0/api/utils/Rolling/) która dokona aggregacji tylko na określonej liczbie rekordów.

Obliczmy teraz średnia ocena recenzji użytkownika z ostatnich pięciu rekordów

In [None]:
from river.utils import Rolling
from river.stats import Mean

ds = iter_pandas(df.iloc[0:20], y=target[0:20])

agg = TargetAgg(by=["user"], how=Rolling(Mean(), 5))
user_rolling_review_mean = []
for x, y in ds:
    user_rolling_review_mean.append(
        {"user": x["user"], "rating": y, **agg.learn_one(x, y).transform_one(x)}
    )

user_rolling_review_mean_df = pd.DataFrame(user_rolling_review_mean)
user_rolling_review_mean_df.index.name = "event_id"
user_rolling_review_mean_df

Aggregacje również możemy grupować w pipeline za pomocą `TransformerUnion`

In [None]:
aggregations = TargetAgg(by=["user"], how=Rolling(Mean(), 5)) +  TargetAgg(by=["user"], how=Count())
aggregations

In [None]:
ds = iter_pandas(df.iloc[0:20], y=target[0:20])

output = []
for x, y in ds:
    output.append({"user": x["user"], "rating": y, **aggregations.learn_one(x, y).transform_one(x)})
    
output = pd.DataFrame(output)
output.index.name = "event_id"
output