In [33]:
import boto3
import pandas as pd
import numpy as np
import configparser
import psycopg2
import hashlib
import uuid
from faker import Faker
import random

# Config setup

In [31]:
etl_config = configparser.ConfigParser()
etl_config.read('etl_config.cfg')

['etl_config.cfg']

In [32]:
tran_config = configparser.ConfigParser()
tran_config.read('transactional_config.cfg')

['transactional_config.cfg']

# AWS client connection

In [34]:
access_key = etl_config.get('IAM', 'ACCESS_KEY')
secret_key = etl_config.get('IAM', 'SECRET_KEY')
region = etl_config.get('REGION', 'REGION_NAME')

aws_rds_conn = boto3.client('rds', 
                            aws_access_key_id=access_key, 
                            aws_secret_access_key=secret_key, 
                            region_name=region)

# AWS RDS creation

In [35]:
etl_db_instance_id = etl_config.get('DB', 'DB_INSTANCE_ID')
etl_db_name = etl_config.get('DB', 'DB_NAME')
etl_db_username = etl_config.get('DB', 'DB_USERNAME')
etl_db_password = etl_config.get('DB', 'DB_PASSWORD')
etl_db_port = etl_config.get('DB', 'DB_PORT')
etl_vpc_security_group = etl_config.get('VPC', 'SECURITY_GROUP')

In [36]:
tran_db_instance_id = tran_config.get('DB', 'DB_INSTANCE_ID')
tran_db_name = tran_config.get('DB', 'DB_NAME')
tran_db_username = tran_config.get('DB', 'DB_USERNAME')
tran_db_password = tran_config.get('DB', 'DB_PASSWORD')
tran_db_port = tran_config.get('DB', 'DB_PORT')
tran_vpc_security_group = tran_config.get('VPC', 'SECURITY_GROUP')

In [37]:
try:
    response = aws_rds_conn.create_db_instance(
        DBInstanceIdentifier=etl_db_instance_id,
        DBName=etl_db_name,
        MasterUsername=etl_db_username,
        MasterUserPassword=etl_db_password,
        Port=int(etl_db_port),
        DBInstanceClass='db.t3.micro',
        Engine='mysql',
        PubliclyAccessible=True,
        AllocatedStorage=20,
        VpcSecurityGroupIds=[etl_vpc_security_group]
    )
    print(response)
except aws_rds_conn.exceptions.DBInstanceAlreadyExistsFault:
    print('La instancia ya existe')
except Exception as ex:
    print(f"Error: ${ex}")

La instancia ya existe


# RDS host name

In [42]:
try:
    etl_instance = aws_rds_conn.describe_db_instances(DBInstanceIdentifier=etl_db_instance_id)
    ETL_RDS_HOST_NAME = instance.get('DBInstances')[0].get('Endpoint').get('Address')
    print(ETL_RDS_HOST_NAME)
except Exception as ex:
    print('Error: ', ex)

dwshop.cnkf6pwg5si3.us-east-2.rds.amazonaws.com


In [39]:
etl_driver = f"""mysql://{etl_db_username}:{etl_db_password}@{ETL_RDS_HOST_NAME}:{etl_db_port}/{etl_db_name}"""

In [41]:
try:
    tran_instance = aws_rds_conn.describe_db_instances(DBInstanceIdentifier=tran_db_instance_id)
    TRAN_RDS_HOST_NAME = instance.get('DBInstances')[0].get('Endpoint').get('Address')
    print(TRAN_RDS_HOST_NAME)
except Exception as ex:
    print('Error: ', ex)

dwshop.cnkf6pwg5si3.us-east-2.rds.amazonaws.com


# Creation of ETL system

In [None]:

# -- Creación de la dimensión Categoría
# CREATE TABLE dim_categoria (
#   categoria_id SERIAL PRIMARY KEY,
#   nombre VARCHAR(50),
#   descripcion VARCHAR(255)
# );

# -- Creación de la dimensión Producto
# CREATE TABLE dim_producto (
#   producto_id SERIAL PRIMARY KEY,
#   codigo VARCHAR(50),
#   nombre VARCHAR(100),
#   descripcion VARCHAR(255),
#   categoria_id INTEGER REFERENCES dim_categoria(categoria_id)
# );

# -- Creación de la dimensión Empleado
# CREATE TABLE dim_empleado (
#   empleado_id SERIAL PRIMARY KEY,
#   nombre VARCHAR(100),
#   tipo_documento VARCHAR(20),
#   num_documento VARCHAR(20),
#   direccion VARCHAR(70),
#   telefono VARCHAR(20),
#   email VARCHAR(50)
# );

# -- Creación de la dimensión Comprobante
# CREATE TABLE dim_comprobante (
#   comprobante_id SERIAL PRIMARY KEY,
#   tipo_comprobante VARCHAR(20)
# );

# -- Creación de la tabla de hechos Ventas
# CREATE TABLE fact_ventas (
#   venta_id SERIAL PRIMARY KEY,
#   producto_id INTEGER REFERENCES dim_producto(producto_id),
#   empleado_id INTEGER REFERENCES dim_empleado(empleado_id),
#   comprobante_id INTEGER REFERENCES dim_comprobante(comprobante_id),
#   cantidad INTEGER,
#   precio_venta DECIMAL(11,2),
#   descuento DECIMAL(11,2),
#   total_venta DECIMAL(11,2),
#   fecha TIMESTAMP
# );
sql_query = '''SELECT * FROM categoria;'''
df_categoria = pd.read_sql(sql_query, driver)

df_categoria