### **ETL Process with PostgreSQL and Python**  
##### **Written by: Liseth Esmeralda Erazo Varela**  

In this section, we will set up an **ETL (Extract, Transform, Load) pipeline** using **PostgreSQL** and **Python**.  
The goal is to efficiently load raw data, perform necessary **data cleaning** and **transformations**, and store the processed data in a structured format for analysis.  

##### **Steps in this process:**  
- Establish a **connection to PostgreSQL** from Python.  
- Load **raw data** into a dedicated staging table.  
- Perform **data cleaning and transformations** to ensure data integrity.  
- Store the **cleaned dataset** in a final table for further analysis.  
- Prepare the data for **visualization** in a dashboard.  

By structuring the ETL pipeline this way, we ensure that the process is **efficient, scalable, and reproducible** for future use. 



In [1]:
import psycopg2
import json
import pandas as pd
import numpy as np
from sqlalchemy import create_engine

First of all, chargue the credentials for the database. I put a print in config to dettect anomalies in the connection.


In [2]:
with open("credenciales.json", "r", encoding="utf-8") as config_file:
     config = json.load(config_file)
print (config)

{'user': 'postgres', 'password': '12345', 'host': 'localhost', 'port': 5432}


Now, create the connection with PostgreSQL 

In [8]:
try:
    conn = psycopg2.connect(**config)
    cursor = conn.cursor()
    conn.autocommit = True 
    
   
    cursor.execute("SELECT version();")
    db_version = cursor.fetchone()
    print(f"✅ Conexión exitosa a PostgreSQL - {db_version[0]}")

except Exception as e:
    print(f"❌ Error en la conexión: {e}")



✅ Conexión exitosa a PostgreSQL - PostgreSQL 17.4 on x86_64-windows, compiled by msvc-19.42.34436, 64-bit



Create the Database. The database will be used to store the raw data before processing.

In [9]:
try:
    cursor.execute("CREATE DATABASE candidatos;")
    print("Base de datos 'candidatos' creada.")
except psycopg2.Error as e:
    print(f"Error al crear la base de datos: {e}")

Error al crear la base de datos: la base de datos «candidatos» ya existe



Close the connection to the database.

In [10]:
cursor.close()
conn.close()
print("🔗 Conexión cerrada.")


🔗 Conexión cerrada.


after, open again the connection, connect to the database and create the table. 

In [11]:
config["dbname"] = "candidatos"  
conn = psycopg2.connect(**config)
cursor = conn.cursor()


cursor.execute("SELECT current_database();")
db_name = cursor.fetchone()[0]
print(f"✅ Conectado a la base de datos: {db_name}")

cursor.close()
conn.close()

✅ Conectado a la base de datos: candidatos


Create the table for dirty data

In [12]:
try:
    with psycopg2.connect(**config) as conn:
        with conn.cursor() as cursor:
            create_table_query = """
            CREATE TABLE aplicantes (
                first_name TEXT NOT NULL,
                last_name TEXT NOT NULL,
                email TEXT NOT NULL,
                application_date DATE NOT NULL,
                country TEXT NOT NULL,
                years_of_experience INT,
                seniority TEXT NOT NULL,
                technology TEXT NOT NULL,
                code_challenge_score NUMERIC,
                technical_interview_score NUMERIC
            );
            """
            cursor.execute(create_table_query)
            conn.commit()
            print("Tabla 'aplicantes' creada.")


except Exception as e:
    print(f"Error al crear la tabla: {e}")


Error al crear la tabla: la relación «aplicantes» ya existe



Charge the data. you should have the data in the same directory.

In [None]:
csv_path = "your_path_here.csv"
try:
    with psycopg2.connect(**config) as conn:
        with conn.cursor() as cursor:
            with open(csv_path, "r", encoding="utf-8") as file:
                cursor.copy_expert("""
                COPY aplicantes FROM STDIN WITH CSV HEADER DELIMITER ';';
                """, file)
            
            conn.commit()
            print("Archivo subido exitosamente a PostgreSQL.")

except Exception as e:
    print(f"Error al subir el archivo: {e}")

Archivo subido exitosamente a PostgreSQL.


# Convert the data into a dataframe.

In [14]:
conn = psycopg2.connect(**config)

query = "SELECT * FROM aplicantes;"
df = pd.read_sql(query, conn)

conn.close()

print(df.head())

  df = pd.read_sql(query, conn)


   first_name   last_name                      email application_date  \
