# Gerador de Recomendações
---
Este notebook utiliza-se das informações contidas no notebook anterior para realizar as recomendações de perfis para os 85.000+ usuários.

Dado o volume de dados, é paralelizar o programa. Como mencionado anteriormente, o processamento CPU bound em Python só pode ser feito utilizando múltiplos processos. Por isso, este programa tenta minimizar o consumo de memória pois não tivemos tempo para explorar como o conceito de compartilhamento de memória entre processos poderia ser explorado


In [1]:
import os
import os.path
import numpy as np
import pandas as pd
import pynndescent
import duckdb
import pickle
import joblib
import scipy.sparse
import importlib
from datetime import datetime
from knn_indexer import KnnIndex

As variáveis de ambiente abaixo precisam ser configuradas antes da execução deste notebook. Vide o arquivo **setenv.ps1.example**

In [2]:
EXPERIMENTO             = os.environ['EXPERIMENTO']
DATASET                 = os.environ['DATASET']
REMOVED_ROLES           = os.environ['REMOVED_ROLES']
USER_ROLES              = os.environ['USER_ROLES']
ORGUNIT_ROLES           = os.environ['ORGUNIT_ROLES']
FUNCTION_ROLES          = os.environ['FUNCTION_ROLES']
KNN_INDEX               = os.environ['KNN_INDEX']
HASHED_FEATURE_COUNT    = int(os.environ['HASHED_FEATURE_COUNT'])
HASHED_FEATURES         = os.environ['HASHED_FEATURES']
HASHED_FEATURES_IDX     = os.environ['HASHED_FEATURES_IDX']
RESULT                  = os.environ['RESULT']

Parâmetros do Recomendador

In [3]:
N_JOBS          = 20 # nº de processos em paralelo. Deve ser escolhido com base no número de CPUs e memória disponível

Arquivos de Saída

In [4]:
NEIGHBORS_CSV           = f'./DATA/{EXPERIMENTO}/vizinhos.csv' # temporário
RECOMMENDATIONS_CSV     = f'./DATA/{EXPERIMENTO}/recomendacoes.csv' # # temporário
NEIGHBORS_PARQUET       = f'./DATA/{EXPERIMENTO}/vizinhos.parquet'
RECOMMENDATIONS_PARQUET = f'./DATA/{EXPERIMENTO}/recomendacoes.parquet'

Leitura de arquivos de Dados

