# Env setup

In [122]:
%load_ext autoreload

%autoreload 2

In [123]:
import boto3
import pandas as pd
import numpy as np
import configparser
import psycopg2
from faker import Faker
import constants as cons

# Reading configuration file and credentials

In [124]:
config = configparser.ConfigParser()

config.read(cons.CONFIG_FILE)

['config.cfg']

In [119]:
aws_rds_conn = boto3.client('rds', aws_access_key_id=config.get(cons.USER, cons.ACCESS_KEY),
                            aws_secret_access_key=config.get(cons.USER, cons.SECRET_KEY),
                            region_name='us-east-1'
                            )

# Checking available instances for user

In [120]:
rds_instances_ids = []
aws_response = aws_rds_conn.describe_db_instances()

for db_instance in aws_response['DBInstances']:
    db_instance_identifier = db_instance['DBInstanceIdentifier']
    db_engine = db_instance['Engine']
    if (db_engine == 'mysql'):
      aws_mysql = db_instance
    else:
      aws_postgres = db_instance
    print(f"La instancia '{db_instance_identifier}' utiliza el motor de base de datos '{db_engine}'")


La instancia 'ja3plqub98cr' utiliza el motor de base de datos 'mysql'
La instancia 'xux3br0jo1lp' utiliza el motor de base de datos 'postgres'


In [114]:
print(aws_mysql['Endpoint']['Address'])
print(aws_postgres['Endpoint']['Address'])

ja3plqub98cr.cr0suguo232c.us-east-1.rds.amazonaws.com
xux3br0jo1lp.cr0suguo232c.us-east-1.rds.amazonaws.com


In [21]:
try: 
    response = aws_rds_conn.create_db_instance(
        DBInstanceIdentifier=config.get(cons.DB, cons.INSTANCE_ID),
        DBName=config.get(cons.DB, cons.DB_NAME),
        MasterUsername=config.get(cons.DB, cons.DB_USERNAME),
        MasterUserPassword=config.get(cons.DB, cons.DB_PASSWORD),
        Port=int(config.get(cons.DB, cons.PORT)),
        DBInstanceClass= cons.INSTANCE_CLASS,
        Engine=cons.ENGINE,
        PubliclyAccessible=True,
        AllocatedStorage=cons.ALLOC_STORAGE,
        VpcSecurityGroupIds=[config.get(cons.VPC, cons.SEC_GROUP)]
    )
except aws_rds_conn.exceptions.DBInstanceAlreadyExistsFault:
    print("La instancia ya existe")
except Exception as ex:
    print("Error!! ", ex)

## ETL

### Connection to MySQL

import de connector y confifuracion de la conexion.

In [176]:
import mysql.connector #TODO: Remove, only testing
import psycopg2 #TODO: Remove, only testing




# Establecer la conexión con la base de datos Mysql
conexionMysql = mysql.connector.connect(
		host=aws_mysql['Endpoint']['Address'],  # Cambia esto por la dirección IP o el nombre del host de tu servidor MySQL
		user=config.get('DB-MYSQL', 'DB_MYSQL_USERNAME'),  # Cambia esto por tu nombre de usuario de MySQL
		password=config.get('DB-MYSQL', 'DB_MYSQL_PASSWORD'),  # Cambia esto por tu contraseña de MySQL
		database=config.get('DB-MYSQL', 'DB_MYSQL_NAME') # Cambia esto por el nombre de tu base de datos
)

# Establecer la conexión con la base de datos Postgres
dbname = config.get('DB-POSTGRE', 'DB_POSTGRE_NAME')
user = config.get('DB-POSTGRE', 'DB_POSTGRE_USERNAME') # ! Change for user in local Postgres DB
password = config.get('DB-POSTGRE', 'DB_POSTGRE_PASSWORD')
host = aws_postgres['Endpoint']['Address'] # ! Change for url
port = aws_postgres['Endpoint']['Port']

try:
	conexionPSQL = psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port)
except:
  print('Error de conexion')

### Probando conexion de AWS

Conexion MYSQL

In [177]:
cursor = conexionMysql.cursor()

# Ejecuta una consulta para mostrar las tablas de la base de datos
cursor.execute("select * from venta")

# Obtiene los resultados de la consulta
tables = cursor.fetchall()

# Imprime las tablas
print("Tablas en la base de datos:")
for table in tables:
    print(table)

# Cierra el cursor y la conexión
cursor.close()
conexionMysql.close()