0  Bernadette   Langworth        leonard91@yahoo.com       2021-02-26   
1      Camryn    Reynolds        zelda56@hotmail.com       2021-09-09   
2       Larue      Spinka   okey_schultz41@gmail.com       2020-04-14   
3        Arch      Spinka     elvera_kulas@yahoo.com       2020-10-01   
4       Larue  Altenwerth  minnie.gislason@gmail.com       2020-05-20   

   country  years_of_experience  seniority                         technology  \
0   Norway                    2     Intern                      Data Engineer   
1   Panama                   10     Intern                      Data Engineer   
2  Belarus                    4  Mid-Level                     Client Success   
3  Eritrea                   25    Trainee                          QA Manual   
4  Myanmar                   13  Mid-Level  Social Media Community Management   

   code_challenge_score  technical_interview_score  
0                   3

Now, the function .info help us to get more information about the dataset. The fuction could be used when you first load a dataset to get a quick overview. also to check missing data or incorrect data types and to understand the structure of the DataFrame before performing further analysis.

In [15]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000 entries, 0 to 99999
Data columns (total 10 columns):
 #   Column                     Non-Null Count   Dtype  
---  ------                     --------------   -----  
 0   first_name                 100000 non-null  object 
 1   last_name                  100000 non-null  object 
 2   email                      100000 non-null  object 
 3   application_date           100000 non-null  object 
 4   country                    100000 non-null  object 
 5   years_of_experience        100000 non-null  int64  
 6   seniority                  100000 non-null  object 
 7   technology                 100000 non-null  object 
 8   code_challenge_score       100000 non-null  float64
 9   technical_interview_score  100000 non-null  float64
dtypes: float64(2), int64(1), object(7)
memory usage: 7.6+ MB


Describe fuction, for numerical columns calculates and displays some statistics like count, mean, std, min, percentile and max. For categorical columns count -> non-null values, unique -> unique categories, top -> most frequently category, freq -> Most common Category.

In [16]:
df.describe()

Unnamed: 0,years_of_experience,code_challenge_score,technical_interview_score
count,100000.0,100000.0,100000.0
mean,15.28698,4.9964,5.00388
std,8.830608,3.16688,3.165066
min,0.0,0.0,0.0
25%,8.0,2.0,2.0
50%,15.0,5.0,5.0
75%,23.0,8.0,8.0
max,30.0,10.0,10.0


*isnull.sume()* is used to check missing values in a dataframe. isnull just returns a Dataframe with boolean values. 1 if the data is missing or 0 if the value is present. .sum just sums up the true (1) values for each column.  

In [18]:
df.isnull().sum()

first_name                   0
last_name                    0
email                        0
application_date             0
country                      0
years_of_experience          0
seniority                    0
technology                   0
code_challenge_score         0
technical_interview_score    0
dtype: int64

The line of code df.duplicated().sum() is used to check for duplicate rows in a DataFrame. First of all it considers all columns when it identifies duplicates. Therefore the function .sum() count the number of true values.  The result is the total number of duplicate rows in the DataFrame.

In [21]:
df.duplicated().sum()

np.int64(50000)

In the next line of the code, I wanted to see the name of each column in the Dataset. This function helps me with this because returns a list with the names.

In [23]:
print(df.columns.tolist())

['first_name', 'last_name', 'email', 'application_date', 'country', 'years_of_experience', 'seniority', 'technology', 'code_challenge_score', 'technical_interview_score']


A potential bias could be that a candidate might attempt to apply twice. Since we do not have an identification document to verify this, we will check for duplicates using their email address. We could delete the duplicates to keep only one entry, but since this is a fictional exercise, we will only write the line of code for learning purposes.

The value_counts function helps us identify and analyze duplicates within the DataFrame.

In [24]:
df["email"].value_counts()

email
fern70@gmail.com              6
marianne31@yahoo.com          6
omari6@gmail.com              4
rodolfo28@gmail.com           4
isaiah24@yahoo.com            4
                             ..
rocky_mitchell@hotmail.com    2
dolores.roob@hotmail.com      2
savanah.stracke@gmail.com     2
vivienne.fritsch@yahoo.com    2
abigayle.crooks@yahoo.com     2
Name: count, Length: 49833, dtype: int64

Now, let's clean the data and perform data transformations. 

We can create a new column in the Dataset named hired. this column help us to determinate the status of the candidate. Lets break down the code step by step:

- df["hired"] creates the new column. 
- now, we used the numpy library with the fuction where, that works like if-else. 
- (df["Code Challenge Score"] >= 7) & (df["Technical Interview Score"] >= 7) is the condition being evaluated.If the candidate's "Code Challenge Score" and "Technical Interview Score" are both 7 or higher, the value in the "hired" column will be True. Otherwise, the value will be False.

