In [1]:
from IPython.display import clear_output

In [2]:
!pip install pymorphy2 sparse_dot_topn swifter
!python -m nltk.downloader stopwords
!python -m nltk.downloader wordnet

clear_output()

In [3]:
import torch

In [4]:
import os
import re

import json
import gzip
import codecs

from itertools import islice, chain, filterfalse
from collections import Counter, defaultdict, namedtuple
from operator import itemgetter

import numpy as np
import scipy.sparse as sp
import scipy.stats as ss
import pandas as pd

import lxml.html as lhtml

# from tqdm.notebook import tqdm
from tqdm import tqdm

In [5]:
WORKDIR = '.'

In [6]:
!mkdir -p "{WORKDIR}/data" "{WORKDIR}/models"

In [7]:
def save_array(a, filename: str, sparse: bool = False, **params):
    if sparse and not sp.issparse(a):
        a = sp.csr_matrix(a)
    elif not sparse and sp.issparse(a):
        a = np.asarray(a.todense())

    with open(filename, 'wb') as f_data:
        save = sp.save_npz if sparse else np.save
        return save(f_data, a, **params)


def load_array(filename: str, sparse: bool = False, **params):
    with open(filename, 'rb') as f_data:
        load = sp.load_npz if sparse else np.load
        return load(f_data, **params)

In [8]:
def parse_specializations(s):
    res = s[1:-1].split(',')
    res = map(int, res)
    res = list(res)
    # res = np.asarray(res, dtype=int)
    return res

vacancies_file = os.path.join(WORKDIR, 'data/vacancies_info.csv.gz')

if not os.path.isfile(vacancies_file):
    # Загружаем специализации для обучения
    df_train_ids = pd.read_csv(
        os.path.join(WORKDIR, 'train_labels.csv.gz'),
        index_col='vacancy_id',
        compression='gzip',
    )

    df_train_ids['specializations'] = df_train_ids['specializations'].map(parse_specializations)
    df_train_ids['is_train'] = True

    # Загружаем специализации для теста
    df_test_ids = pd.read_csv(
        os.path.join(WORKDIR, 'test_vacancy_ids.csv.gz'),
        index_col='vacancy_id',
        compression='gzip',
    )

    # Объединяем в один датафрейм
    df_all_ids = pd.concat([df_train_ids, df_test_ids], axis=0)
    df_all_ids['is_train'].fillna(False, inplace=True)
    df_all_ids.sort_index(inplace=True)

    # Загружаем информацию о каждой из вакансий
    df_vacancies_info = pd.read_csv(
        os.path.join(WORKDIR, 'vacancies_info.csv.gz'),
        index_col='vacancy_id',
        compression='gzip',
    )

    # Объединяем в один датафрейм
    df_all_ids = pd.merge(df_all_ids, df_vacancies_info, left_index=True, right_index=True, how='left')

    df_all_ids.to_csv(vacancies_file, index=True, compression='gzip')
else:
    df_all_ids = pd.read_csv(
        vacancies_file,
        index_col='vacancy_id',
        compression='gzip',
    )
    df_all_ids.loc[df_all_ids['is_train'], 'specializations'] = \
        df_all_ids.loc[df_all_ids['is_train'], 'specializations'].map(parse_specializations)

df_all_ids.head()

  mask |= (ar1 == a)


Unnamed: 0_level_0,specializations,is_train,area_id,compensation_from,compensation_to,creation_date,currency,employer,employment,work_experience,work_schedule
vacancy_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
1,"[242, 256, 302, 324, 358, 440]",True,26,22000.0,24000.0,2019-01-24,RUR,0ce23382345c,full,between1And3,fullDay
2,,False,160,,,2019-07-26,,b9aa259f8724,full,between1And3,fullDay
3,[211],True,1002,,,2019-04-15,,11ecc72a7a76,project,between1And3,fullDay
4,"[389, 412, 437]",True,22,,36000.0,2019-07-12,RUR,e1e424ceb5e4,full,noExperience,fullDay
5,,False,1002,600.0,,2019-01-17,BYR,943fd4a3770a,full,between1And3,fullDay


In [9]:
df_all_ids['joined_work'] = (
    df_all_ids['employment'] + '_' +
    df_all_ids['work_experience'] + '_' +
    df_all_ids['work_schedule']
)

df_all_ids.head()

Unnamed: 0_level_0,specializations,is_train,area_id,compensation_from,compensation_to,creation_date,currency,employer,employment,work_experience,work_schedule,joined_work
vacancy_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
1,"[242, 256, 302, 324, 358, 440]",True,26,22000.0,24000.0,2019-01-24,RUR,0ce23382345c,full,between1And3,fullDay,full_between1And3_fullDay
2,,False,160,,,2019-07-26,,b9aa259f8724,full,between1And3,fullDay,full_between1And3_fullDay
3,[211],True,1002,,,2019-04-15,,11ecc72a7a76,project,between1And3,fullDay,project_between1And3_fullDay
4,"[389, 412, 437]",True,22,,36000.0,2019-07-12,RUR,e1e424ceb5e4,full,noExperience,fullDay,full_noExperience_fullDay
5,,False,1002,600.0,,2019-01-17,BYR,943fd4a3770a,full,between1And3,fullDay,full_between1And3_fullDay


In [10]:
features_vacancy_info = pd.get_dummies(df_all_ids[['employment', 'work_experience', 'work_schedule']], sparse=True)

# features_vacancy_info = pd.get_dummies(df_all_ids['joined_work'], sparse=True)
features_vacancy_info = features_vacancy_info.sparse.to_coo().tocsr()
features_vacancy_info.shape

(2912650, 14)

In [11]:
employer_chosen = df_all_ids.groupby(by='employer')['employer'].count().sort_values(ascending=False)
employer_chosen = set(employer_chosen[employer_chosen >= 5].index)
employer_chosen |= set(df_all_ids.loc[~df_all_ids['is_train'], 'employer'])

len(employer_chosen)

257994

In [12]:
df_all_ids.loc[~df_all_ids['employer'].isin(employer_chosen), 'employer'] = 'UNKNOWN'
df_all_ids.head()

