# Hire Employee
| Version | Validate Date | Editor | Brief Description |
| --- | --- | --- | --- |
| 1| 28/10/2024| Romario Coronel | Notebook: Hire employee |
| 2| 29/10/2024| Romario Coronel | Notebook: Hire employee |

### Design Architecture

![Step 2 screenshot](https://github.com/rcoroneldev/Data-Engineering-Project-End-to-End/blob/main/Images/Hire_employee-Solution%201.drawio.png?raw=true)



### Import libraries


In [0]:
import pyspark
import pandas as pd
from pyspark.sql.functions import col, lit,concat,when, sum, avg, expr
from pyspark.sql.functions import col, year, month, dayofmonth, to_timestamp
from pyspark.sql.functions import when

### Librarie JDBC SQL Server

In [0]:

com.microsoft.sqlserver:mssql-jdbc:11.2.3.jre8

In [0]:
%pip install flask sqlalchemy pandas

In [0]:

%pip install pyodbc sqlalchemy pandas flask


[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


### Conection setting to the service Azure SQL Database

In [0]:
jdbcHostname = "hireemployee.database.windows.net"
jdbcPort = 1433
jdbcDatabase = "hireemployee"
jdbcUsername = "rcoronel"
jdbcPassword = "Pa$$word1707%"

# URL JDBC
jdbcUrl = f"jdbc:sqlserver://{jdbcHostname}:{jdbcPort};database={jdbcDatabase};encrypt=true;trustServerCertificate=false;loginTimeout=30;"

# Leer datos desde SQL Server
query = "(SELECT * FROM departments) AS tmp"

df_departments = spark.read \
  .format("jdbc") \
  .option("url", jdbcUrl) \
  .option("dbtable", query) \
  .option("user", jdbcUsername) \
  .option("password", jdbcPassword)  \
  .load()

display(df_departments)

id,department
1,Marketing Assistant
2,VP Sales
3,Biostatistician IV
4,Account Representative II
5,VP Marketing
6,Environmental Specialist
7,Software Consultant
8,Office Assistant III
9,Information Systems Manager
10,Desktop Support Technician


### Configure Databricks Notebook and SQL Connection

In [0]:
import pandas as pd
from sqlalchemy import create_engine, text

# Azure SQL connection details
server_name = 'hireemployee.database.windows.net'
database_name = 'hireemployee'
username = 'rcoronel'
password = 'Pa$$word1707%'

# Create SQL connection string using SQLAlchemy
conn_str = f'mssql+pyodbc://{username}:{password}@{server_name}:1433/{database_name}?driver=ODBC+Driver+18+for+SQL+Server'
engine = create_engine(conn_str)


### Create the REST API Using Flask

In [0]:
from flask import Flask, request, jsonify
from sqlalchemy.exc import SQLAlchemyError

app = Flask(__name__)

@app.route('/insert', methods=['POST'])
def insert_data():
    try:
        # Parse the incoming JSON request
        content = request.json

        # Extract the table name and data from the request
        table_name = content.get("departments")
        data = content.get("data")

        # # Validate the data using the data dictionary rules
        # validate_data(table_name, data)

        # Convert the data into a Pandas DataFrame for insertion
        df = pd.DataFrame(data)

        # Perform batch insertion into Azure SQL
        with engine.connect() as conn:
            df.to_sql(table_name, conn, if_exists='append', index=False)

        return jsonify({"status": "success", "message": f"Inserted {len(data)} rows into {table_name}"}), 201

    except ValueError as e:
        return jsonify({"status": "error", "message": str(e)}), 400
    except SQLAlchemyError as e:
        return jsonify({"status": "error", "message": str(e.orig)}), 500
    except Exception as e:
        return jsonify({"status": "error", "message": str(e)}), 500

# Run the Flask app on Databricks (use port 5000)
if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)


## Exploration Data Analysis

### Delta Lake Architecture

#### Container landing 

In [0]:
%scala
val storageAccount = "adlgen2hireemployee"  //nombre de la cuenta de almacenamiento
val container = "landing"  //nombre del contenedor dentro del ADLS
val sasKey = "sv=2022-11-02&ss=bfqt&srt=sco&sp=rwdlacupyx&se=2024-10-30T21:48:44Z&st=2024-10-28T13:48:44Z&spr=https&sig=9B3tYlVCcuPzFH6%2FCvF02GPoSvjV%2Bk9MkZmiN8BJU%2Fc%3D" // sas token del ADLS

In [0]:
%scala
val conf = "fs.azure.sas." + container + "." + storageAccount + ".blob.core.windows.net"

In [0]:
%scala
dbutils.fs.mount( source = "wasbs://"+container+"@"+storageAccount+".blob.core.windows.net", mountPoint = "/mnt/"+container, extraConfigs = Map(conf -> sasKey))

In [0]:
# Comando para desmontar en databricks 
dbutils.fs.unmount("/mnt/landing")

/mnt/landing has been unmounted.


True

In [0]:
%scala
dbutils.fs.ls("/mnt/"+container)

#### Container bronze 

In [0]:
%scala
val storageAccount = "adlgen2hireemployee"  //nombre de la cuenta de almacenamiento
val container_bronze = "bronze"  //nombre del contenedor dentro del ADLS
val sasKey = "sv=2022-11-02&ss=bfqt&srt=sco&sp=rwdlacupyx&se=2024-10-30T21:48:44Z&st=2024-10-28T13:48:44Z&spr=https&sig=9B3tYlVCcuPzFH6%2FCvF02GPoSvjV%2Bk9MkZmiN8BJU%2Fc%3D" // sas token del ADLS

In [0]:
%scala
val conf = "fs.azure.sas." + container_bronze + "." + storageAccount + ".blob.core.windows.net"

In [0]:
%scala
dbutils.fs.mount( source = "wasbs://"+container_bronze+"@"+storageAccount+".blob.core.windows.net", mountPoint = "/mnt/"+container_bronze, extraConfigs = Map(conf -> sasKey))

#### Container gold 

In [0]:
%scala
val storageAccount = "adlgen2hireemployee"  //nombre de la cuenta de almacenamiento
val container_gold = "gold"  //nombre del contenedor dentro del ADLS
val sasKey = "sv=2022-11-02&ss=bfqt&srt=sco&sp=rwdlacupyx&se=2024-10-30T21:48:44Z&st=2024-10-28T13:48:44Z&spr=https&sig=9B3tYlVCcuPzFH6%2FCvF02GPoSvjV%2Bk9MkZmiN8BJU%2Fc%3D" // sas token del ADLS

In [0]:
%scala
val conf = "fs.azure.sas." + container_bronze + "." + storageAccount + ".blob.core.windows.net"

In [0]:
%scala
dbutils.fs.mount( source = "wasbs://"+container_bronze+"@"+storageAccount+".blob.core.windows.net", mountPoint = "/mnt/"+container_bronze, extraConfigs = Map(conf -> sasKey))

### Read csv files

#### Departments csv 

In [0]:
%scala
dbutils.fs.head("/mnt/"+container+"/departments.csv")

#### Jobs csv 

In [0]:
%scala
dbutils.fs.head("/mnt/"+container+"/departments.csv")

#### Hired employees csv 

In [0]:
%scala
dbutils.fs.head("/mnt/"+container+"/hired_employees.csv")

##  Data Transformation

In [0]:
%scala
val departments_df = spark.read
.option("inferSchema","true")
.csv("/mnt/"+container+"/departments.csv")

In [0]:
%scala
display(departments_df) 

_c0,_c1
1,Product Management
2,Sales
3,Research and Development
4,Business Development
5,Engineering
6,Human Resources
7,Services
8,Support
9,Marketing
10,Training


###  Table Departments


In [0]:
%sql
CREATE TABLE Departments
(
id INTEGER,department STRING
)
USING CSV
OPTIONS (inferSchema 'true', delimiter ',')
LOCATION "/mnt/landing/departments.csv"

org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: [TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create table or view `spark_catalog`.`default`.`departments` because it already exists.
Choose a different name, drop the existing object, add the IF NOT EXISTS clause to tolerate pre-existing objects, add the OR REPLACE clause to replace the existing materialized view, or add the OR REFRESH clause to refresh the existing streaming table. SQLSTATE: 42P07
	at org.apache.spark.sql.errors.QueryCompilationErrors$.tableAlreadyExistsError(QueryCompilationErrors.scala:3163)
	at org.apache.spark.sql.execution.command.CreateDataSourceTableCommand.run(createDataSourceTables.scala:64)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.$anonfun$sideEffectResult$2(commands.scala:84)
	at org.apache.spark.sql.execution.SparkPlan.runCommandWithAetherOff(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.runCommandInAetherOrSpark(SparkPlan.scala:191)
	at org.apache.spark.

In [0]:
%sql
select * from Departments

id,department
1,Product Management
2,Sales
3,Research and Development
4,Business Development
5,Engineering
6,Human Resources
7,Services
8,Support
9,Marketing
10,Training


In [0]:
%sql
describe extended Departments

col_name,data_type,comment
id,int,
department,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,default,
Table,departments,
Owner,root,
Created Time,Mon Oct 28 14:52:42 UTC 2024,
Last Access,UNKNOWN,


#### Read csv file with PySpark

In [0]:

department_df = spark.read.option("inferSchema",True) \
            .option("delimiter",',') \
            .csv("/mnt/landing/departments.csv")

In [0]:
department_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: string (nullable = true)



In [0]:
display(department_df)

_c0,_c1
1,Product Management
2,Sales
3,Research and Development
4,Business Development
5,Engineering
6,Human Resources
7,Services
8,Support
9,Marketing
10,Training


#### Column Renamed

In [0]:
department_df = department_df.withColumnRenamed("_c0", "id").withColumnRenamed("_c1", "department")
display(department_df)

id,department
1,Product Management
2,Sales
3,Research and Development
4,Business Development
5,Engineering
6,Human Resources
7,Services
8,Support
9,Marketing
10,Training


In [0]:
department_df = department_df.withColumnRenamed("id", "id_department") 

display(department_df)


id_department,department
1,Product Management
2,Sales
3,Research and Development
4,Business Development
5,Engineering
6,Human Resources
7,Services
8,Support
9,Marketing
10,Training


#### Write format CSV

In [0]:

department_df.coalesce(1).write.format("com.databricks.spark.csv").mode ("overwrite").option("header", "true").save("/mnt/bronze/departments")

###  Table Jobs


#### Read csv file with PySpark

In [0]:
jobs_df = spark.read.option("inferSchema",True) \
            .option("delimiter",',') \
            .csv("/mnt/landing/jobs.csv")

In [0]:
display(jobs_df)

_c0,_c1
1,Marketing Assistant
2,VP Sales
3,Biostatistician IV
4,Account Representative II
5,VP Marketing
6,Environmental Specialist
7,Software Consultant
8,Office Assistant III
9,Information Systems Manager
10,Desktop Support Technician


#### Column Renamed

In [0]:
jobs_df = (jobs_df.withColumnRenamed("_c0", "id")
                   .withColumnRenamed("_c1", "job") )

display(jobs_df)

id,job
1,Marketing Assistant
2,VP Sales
3,Biostatistician IV
4,Account Representative II
5,VP Marketing
6,Environmental Specialist
7,Software Consultant
8,Office Assistant III
9,Information Systems Manager
10,Desktop Support Technician


In [0]:
jobs_df = jobs_df.withColumnRenamed("id", "id_job")
display(jobs_df)


id_job,job
1,Marketing Assistant
2,VP Sales
3,Biostatistician IV
4,Account Representative II
5,VP Marketing
6,Environmental Specialist
7,Software Consultant
8,Office Assistant III
9,Information Systems Manager
10,Desktop Support Technician


#### Write format CSV

In [0]:

jobs_df.coalesce(1).write.format("com.databricks.spark.csv").mode ("overwrite").option("header", "true").save("/mnt/bronze/jobs")

###  Table Hire Employee


In [0]:
%sql
CREATE TABLE Hire_employees
(
id INTEGER,
name STRING,
datetime STRING,
job_id INTEGER,
department_id INTEGER
)
USING CSV
OPTIONS (inferSchema 'true', delimiter ',')
LOCATION "/mnt/landing/hired_employees.csv"

In [0]:
%sql
select * from Hire_employees

id,name,datetime,job_id,department_id
1,Harold Vogt,2021-11-07T02:48:42Z,2.0,96.0
2,Ty Hofer,2021-05-30T05:43:46Z,8.0,
3,Lyman Hadye,2021-09-01T23:27:38Z,5.0,52.0
4,Lotti Crowthe,2021-10-01T13:04:21Z,12.0,71.0
5,Gretna Lording,2021-10-10T22:22:17Z,6.0,80.0
6,Marlow Antecki,2021-04-23T23:45:42Z,6.0,95.0
7,Joan Rillett,2021-10-10T01:33:31Z,9.0,78.0
8,Ulrick Nucciotti,2021-07-24T01:28:40Z,8.0,169.0
9,Lucretia Northcote,2021-04-01T21:22:47Z,9.0,8.0
10,Arty Giacobo,2022-02-08T12:27:07Z,6.0,62.0


#### Read csv file with PySpark

In [0]:
hire_employee_df = spark.read.option("inferSchema",True) \
            .option("delimiter",',') \
            .csv("/mnt/landing/hired_employees.csv")

In [0]:
display(hire_employee_df)

_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7
1,Harold Vogt,2021-11-07T02:48:42Z,2.0,96.0,37,3,Contrato practica profesional
2,Ty Hofer,2021-05-30T05:43:46Z,8.0,,45,2,Contrato practica profesional
3,Lyman Hadye,2021-09-01T23:27:38Z,5.0,52.0,57,3,Contrato indefinido
4,Lotti Crowthe,2021-10-01T13:04:21Z,12.0,71.0,22,2,Contrato temporal
5,Gretna Lording,2021-10-10T22:22:17Z,6.0,80.0,57,3,Contrato indefinido
6,Marlow Antecki,2021-04-23T23:45:42Z,6.0,95.0,33,3,Contrato temporal
7,Joan Rillett,2021-10-10T01:33:31Z,9.0,78.0,55,3,Contrato temporal
8,Ulrick Nucciotti,2021-07-24T01:28:40Z,8.0,169.0,61,2,Contrato indefinido
9,Lucretia Northcote,2021-04-01T21:22:47Z,9.0,8.0,61,1,Contrato temporal
10,Arty Giacobo,2022-02-08T12:27:07Z,6.0,62.0,39,3,Contrato practica profesional


#### Column Renamed

In [0]:
hire_employee_df = hire_employee_df.withColumnRenamed("_c0", "id_hire_employed") \
                                .withColumnRenamed("_c1", "name") \
                                .withColumnRenamed("_c2", "datetime") \
                                .withColumnRenamed("_c3", "job_id") \
                                .withColumnRenamed("_c4", "department_id") \
                                .withColumnRenamed("_c5", "age") \
                                .withColumnRenamed("_c6", "delete") \
                                .withColumnRenamed("_c7", "contract_type")

display(hire_employee_df)

id_hire_employed,name,datetime,job_id,department_id,age,delete,contract_type
1,Harold Vogt,2021-11-07T02:48:42Z,2.0,96.0,37,3,Contrato practica profesional
2,Ty Hofer,2021-05-30T05:43:46Z,8.0,,45,2,Contrato practica profesional
3,Lyman Hadye,2021-09-01T23:27:38Z,5.0,52.0,57,3,Contrato indefinido
4,Lotti Crowthe,2021-10-01T13:04:21Z,12.0,71.0,22,2,Contrato temporal
5,Gretna Lording,2021-10-10T22:22:17Z,6.0,80.0,57,3,Contrato indefinido
6,Marlow Antecki,2021-04-23T23:45:42Z,6.0,95.0,33,3,Contrato temporal
7,Joan Rillett,2021-10-10T01:33:31Z,9.0,78.0,55,3,Contrato temporal
8,Ulrick Nucciotti,2021-07-24T01:28:40Z,8.0,169.0,61,2,Contrato indefinido
9,Lucretia Northcote,2021-04-01T21:22:47Z,9.0,8.0,61,1,Contrato temporal
10,Arty Giacobo,2022-02-08T12:27:07Z,6.0,62.0,39,3,Contrato practica profesional


#### Decomposition of the datetime column

In [0]:
hire_employee_df.printSchema()

root
 |-- id_hire_employed: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- datetime: timestamp (nullable = true)
 |-- job_id: integer (nullable = true)
 |-- department_id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- delete: integer (nullable = true)
 |-- contract_type: string (nullable = true)



In [0]:
hire_employee_df = hire_employee_df.withColumn("datetime_fix", to_timestamp(col("datetime"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))

# display(hire_employee_df)
# hire_employee_df.printSchema()

#### Extract the year, month, day

In [0]:
hire_employee_df = hire_employee_df.withColumn("year", year(col("datetime"))) \
                                    .withColumn("month", month(col("datetime"))) \
                                    .withColumn("day", dayofmonth(col("datetime")))
                                    
hire_employee_df = hire_employee_df.dropna()

In [0]:
hire_employee_df = hire_employee_df.drop("datetime") \
                                .drop("datetime_fix") \
                                 .drop("delete")


In [0]:
hire_employee_df.printSchema()

root
 |-- id_hire_employed: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- job_id: integer (nullable = true)
 |-- department_id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- contract_type: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)



#### Change the value of integer data type  to categoric value

In [0]:
from pyspark.sql.functions import col, when

hire_employee_df = hire_employee_df.withColumn("new_month", 
                                  when(col('month') == 1, "Enero")
                                  .when(col('month') == 2, "Febrero")
                                  .when(col('month') == 3, "Marzo")
                                  .when(col('month') == 4, "Abril")
                                  .when(col('month') == 5, "Mayo")
                                  .when(col('month') == 6, "Junio")
                                  .when(col('month') == 7, "Julio")
                                  .when(col('month') == 8, "Agosto")
                                  .when(col('month') == 9, "Setiembre")
                                  .when(col('month') == 10, "Octubre")
                                  .when(col('month') == 11, "Noviembre")
                                  .when(col('month') == 12, "Diciembre")
                                  .when(col('month').isNull(), "")
                                  .otherwise(col('month')))
                                
display(hire_employee_df)

id_hire_employed,name,job_id,department_id,age,contract_type,year,month,day,new_month
1,Harold Vogt,2,96,37,Contrato practica profesional,2021,11,7,Noviembre
3,Lyman Hadye,5,52,57,Contrato indefinido,2021,9,1,Setiembre
4,Lotti Crowthe,12,71,22,Contrato temporal,2021,10,1,Octubre
5,Gretna Lording,6,80,57,Contrato indefinido,2021,10,10,Octubre
6,Marlow Antecki,6,95,33,Contrato temporal,2021,4,23,Abril
7,Joan Rillett,9,78,55,Contrato temporal,2021,10,10,Octubre
8,Ulrick Nucciotti,8,169,61,Contrato indefinido,2021,7,24,Julio
9,Lucretia Northcote,9,8,61,Contrato temporal,2021,4,1,Abril
10,Arty Giacobo,6,62,39,Contrato practica profesional,2022,2,8,Febrero
11,Libbi Dowtry,6,41,60,Contrato practica profesional,2021,7,5,Julio


In [0]:
hire_employee_df = hire_employee_df.drop("month")


In [0]:
hire_employee_df.printSchema()

root
 |-- id_hire_employed: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- job_id: integer (nullable = true)
 |-- department_id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- contract_type: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- new_month: string (nullable = true)



#### Save the data in the service Azure Datalake Gen 2

#### Write format CSV

In [0]:
hire_employee_df.coalesce(1).write.format("com.databricks.spark.csv").mode ("overwrite").option("header", "true").save("/mnt/bronze/hire_employees")

#### Write format Delta

In [0]:
hire_employee_df.write.format("delta").mode("overwrite").save("dbfs:/user/hive/warehouse/hire_employee.db/hire_employees")

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-1353675602186368>, line 1[0m
[0;32m----> 1[0m hire_employee_df[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)[38;5;241m.[39mmode([38;5;124m"[39m[38;5;124moverwrite[39m[38;5;124m"[39m)[38;5;241m.[39msave([38;5;124m"[39m[38;5;124mdbfs:/user/hive/warehouse/hire_employee.db/hire_employees[39m[38;5;124m"[39m)

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:47[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     45[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     46[0m [38;5;28;01mtry[39;00m:
[0;32m---> 47[0m     res [38;5;241m=[39m func([38;5;241m*[39margs, [38;5;241m*[39m[38;5;241m*[39mkwargs)
[1;32m     48[0m     logger[38;5;2

#### Final Dataframe 

In [0]:
#join hired employed vs join  department

join_one_hire_employee_df = department_df.join(hire_employee_df, department_df.id_department == hire_employee_df.department_id) \
                                    .withColumnRenamed('new_month', 'month')

# hire_employee_df_1.show()

In [0]:
join_one_hire_employee_df.printSchema()

root
 |-- id_department: integer (nullable = true)
 |-- department: string (nullable = true)
 |-- id_hire_employed: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- job_id: integer (nullable = true)
 |-- department_id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- contract_type: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)



In [0]:
#join hired employed vs join  jobs

hire_employee_final_df = join_one_hire_employee_df.join(jobs_df, join_one_hire_employee_df.job_id == jobs_df.id_job) 
hire_employee_final_df.printSchema()       

root
 |-- id_department: integer (nullable = true)
 |-- department: string (nullable = true)
 |-- id_hire_employed: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- job_id: integer (nullable = true)
 |-- department_id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- contract_type: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- id_job: integer (nullable = true)
 |-- job: string (nullable = true)



#### Transform data Q1

In [0]:
hire_employee_final_df = hire_employee_final_df.withColumn("Q1", 
                                  when(col('month') == "Enero", "Q1")
                                  .when(col('month') == "Febrero", "Q1")
                                  .when(col('month') == "Marzo", "Q1")
                                  .when(col('month').isNull(), "")
                                  .otherwise(col('month')))
                                
display(hire_employee_final_df)



id_department,department,id_hire_employed,name,job_id,department_id,age,contract_type,year,day,month,id_job,job,Q1
8,Support,9,Lucretia Northcote,9,8,61,Contrato temporal,2021,1,Abril,9,Information Systems Manager,Abril
3,Research and Development,73,Matthus Szymanzyk,7,3,63,Contrato temporal,2021,26,Abril,7,Software Consultant,Abril
11,Legal,398,Kaitlin Gamlen,11,11,48,Contrato indefinido,2021,21,Abril,11,Financial Advisor,Abril
10,Training,701,Clarie Dionsetti,12,10,60,Contrato indefinido,2021,31,Agosto,12,Computer Systems Analyst I,Agosto
9,Marketing,821,Lindsay Yven,10,9,63,Contrato temporal,2021,10,Agosto,10,Desktop Support Technician,Agosto
9,Marketing,837,Wilton Muldrew,3,9,18,Contrato practica profesional,2021,5,Agosto,3,Biostatistician IV,Agosto
7,Services,913,Moll Danat,1,7,40,Contrato temporal,2021,16,Agosto,1,Marketing Assistant,Agosto
10,Training,918,Brooke Liddel,9,10,23,Contrato indefinido,2021,18,Marzo,9,Information Systems Manager,Q1
5,Engineering,1054,Bevin Baseley,8,5,55,Contrato practica profesional,2021,13,Noviembre,8,Office Assistant III,Noviembre
1,Product Management,1163,Nerta Castro,6,1,39,Contrato practica profesional,2021,23,Junio,6,Environmental Specialist,Junio


#### Transform data Q2

In [0]:
hire_employee_final_df = hire_employee_final_df.withColumn("Q2", 
                                  when(col('month') == "Abril", "Q2")
                                  .when(col('month') == "Mayo", "Q2")
                                  .when(col('month') == "Junio", "Q2")
                                  .when(col('month').isNull(), "")
                                  .otherwise(col('month')))
                                
display(hire_employee_final_df)



id_department,department,id_hire_employed,name,job_id,department_id,age,contract_type,year,day,month,id_job,job,Q1,Q2
8,Support,9,Lucretia Northcote,9,8,61,Contrato temporal,2021,1,Abril,9,Information Systems Manager,Abril,Q2
3,Research and Development,73,Matthus Szymanzyk,7,3,63,Contrato temporal,2021,26,Abril,7,Software Consultant,Abril,Q2
11,Legal,398,Kaitlin Gamlen,11,11,48,Contrato indefinido,2021,21,Abril,11,Financial Advisor,Abril,Q2
10,Training,701,Clarie Dionsetti,12,10,60,Contrato indefinido,2021,31,Agosto,12,Computer Systems Analyst I,Agosto,Agosto
9,Marketing,821,Lindsay Yven,10,9,63,Contrato temporal,2021,10,Agosto,10,Desktop Support Technician,Agosto,Agosto
9,Marketing,837,Wilton Muldrew,3,9,18,Contrato practica profesional,2021,5,Agosto,3,Biostatistician IV,Agosto,Agosto
7,Services,913,Moll Danat,1,7,40,Contrato temporal,2021,16,Agosto,1,Marketing Assistant,Agosto,Agosto
10,Training,918,Brooke Liddel,9,10,23,Contrato indefinido,2021,18,Marzo,9,Information Systems Manager,Q1,Marzo
5,Engineering,1054,Bevin Baseley,8,5,55,Contrato practica profesional,2021,13,Noviembre,8,Office Assistant III,Noviembre,Noviembre
1,Product Management,1163,Nerta Castro,6,1,39,Contrato practica profesional,2021,23,Junio,6,Environmental Specialist,Junio,Q2


#### Transform data Q3

In [0]:
hire_employee_final_df = hire_employee_final_df.withColumn("Q3", 
                                  when(col('month') == "Julio", "Q3")
                                  .when(col('month') == "Agosto", "Q3")
                                  .when(col('month') == "Setiembre", "Q3")
                                  .when(col('month').isNull(), "")
                                  .otherwise(col('month')))
                                
display(hire_employee_final_df)



id_department,department,id_hire_employed,name,job_id,department_id,age,contract_type,year,day,month,id_job,job,Q1,Q2,Q3
8,Support,9,Lucretia Northcote,9,8,61,Contrato temporal,2021,1,Abril,9,Information Systems Manager,Abril,Q2,Abril
3,Research and Development,73,Matthus Szymanzyk,7,3,63,Contrato temporal,2021,26,Abril,7,Software Consultant,Abril,Q2,Abril
11,Legal,398,Kaitlin Gamlen,11,11,48,Contrato indefinido,2021,21,Abril,11,Financial Advisor,Abril,Q2,Abril
10,Training,701,Clarie Dionsetti,12,10,60,Contrato indefinido,2021,31,Agosto,12,Computer Systems Analyst I,Agosto,Agosto,Q3
9,Marketing,821,Lindsay Yven,10,9,63,Contrato temporal,2021,10,Agosto,10,Desktop Support Technician,Agosto,Agosto,Q3
9,Marketing,837,Wilton Muldrew,3,9,18,Contrato practica profesional,2021,5,Agosto,3,Biostatistician IV,Agosto,Agosto,Q3
7,Services,913,Moll Danat,1,7,40,Contrato temporal,2021,16,Agosto,1,Marketing Assistant,Agosto,Agosto,Q3
10,Training,918,Brooke Liddel,9,10,23,Contrato indefinido,2021,18,Marzo,9,Information Systems Manager,Q1,Marzo,Marzo
5,Engineering,1054,Bevin Baseley,8,5,55,Contrato practica profesional,2021,13,Noviembre,8,Office Assistant III,Noviembre,Noviembre,Noviembre
1,Product Management,1163,Nerta Castro,6,1,39,Contrato practica profesional,2021,23,Junio,6,Environmental Specialist,Junio,Q2,Junio



#### Write in the final table with the fortmat Delta

In [0]:
hire_employee_final_df.write.format("delta") \
                            .mode("overwrite") \
                            .option("mergeSchema", "true") \
                            .save("dbfs:/user/hive/warehouse/hire_employee.db/hire_employees")

hire_employee_final_df

DataFrame[id_department: int, department: string, id_hire_employed: int, name: string, job_id: int, department_id: int, age: int, contract_type: string, year: int, day: int, month: string, id_job: int, job: string]