In [1]:
import psycopg2
import json

In [2]:
request = {
    "entities": {
        "capabilities": [
            "Apache Hadoop",
            "Spark",
            "AWS",
            "arquitecturas de datos escalables",
            "análisis de datos en tiempo real",
            "automatización de procesos",
            "Docker",
            "Kubernetes",
            "Java"
        ],
        "role": "Ingeniero de datos"
    },
    "input_token": 90,
    "output_token": 42,
    "tokens": 132
}

In [3]:
import os
import psycopg2
from psycopg2 import DatabaseError
from dotenv import load_dotenv

#cargar las variables de entorno
load_dotenv()

#Crear conexion con PostgreSQL
def get_db_connection():
    try:
        connection = psycopg2.connect(
            host=os.getenv('PGSQL_HOST'),
            user=os.getenv('PGSQL_USER'),
            password=os.getenv('PGSQL_PASSWORD'),
            dbname=os.getenv('PGSQL_DATABASE'),
            port=os.getenv('PGSQL_PORT')
        )
        return connection
    except DatabaseError as ex:
        raise ex

In [4]:
from psycopg2.extras import RealDictCursor

def buscar_candidatos_postgre(rol, capabilities):
    # Formatear la búsqueda para usar en to_tsquery
    
    busqueda_capabilities = ' | '.join(
            [f"'{capability.strip()}'" if ' ' in capability else capability.strip() for capability in capabilities]
        )

    if busqueda_capabilities.startswith(" | "):
        busqueda_capabilities = busqueda_capabilities[3:]
    if busqueda_capabilities.endswith(" | "):
        busqueda_capabilities = busqueda_capabilities[:-3]
        

    if isinstance(rol, str):
        busqueda_rol = rol.strip()  # Eliminar espacios innecesarios
        busqueda_rol = ' & '.join(busqueda_rol.split())  # Reemplazar espacios por " & " para to_tsquery
    else:
        raise ValueError("El parámetro 'rol' debe ser una cadena.")

    print(busqueda_capabilities, busqueda_rol)
    
    consulta = """
    SET enable_nestloop = off; 
    
    EXPLAIN ANALYZE
WITH filtered_skills AS (
    SELECT id, description
    FROM recruitment.skills
    WHERE description IN ('Apache Hadoop', 'Spark', 'AWS', 'Docker', 'Kubernetes', 'Java', 'Python', 'SQL')
),
filtered_job_profile AS (
    SELECT jobs.id AS job_id, jobs.name AS job_name, level_of_exp.name AS level_name, jobs.profile_type_id AS profile_type_id, profiles_type.description AS profile_type_description
    FROM recruitment.job AS jobs
    INNER JOIN recruitment.levelofexp AS level_of_exp 
        ON level_of_exp.id = jobs.levelofexperience_id
    INNER JOIN recruitment.profiletype AS profiles_type
        ON jobs.profile_type_id = profiles_type.id
    WHERE to_tsvector('spanish', jobs.name || ' ' || profiles_type.description || ' ' || level_of_exp.name) 
        @@ to_tsquery('spanish', 'Ingeniero & de & datos')
)
SELECT 
    a_p.id AS applicant_profile_id,
    u.name AS user_name, 
    job.job_name,
    job.level_name AS level_of_exp, 
    job.profile_type_description AS profile_type,
    ARRAY_AGG(skills.description) AS skills,
	workexperience.description 
FROM 
    recruitment.user AS u
INNER JOIN 
    recruitment.applicant_profile AS a_p ON u.user_id = a_p.user_id
INNER JOIN 
    recruitment.applicantprofile_skill AS a_p_skill ON a_p_skill.applicant_profile_id = a_p.id
INNER JOIN 
    filtered_skills AS skills ON skills.id = a_p_skill.skill_id
INNER JOIN 
    recruitment.application AS application ON u.user_id = application.user_id
INNER JOIN 
    filtered_job_profile AS job ON job.job_id = application.job_id
INNER JOIN 
	recruitment.workexperience AS workexperience on workexperience.applicantprofile_id = a_p.id
GROUP BY 
    a_p.id, u.name, job.job_name, job.job_id, job.level_name, job.profile_type_description, workexperience.description; 

    """
    resultados = []
    try:
        conn = get_db_connection()  # Obtén la conexión a la base de datos
        with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
            # Ejecutar la consulta con el término de búsqueda seguro
            cur.execute(consulta, (busqueda_capabilities,busqueda_rol))
            resultados = cur.fetchall()

        conn.close()
    except Exception as e:
        print(f"Error al consultar PostgreSQL: {e}")
    
    # Convertir los resultados en una lista de diccionarios
    # return json.dumps([dict(row) for row in resultados], default=str)
    return [dict(row) for row in resultados]