Unnamed: 0_level_0,specializations,is_train,area_id,compensation_from,compensation_to,creation_date,currency,employer,employment,work_experience,work_schedule,joined_work
vacancy_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
1,"[242, 256, 302, 324, 358, 440]",True,26,22000.0,24000.0,2019-01-24,RUR,0ce23382345c,full,between1And3,fullDay,full_between1And3_fullDay
2,,False,160,,,2019-07-26,,b9aa259f8724,full,between1And3,fullDay,full_between1And3_fullDay
3,[211],True,1002,,,2019-04-15,,11ecc72a7a76,project,between1And3,fullDay,project_between1And3_fullDay
4,"[389, 412, 437]",True,22,,36000.0,2019-07-12,RUR,UNKNOWN,full,noExperience,fullDay,full_noExperience_fullDay
5,,False,1002,600.0,,2019-01-17,BYR,943fd4a3770a,full,between1And3,fullDay,full_between1And3_fullDay


In [13]:
features_employer = pd.get_dummies(df_all_ids['employer'], sparse=True)

mapping_employer = features_employer.columns.tolist()
mapping_employer = {e: i for i, e in enumerate(mapping_employer)}

features_employer = features_employer.sparse.to_coo().tocsr()
features_employer.shape

(2912650, 257995)

In [14]:
def make_onehot_csr_matrix(s: pd.Series):
    mapping = defaultdict(lambda: len(mapping))

    data = np.ones(shape=(s.shape[0], ))
    indices = [mapping[k] for k in s]
    indptr = np.arange(0, len(data) + 1)

    X = sp.csr_matrix((data, indices, indptr), shape=(len(indptr) - 1, len(mapping)))
    mapping.default_factory = None
    mapping_inv = sorted(mapping, key=lambda e: mapping[e])

    return mapping, mapping_inv, X

In [15]:
def make_onehot_multiple_csr_matrix(s: pd.Series):
    mapping = defaultdict(lambda: len(mapping))

    data, indices, indptr = [], [], [0, ]

    for row in tqdm(s):
        row = list(map(lambda e: mapping[e], row))

        data.extend([1] * len(row))
        indices.extend(row)
        indptr.append(len(data))

    X = sp.csr_matrix((data, indices, indptr), shape=(len(indptr) - 1, len(mapping)))
    mapping.default_factory = None
    mapping_inv = sorted(mapping, key=lambda e: mapping[e])

    return mapping, mapping_inv, X

In [16]:
mapping_spec, mapping_spec_inv, y_spec = \
    make_onehot_multiple_csr_matrix(df_all_ids.loc[df_all_ids['is_train'], 'specializations'])

y_spec.shape

100%|██████████| 1456325/1456325 [00:02<00:00, 626953.67it/s]


(1456325, 620)

In [17]:
vacancies_parts = (f for f in os.listdir(WORKDIR) if f.startswith('vacancies-'))
vacancies_parts = sorted(vacancies_parts)
vacancies_parts

['vacancies-01.json.gz',
 'vacancies-02.json.gz',
 'vacancies-03.json.gz',
 'vacancies-04.json.gz',
 'vacancies-05.json.gz',
 'vacancies-06.json.gz',
 'vacancies-07.json.gz',
 'vacancies-08.json.gz',
 'vacancies-09.json.gz',
 'vacancies-10.json.gz']

In [18]:
def read_vacancies_part(filename):
    with gzip.open(filename, mode='r') as f_gz:
        records = json.load(f_gz)
        records = {int(k): v for k, v in records.items()}
    return records

In [19]:
from functools import lru_cache

from nltk.corpus import stopwords
from pymorphy2 import MorphAnalyzer


ru_morph = MorphAnalyzer()

@lru_cache(maxsize=15000)
def morph_process(token):
    return ru_morph.parse(token)[0].normal_form

@lru_cache(maxsize=5000)
def preprocess_skill(s):
    parts = re.sub('\s+', ' ', s.strip().lower()).split()
    parts = map(morph_process, parts)
    return '_'.join(parts)

stop_words = map(morph_process, stopwords.words('russian'))
stop_words = stopwords.words('russian') + list(stop_words)

In [20]:
def content_names_reader(vacancies_it, index):
    for vacancy_id, vacancy_info in vacancies_it:
        # name = re.sub('\(.*?\)', '', vacancy_info['name'].lower())
        name = vacancy_info['name'].lower()
        index.append(vacancy_id)
        yield name

def content_skills_reader(vacancies_it, index):
    for vacancy_id, vacancy_info in vacancies_it:
        skills = ' '.join(map(preprocess_skill, vacancy_info['key_skills']))
        index.append(vacancy_id)
        yield skills

In [21]:
from sklearn.feature_extraction.text import TfidfVectorizer

def create_tfidf_vectorizer(mode, **params):
    if mode == 'names':
        vec = TfidfVectorizer(
            stop_words=stop_words,
            token_pattern=r"(?u)\b\w\w+\b",
            preprocessor=morph_process,
            ngram_range=(1, 2),
            min_df=5,
            **params
        )
    elif mode == 'skills':
        vec = TfidfVectorizer(
            stop_words=stop_words,
            token_pattern=r"(?u)\b\w\w+\b",
            min_df=5,
            **params
        )
    else:
        raise ValueError(mode)

    return vec

In [22]:
def create_tfdif_vectors(mode,
                         content_array_file,
                         content_terms_idfs,
                         content_vacancies_mapping, ):
    vacancies_it = map(lambda p: os.path.join(WORKDIR, p), tqdm(vacancies_parts))
    vacancies_it = map(read_vacancies_part, vacancies_it)
    vacancies_it = ((k, v) for p in vacancies_it for k, v in p.items())

    index = []

    if mode == 'names':
        content = tqdm(content_names_reader(vacancies_it, index), position=0)
    elif mode == 'skills':
        content = tqdm(content_skills_reader(vacancies_it, index), position=0)
    else:
        raise ValueError(mode)

    vec = create_tfidf_vectorizer(mode)

    # Считаем tfidf-вектора и сохраняем их
    features = vec.fit_transform(content)
    save_array(features, content_array_file, sparse=True)

    # Сохраняем словарик с idf
    vocabulary_inv = sorted(vec.vocabulary_, key=lambda e: vec.vocabulary_[e])
    with open(content_terms_idfs, mode='w', encoding='utf8') as f_data:
        for word, idf in zip(vocabulary_inv, vec.idf_):
            print(word, "%.16f" % idf, sep='\t', file=f_data)

    # Сохраняем порядок вакансий в матрице
    with open(content_vacancies_mapping, mode='w') as f_data:
        print(*index, sep='\n', file=f_data)

    return vec, features, index