In [25]:
df["hired"] = np.where((df["code_challenge_score"] >= 7) & (df["technical_interview_score"] >= 7), True, False)

I need to identify the best candidates based on their scores. To do this, I will create a function that evaluates each candidate's performance in the areas Code Challenge Score and Technical Interview Score. If the candidate's areas are both 7 or higher, the fuction sums the score. if the candidate's "Code Challenge Score" or "Technical  Interview Score" are less than 7, the score will be 0. This way, only candidates who meet the minimum requirements in both areas will have a positive score, and I can easily identify the best candidates based on their total scores.

In [26]:
def Cts(row): #Cts = calculate total score.
    if row["code_challenge_score"] >= 7 and row["technical_interview_score"] >= 7:
        return row["code_challenge_score"] + row["technical_interview_score"]
    else:
        return 0

Create the column Total Score for each candidate

In [27]:
df["total_score"] = df.apply(Cts, axis=1)

Sort the DataFrame by "Total Score" in descending order

In [28]:
df = df.sort_values(by="total_score", ascending=False)

The "hired" column currently contains boolean values (True/False).  
For better analysis and integration into a database, it is converted into an integer format (1 for hired, 0 for not hired).  
This transformation allows for easier aggregations and statistical calculations.  

In [29]:
df["hired"] = df["hired"].astype(int)

Grouping the dataset by "Country" and summing the "hired" column allows us to determine how many candidates were hired in each country.  
Sorting the results in descending order helps identify which countries have the highest hiring rates. 

In [30]:
hires_by_country = df.groupby("country")["hired"].sum().reset_index()
hires_by_country = hires_by_country.sort_values(by="hired", ascending=False)
print (hires_by_country)

                               country  hired
160           Northern Mariana Islands     88
93   Heard Island and McDonald Islands     82
156                              Niger     80
204                          Sri Lanka     80
194                         Seychelles     80
..                                 ...    ...
37                              Canada     36
187   Saint Vincent and the Grenadines     32
130                           Maldives     32
86                                Guam     30
143                         Montenegro     30

[244 rows x 2 columns]


Before categorizing, we first inspect the distinct values in this column to understand the range of technologies present.

In [31]:
print(df['technology'].unique()) 


['Client Success' 'Data Engineer' 'Development - Frontend'
 'Social Media Community Management' 'Development - Backend'
 'Technical Writing' 'Design' 'Business Intelligence'
 'Development - CMS Backend' 'Security' 'Development - CMS Frontend'
 'Salesforce' 'QA Manual' 'Mulesoft' 'Database Administration'
 'Adobe Experience Manager' 'Game Development' 'Security Compliance'
 'DevOps' 'Business Analytics / Project Management' 'QA Automation'
 'System Administration' 'Development - FullStack' 'Sales']


To simplify the analysis in Power BI, we consolidate the various technologies into seven main categories

In [32]:
# Diccionario para mapear tecnologías a categorías generales (7 categorías)
tech_categories = {
    'Development - Frontend': 'Frontend Development',
    'Development - CMS Frontend': 'Frontend Development',

    'Development - Backend': 'Backend & FullStack Development',
    'Development - FullStack': 'Backend & FullStack Development',
    'Development - CMS Backend': 'Backend & FullStack Development',
    'Game Development': 'Backend & FullStack Development',
    'Mulesoft': 'Backend & FullStack Development',

    'DevOps': 'Infrastructure & Security',
    'System Administration': 'Infrastructure & Security',
    'Security': 'Infrastructure & Security',
    'Security Compliance': 'Infrastructure & Security',

    'Database Administration': 'Databases & Data Science',
    'Data Engineer': 'Databases & Data Science',

    'Business Intelligence': 'Data Analysis & BI',
    'Business Analytics / Project Management': 'Data Analysis & BI',

    'QA Manual': 'QA & Testing',
    'QA Automation': 'QA & Testing',

    'Salesforce': 'Other Areas',
    'Adobe Experience Manager': 'Other Areas',
    'Technical Writing': 'Other Areas',
    'Social Media Community Management': 'Other Areas',
    'Design': 'Other Areas',
    'Client Success': 'Other Areas',
    'Sales': 'Other Areas'
}

# Crear la nueva columna con la categoría ajustada
df['tech_category'] = df['technology'].map(tech_categories)

# Verificar la distribución
print(df['tech_category'].value_counts())



tech_category
Other Areas                        26960
Backend & FullStack Development    23198
Infrastructure & Security          19220
Databases & Data Science            7768
Frontend Development                7642
Data Analysis & BI                  7624
QA & Testing                        7588
Name: count, dtype: int64