In [5]:
data = request
role = data["entities"]["role"]
capabilities = data["entities"]["capabilities"]

# Llamar a la función de búsqueda con el término combinado
aplicantes_filtrados = buscar_candidatos_postgre(role, capabilities)


'Apache Hadoop' | Spark | AWS | 'arquitecturas de datos escalables' | 'análisis de datos en tiempo real' | 'automatización de procesos' | Docker | Kubernetes | Java Ingeniero & de & datos


In [6]:
aplicantes_filtrados

[{'QUERY PLAN': 'GroupAggregate  (cost=29561.12..29869.94 rows=2145 width=200) (actual time=633.028..670.139 rows=35605 loops=1)'},
 {'QUERY PLAN': '  Group Key: a_p.id, u.name, jobs.id, level_of_exp.name, profiles_type.description, workexperience.description'},
 {'QUERY PLAN': '  ->  Gather Merge  (cost=29561.12..29805.59 rows=2145 width=178) (actual time=633.015..651.407 rows=45215 loops=1)'},
 {'QUERY PLAN': '        Workers Planned: 1'},
 {'QUERY PLAN': '        Workers Launched: 1'},
 {'QUERY PLAN': '        ->  Sort  (cost=28561.11..28564.26 rows=1262 width=178) (actual time=626.875..628.774 rows=22608 loops=2)'},
 {'QUERY PLAN': '              Sort Key: a_p.id, u.name, jobs.id, level_of_exp.name, profiles_type.description, workexperience.description'},
 {'QUERY PLAN': '              Sort Method: external merge  Disk: 4096kB'},
 {'QUERY PLAN': '              Worker 0:  Sort Method: external merge  Disk: 4136kB'},
 {'QUERY PLAN': '              ->  Parallel Hash Join  (cost=25177.

In [7]:
import pandas as pd

data = []
for item in aplicantes_filtrados:
    line = item['QUERY PLAN']
    if 'cost=' in line and 'actual time=' in line:
        operation = line.split('  ')[-1].strip()  # Operación
        cost = line.split('cost=')[1].split(' ')[0].split('..')
        actual_time = line.split('actual time=')[1].split(' ')[0].split('..')
        rows = line.split('rows=')[1].split(' ')[0] if 'rows=' in line else None
        data.append({
            'Operación': operation,
            'Costo Inicial': float(cost[0]),
            'Costo Final': float(cost[1]),
            'Tiempo Inicial (ms)': float(actual_time[0]),
            'Tiempo Final (ms)': float(actual_time[1]),
            'Filas Procesadas': int(rows) if rows else None
        })

# Crear un DataFrame
df = pd.DataFrame(data)

df

Unnamed: 0,Operación,Costo Inicial,Costo Final,Tiempo Inicial (ms),Tiempo Final (ms),Filas Procesadas
0,(cost=29561.12..29869.94 rows=2145 width=200) ...,29561.12,29869.94,633.028,670.139,2145
1,(cost=29561.12..29805.59 rows=2145 width=178) ...,29561.12,29805.59,633.015,651.407,2145
2,(cost=28561.11..28564.26 rows=1262 width=178) ...,28561.11,28564.26,626.875,628.774,1262
3,(cost=25177.76..28496.11 rows=1262 width=178) ...,25177.76,28496.11,575.553,605.282,1262
4,(cost=0.00..3065.54 rows=65454 width=97) (actu...,0.0,3065.54,0.003,16.799,65454
5,(cost=25161.98..25161.98 rows=1262 width=89) (...,25161.98,25161.98,575.218,575.231,1262
6,(cost=22810.74..25161.98 rows=1262 width=89) (...,22810.74,25161.98,549.241,567.264,1262
7,(cost=0.00..2100.54 rows=65454 width=51) (actu...,0.0,2100.54,0.003,5.408,65454
8,(cost=22799.56..22799.56 rows=894 width=149) (...,22799.56,22799.56,549.158,549.17,894
9,(cost=13954.78..22799.56 rows=894 width=149) (...,13954.78,22799.56,460.509,537.574,894