def load_tfidf_vectors(mode,
                       content_array_file,
                       content_terms_idfs,
                       content_vacancies_mapping, ):
    # Грузим tfidf-вектора
    features = load_array(content_array_file, sparse=True)

    # Грузим TfIdfVectorizer
    with open(content_terms_idfs, mode='r') as f_data:
        f_data = map(lambda s: s.rstrip().split('\t'), f_data)

        vocabulary_inv, vocabulary_idf = [], []
        for i, (word, idf) in enumerate(f_data):
            vocabulary_inv.append(word)
            vocabulary_idf.append(float(idf))

    vec = create_tfidf_vectorizer(mode=mode, vocabulary=vocabulary_inv)
    vec.idf_ = np.asarray(vocabulary_idf, dtype=float)

    # Грузим порядок документов
    with open(content_vacancies_mapping, mode='r') as f_data:
        index = list(map(int, f_data))

    return vec, features, index

In [23]:
content_names_array_file = os.path.join(WORKDIR, 'data/content_names_2.npz')
content_names_terms_idfs = os.path.join(WORKDIR, 'data/content_names_2.idf')
content_names_vacancies_mapping = os.path.join(WORKDIR, 'data/content_names_2.mapping')

if not os.path.isfile(content_names_array_file):
    index = []

    vec, features_content_names, index = create_tfdif_vectors(
        'names',
        content_names_array_file,
        content_names_terms_idfs,
        content_names_vacancies_mapping,
    )
else:
    vec, features_content_names, index = load_tfidf_vectors(
        'names',
        content_names_array_file,
        content_names_terms_idfs,
        content_names_vacancies_mapping,
    )

# Убеждаемся, что все правильно
assert (np.asarray(index) == df_all_ids.index.values).all()
assert features_content_names.shape == (df_all_ids.shape[0], len(vec.idf_))

features_content_names.shape

(2912650, 139435)

In [24]:
content_skills_array_file = os.path.join(WORKDIR, 'data/content_skills_2.npz')
content_skills_terms_idfs = os.path.join(WORKDIR, 'data/content_skills_2.idf')
content_skills_vacancies_mapping = os.path.join(WORKDIR, 'data/content_skills_2.mapping')

if not os.path.isfile(content_skills_array_file):
    index = []

    vec, features_content_skills, index = create_tfdif_vectors(
        'skills',
        content_skills_array_file,
        content_skills_terms_idfs,
        content_skills_vacancies_mapping,
    )
else:
    vec, features_content_skills, index = load_tfidf_vectors(
        'skills',
        content_skills_array_file,
        content_skills_terms_idfs,
        content_skills_vacancies_mapping,
    )

# Убеждаемся, что все правильно
assert (np.asarray(index) == df_all_ids.index.values).all()
assert features_content_skills.shape == (df_all_ids.shape[0], len(vec.idf_))

features_content_skills.shape

(2912650, 19402)

In [25]:
from sklearn.model_selection import train_test_split

mask = df_all_ids['is_train'].values

features_content_names_train = features_content_names[mask]
features_vacancy_info_train = features_vacancy_info[mask]
features_employer_train = features_employer[mask]
features_content_skills_train = features_content_skills[mask]
vacancy_id_train = df_all_ids[mask].index.values
  
features_content_names_test = features_content_names[~mask]
features_vacancy_info_test = features_vacancy_info[~mask]
features_employer_test = features_employer[~mask]
features_content_skills_test = features_content_skills[~mask]
vacancy_id_test = df_all_ids[~mask].index.values

In [26]:
import math

from torch.utils.data import Dataset


class HeadHunterDataset(Dataset):
    def __init__(self, content_names, vacancy_info, employer_info, skills_info, target,
                 batch_size=100, shuffle=True, random_state=None):
        self.check_shapes(
            content_names,
            vacancy_info,
            employer_info,
            skills_info,
            target,
        )

        self.content_names = content_names
        self.vacancy_info = vacancy_info
        self.employer_info = employer_info
        self.skills_info = skills_info
        self.target = target

        self.batch_size = batch_size
        self.shuffle = shuffle

        if random_state is not None and isinstance(random_state, np.random.RandomState):
            self.random_state = random_state
        else:
            self.random_state = np.random.RandomState(random_state)

        # init index
        self.on_epoch_end()

    def check_shapes(self, *args):
        args = args[:-1] if args[-1] is None else args

        shapes = map(lambda e: e.shape[0], args)
        shapes = list(shapes)

        # https://stackoverflow.com/questions/3844801/check-if-all-elements-in-a-list-are-identical

        assert shapes.count(shapes[0]) == len(shapes)

    def __len__(self):
        return int(math.ceil(self.content_names.shape[0] / self.batch_size))

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        index = self.index[idx * self.batch_size:(idx + 1) * self.batch_size]
        
        batch_content_names = self.content_names[index]
        batch_vacancy_info = self.vacancy_info[index]
        batch_employer_info = self.employer_info[index]
        batch_skills_info = self.skills_info[index]
        
        if self.target is not None:
            batch_y = self.target[index]
            batch_y = batch_y.toarray()
        else:
            # inference mode
            batch_y = None

        batch_x = (
            batch_content_names.toarray(),
            batch_vacancy_info.toarray(),
            batch_employer_info.toarray(),
            batch_skills_info.toarray(),
        )

        return batch_x, batch_y

    def on_epoch_end(self):
        if self.shuffle:
            self.index = self.random_state.permutation(self.content_names.shape[0])
        else:
            self.index = np.arange(self.content_names.shape[0])


In [27]:
from torch import nn