In [5]:
dataset_df = pd.read_parquet(DATASET)
dataset_df.info(verbose=True, show_counts=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 91798 entries, 0 to 91797
Data columns (total 22 columns):
 #   Column                   Non-Null Count  Dtype 
---  ------                   --------------  ----- 
 0   chave_usuario            91798 non-null  object
 1   tipo_usuario             91798 non-null  object
 2   centro_custo             39913 non-null  object
 3   lotacao_topo             91798 non-null  object
 4   sigla_lotacao            91798 non-null  object
 5   nome_lotacao             91798 non-null  object
 6   cargo                    91798 non-null  object
 7   enfase                   91798 non-null  object
 8   funcao                   91798 non-null  object
 9   sindicato                39415 non-null  object
 10  area_rh                  91264 non-null  object
 11  imovel                   91264 non-null  object
 12  local_negocio            91261 non-null  object
 13  grupo_prestacao_servico  51118 non-null  object
 14  regime_trabalho          76638 non-nul

In [6]:
removed_roles_df = pd.read_parquet(REMOVED_ROLES)
removed_roles_df.info(verbose=True, show_counts=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 549 entries, 0 to 548
Data columns (total 2 columns):
 #   Column         Non-Null Count  Dtype 
---  ------         --------------  ----- 
 0   chave_usuario  549 non-null    object
 1   role           549 non-null    object
dtypes: object(2)
memory usage: 8.7+ KB


In [7]:
user_roles_df = pd.read_parquet(USER_ROLES)
user_roles_df.info(verbose=True, show_counts=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 414021 entries, 0 to 414020
Data columns (total 2 columns):
 #   Column         Non-Null Count   Dtype 
---  ------         --------------   ----- 
 0   chave_usuario  414021 non-null  object
 1   role           414021 non-null  object
dtypes: object(2)
memory usage: 6.3+ MB


In [8]:
orgunit_roles_df = pd.read_parquet(ORGUNIT_ROLES)
orgunit_roles_df.info(verbose=True, show_counts=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 28185 entries, 0 to 28184
Data columns (total 4 columns):
 #   Column         Non-Null Count  Dtype 
---  ------         --------------  ----- 
 0   lotacao_topo   28185 non-null  object
 1   sigla_lotacao  28185 non-null  object
 2   role           28185 non-null  object
 3   atribuicoes    28185 non-null  int64 
dtypes: int64(1), object(3)
memory usage: 880.9+ KB


In [9]:
function_roles_df = pd.read_parquet(FUNCTION_ROLES)
function_roles_df.info(verbose=True, show_counts=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 14016 entries, 0 to 14015
Data columns (total 6 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   tipo_usuario  14016 non-null  object
 1   cargo         14016 non-null  object
 2   enfase        14016 non-null  object
 3   funcao        14016 non-null  object
 4   role          14016 non-null  object
 5   atribuicoes   14016 non-null  int64 
dtypes: int64(1), object(5)
memory usage: 657.1+ KB


In [10]:
knn_index = KnnIndex.load(KNN_INDEX)
knn_index.get_neighbors(0)

(array([    0, 29301, 16115, 85550, 27715, 69840, 29939, 29008, 25392,
        19955, 40571, 61978,   958,  8853, 27035, 55804, 11492, 26039,
         5356, 63071, 61360,  5624, 62911, 10523, 27483,  5663, 20132,
        27484, 30088,  3673, 25557, 68230, 14074, 63010, 57121, 26533,
        26093, 14721, 62846, 28929, 26792, 35310, 70013,  1609, 34924,
        31611, 85531, 26134, 76791, 26430, 58558, 28586, 64330, 34070,
        35295,  5596, 87270,  9960, 30490, 35230, 34123, 28583,  8680,
        33236,  1021,  5862, 35024, 14868, 63679, 28390, 64022, 19596,
        34034, 68663,  1207, 62974, 18846, 34923,  1707, 25323, 35710,
        24243, 58823, 40446,  2222, 26534, 30285,  2241, 27355,  2235,
        26416, 13147, 26990, 85494, 58697, 40488, 63805, 18435, 73850,
        11969, 32633,  2229, 87377, 26193, 26927, 39901, 76662, 40579,
         2923, 70699,  3185, 16682, 18450,  2937, 32675, 26857, 90735,
        32641, 78922, 31347, 32624, 70475, 28617,  2223, 26635, 16132,
      

In [11]:
hashed_features_idx = pd.read_parquet(HASHED_FEATURES_IDX)
hashed_features_idx.info(verbose=True, show_counts=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 91798 entries, 0 to 91797
Data columns (total 8 columns):
 #   Column         Non-Null Count  Dtype 
---  ------         --------------  ----- 
 0   index          91798 non-null  int64 
 1   chave_usuario  91798 non-null  object
 2   lotacao_topo   91798 non-null  object
 3   sigla_lotacao  91798 non-null  object
 4   tipo_usuario   91798 non-null  object
 5   cargo          91798 non-null  object
 6   enfase         91798 non-null  object
 7   funcao         91798 non-null  object
dtypes: int64(1), object(7)
memory usage: 5.6+ MB


In [12]:
conn = duckdb.connect(":memory:")

Os dados necessários para cômputo das atribuições categóricas são pré-processados e escritos como arquivos parquet para otimizar o tempo de execução do recomendador em lote.

Os dados precisam ser consultados por cada um dos N_JOBS processos e é mais rápido ler os dados do disco uma vez para cada processo do que serializar os data frames para cada recomendação.

Os dados que não são mais necessários são posteriormente excluídos

In [13]:
# to remove correlated subqueries and reduce total processing time
atribuido_tipo_usuario_df = conn.execute("SELECT DISTINCT tipo_usuario, role  FROM function_roles_df ORDER BY 1,2").fetchdf()
atribuido_topo_df         = conn.execute("SELECT DISTINCT lotacao_topo, role  FROM orgunit_roles_df  ORDER BY 1,2").fetchdf()
atribuido_lotacao_df      = conn.execute("SELECT DISTINCT sigla_lotacao, role FROM orgunit_roles_df  ORDER BY 1,2").fetchdf()
atribuido_cargo_df        = conn.execute("SELECT DISTINCT cargo, role         FROM function_roles_df ORDER BY 1,2").fetchdf()
atribuido_enfase_df       = conn.execute("SELECT DISTINCT enfase, role        FROM function_roles_df ORDER BY 1,2").fetchdf()
atribuido_funcao_df       = conn.execute("SELECT DISTINCT funcao, role        FROM function_roles_df ORDER BY 1,2").fetchdf()

# write files dataframes to work dir
dataset_df.to_parquet("./WORK/dataset.parquet")
user_roles_df.to_parquet("./WORK/user_roles.parquet")
orgunit_roles_df.to_parquet("./WORK/orgunit_roles.parquet")
function_roles_df.to_parquet("./WORK/function_roles.parquet")
hashed_features_idx.to_parquet("./WORK/hashed_features_idx.parquet")
atribuido_tipo_usuario_df.to_parquet("./WORK/atribuido_tipo_usuario.parquet")
atribuido_topo_df.to_parquet("./WORK/atribuido_topo.parquet")
atribuido_lotacao_df.to_parquet("./WORK/atribuido_lotacao.parquet")
atribuido_cargo_df.to_parquet("./WORK/atribuido_cargo.parquet")
atribuido_enfase_df.to_parquet("./WORK/atribuido_enfase.parquet")
atribuido_funcao_df.to_parquet("./WORK/atribuido_funcao.parquet")

# save memory
del user_roles_df
del orgunit_roles_df
del function_roles_df
del hashed_features_idx
del atribuido_tipo_usuario_df
del atribuido_topo_df
del atribuido_lotacao_df
del atribuido_cargo_df
del atribuido_enfase_df
del atribuido_funcao_df

Como o recomendador irá utilizar múltiplos processos, é preciso que o mesmo seja definido em um módulo diferente do módulo \_\_main\_\_ para que a serialização funcione conforme esperado.

O arquivo abaixo contém a lógica de recomendação de perfis contida no notebook anterior adaptada para execução em um cenário contendo múltiplos arquivos.

Reiteramos aqui o quão frustrante pode ser o processo de tornar um protótipo em Python em um produto que possa atender as necessidades do negócio. A linguagem Python é ótima para prototipação mas para volumes de dados mais elevados, o GIL (Global Interpreter Lock) afeta negativamente a DX (developer experience)

In [14]:
%%writefile recommender.py
import os
import pandas as pd
import duckdb
import pickle
import scipy
import numpy as np
import threading
from knn_indexer import KnnIndex

HASHED_FEATURE_COUNT    = int(os.environ['HASHED_FEATURE_COUNT'])
KNN_INDEX               = os.environ['KNN_INDEX']

class RoleRecommenderState:
    
    # classe utilizada para guardar o estado dos dados do recomendador.
    # A conexão do DuckDB não é serializável e portanto é armazenada em uma variável thread local
    
    def __init__(self):
        self.tld                 = threading.local()
        self.knn_index           = KnnIndex.load(KNN_INDEX)
        self.hashed_features_df  = pd.read_parquet("./WORK/hashed_features_idx.parquet")
        self.on_start_thread()
        
    def on_start_thread(self):
        self.tld.conn                = duckdb.connect(":memory:")
        self.conn.execute("SET enable_progress_bar=false")
        
    @property
    def conn(self):
        return self.tld.conn
                    
class RoleRecommender:
    
    STATE = None
    
    SQL_NEIGHBORS = """
        SELECT  DISTINCT 
                z.chave_usuario as usuario_busca
        ,       a.distance
        ,       b.chave_usuario
        ,       b.sigla_lotacao
        ,       b.cargo
        ,       b.enfase
        ,       b.funcao
        ,       c.role
        ,       CASE WHEN e.role IS NULL THEN 0 ELSE 1 END                   AS atribuido_tipo_usuario
        ,       CASE WHEN f.role IS NULL THEN 0 ELSE 1 END                   AS atribuido_topo        
        ,       CASE WHEN g.role IS NULL THEN 0 ELSE 1 END                   AS atribuido_lotacao
        ,       CASE WHEN h.role IS NULL THEN 0 ELSE 1 END                   AS atribuido_cargo        
        ,       CASE WHEN i.role IS NULL THEN 0 ELSE 1 END                   AS atribuido_enfase
        ,       CASE WHEN j.role IS NULL THEN 0 ELSE 1 END                   AS atribuido_funcao
        FROM    search_df z
                --
                CROSS JOIN _neighbors_df a
                --
                INNER JOIN "./WORK/hashed_features_idx.parquet" b
                ON a.index              = b.index
                --
                INNER JOIN "./WORK/user_roles.parquet" c
                ON b.chave_usuario      = c.chave_usuario
                --
                LEFT OUTER JOIN "./WORK/user_roles.parquet" d
                ON  z.chave_usuario     = d.chave_usuario
                AND c.role              = d.role
                --
                LEFT OUTER JOIN "./WORK/atribuido_tipo_usuario.parquet" e
                ON  z.tipo_usuario      = e.tipo_usuario 
                AND c.role              = e.role
                --
                LEFT OUTER JOIN "./WORK/atribuido_topo.parquet" f
                ON  z.lotacao_topo      = f.lotacao_topo 
                AND c.role              = f.role
                --
                LEFT OUTER JOIN "./WORK/atribuido_lotacao.parquet" g
                ON  z.sigla_lotacao     = g.sigla_lotacao 
                AND c.role              = g.role
                --
                LEFT OUTER JOIN "./WORK/atribuido_cargo.parquet" h
                ON  z.cargo             = h.cargo 
                AND c.role              = h.role
                --
                LEFT OUTER JOIN "./WORK/atribuido_enfase.parquet" i
                ON  z.enfase            = i.enfase 
                AND c.role              = i.role
                --
                LEFT OUTER JOIN "./WORK/atribuido_funcao.parquet" j
                ON  z.funcao            = j.funcao 
                AND c.role              = j.role
                --
        WHERE   b.chave_usuario         <> z.chave_usuario
        AND     z.chave_usuario         IS NOT NULL
        AND     d.role                  IS NULL
        AND     c.role                  NOT IN ('Z:BC_USO_GERAL')
        AND     f.role                  IS NOT NULL -- atribuido_topo        
        AND     e.role                  IS NOT NULL -- atribuido_tipo_usuario
        ORDER   BY a.distance    
    """
    
    SQL_RECOMMENDATIONS = """
        WITH cte_roles AS (
            SELECT  a.usuario_busca
            ,       a.role
            ,       MAX(atribuido_tipo_usuario)     AS atribuido_tipo_usuario
            ,       MAX(atribuido_topo)             AS atribuido_topo
            ,       MAX(atribuido_lotacao)          AS atribuido_lotacao
            ,       MAX(atribuido_cargo)            AS atribuido_cargo
            ,       MAX(atribuido_enfase)           AS atribuido_enfase
            ,       MAX(atribuido_funcao)           AS atribuido_funcao
            ,       MAX(atribuido_tipo_usuario)
            +       MAX(atribuido_topo)
            +       MAX(atribuido_lotacao)
            +       MAX(atribuido_cargo)
            +       MAX(atribuido_enfase)
            +       MAX(atribuido_funcao)           AS atribuicoes_categoricas
            ,       MIN(distance)                   AS min_distance
            ,       AVG(distance)                   AS avg_distance
            ,       (1.0 
                    -   (
                            (
                                MIN(distance) / MAX(distance) 
                            +   AVG(distance) / MAX(distance)
                            ) 
                        *   0.5 
                        )
                    )                               AS distance_factor
            ,       COUNT()                         AS qtd_atribuicoes
            FROM    neighbors_df a
            GROUP   BY a.usuario_busca
            ,       a.role
        )
        , cte_role_scores AS (
            SELECT  a.role
            ,      ( 
                       a.atribuicoes_categoricas 
                   *   a.qtd_atribuicoes 
                   *   a.distance_factor 
                   )
                   /    
                   ( 
                       (SELECT MAX(atribuicoes_categoricas) FROM cte_roles) 
                   *   (SELECT MAX(qtd_atribuicoes)         FROM cte_roles)
                   *   (SELECT MAX(distance_factor)         FROM cte_roles)
                   ) AS score  
            FROM    cte_roles a
        )
        SELECT  a.usuario_busca                AS chave_usuario
        ,       a.role
        ,       a.atribuicoes_categoricas
        ,       a.min_distance
        ,       a.avg_distance
        ,       a.distance_factor
        ,       a.qtd_atribuicoes
        ,       b.score
        ,       a.atribuido_tipo_usuario
        ,       a.atribuido_topo
        ,       a.atribuido_lotacao
        ,       a.atribuido_cargo
        ,       a.atribuido_enfase
        ,       a.atribuido_funcao
        FROM    cte_roles a
                --
                INNER JOIN cte_role_scores b
                ON  a.role                      = b.role
                --
        WHERE   b.score                         > 0.0
        ORDER   BY a.usuario_busca
        ,       b.score DESC
"""
    
    def __init__(self):
        pass
    
    @property
    def state(self):
        return RoleRecommender.STATE
    
    def _get_neighbours(self, search_idx):
        indices, distances = self.state.knn_index.get_neighbors(search_idx)
        _neighbors_df = pd.DataFrame({"index": indices, "distance": distances})
        self.state.conn.register("_neighbors_df", _neighbors_df)
        neighbors_df = self.state.conn.execute(self.SQL_NEIGHBORS).fetchdf()
        self.state.conn.unregister("_neighbors_df")
        return neighbors_df
    
    @classmethod
    def on_process_start(klass):
        klass.STATE = RoleRecommenderState()
        
    def recommend_roles(self, search_user):        
        search_df          = self.state.hashed_features_df[self.state.hashed_features_df["chave_usuario"] == search_user].copy()
        self.state.conn.register('search_df', search_df)
        
        search_index  = search_df.iloc[0]["index"]
        neighbors_df  = self._get_neighbours(search_index)
        self.state.conn.register("neighbors_df", neighbors_df)
        
        recommendations_df = self.state.conn.execute(self.SQL_RECOMMENDATIONS).fetchdf()
        self.state.conn.unregister("search_df")
        self.state.conn.unregister("neighbors_df")
        return search_df, neighbors_df, recommendations_df
        
        
def recommend_roles(search_user):
    if RoleRecommender.STATE is None:
        RoleRecommender.on_process_start()
    return RoleRecommender().recommend_roles(search_user)


Overwriting recommender.py


O código abaixo serve para recarregar o módulo em que a função **recommend_roles** é criada.

Por padrão um módulo não é reimportado caso uma instrução import seja repetida

In [15]:
import importlib
import recommender
importlib.reload(recommender)
from recommender import RoleRecommender, recommend_roles

Segue abaixo uma chamada da função de recomendação de perfis para ilustrar suas saídas

In [16]:
search_df, neighbors_df, recommendations_df = recommend_roles("UR40")
display(search_df)
display(neighbors_df)
display(recommendations_df)
del search_df
del neighbors_df
del recommendations_df

Unnamed: 0,index,chave_usuario,lotacao_topo,sigla_lotacao,tipo_usuario,cargo,enfase,funcao
83535,83535,UR40,TIC,TIC/CORP/DSCESI/DS-PDDS,EMPREGADO,PROF. PETROBRAS DE NIVEL SUPERIOR SENIOR,PCR NS ANALISE DE SISTEMAS ENG SOFTWARE,GERENTE SETORIAL


Unnamed: 0,usuario_busca,distance,chave_usuario,sigla_lotacao,cargo,enfase,funcao,role,atribuido_tipo_usuario,atribuido_topo,atribuido_lotacao,atribuido_cargo,atribuido_enfase,atribuido_funcao
0,UR40,24.964739,CZL1,TIC/CORP/FRIE/PN-CTRI,PROF. PETROBRAS DE NIVEL SUPERIOR MASTER,PCR NS ANALISE DE SISTEMAS ENG SOFTWARE,GERENTE SETORIAL,Z:BC_CHARM_GMUD,1,1,0,1,1,0
1,UR40,27.145971,UQ4P,TIC/CORP/DSCESI/DS-FRJD,PROF. PETROBRAS DE NIVEL SUPERIOR MASTER,PCR NS ANALISE DE SISTEMAS ENG SOFTWARE,GERENTE SETORIAL,Z:FI_AA_PB001_EXE_CON_REL_IMOB,1,1,1,1,1,1
2,UR40,28.400728,CLXN,TIC/CORP/DSCESI/DS-SIG,PROF. PETROBRAS DE NIVEL SUPERIOR MASTER,PCR NS ANALISE DE SISTEMAS INFRAESTRUTURA,GERENTE SETORIAL,Z:EH_AM_PB001_AUD_COMP_VIS,1,1,0,1,1,1
3,UR40,28.400728,CLXN,TIC/CORP/DSCESI/DS-SIG,PROF. PETROBRAS DE NIVEL SUPERIOR MASTER,PCR NS ANALISE DE SISTEMAS INFRAESTRUTURA,GERENTE SETORIAL,Z:EH_AM_PB001_AUD_COMP,1,1,0,1,1,1
4,UR40,31.175816,JAM9,JURIDICO/GG-ACOC/JCORP/CRHC,PROF. PETROBRAS DE NIVEL SUPERIOR SENIOR,PCR NS ADVOCACIA,GERENTE SETORIAL,Z:MM_PBAUT_FISCAL_CONTRATO,1,1,0,1,1,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
60,UR40,36.713062,UQ7E,TIC/PE/PNP-II,PROF. PETROBRAS DE NIVEL SUPERIOR SENIOR,PCR NS ADMINISTRACAO,COORDENADOR(A),Z:FI_AA_PB001_EXE_CON_REL_IMOB,1,1,1,1,1,1
61,UR40,36.734558,CQH3,SRGE/ERGE/SEAI/IAI,PROF. PETROBRAS DE NIVEL SUPERIOR MASTER,PCR NS ENGENHARIA DE EQUIPAMENTOS ELETRONICA,GERENTE SETORIAL,Z:FI_AA_PB001_EXE_CON_REL_IMOB,1,1,1,1,1,1
62,UR40,36.911388,AKZU,AGUP/RES-EE/OPR-II/ATP-BBG-SRR,PROF. PETROBRAS DE NIVEL SUPERIOR SENIOR,PCR NS ENGENHARIA DE PETROLEO,GERENTE SETORIAL,Z:FI_AA_PB001_EXE_CON_REL_IMOB,1,1,1,1,1,1
63,UR40,36.919613,UQ5E,TIC/RGNCL/RGN/DES-DS-GEREI,PROF. PETROBRAS DE NIVEL SUPERIOR MASTER,PCR NS ANALISE DE SISTEMAS PROC NEGOCIOS,GERENTE SETORIAL,Z:FI_AA_PB001_EXE_CON_REL_IMOB,1,1,1,1,1,1


Unnamed: 0,chave_usuario,role,atribuicoes_categoricas,min_distance,avg_distance,distance_factor,qtd_atribuicoes,score,atribuido_tipo_usuario,atribuido_topo,atribuido_lotacao,atribuido_cargo,atribuido_enfase,atribuido_funcao
0,UR40,Z:FI_AA_PB001_EXE_CON_REL_IMOB,6,27.145971,35.079859,0.15728,27,0.673578,1,1,1,1,1,1
1,UR40,Z:EH_AM_PB001_AUD_COMP_VIS,5,28.400728,32.456486,0.159546,5,0.105445,1,1,0,1,1,1
2,UR40,Z:EH_AM_PB001_AUD_COMP,5,28.400728,32.456486,0.159546,5,0.105445,1,1,0,1,1,1
3,UR40,Z:MM_PB001_LIBERADOR_NL_PGTO,5,32.943951,35.020508,0.066079,11,0.096078,1,1,0,1,1,1
4,UR40,Z:MM_PBAUT_FISCAL_CONTRATO,5,31.175816,33.956872,0.085416,7,0.079033,1,1,0,1,1,1
5,UR40,Z:BC_CHARM_GMUD,4,24.964739,31.726342,0.233498,3,0.074074,1,1,0,1,1,0


O trecho de código abaixo serve para estimar o tempo de processamento de todas as recomendações em uma única thread ou processo, desconsiderando o tempo necessário para escrita dos dados

In [17]:
begin = datetime.now()
for i in range(50):
    search_df, neighbors_df, recommendations_df = recommend_roles("U4UL")
end = datetime.now()

delta                  = end - begin
time_per_user          = delta.total_seconds() / 50
total_users            = len(dataset_df["chave_usuario"])
expected_total_time_s  = time_per_user * total_users
expected_total_time_m  = expected_total_time_s / 60
expected_total_time_h  = expected_total_time_m/ 60
print(f"""
delta                  = {delta}
time/user              = {time_per_user}
total users            = {total_users}
expected total time(s) = {expected_total_time_s} seconds
expected total_time(m) = {expected_total_time_m} minutes
expected total_time(h) = {expected_total_time_h} hours

time to write csv is not accounted
""")


delta                  = 0:00:11.971770
time/user              = 0.2394354
total users            = 91798
expected total time(s) = 21979.6908492 seconds
expected total_time(m) = 366.32818082 minutes
expected total_time(h) = 6.105469680333333 hours

time to write csv is not accounted



Os resultados intermediários do recomendador são escritos em modo append em dois arquivos CSV. A cada processamento este arquivos precisam ser sempre excluídos. Ao término, estes arquivos são convertidos e salvos no formato parquet.


O recomendador monta uma lista de chaves de usuário e as processa usando um ProcessPoolExecutor para paralelizar as recomendações. As recomendações são processadas usando a função **recommend_roles** criada acima.

Como não existe necessidade de ordenação dos resultados das recomendações, os objetos contendo os resultado são processados assim que terminam por meio da função **as_completed** do módulo concurrent.futures.

Apesar de pequeno, o trecho de código abaixo apresentou diversos vazamentos de memória e foi necessário cuidado e atenção para que as recomendações rodassem em um tempo satisfatório.

In [18]:
from concurrent.futures import ProcessPoolExecutor, as_completed

if os.path.exists(NEIGHBORS_CSV):
    print(f"removing existing neighbors file")
    os.unlink(NEIGHBORS_CSV)

if os.path.exists(RECOMMENDATIONS_CSV):
    print(f"removing existing recommendations file")
    os.unlink(RECOMMENDATIONS_CSV)

search_users           = list([u for u in removed_roles_df["chave_usuario"].tolist() if u])
neighbors_acc_df       = None
recommendations_acc_df = None
write_headers          = True

# create the futures
with ProcessPoolExecutor(
    max_workers         = N_JOBS
,   initializer         = RoleRecommender.on_process_start
) as executor:
    futures = (executor.submit(recommend_roles, search_user) for search_user in search_users)
    for i, future in enumerate(as_completed(futures)):
        if i % 50 == 0:
            print(F" -> {datetime.now()} :: {i}/{len(search_users)}")        
            if i > 0:
                neighbors_acc_df.to_csv(NEIGHBORS_CSV,             mode='a', header=write_headers, index=False)
                recommendations_acc_df.to_csv(RECOMMENDATIONS_CSV, mode='a', header=write_headers, index=False)
                write_headers = False
                # accumulate results
                neighbors_acc_df.drop(neighbors_acc_df.index, inplace=True)
                recommendations_acc_df.drop(recommendations_acc_df.index, inplace=True)

        _, neighbors_df, recommendations_df = future.result()

        if i > 0:
            neighbors_acc_df       = pd.concat([neighbors_acc_df, neighbors_df])
            recommendations_acc_df = pd.concat([recommendations_acc_df, recommendations_df])        
        else:
            neighbors_acc_df       = neighbors_df
            recommendations_acc_df = recommendations_df
    
    del future
    del _
    del neighbors_df
    del recommendations_df
    
print(F" -> {datetime.now()} :: {len(search_users)}/{len(search_users)}")        
neighbors_acc_df.to_csv(NEIGHBORS_CSV,             mode='a', header=write_headers, index=False)
recommendations_acc_df.to_csv(RECOMMENDATIONS_CSV, mode='a', header=write_headers, index=False)


removing existing neighbors file
removing existing recommendations file
 -> 2023-08-27 20:26:46.082690 :: 0/549
 -> 2023-08-27 20:26:48.605196 :: 50/549
 -> 2023-08-27 20:26:50.615346 :: 100/549
 -> 2023-08-27 20:26:52.813950 :: 150/549
 -> 2023-08-27 20:26:54.986921 :: 200/549
 -> 2023-08-27 20:26:56.955478 :: 250/549
 -> 2023-08-27 20:26:58.965758 :: 300/549
 -> 2023-08-27 20:27:00.918902 :: 350/549
 -> 2023-08-27 20:27:02.986338 :: 400/549
 -> 2023-08-27 20:27:04.921713 :: 450/549
 -> 2023-08-27 20:27:06.823562 :: 500/549
 -> 2023-08-27 20:27:09.804868 :: 549/549


Os CSV's contendo os resultados são posteriormente lidos e convertidos para parquet, representando os artefatos finais entregues para a área de perfis da gerência de gestão de identidades e acessos da área de Segurança da Informação.

In [19]:
recommendations_acc_df.info(verbose=True, show_counts=True)

<class 'pandas.core.frame.DataFrame'>
Index: 602 entries, 0 to 22
Data columns (total 14 columns):
 #   Column                   Non-Null Count  Dtype  
---  ------                   --------------  -----  
 0   chave_usuario            602 non-null    object 
 1   role                     602 non-null    object 
 2   atribuicoes_categoricas  602 non-null    int32  
 3   min_distance             602 non-null    float32
 4   avg_distance             602 non-null    float64
 5   distance_factor          602 non-null    float64
 6   qtd_atribuicoes          602 non-null    int64  
 7   score                    602 non-null    float64
 8   atribuido_tipo_usuario   602 non-null    int32  
 9   atribuido_topo           602 non-null    int32  
 10  atribuido_lotacao        602 non-null    int32  
 11  atribuido_cargo          602 non-null    int32  
 12  atribuido_enfase         602 non-null    int32  
 13  atribuido_funcao         602 non-null    int32  
dtypes: float32(1), float64(3), int32

In [20]:
neighbors_df = pd.read_csv(NEIGHBORS_CSV)
neighbors_df.info(verbose=True, show_counts=True)
neighbors_df.to_parquet(NEIGHBORS_PARQUET)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 112637 entries, 0 to 112636
Data columns (total 14 columns):
 #   Column                  Non-Null Count   Dtype  
---  ------                  --------------   -----  
 0   usuario_busca           112637 non-null  object 
 1   distance                112637 non-null  float64
 2   chave_usuario           112637 non-null  object 
 3   sigla_lotacao           112598 non-null  object 
 4   cargo                   57783 non-null   object 
 5   enfase                  56261 non-null   object 
 6   funcao                  11444 non-null   object 
 7   role                    112637 non-null  object 
 8   atribuido_tipo_usuario  112637 non-null  int64  
 9   atribuido_topo          112637 non-null  int64  
 10  atribuido_lotacao       112637 non-null  int64  
 11  atribuido_cargo         112637 non-null  int64  
 12  atribuido_enfase        112637 non-null  int64  
 13  atribuido_funcao        112637 non-null  int64  
dtypes: float64(1), int64

In [21]:
recommendations_df = pd.read_csv(RECOMMENDATIONS_CSV)
recommendations_df.info(verbose=True, show_counts=True)
recommendations_df.to_parquet(RECOMMENDATIONS_PARQUET)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6813 entries, 0 to 6812
Data columns (total 14 columns):
 #   Column                   Non-Null Count  Dtype  
---  ------                   --------------  -----  
 0   chave_usuario            6813 non-null   object 
 1   role                     6813 non-null   object 
 2   atribuicoes_categoricas  6813 non-null   int64  
 3   min_distance             6813 non-null   float64
 4   avg_distance             6813 non-null   float64
 5   distance_factor          6813 non-null   float64
 6   qtd_atribuicoes          6813 non-null   int64  
 7   score                    6813 non-null   float64
 8   atribuido_tipo_usuario   6813 non-null   int64  
 9   atribuido_topo           6813 non-null   int64  
 10  atribuido_lotacao        6813 non-null   int64  
 11  atribuido_cargo          6813 non-null   int64  
 12  atribuido_enfase         6813 non-null   int64  
 13  atribuido_funcao         6813 non-null   int64  
dtypes: float64(4), int64(8),

In [25]:
sql = """
WITH cte_ranked_recommendations AS (
    SELECT  chave_usuario
    ,       role
    ,       score
    ,       row_number() over(PARTITION BY chave_usuario ORDER BY score DESC) as rank 
    FROM    recommendations_df
    ORDER   BY chave_usuario
    ,       score DESC
), 
cte_removed_found AS (
    SELECT  a.chave_usuario
    ,       b.role
    ,       b.score
    ,       b.rank
    ,       CASE WHEN b.role IS NOT NULL THEN 1 ELSE 0 END AS role_found
    ,       CASE WHEN rank  = 1          THEN 1 ELSE 0 END AS in_top_01
    ,       CASE WHEN rank <= 3          THEN 1 ELSE 0 END AS in_top_03    
    ,       CASE WHEN rank <= 5          THEN 1 ELSE 0 END AS in_top_05    
    ,       CASE WHEN rank <= 10         THEN 1 ELSE 0 END AS in_top_10
    FROM    removed_roles_df a
            --
            LEFT OUTER JOIN cte_ranked_recommendations b
            ON  a.chave_usuario        = b.chave_usuario
            AND a.role                 = b.role
            --
    ORDER   BY b.chave_usuario
    ,       b.score DESC
)
SELECT  COUNT(*)                                             AS roles_removed
--,       SUM(a.role_found)                                  AS roles_found
--,       SUM(a.role_found * a.in_top_01)                    AS roles_found_in_top_01
--,       SUM(a.role_found * a.in_top_03)                    AS roles_found_in_top_03
--,       SUM(a.role_found * a.in_top_05)                    AS roles_found_in_top_05
--,       SUM(a.role_found * a.in_top_10)                    AS roles_found_in_top_10
,       100.0 * SUM(a.role_found)               / COUNT(*)   AS found_pct
,       100.0 * SUM(a.role_found * a.in_top_01) / COUNT(*)   AS found_in_top_01_pct
,       100.0 * SUM(a.role_found * a.in_top_03) / COUNT(*)   AS found_in_top_03_pct
,       100.0 * SUM(a.role_found * a.in_top_05) / COUNT(*)   AS found_in_top_05_pct
,       100.0 * SUM(a.role_found * a.in_top_10) / COUNT(*)   AS found_in_top_10_pct
FROM    cte_removed_found a
"""
result_df = conn.execute(sql).fetchdf()
result_df.to_parquet(RESULT)
result_df.transpose()

Unnamed: 0,0
roles_removed,549.0
roles_found,451.0
roles_found_in_top_01,380.0
roles_found_in_top_03,425.0
roles_found_in_top_05,425.0
roles_found_in_top_10,425.0
roles_found_pct,82.149362
roles_found_in_top_pct,69.216758
roles_found_in_top_03_pct,77.413479
roles_found_in_top_05_pct,77.413479
