In [None]:
from clickhouse_driver import Client
from typing import Generator, Any
import os
import json
from tqdm.notebook import tqdm
from dateutil.parser import parse
from collections import defaultdict
import numpy as np
import warnings
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import OneHotEncoder, FunctionTransformer
from sklearn.impute import SimpleImputer
from scipy import sparse
import logging as log
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import roc_auc_score

warnings.filterwarnings("ignore")
log.basicConfig(level=log.INFO)

# ------------------------------
# DATA LOADING AND VALIDATION
# ------------------------------
def generate_game_raw(path_to_games_raw_dir: str = "data/games_raw") -> Generator[dict[str, Any], None, None]:
    for filename in tqdm(os.listdir(path_to_games_raw_dir)):
        try:
            with open(os.path.join(path_to_games_raw_dir, filename), "r", encoding="utf-8") as f:
                yield json.load(f)
        except:
            pass

def validate_game(game: dict) -> bool:
    try:
        int(game["id"])
        parse(game["begin_at"])
        int(game["match"]["league"]["id"])
        int(game["match"]["serie"]["id"])
        int(game["match"]["tournament"]["id"])
        int(game["map"]["id"])

        team_players = defaultdict(list)
        for p in game["players"]:
            team_players[p["team"]["id"]].append(p["player"]["id"])

        assert len(team_players) == 2
        for t_id, p_ids in team_players.items():
            assert len(set(p_ids)) == 5

        team_ids = list(team_players.keys())
        rounds = []
        for r in game["rounds"]:
            assert r["round"] is not None
            assert r["ct"] in team_ids
            assert r["terrorists"] in team_ids
            assert r["winner_team"] in team_ids
            rounds.append(r["round"])
        assert min(rounds) == 1
        assert max(rounds) >= 16
        return True
    except:
        return False

def get_game_ids(path_to_games_raw_dir: str = "data/games_raw") -> list[int]:
    game_ids_valid = []
    game_begin_at_valid = []
    for game in generate_game_raw(path_to_games_raw_dir):
        if validate_game(game):
            game_ids_valid.append(game["id"])
            game_begin_at_valid.append(parse(game["begin_at"]))
    return np.array(game_ids_valid)[np.argsort(game_begin_at_valid)].tolist()

# ------------------------------
# X, y PREPARATION
# ------------------------------
def get_X_y_for_game(path_to_games_raw_dir: str, game_id: int):
    with open(os.path.join(path_to_games_raw_dir, f"{game_id}.json"), "r", encoding="utf-8") as f:
        game = json.load(f)

    X = []
    # Basic game info
    X.append(int(game["id"]))  # 0
    X.append(int(parse(game["begin_at"]).timestamp()))  # 1
    X.append(int(game["match"]["league"]["id"]))  # 2
    X.append(int(game["match"]["serie"]["id"]))  # 3
    X.append(int(game["match"]["tournament"]["id"]))  # 4
    X.append(int(game["map"]["id"]))  # 5

    team_players = defaultdict(list)
    player_kills_game = {}
    for p in game["players"]:
        team_players[p["team"]["id"]].append(p["player"]["id"])
        player_kills_game[p["player"]["id"]] = p.get("kills", 0)

    t1_id, t2_id = sorted(team_players.keys())
    X.extend([t1_id, t2_id])  # 6,7
    X.extend(sorted(team_players[t1_id]))  # 8-12
    X.extend(sorted(team_players[t2_id]))  # 13-17

    y = int(np.mean([r["winner_team"] == t1_id for r in game["rounds"]]) > 0.5)

    return X, y

def get_X_y(game_ids_train: list[int]):
    X, y = [], []
    for game_id in tqdm(game_ids_train):
        _X, _y = get_X_y_for_game(PATH_TO_GAMES_RAW, game_id)
        X.append(_X)
        y.append(_y)
    return np.array(X), np.array(y)