class HeadHunterNetworkEmployer(nn.Module):
    def __init__(self, num_content_names, num_vacancy_info, num_employer_info, num_skills_info, num_target):
        super().__init__()

        self.use_vacancy_info = num_vacancy_info is not None
        self.use_employer_info = num_employer_info is not None
        self.use_skills_info = num_skills_info is not None

        num_features = [num_content_names, num_vacancy_info, num_employer_info, num_skills_info, ]
        num_features = filter(lambda e: e is not None, num_features)
        num_features = sum(num_features)

        self.network = nn.Sequential(
            nn.Linear(
                in_features=num_features,
                out_features=num_target,
                bias=True,
            ),
            nn.Sigmoid(),
        )

    def forward(self, content_names, vacancy_info, employer_info, skills_info):
        x = (content_names, )
        
        if self.use_vacancy_info:
            x += (vacancy_info, )
        
        if self.use_employer_info:
            x += (employer_info, )
        
        if self.use_skills_info:
            x += (skills_info, )
        
        x = torch.cat(x, dim=1)
        x = self.network(x)
        return x


model_3 = HeadHunterNetworkEmployer(
    num_content_names=features_content_names.shape[1],
    num_vacancy_info=features_vacancy_info.shape[1],
    num_employer_info=features_employer.shape[1],
    num_skills_info=features_content_skills.shape[1],
    num_target=len(mapping_spec),
)


model_3_path = os.path.join(WORKDIR, 'models/model_logreg_017_empl_skills.pt')
model_3.load_state_dict(torch.load(model_3_path, map_location=torch.device('cpu')))
model_3.eval()

HeadHunterNetworkEmployer(
  (network): Sequential(
    (0): Linear(in_features=416846, out_features=620, bias=True)
    (1): Sigmoid()
  )
)

In [28]:
def to_tensor(X, use_cuda=False):
    if not torch.is_tensor(X):
        device = 'cuda' if use_cuda else 'cpu'
        X = torch.tensor(X, device=device, dtype=torch.float32)

    if use_cuda and not X.is_cuda:
        X = X.cuda()

    if not torch.is_floating_point(X):
        X = X.float()

    return X

In [30]:
input_train = HeadHunterDataset(
    features_content_names_train, features_vacancy_info_train,
    features_employer_train, features_content_skills_train, y_spec,
    batch_size=1024, shuffle=False, random_state=42,
)

input_test = HeadHunterDataset(
    features_content_names_test, features_vacancy_info_test,
    features_employer_test, features_content_skills_test, None,
    batch_size=1024, shuffle=False, random_state=42,
)

In [31]:
from dataclasses import dataclass, asdict

@dataclass
class RankedEntry:
    vacancy_id: int = -1
    spec_id: int = -1
    counts: int = 0
    min_tfidf: float = 10
    max_tfidf: float = -1
    logreg_score: float = -1

def create_candidates(num):
    candidates = [defaultdict(RankedEntry) for _ in range(num)]
    return candidates
        
RankedEntry()

RankedEntry(vacancy_id=-1, spec_id=-1, counts=0, min_tfidf=10, max_tfidf=-1, logreg_score=-1)

In [32]:
candidates_train = create_candidates(features_content_names_train.shape[0])

In [33]:
candidates_test = create_candidates(features_content_names_test.shape[0])

# KNN

In [34]:
K_best = 13

In [35]:
def choose_knn_candidates(candidates_new, top, tfidf):
    for i, (k, v) in enumerate(candidates_new):
        if i < top:
            yield k, v
        elif v.max_tfidf >= tfidf:
            yield k, v
        else:
            break
    

def add_knn_candidates(candidates, indices, ranks, labels):
    for i in tqdm(range(indices.shape[0]), position=0):
        candidates_row = defaultdict(RankedEntry)
        
        for j, tfidf in zip(indices[i], ranks[i]):
            if j < 0:
                break

            for k in labels[j].indices:
                e = candidates_row[k]

                e.spec_id = k
                e.min_tfidf = min(e.min_tfidf, tfidf)
                e.max_tfidf = max(e.max_tfidf, tfidf)
                e.counts += 1

        candidates_new = sorted(candidates_row.items(), key=lambda p: p[1].counts, reverse=True)
        candidates_new = choose_knn_candidates(candidates_new, top=15, tfidf=0.975)
        candidates_new = defaultdict(RankedEntry, candidates_new)
        
        # choose all candidates from most similar object
        index_first = indices[i, 0]
        candidates_new.update({k: candidates_row[k] for k in labels[index_first].indices})

        candidates_row = candidates[i]
        
        for k, v in candidates_new.items():
            candidates_row[k].spec_id = k
            candidates_row[k].min_tfidf = v.min_tfidf
            candidates_row[k].max_tfidf = v.max_tfidf
            candidates_row[k].counts = v.counts

    return candidates

In [36]:
indices_train_file = os.path.join(WORKDIR, 'data/neigbours-train-all.indices.npz')
ranks_train_file = os.path.join(WORKDIR, 'data/neigbours-train-all.ranks.npz')

indices_knn_train = load_array(indices_train_file, sparse=False)[:,:K_best]
ranks_knn_train = load_array(ranks_train_file, sparse=False)[:,:K_best]

In [39]:
candidates_train = add_knn_candidates(candidates_train, indices_knn_train, ranks_knn_train, y_spec)

list(map(len, candidates_train[:15]))

100%|██████████| 1456325/1456325 [28:39<00:00, 846.95it/s] 


[8, 9, 19, 27, 7, 4, 6, 15, 15, 15, 15, 22, 16, 12, 13]

In [40]:
del indices_knn_train, ranks_knn_train

In [41]:
indices_test_file = os.path.join(WORKDIR, 'data/neigbours-test.indices.npz')
ranks_test_file = os.path.join(WORKDIR, 'data/neigbours-test.ranks.npz')

indices_knn_test = load_array(indices_test_file, sparse=False)[:,:K_best]
ranks_knn_test = load_array(ranks_test_file, sparse=False)[:,:K_best]

In [42]:
candidates_test = add_knn_candidates(candidates_test, indices_knn_test, ranks_knn_test, y_spec)

list(map(len, candidates_test[:15]))

100%|██████████| 1456325/1456325 [30:09<00:00, 804.83it/s] 