Tablas en la base de datos:
(1, 427, 33, 'Invoice', 'G', 'G991249', datetime.datetime(2020, 8, 29, 0, 0), Decimal('3.33'), Decimal('6460.65'), 'Processed')
(2, 567, 47, 'Invoice', 'F', 'F482969', datetime.datetime(2020, 12, 18, 0, 0), Decimal('6.19'), Decimal('5714.17'), 'Cancelled')
(3, 455, 2, 'Invoice', 'U', 'U049801', datetime.datetime(2020, 1, 14, 0, 0), Decimal('11.23'), Decimal('5576.60'), 'Processed')
(4, 264, 21, 'Receipt', 'U', 'U781829', datetime.datetime(2021, 7, 30, 0, 0), Decimal('10.98'), Decimal('4194.30'), 'Processed')
(5, 398, 2, 'Invoice', 'Z', 'Z887893', datetime.datetime(2021, 12, 21, 0, 0), Decimal('0.68'), Decimal('6435.98'), 'Processed')
(6, 314, 26, 'Invoice', 'W', 'W303357', datetime.datetime(2021, 5, 28, 0, 0), Decimal('1.67'), Decimal('3649.80'), 'Cancelled')
(7, 226, 14, 'Invoice', 'O', 'O092834', datetime.datetime(2023, 7, 25, 0, 0), Decimal('7.38'), Decimal('517.00'), 'Cancelled')
(8, 340, 27, 'Receipt', 'T', 'T250956', datetime.datetime(2021, 4, 30, 0, 0

Test conexion PSQL

In [140]:
cursor = conexionPSQL.cursor()

# Ejecuta una consulta para mostrar las tablas de la base de datos
cursor.execute("SELECT * FROM dimarticulo")

# Obtiene los resultados de la consulta
tables = cursor.fetchall()

# Imprime las tablas
print("Tablas en la base de datos:")
for table in tables:
    print(table[0])

# Cierra el cursor y la conexión
cursor.close()
conexionPSQL.close()

Tablas en la base de datos:


### Insertando data transaccional

In [175]:
# ! La Data ya fue ingresada
import csv

cursor = conexionMysql.cursor()

try:
	with open('./Proyecto_Final_Data/categoria.csv', 'r') as file:
			csv_data = csv.reader(file)
			next(csv_data)  # Ignora la primera fila (cabecera)

			for row in csv_data:
					cursor.execute("INSERT INTO categoria (idcategoria, nombre, descripcion, estado) VALUES (%s, %s, %s, %s)", row)


	with open('./Proyecto_Final_Data/articulo.csv', 'r') as file:
				csv_data = csv.reader(file)
				next(csv_data)  # Ignora la primera fila (cabecera)

				for row in csv_data:
					cursor.execute("INSERT INTO articulo (idarticulo, idcategoria, codigo, nombre, precio_venta, stock, descripcion, imagen, estado) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)", row)
  
	with open('./Proyecto_Final_Data/persona.csv', 'r') as file:
			csv_data = csv.reader(file)
			next(csv_data)  # Ignora la primera fila (cabecera)

			for row in csv_data:
				cursor.execute("INSERT INTO persona (idpersona, tipo_persona, nombre, tipo_documento, num_documento, direccion, telefono, email) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)", row)

	with open('./Proyecto_Final_Data/rol.csv', 'r') as file:
			csv_data = csv.reader(file)
			next(csv_data)  # Ignora la primera fila (cabecera)

			for row in csv_data:
				cursor.execute("INSERT INTO rol (idrol, nombre, descripcion, estado) VALUES (%s, %s, %s, %s)", row)

	with open('./Proyecto_Final_Data/usuario.csv', 'r') as file:
			csv_data = csv.reader(file)
			next(csv_data)  # Ignora la primera fila (cabecera)

			for row in csv_data:
				cursor.execute("INSERT INTO usuario (idusuario, idrol, nombre,tipo_documento, num_documento, direccion, telefono, email, clave, estado) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", row)

	with open('./Proyecto_Final_Data/ingreso.csv', 'r') as file:
			csv_data = csv.reader(file)
			next(csv_data)  # Ignora la primera fila (cabecera)

			for row in csv_data:
				cursor.execute("INSERT INTO ingreso (idingreso, idproveedor, idusuario, tipo_comprobante, serie_comprobante, num_comprobante, fecha, impuesto, total, estado) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", row)

	with open('./Proyecto_Final_Data/venta.csv', 'r') as file:
			csv_data = csv.reader(file)
			next(csv_data)  # Ignora la primera fila (cabecera)

			for row in csv_data:
				cursor.execute("INSERT INTO venta (idventa, idcliente, idusuario, tipo_comprobante, serie_comprobante, num_comprobante, fecha, impuesto, total, estado) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", row)

	with open('./Proyecto_Final_Data/detalle_venta.csv', 'r') as file:
			csv_data = csv.reader(file)
			next(csv_data)  # Ignora la primera fila (cabecera)

			for row in csv_data:
				cursor.execute("INSERT INTO detalle_venta (iddetalle_venta, idventa, idarticulo, cantidad, precio, descuento) VALUES (%s, %s, %s, %s, %s, %s)", row)

	with open('./Proyecto_Final_Data/detalle_ingreso.csv', 'r') as file:
			csv_data = csv.reader(file)
			next(csv_data)  # Ignora la primera fila (cabecera)

			for row in csv_data:
				cursor.execute("INSERT INTO detalle_ingreso (iddetalle_ingreso, idingreso, idarticulo, cantidad, precio) VALUES (%s, %s, %s, %s, %s)", row)

	# Confirma los cambios en la base de datos
	conexionMysql.commit()

	# Cierra el cursor y la conexión
	
finally:
	cursor.close()
	conexionMysql.close()

### Codigo ETL

In [101]:
def cerrarConexion():
	conexionMysql.close()
	conexionPSQL.close()
	print("Conexión cerrada")


def extraerData(consulta):
	cursor = conexionMysql.cursor()

	print('Extrayendo data')
	cursor.execute(consulta)

	resultados = cursor.fetchall()

	cursor.close()

	return resultados

def insertarData(resultados, tabla, campos):
	print('Insertando data')
	cursor = conexionPSQL.cursor()

	for fila in resultados:
		campos = ", ".join(campos)

		marcadores = ", ".join(["%s"] * len(fila))

		insert_sql = f"""
		INSERT INTO {tabla} ({campos})
		VALUES ({marcadores})
		"""
		cursor.execute(insert_sql, fila)

	conexionPSQL.commit()

	cursor.close() 

def dimensionesSql ():
  print('Creando tablas dimensionales')
  ruta = 'ddl_dimensional.sql'
  with open(ruta, 'r') as archivo:
    consulta = archivo.read()

  cursor = conexionPSQL.cursor()

  cursor.execute(consulta)
  conexionPSQL.commit()

  cursor.close()


def generarDimArticulo ():
  print('Dimension Articulo')
  consulta_sql = """
    SELECT 
			idarticulo,
			categoria.nombre,
			articulo.codigo,
			articulo.nombre,
			articulo.precio_venta
    FROM articulo
    INNER JOIN categoria ON articulo.idcategoria = categoria.idcategoria
    """

  data = extraerData(consulta_sql)
  insertarData(data, 'dimArticulo', ['idarticulo', 'categoria', 'codigo', 'nombre','precio_venta'])

def generarDimPersona ():
  print('Dimension Persona')
  consulta_sql = """
    SELECT 
			idpersona,
			nombre,
			num_documento,
			email
    FROM persona
    """
  
  data = extraerData(consulta_sql)
  insertarData(data, 'dimPersona',['idpersona','nombre','num_documento','email'])

def generarDimVenta ():
  print('Dimension Venta')
  consulta_sql = """
    SELECT 
			venta.idventa,
			nombre,
			idarticulo,
			cantidad,
			precio,
			descuento,
			fecha,
			total
    FROM venta
		INNER JOIN persona ON venta.idcliente = persona.idpersona
		INNER JOIN detalle_venta on venta.idventa = detalle_venta.idventa
    """
  
  data = extraerData(consulta_sql)
  print(data)
  insertarData(data, 'dimVenta',['idventa','nombreCliente','idarticulo','cantidad', 'precio', 'descuento', 'fecha','total'])

def generarDimFechas ():
  print('Dimension Fechas')


def generarDimensiones():
	#Ejecutar archivo sql
	dimensionesSql()
  #TODO: utilizar el arcivo para generar las tablas de dimensiones
	#dimArticulo
	generarDimArticulo()

	#dimPersona
	generarDimPersona()

	#dimVenta
	generarDimVenta()

	#dimFechas
	generarDimFechas()

	#hechos
	return 0

if conexionMysql.is_connected() and conexionPSQL:
	print("Conexión exitosa a las base de datos, MySQL y PostgreSQL")

	# Realiza operaciones en la base de datos aquí
	generarDimensiones()

	# Cerrar conexion
	cerrarConexion()

Conexión exitosa a las base de datos, MySQL y PostgreSQL
Creando tablas dimensionales
Dimension Articulo
Extrayendo data
Insertando data
Dimension Persona
Extrayendo data
Insertando data
Dimension Venta
Extrayendo data
[(5, 'persona prueba', 2, 1, Decimal('10.50'), Decimal('0.01'), datetime.datetime(2024, 4, 12, 13, 6, 43), Decimal('10.50'))]
Insertando data
Dimension Fechas
Conexión cerrada