# ------------------------------
# FEATURE EXTRACTORS
# ------------------------------
class PlayerStatFeatureExtractor(BaseEstimator, TransformerMixin):
    def __init__(self, path_to_games_raw_dir: str, game_ids: list[int], key: str = "kills"):
        self.path_to_games_raw_dir = path_to_games_raw_dir
        self.game_ids = game_ids
        self.key = key  # stat key: kills, deaths, etc.
        self.cumulative_dict = {}

    def _get_player_stat(self, game_id: int) -> dict:
        with open(os.path.join(self.path_to_games_raw_dir, f"{game_id}.json"), "r", encoding="utf-8") as f:
            game = json.load(f)
        return {p["player"]["id"]: int(p.get(self.key) or 0) for p in game["players"]}

    def fit(self, X, y=None):
        n_games = X.shape[0]
        X_cumulative = np.zeros((n_games, 10), dtype=int)
        for i in range(n_games):
            game_id = self.game_ids[i]
            player_ids = X[i]
            player_stat_game = self._get_player_stat(game_id)
            row_stat = []
            for p_id in player_ids:
                self.cumulative_dict[p_id] = self.cumulative_dict.get(p_id, 0) + player_stat_game.get(p_id, 0)
                row_stat.append(self.cumulative_dict[p_id])
            X_cumulative[i, :] = row_stat
        self.X_cumulative_train = X_cumulative
        return self

    def transform(self, X):
        n_games = X.shape[0]
        if hasattr(self, "X_cumulative_train") and self.X_cumulative_train.shape[0] == n_games:
            X_cumulative = self.X_cumulative_train
        else:
            X_cumulative = np.zeros((n_games, 10), dtype=int)
            for i in range(n_games):
                X_cumulative[i, :] = np.array([self.cumulative_dict.get(p_id, 0) for p_id in X[i]])
        first5_sorted = np.sort(X_cumulative[:, :5], axis=1)
        last5_sorted = np.sort(X_cumulative[:, 5:], axis=1)
        x = np.hstack([first5_sorted, last5_sorted])
        mean_first5 = np.mean(first5_sorted, axis=1).reshape(-1, 1)
        mean_last5 = np.mean(last5_sorted, axis=1).reshape(-1, 1)
        diff_means = mean_first5 - mean_last5
        x = np.hstack([x, mean_first5, mean_last5, diff_means])
        pairwise_diffs = np.zeros((n_games, 25))
        for row_id in range(x.shape[0]):
            c = 0
            for i in range(5):
                for j in range(5):
                    pairwise_diffs[row_id, c] = first5_sorted[row_id, i] - last5_sorted[row_id, j]
                    c += 1
        x = np.hstack([x, pairwise_diffs])
        return x