[15, 7, 15, 15, 17, 26, 15, 14, 12, 12, 31, 30, 23, 16, 24]

In [43]:
del indices_knn_test, ranks_knn_test

# Counters

In [44]:
vacancies_file = os.path.join(WORKDIR, 'data/vacancies_info.csv.gz')

df_all_ids = pd.read_csv(
    vacancies_file,
    index_col='vacancy_id',
    compression='gzip',
)

df_all_ids.loc[df_all_ids['is_train'], 'specializations'] = \
    df_all_ids.loc[df_all_ids['is_train'], 'specializations'].map(parse_specializations)

df_all_ids.head()

  mask |= (ar1 == a)


Unnamed: 0_level_0,specializations,is_train,area_id,compensation_from,compensation_to,creation_date,currency,employer,employment,work_experience,work_schedule
vacancy_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
1,"[242, 256, 302, 324, 358, 440]",True,26,22000.0,24000.0,2019-01-24,RUR,0ce23382345c,full,between1And3,fullDay
2,,False,160,,,2019-07-26,,b9aa259f8724,full,between1And3,fullDay
3,[211],True,1002,,,2019-04-15,,11ecc72a7a76,project,between1And3,fullDay
4,"[389, 412, 437]",True,22,,36000.0,2019-07-12,RUR,e1e424ceb5e4,full,noExperience,fullDay
5,,False,1002,600.0,,2019-01-17,BYR,943fd4a3770a,full,between1And3,fullDay


In [45]:
df_train = df_all_ids[df_all_ids['is_train']]

In [46]:
common_counter = Counter()
employer_counters = defaultdict(lambda: Counter())

for spec_list, employer_id in zip(df_train['specializations'], df_train['employer']):
    for spec in spec_list:
        spec = mapping_spec[spec]
        common_counter[spec] += 1
        employer_counters[employer_id][spec] += 1

In [47]:
common_counter = pd.DataFrame([
    {'spec_id': k, 'counts': v}
    for k, v in common_counter.items()
])
common_counter.head()

Unnamed: 0,spec_id,counts
0,0,89768
1,1,119524
2,2,36158
3,3,42051
4,4,6909


In [48]:
employer_counters = pd.DataFrame([
    {'spec_id': spec_id, 'employer': employer_id, 'counts': e, }
    for employer_id, v in employer_counters.items()
    for spec_id, e in v.items()
])
employer_counters.head()

Unnamed: 0,spec_id,employer,counts
0,0,0ce23382345c,26
1,1,0ce23382345c,205
2,2,0ce23382345c,25
3,3,0ce23382345c,28
4,4,0ce23382345c,4


In [49]:
spec_counters = pd.merge(employer_counters, common_counter, how='left', on='spec_id', suffixes=('', '_glb'))
spec_counters['counts_prob'] = spec_counters['counts'] / spec_counters['counts_glb']
spec_counters.head()

Unnamed: 0,spec_id,employer,counts,counts_glb,counts_prob
0,0,0ce23382345c,26,89768,0.00029
1,1,0ce23382345c,205,119524,0.001715
2,2,0ce23382345c,25,36158,0.000691
3,3,0ce23382345c,28,42051,0.000666
4,4,0ce23382345c,4,6909,0.000579


In [50]:
spec_counters_top = spec_counters.groupby('employer').apply(
    lambda g: g.sort_values(by='counts_prob', ascending=False).set_index('spec_id')[:6]['counts_prob'])
spec_counters_top = spec_counters_top.reset_index()
spec_counters_top.head()

Unnamed: 0,employer,spec_id,counts_prob
0,00003669d84d,273,0.00026
1,00005eb9ef63,316,0.000188
2,00005eb9ef63,204,5.5e-05
3,0000a9f4dad5,343,0.000447
4,0000a9f4dad5,11,7.5e-05


In [51]:
spec_counters_top = {name: group['spec_id'].tolist()
                     for name, group in tqdm(spec_counters_top.groupby('employer'), position=0)}

100%|██████████| 256539/256539 [00:47<00:00, 5372.26it/s]


In [52]:
def add_counter_candidates(candidates, vacancy_ids):
    assert len(candidates) == len(vacancy_ids)
    
    for candidates_row, vacancy_id in tqdm(zip(candidates, vacancy_ids), position=0, total=len(candidates)):
        employer = df_all_ids.loc[vacancy_id, 'employer']
        
        if employer not in spec_counters_top:
            continue
        
        for spec_id in spec_counters_top[employer]:
            e = candidates_row[spec_id]
            e.spec_id = spec_id

    return candidates

In [53]:
candidates_train = add_counter_candidates(candidates_train, vacancy_id_train)

list(map(len, candidates_train[:15]))

100%|██████████| 1456325/1456325 [00:32<00:00, 44234.96it/s]


[14, 12, 19, 32, 7, 9, 12, 21, 16, 15, 21, 28, 22, 18, 17]

In [54]:
candidates_test = add_counter_candidates(candidates_test, vacancy_id_test)

list(map(len, candidates_test[:15]))

100%|██████████| 1456325/1456325 [00:42<00:00, 34060.62it/s]


[21, 11, 15, 19, 21, 26, 21, 18, 15, 17, 37, 35, 28, 17, 27]

# LogReg

In [55]:
def get_best_ranks(ranks: np.ndarray, top: int, axis: int = 0, return_ranks: bool = False):
    top_slice = (slice(None), ) * axis + (slice(-top, None), )
    inv_slice = (slice(None), ) * axis + (slice(None, None, -1), )

    if top < ranks.shape[axis]:
        indices = np.argpartition(ranks, -top, axis=axis)[top_slice]
        ranks_top = np.take_along_axis(ranks, indices, axis=axis)
        indices = np.take_along_axis(indices, ranks_top.argsort(axis=axis)[inv_slice], axis=axis)
    else:
        indices = np.argsort(ranks, axis=axis)[top_slice]
        indices = indices[inv_slice]

    result = (indices, )

    if return_ranks:
        ranks = np.take_along_axis(ranks, indices, axis=axis)
        result += (ranks, )

    return result if len(result) > 1 else result[0]

