# INGESTION DE DATOS TRANSACCIONALES

In [1]:
## CONFIGURACION DE AWS RDS

In [2]:
# Librerias necesarias
import boto3
import pandas as pd
import numpy as np
import psycopg2
import configparser
from sqlalchemy import create_engine
import warnings
warnings.filterwarnings('ignore')


In [3]:
# Documento privado de claves
config = configparser.ConfigParser()
config.read('config.cfg')

['config.cfg']

In [4]:
# Identificacion con AWS
aws_rds_conn = boto3.client('rds', aws_access_key_id=config.get('IAM','ACCESS_KEY'),
                             aws_secret_access_key=config.get('IAM','SECRET_ACCESS_KEY'),
                             region_name='us-east-1')

In [5]:
#Verificacion de instancias creadas anteriormente.

rds_instances_ids = []
aws_response = aws_rds_conn.describe_db_instances() 

for response in aws_response['DBInstances']:
    rds_instances_ids.append(response['DBInstanceIdentifier'])

print(f"Instancias Disponibles : {rds_instances_ids}")

Instancias Disponibles : ['banco-transactional']


In [6]:
#creacion de la base de datos

try:
    response = aws_rds_conn.create_db_instance(
        DBInstanceIdentifier = config.get('TRANSACC','DB_INSTANCE_ID'),
        DBName = config.get('TRANSACC','DB_NAME'),
        DBInstanceClass = 'db.t3.micro',
        Engine = 'postgres',
        MasterUsername = config.get('TRANSACC','DB_USER'),
        MasterUserPassword = config.get('TRANSACC','DB_PASSWORD'),
        Port = int(config.get('TRANSACC','DB_PORT')),
        PubliclyAccessible = True,
        VpcSecurityGroupIds = [config.get('VPC','SECURITY_GROUP')],
        AllocatedStorage = 15
               )
    print(response)
except aws_rds_conn.exceptions.DBInstanceAlreadyExistsFault as ex:
    print('La instancia ya existe')
except Exeption as ex:
    print('ERROR', ex)

La instancia ya existe


### Obtencion del hostname de las instancias

In [7]:
# codigo para obtener el hostname

try: 
    instance = aws_rds_conn.describe_db_instances(DBInstanceIdentifier = config.get('TRANSACC','DB_INSTANCE_ID'))
    RDS_HOSTNAME = instance.get('DBInstances')[0].get('Endpoint').get('Address')
    print(RDS_HOSTNAME)
except Exception as ex:
    print('Error', ex)

banco-transactional.c1m44mee4c6c.us-east-1.rds.amazonaws.com


### Conexion de base de datos desde python

In [8]:
import sql_queries

In [9]:
sql_queries.DDL_QUERY

'\nCREATE TABLE IF NOT EXISTS categoria (\n    idcategoria SERIAL PRIMARY KEY,\n    nombre VARCHAR(50) NOT NULL,\n    descripcion VARCHAR(255),\n    estado BOOLEAN NOT NULL\n);\n\nCREATE TABLE IF NOT EXISTS rol (\n    idrol SERIAL PRIMARY KEY,\n    nombre VARCHAR(30) NOT NULL,\n    descripcion VARCHAR(255),\n    estado BOOLEAN NOT NULL\n);\n\nCREATE TABLE IF NOT EXISTS usuario (\n    idusuario SERIAL PRIMARY KEY,\n    idrol INTEGER NOT NULL,\n    nombre VARCHAR(100) NOT NULL,\n    tipo_documento VARCHAR(20),\n    num_documento VARCHAR(20),\n    direccion VARCHAR(70),\n    telefono VARCHAR(20),\n    email VARCHAR(50) NOT NULL,\n    clave VARCHAR(50) NOT NULL,\n    estado BOOLEAN NOT NULL,\n    CONSTRAINT fk_usuario_rol FOREIGN KEY (idrol) REFERENCES rol (idrol)\n);\n\nCREATE TABLE IF NOT EXISTS persona (\n    idpersona SERIAL PRIMARY KEY,\n    tipo_persona VARCHAR(20) NOT NULL,\n    nombre VARCHAR(100) NOT NULL,\n    tipo_documento VARCHAR(20),\n    num_documento VARCHAR(20),\n    direc

In [10]:
#creacion de database de datos
try:
    db_pg_conn = psycopg2.connect(
                    dbname = config.get('TRANSACC','DB_NAME'),
                    user = config.get('TRANSACC','DB_USER'),
                    password = config.get('TRANSACC','DB_PASSWORD'),
                    host = RDS_HOSTNAME,
                    port = config.get('TRANSACC','DB_PORT')
    )
    cursor = db_pg_conn.cursor()
    cursor.execute(sql_queries.DDL_QUERY)
    db_pg_conn.commit()
    print('Base de datos creada exitosamente')

except Exception as ex:
    print('ERROR', ex)

Base de datos creada exitosamente


### Insertar table en base de datos

In [11]:
data_tipo_transacciones = [
     {'id_tipo_transac': 85095, 'tipo_transaccion': 'Depósito'}, 
     {'id_tipo_transac': 85098, 'tipo_transaccion': 'Retiro'},
     {'id_tipo_transac': 85194, 'tipo_transaccion': 'Transferencia'},
     {'id_tipo_transac': 85133, 'tipo_transaccion': 'Pago Prestamo'}
]

In [12]:
def insertData2SQL(data_dict, table_name, driver):
    df_data = pd.DataFrame.from_records(data_dict)
    try:
        response = df_data.to_sql(table_name, driver, index=False, if_exists='append')
        print(f"Se han insertado {response} nuevos registros")
    except Exception as ex:
        print(ex)

In [13]:
driver = f"""postgresql://{config.get('TRANSACC', 'DB_USER')}:{config.get('TRANSACC', 'DB_PASSWORD')}@{RDS_HOSTNAME}:{config.get('TRANSACC', 'DB_PORT')}/{config.get('TRANSACC', 'DB_NAME')}"""
driver

engine = create_engine(driver)

In [14]:
#insertamos data en tabla tipo_transaccione 
insertData2SQL(data_tipo_transacciones, 'tipo_transacciones', driver)

Using URI string without sqlalchemy installed.
