## Proyecto Final - Ing. de Datos Rocking Data 🤘 - Santiago Sosa

In [None]:
from sqlalchemy import create_engine

# Readme: Seguir primero los pasos en create_schema.py!

import pymysql
import pandas as pd

host = '127.0.0.1'
port = 3306
user = 'uservpetitions'
password = '12345678'
database = 'visa_petitions'

conn = pymysql.connect(
    host=host,
    port=port,
    user=user,
    passwd=password,
    db=database,
    charset='utf8mb4')


def extract(engine):
    engine.execute("USE staging")  # staging para guardar la data tal cual como viene

    engine.execute("DROP TABLE IF EXISTS raw_petitions;")
    engine.execute("""
        CREATE TABLE raw_petitions AS
        SELECT * FROM visa_petitions.petitions
        ;""")

    engine.execute("DROP TABLE IF EXISTS raw_cities_states;")
    engine.execute("""
        CREATE TABLE raw_cities_states AS
        SELECT * FROM visa_petitions.cities_states
        ;""")

    engine.execute("DROP TABLE IF EXISTS raw_employers;")
    engine.execute("""
        CREATE TABLE raw_employers AS
        SELECT * FROM visa_petitions.employers
        ;""")

    engine.execute("DROP TABLE IF EXISTS raw_soc;")
    engine.execute("""
        CREATE TABLE raw_soc AS
        SELECT * FROM visa_petitions.soc
        ;""")

# A partir de acá no está editado
def transform(engine):
    engine.execute("USE staging")

    engine.execute("DROP TABLE IF EXISTS stg_bt_employee;")
    engine.execute("""
        CREATE TABLE stg_bt_employee AS
            SELECT
                EmployeeID AS id,
                ssn,
                name,
                position,
                'Physician' AS role,
                NULL AS registered_nurse
            FROM raw_physician
            UNION
            SELECT
                EmployeeID AS id,
                ssn,
                name,
                position,
                'Nurse' AS role,
                registered AS registered_nurse
            FROM raw_nurse
        ;""")

    engine.execute("DROP TABLE IF EXISTS stg_bt_procedure;")
    engine.execute("""
        CREATE TABLE stg_bt_procedure AS
            SELECT
                code,
                name,
                cost
            FROM raw_procedures
        ;""")

    engine.execute("DROP TABLE IF EXISTS stg_bt_patient;")
    engine.execute("""
        CREATE TABLE stg_bt_patient AS
            SELECT
                ssn,
                name,
                address,
                phone,
                InsuranceID AS insurance_id,
                pcp
            FROM raw_patient
        ;""")

    engine.execute("DROP TABLE IF EXISTS stg_bt_stay;")
    engine.execute("""
        CREATE TABLE stg_bt_stay AS
            SELECT
                StayID AS id,
                patient AS patient_ssn,
                room,
                StayStart AS start,
                StayEnd AS end
            FROM raw_stay
        ;""")

    engine.execute("DROP TABLE IF EXISTS stg_ft_procedure;")
    engine.execute("""
        CREATE TABLE stg_ft_procedure AS
            SELECT
                patient AS patient_ssn,
                Procedures AS procedure_code,
                stay AS stay_id,
                DateUndergoes AS date,
                physician AS physician_id,
                AssistingNurse AS nurse_id
            FROM raw_undergoes
        ;""")


def load(engine):
    engine.execute("USE dw_hospital")

    engine.execute("""
        INSERT INTO bt_employee(
            id,
            ssn,
            name,
            position,
            role,
            registered_nurse
        )
        SELECT
            id,
            ssn,
            name,
            position,
            role,
            registered_nurse
        FROM staging.stg_bt_employee
        ;""")

    engine.execute("""
        INSERT INTO bt_procedure(
            code,
            name,
            cost
        )
        SELECT
            code,
            name,
            cost
        FROM staging.stg_bt_procedure
        ;""")

    engine.execute("""
        INSERT INTO bt_patient(
            ssn,
            name,
            address,
            phone,
            insurance_id,
            pcp
        )
        SELECT
            ssn,
            name,
            address,
            phone,
            insurance_id,
            pcp
        FROM staging.stg_bt_patient
        ;""")

    engine.execute("""
        INSERT INTO bt_stay(
            id,
            patient_ssn,
            room,
            start,
            end
        )
        SELECT
            id,
            patient_ssn,
            room,
            start,
            end
        FROM staging.stg_bt_stay
        ;""")

    engine.execute("""
        INSERT INTO ft_procedure(
            patient_ssn,
            procedure_code,
            stay_id,
            date,
            physician_id,
            nurse_id
        )
        SELECT
            patient_ssn,
            procedure_code,
            stay_id,
            date,
            physician_id,
            nurse_id
        FROM staging.stg_ft_procedure
        ;""")


if __name__ == '__main__':
    
    # connect to server
    engine = create_engine('mysql+pymysql://uservpetitions:12345678@127.0.0.1:3306')
    
    # extraemos los datos de la fuente y los guardamos en un area staging para usar
    extract(engine)
    
    # adecuamos los datos a la estructura de nuestro esquema y los guardamos en staging
    transform(engine)

    # agregamos los nuevos datos a nuestro data warehouse
    load(engine)