In [56]:
def fill_unknown_scores(candidates, model, input_seq, top=6):
    j_min = 0

    for i in tqdm(range(len(input_seq)), position=0, leave=True):
        X_batch, _ = input_seq[i]

        y_batch = model(*map(to_tensor, X_batch))
        y_batch = y_batch.cpu().detach().numpy()

        for j in range(0, y_batch.shape[0]):
            candidates_row = candidates[j + j_min]
            indices = [k for k, v in candidates_row.items() if v.logreg_score < 0]
            ranks = y_batch[j, indices]

            for k, prob in zip(indices, ranks):
                candidates_row[k].logreg_score = prob

        indices_batch = get_best_ranks(y_batch, top=top, axis=1, return_ranks=False)
        
        for j in range(0, y_batch.shape[0]):
            candidates_row = candidates[j + j_min]
            
            indices = indices_batch[j]
            ranks = y_batch[j, indices]
            
            for k, prob in zip(indices, ranks):
                candidates_row[k].logreg_score = prob

        j_min += y_batch.shape[0]
        
    return candidates

In [57]:
candidates_train = fill_unknown_scores(candidates_train, model_3, input_train, top=6)

100%|██████████| 5689/5689 [29:06<00:00,  3.26it/s]


In [58]:
list(map(len, candidates_train[:15]))

[14, 13, 21, 33, 8, 13, 14, 23, 16, 17, 21, 28, 24, 18, 17]

In [91]:
np.quantile([len(c) for c in candidates_train], q=np.arange(0.1, 1.0, 0.1))

array([11., 13., 15., 17., 18., 20., 21., 23., 26.])

In [59]:
candidates_test = fill_unknown_scores(candidates_test, model_3, input_test, top=6)

100%|██████████| 1423/1423 [24:44<00:00,  1.04s/it]


In [60]:
list(map(len, candidates_test[:15]))

[24, 11, 16, 19, 21, 28, 23, 18, 15, 19, 38, 36, 28, 17, 27]

In [88]:
np.quantile([len(c) for c in candidates_test], q=np.arange(0.1, 1.0, 0.1))

array([11., 13., 15., 17., 18., 20., 21., 23., 26.])

# Ranking

In [67]:
import gc

gc.collect()

1426

In [69]:
train_pairs = df_all_ids.loc[vacancy_id_train, 'specializations'].reset_index()
train_pairs = {(k, e) for k, v in zip(train_pairs['vacancy_id'], train_pairs['specializations']) for e in v}

len(train_pairs)

4491645

In [70]:
offset = features_content_names.shape[1] + features_vacancy_info.shape[1]

weights = model_3.state_dict()['network.0.weight'].detach().numpy()
weights = weights[:, offset:offset+features_employer.shape[1]]
weights.shape

(620, 257995)

In [94]:
from dask import dataframe as dd
from dask.dataframe.utils import make_meta
from dask.diagnostics import ProgressBar


def add_ranks(group, cols):
    ranks = group[cols].rank(axis=0, method='average', ascending=False).add_prefix('rank_')
    group = pd.concat([group, ranks], axis=1)
    return group


def create_ranking_df(vacancy_ids, candidates, rank_cols=None, use_logreg_weights=True):
    df_ranking = []

    candidates_zip = zip(vacancy_ids, candidates)
    candidates_zip = tqdm(candidates_zip, position=0, total=len(candidates))

    for vacancy_id, candidates_row in candidates_zip:
        for candidate in candidates_row.values():
            candidate = asdict(candidate)
            candidate['vacancy_id'] = vacancy_id
            df_ranking.append(candidate)

    df_ranking = pd.DataFrame(df_ranking)
    
    # prepare employer column for future features
    df_ranking = pd.merge(
        df_ranking,
        df_all_ids[['employer']].reset_index(),
        how='left',
        on='vacancy_id', 
    )

    # use counts for (spec_id, employer)
    df_ranking = pd.merge(
        df_ranking,
        spec_counters[['spec_id', 'employer', 'counts_prob']],
        how='left',
        on=['spec_id', 'employer'],
    )
    
    df_ranking['counts_prob'] = df_ranking['counts_prob'].fillna(0)
    
    # use logreg weights for (spec_id, employer)
    if use_logreg_weights:
        df_ranking['employer_logreg'] = df_ranking['employer'].map(
            lambda e: mapping_employer.get(e, mapping_employer['UNKNOWN'])
        )
        
        index = df_ranking[['spec_id', 'employer_logreg']].values
        index = (index[:,0], index[:,1], )
        df_ranking['logreg_employer_weight'] = weights[index]

        df_ranking.drop(columns=['employer_logreg'], inplace=True)
        
    # use ranks
    if rank_cols:
        df_ranking = df_ranking.set_index('vacancy_id')
        
        meta = make_meta(df_ranking)
        for col in rank_cols:
            col = 'rank_' + col
            meta[col] = pd.Series(name=col, dtype='float')
        
        with ProgressBar():
            df_ranking = df_ranking.pipe(dd.from_pandas, npartitions=100)\
                                   .groupby('vacancy_id')\
                                   .apply(lambda g: add_ranks(g, rank_cols), meta=meta)\
                                   .compute(num_workers=20)

        """
        df_groups = []
        for _, group in tqdm(df_ranking.groupby('vacancy_id'), position=0):
            group = add_ranks(group, rank_cols)
            df_groups.append(group)
        df_ranking = pd.concat(df_groups, axis=0)
        """
    
    # remove redundant columns
    df_ranking.drop(columns=['employer'], inplace=True)

    df_ranking['spec_id'] = df_ranking['spec_id'].map(lambda e: mapping_spec_inv[e])
    df_ranking.sort_index(inplace=True)

    return df_ranking

In [72]:
%%time

df_ranking_train = create_ranking_df(
    vacancy_id_train, candidates_train,
    rank_cols=['counts', 'counts_prob', 'logreg_score'],
    use_logreg_weights=True,
)
df_ranking_train.head()

100%|██████████| 1456325/1456325 [09:51<00:00, 2463.81it/s]
100%|██████████| 1456325/1456325 [56:35<00:00, 428.92it/s]  


