# 001 Extract Workshop

DB Connection

In [1]:
import yaml
import psycopg2
from psycopg2 import sql
from sqlalchemy import create_engine, text
import pandas as pd

In [2]:
def load_config(file_path="config.yaml"):
    with open(file_path, "r") as file:
        return yaml.safe_load(file)

In [21]:
config = load_config()
db_config = config["database"]

# Load credentials
db_user = db_config["user"]
db_password = db_config["password"]
db_host = db_config["host"]
db_port = db_config["port"]
db_name = db_config["name"]

# DB connection
conn = psycopg2.connect(
    dbname="postgres",
    user=db_user,
    password=db_password,
    host=db_host,
    port=db_port
)
conn.autocommit = True

Check if DB Exists

In [4]:
db_name = "etl_db_workshop_1"
try:
    with conn.cursor() as cur:
        cur.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(db_name)))
        print(f"Base de datos '{db_name}' creada exitosamente.")
except psycopg2.errors.DuplicateDatabase:
    print(f"La base de datos '{db_name}' ya existe.")
finally:
    conn.close()

La base de datos 'etl_db_workshop_1' ya existe.


Load Data

In [5]:
df = pd.read_csv("data/candidates.csv", sep=";")
print(df.columns)
print(df.info())
df.head()

Index(['First Name', 'Last Name', 'Email', 'Application Date', 'Country',
       'YOE', 'Seniority', 'Technology', 'Code Challenge Score',
       'Technical Interview Score'],
      dtype='object')
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 50000 entries, 0 to 49999
Data columns (total 10 columns):
 #   Column                     Non-Null Count  Dtype 
---  ------                     --------------  ----- 
 0   First Name                 50000 non-null  object
 1   Last Name                  50000 non-null  object
 2   Email                      50000 non-null  object
 3   Application Date           50000 non-null  object
 4   Country                    50000 non-null  object
 5   YOE                        50000 non-null  int64 
 6   Seniority                  50000 non-null  object
 7   Technology                 50000 non-null  object
 8   Code Challenge Score       50000 non-null  int64 
 9   Technical Interview Score  50000 non-null  int64 
dtypes: int64(3), object(7)
memor

Unnamed: 0,First Name,Last Name,Email,Application Date,Country,YOE,Seniority,Technology,Code Challenge Score,Technical Interview Score
0,Bernadette,Langworth,leonard91@yahoo.com,2021-02-26,Norway,2,Intern,Data Engineer,3,3
1,Camryn,Reynolds,zelda56@hotmail.com,2021-09-09,Panama,10,Intern,Data Engineer,2,10
2,Larue,Spinka,okey_schultz41@gmail.com,2020-04-14,Belarus,4,Mid-Level,Client Success,10,9
3,Arch,Spinka,elvera_kulas@yahoo.com,2020-10-01,Eritrea,25,Trainee,QA Manual,7,1
4,Larue,Altenwerth,minnie.gislason@gmail.com,2020-05-20,Myanmar,13,Mid-Level,Social Media Community Management,9,7


Fixing column names

In [None]:
# Replacing empty spaces with underscores and lower columns
df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_')

In [7]:
print(df.columns)

Index(['first_name', 'last_name', 'email', 'application_date', 'country',
       'yoe', 'seniority', 'technology', 'code_challenge_score',
       'technical_interview_score'],
      dtype='object')


Create SQLAlqchemy Engine

In [8]:
# Crear el motor de SQLAlchemy
engine = create_engine(f"postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}")

Drop Table if Neccesary 

In [None]:
# Borrar la tabla 'etl_staging_table_workshop_1' #Staging table
# Borrar la tabla 'etl_transformed_table_workshop_1' #Transformed table
# with engine.connect() as conn:
#     conn.execute(text("DROP TABLE IF EXISTS etl_transformed_table_workshop_1"))
#     print("Tabla 'tabla_etl' borrada exitosamente en PostgreSQL.")

Tabla 'tabla_etl' borrada exitosamente en PostgreSQL.


Staging Raw Data

In [9]:
db_table_name = "etl_staging_table_workshop_1"
df.to_sql(db_table_name, engine, if_exists='append', index=False)

print(f"DataFrame guardado exitosamente en la tabla {db_table_name} en PostgreSQL.")

# # Crear la tabla en PostgreSQL
# with engine.connect() as conn:
#     conn.execute(text(f"""
#         CREATE TABLE IF NOT EXISTS {db_table_name} (
#             id SERIAL PRIMARY KEY,
#             first_name VARCHAR(100),
#             last_name VARCHAR(100),
#             email VARCHAR(100),
#             application_date DATE,
#             country VARCHAR(100),
#             yoe INT,
#             seniority VARCHAR(100),
#             technology VARCHAR(100),
#             code_challenge_score INT,
#             technical_interview_score INT
#         );
#     """))
#     conn.commit()
#     print(f"Tabla {db_table_name} creada exitosamente en PostgreSQL.")

# # Guardar el DataFrame en la tabla {db_table_name} en PostgreSQL
# df.to_sql({db_table_name}, engine, if_exists='append', index=False)

DataFrame guardado exitosamente en la tabla etl_staging_table_workshop_1 en PostgreSQL.


Loading / Reading Staging Table

In [12]:
# # Read data from table
# with engine.connect() as conn:
#     result = conn.execute(text(f"SELECT * FROM {db_table_name};"))
#     rows = result.fetchall()

# # Show results
# print(f"Datos en {db_table_name}:")
# for row in rows:
#     print(row)

db_table_name = "etl_staging_table_workshop_1"

with engine.connect() as conn:
    df_staging = pd.read_sql(f"SELECT * FROM {db_table_name}", conn)

