In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
from finn_deals.scraping.finn import FinnAPI

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timezone
from sklearn.model_selection import train_test_split

In [None]:
api = FinnAPI()

In [None]:

df = api.search_dataframe(query="gitar")
df["timestamp"] = pd.to_datetime(df["timestamp"], format="ISO8601")
df["time_since_posted"] = datetime.now(timezone.utc) - df["timestamp"]
df

In [None]:
import numpy as np
from typing import List, Iterable, Dict

class LogScaler:
    def __init__(self):
        self.min = None
        self.max = None

    def fit(self, x):
        x = np.log1p(x)
        self.min = np.min(x)
        self.max = np.max(x)

        if self.max == self.min:
            self.max += 1e-9

        return self
    
    def transform(self, x):
        assert (self.max is not None) and (self.min is not None), "You need to fit the scaler first!"
        x = np.log1p(x)
        return (x - self.min) / (self.max - self.min)

    def fit_transform(self, x):
        _ = self.fit(x)
        return self.transform(x)

    def untransform(self, x):
        x = x*(self.max - self.min) - self.min
        return np.expm1(x)

class WordPieceTokenizer:
    def __init__(self, unk_token: str = "<UNK>"):
        self.unk_token = unk_token
        self.encoder: Dict[str, int] = {}
        self.decoder: Dict[int, str] = {}

    def fit(self, vocab: Iterable[str]):
        """
        Build word → id and id → word lookup tables.
        """
        vocab = sorted(set(vocab))  # deterministic ordering
        vocab = [self.unk_token] + vocab  # add unknown token first

        self.encoder = {word: idx for idx, word in enumerate(vocab)}
        self.decoder = {idx: word for idx, word in enumerate(vocab)}
        return self

    def _tokenize(self, x: Iterable[str]) -> List[List[str]]:
        """
        Lowercase and split each string in x.
        """
        return [s.lower().split() for s in x]

    def __call__(self, x: Iterable[str]) -> List[List[int]]:
        """
        Convert text → token_ids.
        """
        tokenized = self._tokenize(x)
        return [
            [self.encoder.get(word, self.encoder[self.unk_token]) for word in sentence]
            for sentence in tokenized
        ]
    
    def __len__(self,):
        """
        Vocab size.
        """
        return len(self.encoder)


# ---------------- Example Usage ---------------- #

tokenizer = WordPieceTokenizer()

# Build vocab from dataframe titles
vocab = " ".join(df["title"].str.lower().tolist()).split()

tokenizer.fit(vocab)

# print(
#     tokenizer(df["title"].values)
# )

X = tokenizer(df["title"])

scaler = LogScaler()
y = scaler.fit_transform(df["price_amount"].values)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

X_train = torch.nested.nested_tensor(X_train, layout=torch.jagged)
X_test = torch.nested.nested_tensor(X_test, layout=torch.jagged)
y_train = torch.tensor(y_train, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)

In [None]:
import torch
import torch.nn as nn
from dataclasses import dataclass

@dataclass
class SentimentModelConfig:
    vocab_size: int = 4617
    embedding_dim: int = 16
    padding_idx: int = 0
    mlp_hidden: int = 32   # size of hidden layer

class PositionalEncoder(nn.Module):
    ...

class SentimentModel(nn.Module):

    def __init__(self, config: SentimentModelConfig = SentimentModelConfig()):
        super().__init__()
        self.embedding = nn.EmbeddingBag(
            num_embeddings=config.vocab_size,
            embedding_dim=config.embedding_dim,
            padding_idx=config.padding_idx,
        )
        self.mlp = nn.Sequential(
            nn.Linear(config.embedding_dim, config.mlp_hidden),
            nn.ReLU(),
            nn.Linear(config.mlp_hidden, 1),
            nn.Sigmoid()  # to keep output between 0–1
        )
    
    def forward(self, x,):
        x = self.embedding(x)     # shape: (batch, embedding_dim)
        x = self.mlp(x)           # shape: (batch, 1)
        return x.squeeze(-1)      # optional: return (batch,)

In [None]:
model = SentimentModel()

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import math

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = SentimentModel().to(device)

X_train = X_train.to(device)
y_train = y_train.to(device).float()

X_test = X_test.to(device)
y_test = y_test.to(device).float()

criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

num_iterations = 5000
patience = 50
best_val_loss = float('inf')
steps_without_improvement = 0

for it in range(1, num_iterations + 1):

    # ---- train step ----
    model.train()
    optimizer.zero_grad()
    pred = model(X_train)
    loss = criterion(pred, y_train)
    loss.backward()
    optimizer.step()

    # ---- validation / test loss ----
    model.eval()
    with torch.no_grad():
        test_pred = model(X_test)
        test_loss = criterion(test_pred, y_test)

    # ---- early stopping ----
    if test_loss.item() < best_val_loss:
        best_val_loss = test_loss.item()
        steps_without_improvement = 0
        best_state_dict = {k: v.clone() for k, v in model.state_dict().items()}
    else:
        steps_without_improvement += 1

    # ---- logging ----
    if it % 10 == 0:
        rmse = math.sqrt(loss.item())
        test_rmse = math.sqrt(test_loss.item())
        print(
            f"Iter {it:05d} | Train RMSE: {rmse:.4f} | Test RMSE: {test_rmse:.4f}"
        )

    if steps_without_improvement >= patience:
        print(f"Early stopping at iteration {it}")
        break

# restore best model
model.load_state_dict(best_state_dict)
print(f"Best Test RMSE: {math.sqrt(best_val_loss):.4f}")


In [None]:
pred = model(X_test).cpu().detach().numpy()

In [None]:
np.mean(np.abs(scaler.untransform(pred) - scaler.untransform(y_test.cpu().detach().numpy())))

In [None]:
plt.hist(scaler.untransform(y_test.cpu().detach().numpy()) - scaler.untransform(pred), bins=100)
plt.show()

In [None]:
plt.hist(scaler.untransform(y_test.cpu().detach().numpy()), bins=100)
plt.hist(scaler.untransform(pred), bins = 100, alpha=0.6)
plt.xscale("log")
plt.show()

In [None]:
from sklearn.metrics import r2_score

print(f"{r2_score(scaler.untransform(y_test.cpu().detach().numpy()), scaler.untransform(pred)):.2%}")
print(f"{r2_score(y_test.cpu().detach().numpy(), pred):.2%}")