CPU times: user 1h 24min 55s, sys: 4min 30s, total: 1h 29min 25s
Wall time: 1h 29min 9s


Unnamed: 0,vacancy_id,spec_id,counts,min_tfidf,max_tfidf,logreg_score,counts_prob,logreg_employer_weight,rank_counts,rank_counts_prob,rank_logreg_score
0,1,242,12,1.0,1.0,0.609589,0.00029,-0.312096,1.0,12.0,4.0
1,1,256,11,1.0,1.0,0.914891,0.001715,1.468223,2.5,7.0,1.0
2,1,324,11,1.0,1.0,0.776175,0.000666,0.231966,2.5,10.0,2.0
3,1,302,10,1.0,1.0,0.648414,0.000691,0.122485,4.0,8.0,3.0
4,1,440,8,1.0,1.0,0.516882,0.00067,-0.11658,5.0,9.0,5.0


In [74]:
df_ranking_train.shape

(26751060, 11)

In [75]:
df_ranking_train['target'] = [p in train_pairs
                              for p in zip(df_ranking_train['vacancy_id'], df_ranking_train['spec_id'])]
df_ranking_train['target'] = df_ranking_train['target'].astype(int)

df_ranking_train.head()

Unnamed: 0,vacancy_id,spec_id,counts,min_tfidf,max_tfidf,logreg_score,counts_prob,logreg_employer_weight,rank_counts,rank_counts_prob,rank_logreg_score,target
0,1,242,12,1.0,1.0,0.609589,0.00029,-0.312096,1.0,12.0,4.0,1
1,1,256,11,1.0,1.0,0.914891,0.001715,1.468223,2.5,7.0,1.0,1
2,1,324,11,1.0,1.0,0.776175,0.000666,0.231966,2.5,10.0,2.0,1
3,1,302,10,1.0,1.0,0.648414,0.000691,0.122485,4.0,8.0,3.0,1
4,1,440,8,1.0,1.0,0.516882,0.00067,-0.11658,5.0,9.0,5.0,1


In [96]:
%%time

import swifter

def check_dataset_quality(df_ranking):
    counts = df_ranking.groupby('vacancy_id')['target'].sum() > 0

    print('Number of vacancies:', counts.shape[0])
    print('Number of vacancies (pos):', counts.sum())
    print('Fraction of vacancies (pos):', '%.6f' % (counts.sum() / counts.shape[0]))
    
    with ProgressBar():
        counts = df_ranking.set_index('vacancy_id')\
                           .pipe(dd.from_pandas, npartitions=100)\
                           .groupby('vacancy_id')\
                           .apply(lambda g: g.loc[g['target'] > 0, 'spec_id'].tolist(), meta=('list'))\
                           .compute(num_workers=20)

    counts = counts.rename('spec_id').reset_index()
    
#     counts = df_ranking.groupby('vacancy_id').apply(
#         lambda g: g.loc[g['target'] > 0, 'spec_id'].tolist()
#     ).rename('spec_id').reset_index()
    
    counts = pd.merge(
        counts,
        df_all_ids['specializations'].reset_index(),
        on='vacancy_id',
        how='left'
    )
    
    counts['specializations'] = counts['specializations'].swifter.progress_bar(False).apply(set)
    counts['spec_id'] = counts['spec_id'].swifter.progress_bar(False).apply(set)
    
    counts_inter = counts.swifter.progress_bar(False).apply(
        lambda r: len(r['specializations'] & r['spec_id']) / len(r['specializations']),
        axis=1,
    )
    
    print('Recall:', '%.6f' % counts_inter.mean())


check_dataset_quality(df_ranking_train)