Now, lets see the dataframe cleaned in python 

In [33]:
df.head()


Unnamed: 0,first_name,last_name,email,application_date,country,years_of_experience,seniority,technology,code_challenge_score,technical_interview_score,hired,total_score,tech_category
42366,Cyril,Larson,ray91@hotmail.com,2018-06-30,Madagascar,2,Junior,Client Success,10.0,10.0,1,20.0,Other Areas
19338,Caleb,Stehr,vicenta.williamson@yahoo.com,2021-10-20,Hong Kong,15,Senior,Data Engineer,10.0,10.0,1,20.0,Databases & Data Science
9066,Fletcher,Macejkovic,savanna11@gmail.com,2020-07-17,Central African Republic,15,Mid-Level,Development - Frontend,10.0,10.0,1,20.0,Frontend Development
70066,Devonte,Boyle,madisen9@hotmail.com,2020-12-15,Sri Lanka,22,Intern,Social Media Community Management,10.0,10.0,1,20.0,Other Areas
41762,Carroll,Yost,savannah_walter50@gmail.com,2018-04-26,Cape Verde,26,Intern,Development - Backend,10.0,10.0,1,20.0,Backend & FullStack Development


Create a new table to put the cleaned data. 

In [34]:
conn = psycopg2.connect(**config)
cur = conn.cursor()


with conn.cursor() as cursor:
    create_table_query = """
        CREATE TABLE IF NOT EXISTS aplicantes_limpios (
            id SERIAL PRIMARY KEY,  -- ID único autoincremental
            first_name TEXT,
            last_name TEXT,
            email TEXT UNIQUE,
            application_date DATE,  -- Convertimos la fecha a DATE en PostgreSQL
            country TEXT,
            years_of_experience INT,
            seniority TEXT,
            technology TEXT,
            code_challenge_score FLOAT,
            technical_interview_score FLOAT,
            hired INT,  -- Convertimos el booleano en entero
            total_score FLOAT
        );
    """


# Ejecutar la consulta
cur.execute(create_table_query)
conn.commit()

# Cerrar conexión
cur.close()
conn.close()

print(" Tabla 'clean_applicants' creada correctamente.")

 Tabla 'clean_applicants' creada correctamente.


Now, put the cleaned data into the table Aplicantes_limpios. we use the command engine to create the connection because it is required if im going to use the command ".to_sql()"

In [35]:
# Cargar credenciales desde JSON
with open("credenciales.json") as f:
    creds = json.load(f)

# Construir la URL de conexión a la base de datos 'candidatos'
db_url = f"postgresql://{creds['user']}:{creds['password']}@{creds['host']}:{creds['port']}/candidatos"

# Crear el engine
engine = create_engine(db_url)

# Probar la conexión
try:
    with engine.connect() as connection:
        print("Conexión exitosa a la base de datos 'candidatos' ")
except Exception as e:
    print(f"Error al conectar: {e}")

Conexión exitosa a la base de datos 'candidatos' 


Insert the DataFrame into the Table (Finally!) 

In [36]:
df.to_sql("clean_applicants", con=engine, if_exists="replace", index=False)


1000

Let's check if the data loaded successfully!

In [37]:
query = "SELECT * FROM clean_applicants LIMIT 5;"
df_preview = pd.read_sql(query, con=engine)
df_preview

Unnamed: 0,first_name,last_name,email,application_date,country,years_of_experience,seniority,technology,code_challenge_score,technical_interview_score,hired,total_score,tech_category
0,Cyril,Larson,ray91@hotmail.com,2018-06-30,Madagascar,2,Junior,Client Success,10.0,10.0,1,20.0,Other Areas
1,Caleb,Stehr,vicenta.williamson@yahoo.com,2021-10-20,Hong Kong,15,Senior,Data Engineer,10.0,10.0,1,20.0,Databases & Data Science
2,Fletcher,Macejkovic,savanna11@gmail.com,2020-07-17,Central African Republic,15,Mid-Level,Development - Frontend,10.0,10.0,1,20.0,Frontend Development
3,Devonte,Boyle,madisen9@hotmail.com,2020-12-15,Sri Lanka,22,Intern,Social Media Community Management,10.0,10.0,1,20.0,Other Areas
4,Carroll,Yost,savannah_walter50@gmail.com,2018-04-26,Cape Verde,26,Intern,Development - Backend,10.0,10.0,1,20.0,Backend & FullStack Development


Close the connection


In [None]:

cur.close()
conn.close()