class TeamBagEncoder(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        X = np.asarray(X)
        self.d = {val: idx for idx, val in enumerate(np.unique(X.flatten()))}
        log.info(f"TeamBagEncoder: {len(self.d)} unique teams found.")
        return self
    def transform(self, X):
        X = np.asarray(X)
        rows, cols, data = [], [], []
        for i, row in enumerate(X):
            for j, val in enumerate(row):
                if val in self.d:
                    rows.append(i)
                    cols.append(self.d[val])
                    data.append(1 if j == 0 else -1)
        return sparse.csr_matrix((data, (rows, cols)), shape=(X.shape[0], len(self.d)), dtype=int)

class PlayerBagEncoder(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        X = np.asarray(X)
        self.d = {val: idx for idx, val in enumerate(np.unique(X.flatten()))}
        log.info(f"PlayerBagEncoder: {len(self.d)} unique players found.")
        return self
    def transform(self, X):
        X = np.asarray(X)
        rows, cols, data = [], [], []
        for i, row in enumerate(X):
            for j, val in enumerate(row):
                if val in self.d:
                    rows.append(i)
                    cols.append(self.d[val])
                    data.append(1 if j < 5 else -1)
        return sparse.csr_matrix((data, (rows, cols)), shape=(X.shape[0], len(self.d)), dtype=int)
    
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import TimeSeriesSplit
from scipy import sparse
import numpy as np

class PermutationImportanceFeatureSelector(BaseEstimator, TransformerMixin):
    def __init__(self, model=None, scoring="roc_auc", n_repeats=1, n_splits=10, n_jobs=8, random_state=42):
        self.model = model or LogisticRegression(random_state=random_state, max_iter=1000)
        self.scoring = scoring
        self.n_repeats = n_repeats
        self.n_splits = n_splits
        self.n_jobs = n_jobs
        self.random_state = random_state
        self.feature_mask_ = None

    def fit(self, X, y):
        X_current = X.copy()
        feature_mask = np.ones(X_current.shape[1], dtype=bool)
        tscv = TimeSeriesSplit(n_splits=self.n_splits)

        while True:
            permutation_importances = np.zeros(X_current.shape[1])

            # CV loop
            for tr_idx, eval_idx in tscv.split(X_current, y):
                self.model.fit(X_current[tr_idx], y[tr_idx])
                perm_importances = permutation_importance_sparse_parallel(
                    self.model,
                    X_current[eval_idx],
                    y[eval_idx],
                    n_repeats=self.n_repeats,
                    n_jobs=self.n_jobs,
                    scoring=self.scoring
                )
                permutation_importances += perm_importances

            # Average across CV splits
            permutation_importances /= tscv.get_n_splits()

            # Keep only features with positive importance
            keep_mask = permutation_importances > 0

            if np.all(keep_mask):
                break

            # Update global feature mask
            new_feature_mask = np.zeros_like(feature_mask)
            new_feature_mask[feature_mask] = keep_mask
            feature_mask = new_feature_mask

            if np.sum(keep_mask) == 0:
                break

            # Reduce X_current for next iteration
            if sparse.issparse(X_current):
                X_current = X_current[:, keep_mask]
            else:
                X_current = X_current[:, keep_mask]

        self.feature_mask_ = feature_mask
        return self

    def transform(self, X):
        if self.feature_mask_ is None:
            raise RuntimeError("The selector has not been fitted yet.")
        if sparse.issparse(X):
            return X[:, self.feature_mask_]
        else:
            return X[:, self.feature_mask_]


# ------------------------------
# COLUMN SELECTORS
# ------------------------------
def select_columns(cols):
    return FunctionTransformer(lambda X: X[:, cols], validate=False)

from sklearn.metrics import get_scorer
from scipy import sparse
import numpy as np
from tqdm.notebook import tqdm
import multiprocessing as mp

# Worker function for a single feature
def _perm_importance_worker(args):
    model, X, y, col, n_repeats, scoring, seed = args
    rng = np.random.default_rng(seed)
    scorer = get_scorer(scoring)
    scores = []

    # Compute baseline score once
    baseline_score = scorer(model, X, y)
    print(f"Feature {col}: baseline score = {baseline_score:.4f}")

    for i in range(n_repeats):
        if sparse.issparse(X):
            X_permuted = X.copy().tocsc()
            col_data = X_permuted[:, col].toarray().ravel()
            rng.shuffle(col_data)
            X_permuted[:, col] = sparse.csc_matrix(col_data).T
        else:
            X_permuted = X.copy()
            col_data = X_permuted[:, col].copy()
            rng.shuffle(col_data)
            X_permuted[:, col] = col_data

        perm_score = scorer(model, X_permuted, y)
        print(f"Feature {col}, repeat {i+1}: permuted score = {perm_score:.4f}")
        scores.append(perm_score)
    
    return col, np.mean(scores)

# Main function to compute permutation importances in parallel
def permutation_importance_sparse_parallel(model, X, y, scoring="roc_auc", n_repeats=1, n_jobs=None, random_state=42):
    if sparse.issparse(X):
        X = X.tocsr()
    n_samples, n_features = X.shape
    scorer = get_scorer(scoring)
    baseline_score = scorer(model, X, y)

    rng = np.random.default_rng(random_state)
    seeds = rng.integers(0, 1_000_000, size=n_features)
    args = [(model, X, y, col, n_repeats, scoring, seeds[col]) for col in range(n_features)]

    with mp.Pool(n_jobs or mp.cpu_count()) as pool:
        results = list(tqdm(pool.imap(_perm_importance_worker, args), total=n_features))

    importances = np.zeros(n_features)
    for col, perm_score in results:
        importances[col] = baseline_score - perm_score

    return importances


# ------------------------------
# DATA SPLIT
# ------------------------------
PATH_TO_GAMES_RAW = "data/games_raw"
TEST_SIZE = 100

game_ids = get_game_ids(PATH_TO_GAMES_RAW)
game_ids_train, game_ids_test = game_ids[:-TEST_SIZE], game_ids[-TEST_SIZE:]

X_train, y_train = get_X_y(game_ids_train)
X_test, y_test = get_X_y(game_ids_test)

timestamp_cols = [1]
categorical_cols = [2,3,4,5]
team_cols = [6,7]
player_cols = list(range(8,18))



# ------------------------------
# PIPELINES
# ------------------------------


# ------------------------------
# PIPELINES
# ------------------------------
timestamp_pipeline = Pipeline([
    ('select', select_columns([1]))
])

categorical_pipeline = Pipeline([
    ('select', select_columns([2,3,4,5])),
    ('imputer', SimpleImputer(strategy='constant', fill_value=-1)),
    ('onehot', OneHotEncoder(handle_unknown="ignore", sparse_output=False))
])

team_pipeline = Pipeline([
    ('select', select_columns([6,7])),
    ('bag', TeamBagEncoder())
])

player_pipeline = Pipeline([
    ('select', select_columns(list(range(8,18)))),
    ('bag', PlayerBagEncoder())
])

# Player stats pipelines with MinMax scaling
player_stats_keys = [
    "kills", "deaths", "assists", "headshots",
    "flash_assists", "first_kills_diff", "k_d_diff"
]

player_stats_pipelines = [
    (key, Pipeline([
        ('select', select_columns(list(range(8,18)))),
        ('stat', PlayerStatFeatureExtractor(PATH_TO_GAMES_RAW, game_ids_train, key)),
        ('scaler', MinMaxScaler())
    ]))
    for key in player_stats_keys
]

# Combine all pipelines into full feature union
full_pipeline = FeatureUnion(
    [('timestamp', timestamp_pipeline),
     ('categorical', categorical_pipeline),
     ('team_bag', team_pipeline),
     ('player_bag', player_pipeline)] + player_stats_pipelines
)

full_pipeline = Pipeline([
    ('features', full_pipeline),  
    ('feature_selector', PermutationImportanceFeatureSelector(
        model=LogisticRegression(random_state=13, max_iter=1000),
        scoring="roc_auc",
        n_repeats=1,
        n_splits=5,
        n_jobs=8,
        random_state=42
    ))
])

# Fit the pipeline
full_pipeline.fit(X_train[:1000], y_train[:1000])

# Transform training and test sets
X_train_selected = full_pipeline.transform(X_train[:1000])
X_test_selected = full_pipeline.transform(X_test)






  0%|          | 0/56493 [00:00<?, ?it/s]

  0%|          | 0/37974 [00:00<?, ?it/s]

  0%|          | 0/100 [00:00<?, ?it/s]

INFO:root:TeamBagEncoder: 46 unique teams found.
INFO:root:PlayerBagEncoder: 237 unique players found.


  0%|          | 0/674 [00:00<?, ?it/s]

  0%|          | 0/674 [00:00<?, ?it/s]

  0%|          | 0/674 [00:00<?, ?it/s]

  0%|          | 0/674 [00:00<?, ?it/s]

  0%|          | 0/674 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

  0%|          | 0/1 [00:00<?, ?it/s]

In [2]:
X_train_selected

<Compressed Sparse Row sparse matrix of dtype 'float64'
	with 1000 stored elements and shape (1000, 1)>

 84%|████████▍ | 8131/9670 [02:21<00:26, 57.57it/s]Process ForkPoolWorker-10:
Process ForkPoolWorker-13:
Process ForkPoolWorker-15:
Process ForkPoolWorker-12:
Process ForkPoolWorker-11:
Process ForkPoolWorker-9:
Process ForkPoolWorker-16:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
Traceback (most recent call last):
  File "/usr/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.10/multiprocessing/pro

In [2]:
PATH_TO_GAMES_RAW = "data/games_raw"
TEST_SIZE = 100

game_ids = get_game_ids(PATH_TO_GAMES_RAW)
game_ids_train, game_ids_test = game_ids[:-TEST_SIZE], game_ids[-TEST_SIZE:]

X_train, y_train = get_X_y(game_ids_train)
X_test, y_test = get_X_y(game_ids_test)

  0%|          | 0/56493 [00:00<?, ?it/s]

  0%|          | 0/37974 [00:00<?, ?it/s]

  0%|          | 0/100 [00:00<?, ?it/s]

In [None]:


# Extract only player IDs for feature extractor
X_train_players = X_train_full[:, -10:]
X_test_players = X_test_full[:, -10:]

# Initialize extractor
fe = PlayerKillsFeatureExtractor(PATH_TO_GAMES_RAW)

# Fit using game_ids and X matrix of player IDs
fe.fit(game_ids_train, X_train_players, "kills")

# Transform
X_train_cum = fe.transform(X_train_players)
X_test_cum = fe.transform(X_test_players)

print("X_train_cum shape:", X_train_cum.shape)
print("X_test_cum shape:", X_test_cum.shape)

  0%|          | 0/56493 [00:00<?, ?it/s]

  0%|          | 0/37974 [00:00<?, ?it/s]

  0%|          | 0/100 [00:00<?, ?it/s]

X_train_cum shape: (37974, 38)
X_test_cum shape: (100, 38)


In [6]:
import pandas as pd
pd.DataFrame(X_train_cum).astype(int)

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,28,29,30,31,32,33,34,35,36,37
0,9,18,19,20,23,5,6,10,12,15,...,15,14,10,8,5,18,17,13,11,8
1,30,32,37,38,40,18,19,22,23,32,...,20,19,16,15,6,22,21,18,17,8
2,7,12,12,17,23,13,14,19,21,28,...,4,3,-2,-4,-11,10,9,4,2,-5
3,19,24,25,31,37,27,35,36,45,46,...,4,-4,-5,-14,-15,10,2,1,-8,-9
4,33,34,38,46,47,30,37,39,42,49,...,16,9,7,4,-3,17,10,8,5,-2
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
37969,2159,3600,4646,4958,5712,4850,5201,5906,7961,17878,...,108,-243,-948,-3003,-12920,862,511,-194,-2249,-12166
37970,2175,3618,4667,4976,5732,4864,5222,5928,7987,17890,...,112,-246,-952,-3011,-12914,868,510,-196,-2255,-12158
37971,4602,8196,8530,8559,10140,5048,9208,9506,16265,17623,...,3511,-649,-947,-7706,-9064,5092,932,634,-6125,-7483
37972,4624,8216,8545,8571,10157,5062,9225,9530,16288,17636,...,3509,-654,-959,-7717,-9065,5095,932,627,-6131,-7479


In [None]:
class PlayerKillsFeatureExtractor:
    def __init__(self):
        self._kills_dict = {}
        
    def fit(self, X, y):
        

(37974, 54)

4568

In [2]:
def fetch_games_by_ids(client: Client, game_ids, clickhouse_db="cs2_db"):
    if len(game_ids) == 0:
        return np.array([]), np.array([])
    
    ids_str = ",".join(str(i) for i in game_ids)

    query = f"""
    SELECT
        g.game_id,
        any(g.begin_at) AS timestamp,
        any(g.map_id) AS map_id,
        any(g.league_id) AS league_id,
        any(g.serie_id) AS serie_id,
        any(g.tournament_id) AS tournament_id,
        any(g.tier_id) AS tier_id,
        t.teams[1] AS t1_id,
        t.teams[2] AS t2_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[1]))), 1, 5), 1) AS p1_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[1]))), 1, 5), 2) AS p2_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[1]))), 1, 5), 3) AS p3_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[1]))), 1, 5), 4) AS p4_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[1]))), 1, 5), 5) AS p5_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[2]))), 1, 5), 1) AS p6_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[2]))), 1, 5), 2) AS p7_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[2]))), 1, 5), 3) AS p8_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[2]))), 1, 5), 4) AS p9_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[2]))), 1, 5), 5) AS p10_id,
        sumIf(g.round_win, g.team_id = t.teams[1]) > sumIf(g.round_win, g.team_id = t.teams[2]) AS team1_win
    FROM {clickhouse_db}.games_flatten AS g
    INNER JOIN (
        SELECT
            game_id,
            arraySort(arrayDistinct(groupArray(team_id))) AS teams
        FROM {clickhouse_db}.games_flatten
        WHERE game_id IN ({ids_str})
        GROUP BY game_id
    ) AS t USING (game_id)
    GROUP BY g.game_id, t.teams
    ORDER BY timestamp ASC
    """

    result = client.query_dataframe(query)
    return result
    if not result:
        return np.array([]), np.array([])

    rows = [list(r) for r in result[0]]

    for row in rows:
        if hasattr(row[1], "timestamp"):
            row[1] = row[1].timestamp()

    X = np.array([r[1:-1] for r in rows], dtype=float)
    y = np.array([r[-1] for r in rows], dtype=int)

    return X, y

In [50]:
df_train

Unnamed: 0,game_id,timestamp,map_id,league_id,serie_id,tournament_id,tier_id,t1_id,t2_id,p1_id,p2_id,p3_id,p4_id,p5_id,p6_id,p7_id,p8_id,p9_id,p10_id,team1_win
0,1744,2016-04-13 16:30:00,8,4569,1544,1583,1,3216,3224,17539,17540,17542,17570,17602,17529,17576,17578,17579,17588,1
1,2365,2016-06-11 03:01:00,8,4154,1559,1714,0,3209,3224,17507,17508,17509,17510,17524,17529,17576,17578,17579,17655,0
2,1287,2016-09-08 13:30:00,8,4165,1591,1574,0,3209,3216,17507,17508,17509,17510,17524,17525,17539,17540,17541,17542,1
3,2011,2016-12-10 00:40:00,4,4153,1552,1567,0,3222,3245,17513,17515,17516,17568,17708,17501,17581,17658,17664,17665,1
4,4436,2017-03-01 21:00:00,3,4161,1583,1642,0,3212,3245,17522,17523,17524,17565,17569,17501,17543,17581,17664,17665,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
37026,70114,2022-04-13 10:15:39,2,4163,4541,7883,2,126709,129501,20569,25589,27555,28854,34000,17499,17747,21433,21439,25439,0
37027,70112,2022-04-13 10:18:37,8,4163,4541,7883,2,125802,126377,20678,20684,20692,23776,29284,17497,17498,17501,17543,19666,0
37028,70113,2022-04-13 11:29:14,2,4163,4541,7883,2,125802,126377,20678,20684,20692,23776,29284,17497,17498,17501,17543,19666,0
37029,70115,2022-04-13 11:30:45,7,4163,4541,7883,2,126709,129501,20569,25589,27555,28854,34000,17499,17747,21433,21439,25439,1


In [1]:
from clickhouse_driver import Client
import pandas as pd
import numpy as np
from tqdm.notebook import tqdm

# -----------------------------
# ClickHouse connection
# -----------------------------
client = Client(
    host="localhost",
    port=9000,
    user="cs2_user",
    password="cs2_password",
    database="cs2_db"
)

# -----------------------------
# Fetch games by IDs
# -----------------------------
def fetch_games_by_ids(client: Client, game_ids, clickhouse_db="cs2_db"):
    if len(game_ids) == 0:
        return pd.DataFrame()
    
    ids_str = ",".join(str(i) for i in game_ids)
    query = f"""
    SELECT
        g.game_id,
        any(g.begin_at) AS timestamp,
        any(g.map_id) AS map_id,
        any(g.league_id) AS league_id,
        any(g.serie_id) AS serie_id,
        any(g.tournament_id) AS tournament_id,
        any(g.tier_id) AS tier_id,
        t.teams[1] AS t1_id,
        t.teams[2] AS t2_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[1]))), 1, 5), 1) AS p1_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[1]))), 1, 5), 2) AS p2_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[1]))), 1, 5), 3) AS p3_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[1]))), 1, 5), 4) AS p4_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[1]))), 1, 5), 5) AS p5_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[2]))), 1, 5), 1) AS p6_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[2]))), 1, 5), 2) AS p7_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[2]))), 1, 5), 3) AS p8_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[2]))), 1, 5), 4) AS p9_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[2]))), 1, 5), 5) AS p10_id,
        sumIf(g.round_win, g.team_id = t.teams[1]) > sumIf(g.round_win, g.team_id = t.teams[2]) AS team1_win
    FROM {clickhouse_db}.games_flatten AS g
    INNER JOIN (
        SELECT
            game_id,
            arraySort(arrayDistinct(groupArray(team_id))) AS teams
        FROM {clickhouse_db}.games_flatten
        WHERE game_id IN ({ids_str})
        GROUP BY game_id
    ) AS t USING (game_id)
    GROUP BY g.game_id, t.teams
    ORDER BY timestamp ASC
    """
    df = client.query_dataframe(query)
    return df

# -----------------------------
# Initialize cumulative columns
# -----------------------------
def initialize_cumulative_columns(df: pd.DataFrame, prefix="p", stat="kills", n_players=10):
    cols = [f"{prefix}{i+1}_{stat}_sum" for i in range(n_players)]
    for col in cols:
        df[col] = 0
    return df, cols

# -----------------------------
# Fill cumulative stat
# -----------------------------
def fill_cumulative_stat(client: Client, df: pd.DataFrame, stat: str = "kills", view_name="player_cumulative_view"):
    df, stat_cols = initialize_cumulative_columns(df, stat=stat)
    
    for game_id in tqdm(df["game_id"], desc=f"Filling {stat}"):
        # Fetch the game data
        game = client.query_dataframe(
            "SELECT * FROM games_flatten WHERE game_id = %(game_id)s",
            params={"game_id": game_id}
        )
        if game.empty:
            continue
        
        begin_at = game["begin_at"].iloc[0]
        team_player = game.groupby("team_id")["player_id"].apply(lambda x: np.unique(x)).to_dict()
        
        
        stat_all = []
        for team_id, p_ids in team_player.items():
            stat_all.extend(sorted([client.query_dataframe(
                f"SELECT sum({stat}) AS value FROM {view_name} WHERE player_id = %(player_id)s AND begin_at < %(begin_at)s",
                params={"player_id": p_id, "begin_at": begin_at}
            )["value"].iloc[0]
            for p_id in p_ids]))
        
        df.loc[df["game_id"] == game_id, stat_cols] = stat_all
    
    return df

# -----------------------------
# Usage
# -----------------------------
game_ids = client.query_dataframe("SELECT DISTINCT(game_id) FROM games_flatten ORDER BY begin_at")["game_id"].values
game_ids_train, game_ids_test = game_ids[:-100], game_ids[-100:]

df_train = fetch_games_by_ids(client, game_ids_train)
df_test  = fetch_games_by_ids(client, game_ids_test)

df_train = fill_cumulative_stat(client, df_train, stat="kills")
df_test  = fill_cumulative_stat(client, df_test, stat="kills")


Filling kills:   0%|          | 0/37031 [00:00<?, ?it/s]

ValueError: Must have equal len keys and value when setting with an iterable

In [58]:
df_train

Unnamed: 0,game_id,timestamp,map_id,league_id,serie_id,tournament_id,tier_id,t1_id,t2_id,p1_id,...,p1_kills_sum,p2_kills_sum,p3_kills_sum,p4_kills_sum,p5_kills_sum,p6_kills_sum,p7_kills_sum,p8_kills_sum,p9_kills_sum,p10_kills_sum
0,1744,2016-04-13 16:30:00,8,4569,1544,1583,1,3216,3224,17539,...,0,0,0,0,0,0,0,0,0,0
1,2365,2016-06-11 03:01:00,8,4154,1559,1714,0,3209,3224,17507,...,0,0,0,0,0,0,0,0,0,0
2,1287,2016-09-08 13:30:00,8,4165,1591,1574,0,3209,3216,17507,...,0,0,0,0,0,0,0,0,0,0
3,2011,2016-12-10 00:40:00,4,4153,1552,1567,0,3222,3245,17513,...,0,0,0,0,0,0,0,0,0,0
4,4436,2017-03-01 21:00:00,3,4161,1583,1642,0,3212,3245,17522,...,0,0,0,0,0,0,0,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
37026,70114,2022-04-13 10:15:39,2,4163,4541,7883,2,126709,129501,20569,...,0,0,0,0,0,0,0,0,0,0
37027,70112,2022-04-13 10:18:37,8,4163,4541,7883,2,125802,126377,20678,...,0,0,0,0,0,0,0,0,0,0
37028,70113,2022-04-13 11:29:14,2,4163,4541,7883,2,125802,126377,20678,...,0,0,0,0,0,0,0,0,0,0
37029,70115,2022-04-13 11:30:45,7,4163,4541,7883,2,126709,129501,20569,...,0,0,0,0,0,0,0,0,0,0


In [55]:
df_train.loc[df_train["game_id"]]

Unnamed: 0,game_id,timestamp,map_id,league_id,serie_id,tournament_id,tier_id,t1_id,t2_id,p1_id,p2_id,p3_id,p4_id,p5_id,p6_id,p7_id,p8_id,p9_id,p10_id,team1_win
0,1744,2016-04-13 16:30:00,8,4569,1544,1583,1,3216,3224,17539,17540,17542,17570,17602,17529,17576,17578,17579,17588,1
1,2365,2016-06-11 03:01:00,8,4154,1559,1714,0,3209,3224,17507,17508,17509,17510,17524,17529,17576,17578,17579,17655,0
2,1287,2016-09-08 13:30:00,8,4165,1591,1574,0,3209,3216,17507,17508,17509,17510,17524,17525,17539,17540,17541,17542,1
3,2011,2016-12-10 00:40:00,4,4153,1552,1567,0,3222,3245,17513,17515,17516,17568,17708,17501,17581,17658,17664,17665,1
4,4436,2017-03-01 21:00:00,3,4161,1583,1642,0,3212,3245,17522,17523,17524,17565,17569,17501,17543,17581,17664,17665,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
37026,70114,2022-04-13 10:15:39,2,4163,4541,7883,2,126709,129501,20569,25589,27555,28854,34000,17499,17747,21433,21439,25439,0
37027,70112,2022-04-13 10:18:37,8,4163,4541,7883,2,125802,126377,20678,20684,20692,23776,29284,17497,17498,17501,17543,19666,0
37028,70113,2022-04-13 11:29:14,2,4163,4541,7883,2,125802,126377,20678,20684,20692,23776,29284,17497,17498,17501,17543,19666,0
37029,70115,2022-04-13 11:30:45,7,4163,4541,7883,2,126709,129501,20569,25589,27555,28854,34000,17499,17747,21433,21439,25439,1


ServerException: Code: 62.
DB::Exception: Syntax error: failed at position 88 ('14'): 14:09:41 . Expected one of: token, DoubleColon, OR, AND, IS NOT DISTINCT FROM, IS NULL, IS NOT NULL, BETWEEN, NOT BETWEEN, LIKE, ILIKE, NOT LIKE, NOT ILIKE, REGEXP, IN, NOT IN, GLOBAL IN, GLOBAL NOT IN, MOD, DIV, alias, AS, GROUP BY, WITH, HAVING, WINDOW, ORDER BY, LIMIT, OFFSET, FETCH, SETTINGS, UNION, EXCEPT, INTERSECT, INTO OUTFILE, FORMAT, end of query. Stack trace:

0. DB::Exception::Exception(DB::Exception::MessageMasked&&, int, bool) @ 0x000000000cdd0af7 in /usr/bin/clickhouse
1. DB::Exception::createDeprecated(String const&, int, bool) @ 0x00000000079b862d in /usr/bin/clickhouse
2. DB::parseQueryAndMovePosition(DB::IParser&, char const*&, char const*, String const&, bool, unsigned long, unsigned long) @ 0x00000000142b13fc in /usr/bin/clickhouse
3. DB::executeQueryImpl(char const*, char const*, std::shared_ptr<DB::Context>, DB::QueryFlags, DB::QueryProcessingStage::Enum, DB::ReadBuffer*) @ 0x0000000012bdf1d2 in /usr/bin/clickhouse
4. DB::executeQuery(String const&, std::shared_ptr<DB::Context>, DB::QueryFlags, DB::QueryProcessingStage::Enum) @ 0x0000000012bde49a in /usr/bin/clickhouse
5. DB::TCPHandler::runImpl() @ 0x0000000013aa3649 in /usr/bin/clickhouse
6. DB::TCPHandler::run() @ 0x0000000013ab7ff9 in /usr/bin/clickhouse
7. Poco::Net::TCPServerConnection::start() @ 0x000000001658f854 in /usr/bin/clickhouse
8. Poco::Net::TCPServerDispatcher::run() @ 0x0000000016590a51 in /usr/bin/clickhouse
9. Poco::PooledThread::run() @ 0x000000001669c167 in /usr/bin/clickhouse
10. Poco::ThreadImpl::runnableEntry(void*) @ 0x000000001669a3fc in /usr/bin/clickhouse
11. ? @ 0x00007955764ba609 in ?
12. ? @ 0x00007955763df353 in ?


array([ 1744,  2365,  1287, ..., 70490, 70489, 70491], shape=(37131,))

In [6]:
def fetch_games_by_ids(client: Client, game_ids, clickhouse_db="cs2_db"):
    if not game_ids:
        log.warning("Список game_ids пуст. Возвращаю пустые массивы.")
        return np.array([]), np.array([])
    log.info(f"Получение данных для {len(game_ids)} игр из ClickHouse...")
    ids_str = ",".join(str(i) for i in game_ids)
    query = f"""
    SELECT
        g.game_id,
        any(g.begin_at) AS timestamp,
        any(g.map_id) AS map_id,
        any(g.league_id) AS league_id,
        any(g.serie_id) AS serie_id,
        any(g.tournament_id) AS tournament_id,
        any(g.tier_id) AS tier_id,
        t.teams[1] AS t1_id,
        t.teams[2] AS t2_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[1]))), 1, 5), 1) AS p1_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[1]))), 1, 5), 2) AS p2_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[1]))), 1, 5), 3) AS p3_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[1]))), 1, 5), 4) AS p4_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[1]))), 1, 5), 5) AS p5_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[2]))), 1, 5), 1) AS p6_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[2]))), 1, 5), 2) AS p7_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[2]))), 1, 5), 3) AS p8_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[2]))), 1, 5), 4) AS p9_id,
        arrayElement(arraySlice(arraySort(arrayDistinct(groupArrayIf(player_id, team_id = t.teams[2]))), 1, 5), 5) AS p10_id,
        sumIf(g.round_win, g.team_id = t.teams[1]) > sumIf(g.round_win, g.team_id = t.teams[2]) AS team1_win
    FROM {clickhouse_db}.games_flatten AS g
    INNER JOIN (
        SELECT
            game_id,
            arraySort(arrayDistinct(groupArray(team_id))) AS teams
        FROM {clickhouse_db}.games_flatten
        WHERE game_id IN ({ids_str})
        GROUP BY game_id
    ) AS t USING (game_id)
    GROUP BY g.game_id, t.teams
    ORDER BY timestamp ASC
    """
    result = client.execute(query, with_column_types=True)
    if not result:
        log.warning("Запрос вернул пустой результат.")
        return np.array([]), np.array([])
    rows = [list(r) for r in result[0]]
    for row in rows:
        if hasattr(row[1], "timestamp"):
            row[1] = row[1].timestamp()
    X = np.array([r[1:-1] for r in rows], dtype=float)
    y = np.array([r[-1] for r in rows], dtype=int)
    log.info(f"Получено {len(X)} строк с {X.shape[1]} признаками.")
    return X, y

Unnamed: 0,uniqExact_game_id_
0,37131