Number of vacancies: 1456325
Number of vacancies (pos): 1418555
Fraction of vacancies (pos): 0.974065
[########################################] | 100% Completed |  9min  9.5s
Recall: 0.897708
CPU times: user 10min 14s, sys: 31.3 s, total: 10min 45s
Wall time: 10min 32s


In [80]:
ranking_train_file = os.path.join(WORKDIR, 'data/ranking-dataset-train.csv.gz')

df_ranking_train.to_csv(ranking_train_file, index=False, compression='gzip')
df_ranking_train.head()

Unnamed: 0,vacancy_id,spec_id,counts,min_tfidf,max_tfidf,logreg_score,counts_prob,logreg_employer_weight,rank_counts,rank_counts_prob,rank_logreg_score,target
0,1,242,12,1.0,1.0,0.609589,0.00029,-0.312096,1.0,12.0,4.0,1
1,1,256,11,1.0,1.0,0.914891,0.001715,1.468223,2.5,7.0,1.0,1
2,1,324,11,1.0,1.0,0.776175,0.000666,0.231966,2.5,10.0,2.0,1
3,1,302,10,1.0,1.0,0.648414,0.000691,0.122485,4.0,8.0,3.0,1
4,1,440,8,1.0,1.0,0.516882,0.00067,-0.11658,5.0,9.0,5.0,1


In [81]:
!zcat "{ranking_train_file}" | head

vacancy_id,spec_id,counts,min_tfidf,max_tfidf,logreg_score,counts_prob,logreg_employer_weight,rank_counts,rank_counts_prob,rank_logreg_score,target
1,242,12,1.0,1.0,0.6095885038375854,0.0002896355048569646,-0.312096,1.0,12.0,4.0,1
1,256,11,1.0,1.0,0.9148905873298645,0.0017151367089454838,1.4682232,2.5,7.0,1.0,1
1,324,11,1.0,1.0,0.7761746048927307,0.0006658581246581532,0.23196599,2.5,10.0,2.0,1
1,302,10,1.0,1.0,0.6484136581420898,0.0006914099231152166,0.12248501,4.0,8.0,3.0,1
1,440,8,1.0,1.0,0.516882061958313,0.000670391061452514,-0.11657994,5.0,9.0,5.0,1
1,358,6,1.0,1.0,0.2547731399536133,0.000578954986249819,-1.0417575,6.0,11.0,7.0,1
1,418,2,1.0,1.0,0.0003808138717431575,0.0,-2.3862836,7.0,14.0,12.0,0
1,196,1,1.0,1.0,0.0015391720226034522,3.2814858567959574e-05,-2.7771082,8.0,13.0,10.0,0
1,260,0,10.0,-1.0,0.0005034298519603908,0.005747126436781609,0.3474858,11.5,1.0,11.0,0

gzip: stdout: Broken pipe


In [82]:
%%time

df_ranking_test = create_ranking_df(
    vacancy_id_test, candidates_test,
    rank_cols=['counts', 'counts_prob', 'logreg_score'],
    use_logreg_weights=True,
)
df_ranking_test.head()

100%|██████████| 1456325/1456325 [09:17<00:00, 2611.03it/s]
  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result


CPU times: user 1h 21min 6s, sys: 4min 8s, total: 1h 25min 15s
Wall time: 1h 25min 16s


Unnamed: 0,vacancy_id,spec_id,counts,min_tfidf,max_tfidf,logreg_score,counts_prob,logreg_employer_weight,rank_counts,rank_counts_prob,rank_logreg_score
0,2,211,9,0.66391,0.66391,0.401377,0.0,-0.16915,1.0,15.5,1.0
1,2,172,4,0.66391,0.66391,0.03786,0.0,-0.099528,3.0,15.5,8.0
2,2,420,4,0.66391,0.66391,0.027227,0.0,-0.442375,3.0,15.5,9.0
3,2,395,4,0.66391,0.66391,0.052264,0.0,-0.220978,3.0,15.5,6.0
4,2,388,3,0.66391,0.66391,0.006271,0.0,-0.03815,5.5,15.5,16.0


In [83]:
df_ranking_test.shape

(26802688, 11)

In [84]:
ranking_test_file = os.path.join(WORKDIR, 'data/ranking-dataset-test.csv.gz')

df_ranking_test.to_csv(ranking_test_file, index=False, compression='gzip')
df_ranking_test.head()

Unnamed: 0,vacancy_id,spec_id,counts,min_tfidf,max_tfidf,logreg_score,counts_prob,logreg_employer_weight,rank_counts,rank_counts_prob,rank_logreg_score
0,2,211,9,0.66391,0.66391,0.401377,0.0,-0.16915,1.0,15.5,1.0
1,2,172,4,0.66391,0.66391,0.03786,0.0,-0.099528,3.0,15.5,8.0
2,2,420,4,0.66391,0.66391,0.027227,0.0,-0.442375,3.0,15.5,9.0
3,2,395,4,0.66391,0.66391,0.052264,0.0,-0.220978,3.0,15.5,6.0
4,2,388,3,0.66391,0.66391,0.006271,0.0,-0.03815,5.5,15.5,16.0


In [85]:
!zcat "{ranking_test_file}" | head

vacancy_id,spec_id,counts,min_tfidf,max_tfidf,logreg_score,counts_prob,logreg_employer_weight,rank_counts,rank_counts_prob,rank_logreg_score
2,211,9,0.6639101832957769,0.6639101832957769,0.4013766050338745,0.0,-0.16915032,1.0,15.5,1.0
2,172,4,0.6639101832957769,0.6639101832957769,0.03786018490791321,0.0,-0.09952811,3.0,15.5,8.0
2,420,4,0.6639101832957769,0.6639101832957769,0.02722705341875553,0.0,-0.4423752,3.0,15.5,9.0
2,395,4,0.6639101832957769,0.6639101832957769,0.05226431414484978,0.0,-0.22097787,3.0,15.5,6.0
2,388,3,0.6639101832957769,0.6639101832957769,0.0062707820907235146,0.0,-0.038149506,5.5,15.5,16.0
2,82,3,0.6639101832957769,0.6639101832957769,0.13734538853168488,0.0,-0.29671356,5.5,15.5,3.0
2,93,2,0.6639101832957769,0.6639101832957769,0.009908813051879406,0.0,-0.029291844,9.5,15.5,14.0
2,181,2,0.6639101832957769,0.6639101832957769,0.018524378538131714,0.0,-0.07971368,9.5,15.5,10.0
2,278,2,0.6639101832957769,0.6639101832957769,0.038337696343660355,0.0,-0.095194176,9

In [None]:
from dask import dataframe as dd
from dask.diagnostics import ProgressBar


def choose_best_scores_from_group(group, col_rank, top=6, threshold=None):
    group = group.sort_values(col_rank, ascending=False)
    group = group.iloc[:top]
    if threshold is not None:
        mask = (group[col_rank] > threshold).ravel()
        mask[0] = True
        group = group[mask]
    return group['spec_id'].tolist()


def make_dummy_prediction(df_ranking, col_rank, top=3):
    ranking_dummy = partial(choose_best_scores_from_group, col_rank=col_rank, top=top, threshold=None)
    df_predict = df_ranking.set_index('vacancy_id')\
                           .pipe(dd.from_pandas, npartitions=100)\
                           .groupby('vacancy_id')\
                           .apply(ranking_dummy, meta=('list'))\
                           .compute(num_workers=10)
    df_predict = df_predict.rename('specializations').reset_index()
    return df_predict


def make_smart_prediction(df_ranking, col_rank, threshold, top=6):
    ranking_smart = partial(choose_best_scores_from_group, col_rank=col_rank, top=top, threshold=threshold)
    df_predict = df_ranking.set_index('vacancy_id')\
                           .pipe(dd.from_pandas, npartitions=100)\
                           .groupby('vacancy_id')\
                           .apply(ranking_smart, meta=('list'))\
                           .compute(num_workers=10)
    df_predict = df_predict.rename('specializations').reset_index()
    return df_predict

In [None]:
%%time

submission_id = 15
submission_file = os.path.join(WORKDIR, 'submission_{:03d}_check.csv.gz').format(submission_id)

with ProgressBar():
    df_submission = make_dummy_prediction(df_ranking_test, col='logreg_score')
    df_submission.to_csv(submission_file, index=True, compression='gzip')
    df_submission.head()

In [None]:
%%time

submission_id = 11
submission_file = os.path.join(WORKDIR, 'submission_{:03d}_check.csv.gz').format(submission_id)

with ProgressBar():
    df_submission = make_dummy_prediction(df_ranking_test, col='counts')
    df_submission.to_csv(submission_file, index=True, compression='gzip')
    df_submission.head()