# ETLs y DAGs

DISCLAIMER: Esta notebook tiene un proposito mas bien ilustrativo que de implementacion. Es posible que el codigo no logren ejecutarlo exitosamente ya que son fragmentos para mostrar ejemplos.  

In [None]:
import pandas as pd
import datetime as dt
import requests
import io
from sqlalchemy import create_engine, text
import os
from decouple import config
import logging

In [None]:
# URL del dataset que querramos utilizar
URL = 'https://cs.famaf.unc.edu.ar/~mteruel/datasets/diplodatos/sysarmy_survey_2020_processed.csv'

In [None]:
# Cuando nos queremos conectar a una Base de Datos productiva normalmente necesitamos pasar credenciales para acceder.
# Estas credenciales NO deben ser escritas en archivos compartidos subidos a github, sino mas bien en archivos "privados".
# Una buena practica para manejar credenciales es en archivos ".env" que solo quedan registrados en su computadora local.
# La libreria "python decouple" permite leer estos archivos de configuracion .env y manejarlo como variables.
DB_USER = config('DB_USER')
DB_PASSWORD = config('DB_PASSWORD')
DB_HOST = config('DB_HOST')
DB_PORT = config('DB_PORT')

# Una buena practica es dejar el codigo de las queries SQL en archivos separados de la notebook, .sql
SQL_SCRIPT = 'queries.sql'


In [None]:
# En lugar de usar prints para ver el avance a medida que va corriendo el script se utilizan los logs.
# Los logs basicamente son registros que se van dejando para saber el codigo que ha sido ejecutado.
# Es decision arbitraria del programador decidir que desea registrar en los logs.
# En python se utiliza la libreria logging https://docs.python.org/3/library/logging.html#logging-levels
# La libreria permite definir niveles de logs (ERROR, DEBUG, INFO, etc). Segun la criticidad del error.
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', level=logging.DEBUG, datefmt='%I:%M:%S')


In [None]:
def connection_db():
  '''Connect to DB using SQLAlchemy methods. Returns an engine created and connected'''
  try:
      # ejemplo de conexion a PostgreSQL utilizando SQLalchemy
      engine = create_engine(f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/survey".format(),
                              echo=False, client_encoding='utf8')
      logger.info('Conexion exitosa a la base de datos')
      return engine

  except ValueError as e:
      logger.error(e)

In [None]:
def extract(url):
  # The extract process could be complex including some SQL queries
  df = pd.read_csv(url)
  logger.info('read_csv exitoso')
  return df

In [None]:
def load(engine):
    '''Toma la conexion a la base de datos engine y a partir del sql.script
    definido y ejecuta sus queries definidos sobre la base'''
    try:
        sql_file = open(SQL_SCRIPT)
        sql_as_string = sql_file.read()
        with engine.connect() as conn:
            rs = conn.execute(text(sql_as_string))
    except Exception as e:
        logger.error(e)

In [None]:
def transformation(filename, engine, tablename):
    '''Toma el nombre del archivo, la conexion a la base (engine) y el nombre de la tabla.
     Crea un dataframe y escribe una tabla a partir del engine
    ingestando los datos del archivo en dicha tabla.'''
    try:
        df['fecha'] = dt.date.today()
        df.columns = df.columns.str.lower()
        df.to_sql(tablename, con=engine, if_exists="replace")
        logger.info('Datos ingestados en la base de datos')
        logger.info('Cantidad de registros en el archivo: {}'.format(len(df.index)))
    except ValueError as e:
        logger.error(e)

In [None]:
# la funcion main es muy utilizada en scripts python cuando tenemos archivos .py por ejemplo etl.py
# al tener la funcion main pueden correr desde la terminal python etl.py y va a ejecutar lo definido en la funcion main
def main():

    logger.info('Comienza la extraccion')

    engine = connection_db()

    df = extract(URL)

    load(filename, engine)

    transformation(engine)

if __name__ == "__main__":
    logger.info('ETL Process Initialized')
    main()

# Ejemplo de DAG en Airflow

In [None]:
# ejemplo de DAG en Airflow
from airflow import DAG

with DAG(
    'dag_test',
    default_args=default_args,
    description='DAG ',
    schedule_interval=timedelta(hours=1),
    start_date=datetime(2022, 1, 26),
) as dag:
    extraction = PythonOperator(task_id='extraction',
                               python_callable=extract)  # Consulta SQL
    transformation = PythonOperator(task_id='transformation',
                                    python_callable=transform)    # Procesar datos con pandas
    load = PythonOperator(task_id='load',
                                 python_callable=load) # Carga de datos


    extraction >> transformation >> load