df_staging.head()
# # Close conn
# conn.close()

Unnamed: 0,first_name,last_name,email,application_date,country,yoe,seniority,technology,code_challenge_score,technical_interview_score
0,Bernadette,Langworth,leonard91@yahoo.com,2021-02-26,Norway,2,Intern,Data Engineer,3,3
1,Camryn,Reynolds,zelda56@hotmail.com,2021-09-09,Panama,10,Intern,Data Engineer,2,10
2,Larue,Spinka,okey_schultz41@gmail.com,2020-04-14,Belarus,4,Mid-Level,Client Success,10,9
3,Arch,Spinka,elvera_kulas@yahoo.com,2020-10-01,Eritrea,25,Trainee,QA Manual,7,1
4,Larue,Altenwerth,minnie.gislason@gmail.com,2020-05-20,Myanmar,13,Mid-Level,Social Media Community Management,9,7


Transform Data

In [24]:
db_table_name = "etl_staging_table_workshop_1"
with engine.connect() as conn:
    df_staging = pd.read_sql(f"SELECT * FROM {db_table_name}", conn)

df_staging.head()

Unnamed: 0,first_name,last_name,email,application_date,country,yoe,seniority,technology,code_challenge_score,technical_interview_score
0,Bernadette,Langworth,leonard91@yahoo.com,2021-02-26,Norway,2,Intern,Data Engineer,3,3
1,Camryn,Reynolds,zelda56@hotmail.com,2021-09-09,Panama,10,Intern,Data Engineer,2,10
2,Larue,Spinka,okey_schultz41@gmail.com,2020-04-14,Belarus,4,Mid-Level,Client Success,10,9
3,Arch,Spinka,elvera_kulas@yahoo.com,2020-10-01,Eritrea,25,Trainee,QA Manual,7,1
4,Larue,Altenwerth,minnie.gislason@gmail.com,2020-05-20,Myanmar,13,Mid-Level,Social Media Community Management,9,7


In [25]:
df_transformed = df_staging.copy()

df_transformed["hired"] = df_transformed.apply(
    lambda row: "HIRED" if row["code_challenge_score"] >= 7 and row["code_challenge_score"] >= 7 
    else "NOT_HIRED",
    axis=1
)

# Display transformed DataFrame
print(df_transformed.columns)
print(df_transformed.info())
df_transformed.head()

Index(['first_name', 'last_name', 'email', 'application_date', 'country',
       'yoe', 'seniority', 'technology', 'code_challenge_score',
       'technical_interview_score', 'hired'],
      dtype='object')
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 50000 entries, 0 to 49999
Data columns (total 11 columns):
 #   Column                     Non-Null Count  Dtype 
---  ------                     --------------  ----- 
 0   first_name                 50000 non-null  object
 1   last_name                  50000 non-null  object
 2   email                      50000 non-null  object
 3   application_date           50000 non-null  object
 4   country                    50000 non-null  object
 5   yoe                        50000 non-null  int64 
 6   seniority                  50000 non-null  object
 7   technology                 50000 non-null  object
 8   code_challenge_score       50000 non-null  int64 
 9   technical_interview_score  50000 non-null  int64 
 10  hired              

Unnamed: 0,first_name,last_name,email,application_date,country,yoe,seniority,technology,code_challenge_score,technical_interview_score,hired
0,Bernadette,Langworth,leonard91@yahoo.com,2021-02-26,Norway,2,Intern,Data Engineer,3,3,NOT_HIRED
1,Camryn,Reynolds,zelda56@hotmail.com,2021-09-09,Panama,10,Intern,Data Engineer,2,10,NOT_HIRED
2,Larue,Spinka,okey_schultz41@gmail.com,2020-04-14,Belarus,4,Mid-Level,Client Success,10,9,HIRED
3,Arch,Spinka,elvera_kulas@yahoo.com,2020-10-01,Eritrea,25,Trainee,QA Manual,7,1,HIRED
4,Larue,Altenwerth,minnie.gislason@gmail.com,2020-05-20,Myanmar,13,Mid-Level,Social Media Community Management,9,7,HIRED


In [26]:
db_table_name = "etl_transformed_table_workshop_1"
df_transformed.to_sql(db_table_name, engine, if_exists='append', index=False)

print(f"DataFrame guardado exitosamente en la tabla {db_table_name} en PostgreSQL.")

DataFrame guardado exitosamente en la tabla etl_transformed_table_workshop_1 en PostgreSQL.


Reading / Loading Transformed Table

In [27]:
db_table_name = "etl_transformed_table_workshop_1"
with engine.connect() as conn:
    df_transformed = pd.read_sql(f"SELECT * FROM {db_table_name}", conn)

df_transformed.head()

# # Close conn
# conn.close()

Unnamed: 0,first_name,last_name,email,application_date,country,yoe,seniority,technology,code_challenge_score,technical_interview_score,hired
0,Bernadette,Langworth,leonard91@yahoo.com,2021-02-26,Norway,2,Intern,Data Engineer,3,3,NOT_HIRED
1,Camryn,Reynolds,zelda56@hotmail.com,2021-09-09,Panama,10,Intern,Data Engineer,2,10,NOT_HIRED
2,Larue,Spinka,okey_schultz41@gmail.com,2020-04-14,Belarus,4,Mid-Level,Client Success,10,9,HIRED
3,Arch,Spinka,elvera_kulas@yahoo.com,2020-10-01,Eritrea,25,Trainee,QA Manual,7,1,HIRED
4,Larue,Altenwerth,minnie.gislason@gmail.com,2020-05-20,Myanmar,13,Mid-Level,Social Media Community Management,9,7,HIRED
