In [None]:
import configparser
import psycopg2
import socket
import sys
import logging
import traceback
import pandas as pd

In [None]:

config = configparser.ConfigParser()
config.read('../config.ini')
db_endpoint_RDS = config['database']['host']
db_name_RDS = config['database']['database_name']
db_user_RDS = config['database']['username']
db_password_RDS = config['database']['password']
db_port_RDS = int(config['database']['port'])

db_endpoint_RSH = config['redshift']['host']
db_name_RSH = config['redshift']['database_name']
db_user_RSH = config['redshift']['username']
db_password_RSH = config['redshift']['password']
db_port_RSH = int(config['redshift']['port'])

def connect_to_RDS():
    conn_rds = psycopg2.connect(
            host=db_endpoint_RDS,
            port = db_port_RDS,
            database = db_name_RDS,
            user=db_user_RDS,
            password=db_password_RDS
        )
    return conn_rds

def connection_to_RedShift():

    conn_rsh = psycopg2.connect(
            host=db_endpoint_RSH,
            port = db_port_RSH,
            database = db_name_RSH,
            user=db_user_RSH,
            password=db_password_RSH
        )
    
    return conn_rsh


try:    

    conn_rds = connect_to_RDS()
    cur_rds = conn_rds.cursor()
    conn_rsh = connection_to_RedShift()
    cur_rsh = conn_rsh.cursor()


except (socket.timeout, psycopg2.OperationalError) as e:
    if isinstance(e, socket.timeout):
        print("Error: Connection timed out.")
    else:
        print("Error during connection:", e)
    sys.exit(1)  # Terminate the program with a non-zero exit code


try:
    # Obtener los símbolos únicos de tbTradingHistoric
    cur_rds.execute("SELECT DISTINCT \"Symbol\" FROM tbTradingHistoric;")
    symbols_tbTradingHistoric = cur_rds.fetchall()
    print("Symbols fetched from tbTradingHistoric successfully!")

    # Obtener los símbolos únicos de tbDimSymbol
    cur_rsh.execute("SELECT DISTINCT \"Symbol\" FROM tbDimSymbol;")
    symbols_tbdimSymbols = cur_rsh.fetchall()
    print("Symbols fetched from tbDimSymbol successfully!")

    # Lista de comprensión para eliminar los símbolos existentes de symbols_tbTradingHistoric
    symbols_tbTradingHistoric = [symbol for symbol in symbols_tbTradingHistoric if symbol not in symbols_tbdimSymbols]

    # Consulta de inserción
    insert_query = "INSERT INTO tbDimSymbol (Symbol) VALUES (%s)"

    with conn_rsh.cursor() as cur_rsh:
        for symbol in symbols_tbTradingHistoric:
            try:
                cur_rsh.execute(insert_query, symbol)
                print(f"Inserted symbol '{symbol[0]}' into tbDimSymbol successfully!")
            except psycopg2.Error as e:
                print(f"Error occurred during insertion for symbol '{symbol[0]}':", e)
                conn_rsh.rollback()  # Rollback the transaction in case of an error

    conn_rsh.commit()  # Commit all the successful insertions

except (socket.timeout, psycopg2.OperationalError) as e:
    if isinstance(e, socket.timeout):
        print("Error: Connection timed out.")
    else:
        print("Error during connection:", e)
    sys.exit(1)  # Terminate the program with a non-zero exit code
except psycopg2.Error as e:
    print("Error occurred during SQL query:", e)
    conn_rsh.rollback()  # Rollback the transaction in case of an error


conn_rds.close()
conn_rsh.close()

In [50]:

for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

# loggin config
logging.basicConfig(filename='./logETLs/scripts.log', level=logging.ERROR,
                    format='%(asctime)s - %(levelname)s - %(message)s')

console = logging.StreamHandler()


config = configparser.ConfigParser()
config.read('../config.ini')
db_endpoint_RDS = config['database']['host']
db_name_RDS = config['database']['database_name']
db_user_RDS = config['database']['username']
db_password_RDS = config['database']['password']
db_port_RDS = int(config['database']['port'])

db_endpoint_RSH = config['redshift']['host']
db_name_RSH = config['redshift']['database_name']
db_user_RSH = config['redshift']['username']
db_password_RSH = config['redshift']['password']
db_port_RSH = int(config['redshift']['port'])

def connect_to_RDS():
    conn_rds = psycopg2.connect(
            host=db_endpoint_RDS,
            port = db_port_RDS,
            database = db_name_RDS,
            user=db_user_RDS,
            password=db_password_RDS
        )
    return conn_rds

def connection_to_RedShift():

    conn_rsh = psycopg2.connect(
            host=db_endpoint_RSH,
            port = db_port_RSH,
            database = db_name_RSH,
            user=db_user_RSH,
            password=db_password_RSH
        )
    
    return conn_rsh


try:    

    conn_rds = connect_to_RDS()
    cur_rds = conn_rds.cursor()
    conn_rsh = connection_to_RedShift()
    cur_rsh = conn_rsh.cursor()


except (socket.timeout, psycopg2.OperationalError) as e:
    if isinstance(e, socket.timeout):
        print("Error: Connection timed out.")
    else:
        print("Error during connection:", e)
        logging.error(traceback.format_exc())
    sys.exit(1)  # Terminate the program with a non-zero exit code


def get_unique_symbols(cur_rds, cur_rsh):
    # Obtener los símbolos únicos de tbTradingHistoric
    with cur_rds:
        cur_rds.execute("SELECT * FROM tbTradingHistoric;")
        columns = [desc[0] for desc in cur_rds.description]
        data = cur_rds.fetchall()
        df_trading_historic = pd.DataFrame(data, columns=columns)
    
    
    # Obtener los símbolos únicos de tbDimSymbol
    with cur_rsh:
        cur_rsh.execute("SELECT * FROM tbDimSymbol;")
        columns = [desc[0] for desc in cur_rsh.description]
        data = cur_rsh.fetchall()
        df_DimSymbols = pd.DataFrame(data, columns=columns)
    print("Symbols fetched from tbDimSymbol successfully!")

    # Cruza la columna 'symbols' del DataFrame df_trading_historic con la columna 'Symbol' de df_DimSymbols
    merged_df = df_trading_historic.merge(df_DimSymbols, left_on='symbols', right_on='Symbol', how='left')

    # Reemplaza la columna 'symbols' por la columna 'idsymbols' del DataFrame df_DimSymbols
    merged_df['symbols'] = merged_df['idsymbols']

    # Elimina la columna 'Symbol' que fue utilizada solo para el cruce (opcional)
    merged_df.drop(columns='Symbol', inplace=True)

    # Retornar el DataFrame df_trading_historic con los ids seleccionados
    df_trading_historic_with_ids = merged_df

    return df_trading_historic_with_ids


In [51]:
  # Funcion para traer lista con symbol unica
combined_symbols = get_unique_symbols(cur_rds, cur_rsh)

    # Convertir el DataFrame en una lista de tuplas
data_to_insert = combined_symbols.to_records(index=False)


Symbols fetched from tbDimSymbol successfully!


KeyError: 'Symbol'

In [53]:

conn_rds = connect_to_RDS()
cur_rds = conn_rds.cursor()
conn_rsh = connection_to_RedShift()
cur_rsh = conn_rsh.cursor()
with cur_rds:
    cur_rds.execute("SELECT * FROM tbTradingHistoric;")
    columns = [desc[0] for desc in cur_rds.description]
    data = cur_rds.fetchall()
    df_trading_historic = pd.DataFrame(data, columns=columns)
    
    
    # Obtener los símbolos únicos de tbDimSymbol
with cur_rsh:
    cur_rsh.execute("SELECT * FROM tbDimSymbol;")
    columns = [desc[0] for desc in cur_rsh.description]
    data = cur_rsh.fetchall()
    df_DimSymbols = pd.DataFrame(data, columns=columns)
print("Symbols fetched from tbDimSymbol successfully!")

# Cruza la columna 'symbols' del DataFrame df_trading_historic con la columna 'Symbol' de df_DimSymbols
merged_df = df_trading_historic.merge(df_DimSymbols, left_on='Symbol', right_on='symbol', how='left')

# Reemplaza la columna 'symbols' por la columna 'idsymbols' del DataFrame df_DimSymbols
merged_df['Symbol'] = merged_df['idsymbol']

# Elimina la columna 'Symbol' que fue utilizada solo para el cruce (opcional)
merged_df.drop(columns='symbol', inplace=True)

# Retornar el DataFrame df_trading_historic con los ids seleccionados
df_trading_historic_with_ids = merged_df

df_trading_historic_with_ids



Symbols fetched from tbDimSymbol successfully!


KeyError: 'idsymbols'

In [None]:
# Cruza la columna 'symbols' del DataFrame df_trading_historic con la columna 'Symbol' de df_DimSymbols
merged_df = df_trading_historic.merge(df_DimSymbols, left_on='Symbol', right_on='symbol', how='left')

# Reemplaza la columna 'symbols' por la columna 'idsymbols' del DataFrame df_DimSymbols
merged_df['Symbol'] = merged_df['idsymbol']

# Elimina la columna 'Symbol' que fue utilizada solo para el cruce (opcional)
merged_df.drop(columns='Symbol', inplace=True)
merged_df.drop(columns='symbol', inplace=True)
# Retornar el DataFrame df_trading_historic con los ids seleccionados
df_trading_historic_with_ids = merged_df
df_trading_historic_with_ids


In [71]:
import configparser
import psycopg2
import socket
import sys
import traceback
import logging
import pandas as pd



for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

# loggin config
logging.basicConfig(filename='./logETLs/scripts.log', level=logging.ERROR,
                    format='%(asctime)s - %(levelname)s - %(message)s')

console = logging.StreamHandler()


config = configparser.ConfigParser()
config.read('../config.ini')
db_endpoint_RDS = config['database']['host']
db_name_RDS = config['database']['database_name']
db_user_RDS = config['database']['username']
db_password_RDS = config['database']['password']
db_port_RDS = int(config['database']['port'])

db_endpoint_RSH = config['redshift']['host']
db_name_RSH = config['redshift']['database_name']
db_user_RSH = config['redshift']['username']
db_password_RSH = config['redshift']['password']
db_port_RSH = int(config['redshift']['port'])

def connect_to_RDS():
    conn_rds = psycopg2.connect(
            host=db_endpoint_RDS,
            port = db_port_RDS,
            database = db_name_RDS,
            user=db_user_RDS,
            password=db_password_RDS
        )
    return conn_rds

def connection_to_RedShift():

    conn_rsh = psycopg2.connect(
            host=db_endpoint_RSH,
            port = db_port_RSH,
            database = db_name_RSH,
            user=db_user_RSH,
            password=db_password_RSH
        )
    
    return conn_rsh


try:    

    conn_rds = connect_to_RDS()
    cur_rds = conn_rds.cursor()
    conn_rsh = connection_to_RedShift()
    cur_rsh = conn_rsh.cursor()


except (socket.timeout, psycopg2.OperationalError) as e:
    if isinstance(e, socket.timeout):
        print("Error: Connection timed out.")
    else:
        print("Error during connection:", e)
        logging.error(traceback.format_exc())
    sys.exit(1)  # Terminate the program with a non-zero exit code


def get_unique_symbols(cur_rds, cur_rsh):
    # Obtener los símbolos únicos de tbTradingHistoric
    with cur_rds:
        cur_rds.execute("SELECT * FROM tbTradingHistoric;")
        columns = [desc[0] for desc in cur_rds.description]
        data = cur_rds.fetchall()
        df_trading_historic = pd.DataFrame(data, columns=columns)
    
    
    # Obtener los símbolos únicos de tbDimSymbol
    with cur_rsh:
        cur_rsh.execute("SELECT * FROM tbDimSymbol;")
        columns = [desc[0] for desc in cur_rsh.description]
        data = cur_rsh.fetchall()
        df_DimSymbols = pd.DataFrame(data, columns=columns)
    print("Symbols fetched from tbDimSymbol successfully!")

    # Cruza la columna 'symbols' del DataFrame df_trading_historic con la columna 'Symbol' de df_DimSymbols
    merged_df = df_trading_historic.merge(df_DimSymbols, left_on='Symbol', right_on='symbol', how='left')

    # Reemplaza la columna 'symbols' por la columna 'idsymbols' del DataFrame df_DimSymbols
    merged_df['Symbol'] = merged_df['idsymbol']

    # Elimina la columna 'Symbol' que fue utilizada solo para el cruce (opcional)
    merged_df.drop(columns='symbol', inplace=True)
    merged_df.drop(columns='Symbol', inplace=True)

    # Retornar el DataFrame df_trading_historic con los ids seleccionados
    df_trading_historic_with_ids = merged_df

    return df_trading_historic_with_ids


try:

    # Funcion para traer lista con symbol unica
    combined_symbols = get_unique_symbols(cur_rds, cur_rsh)

    # Convertir el DataFrame en una lista de tuplas
    data_to_insert = combined_symbols.to_records(index=False)
    
    data_to_insert = [(str(row[0]), row[1], row[2], row[3], row[4], row[5], int(row[6]), int(row[7])) for row in data_to_insert]

    # Consulta de inserción
    insert_query = "INSERT INTO tbTradingHistoric (\"Date\", \"Open\", \"High\", \"Low\", \"Close\", \"Adj_Close\", \"Volume\", \"idSymbol\") VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"

    combined_symbols.to_sql("tbtradinghistoric",conn_rsh,index=False, if_exists="replace")

    '''# Ejecutar la inserción
    with conn_rsh.cursor() as cur_rsh:
        cur_rsh.executemany(insert_query, data_to_insert)

    # Confirmar la transacción
    conn_rsh.commit()'''

except (socket.timeout, psycopg2.OperationalError) as e:
    if isinstance(e, socket.timeout):
        print("Error: Connection timed out.")
    else:
        print("Error during connection:", e)
        logging.error(traceback.format_exc())
    sys.exit(1)  # Terminate the program with a non-zero exit code
except psycopg2.Error as e:
    print("Error occurred during SQL query:", e)
    logging.error(traceback.format_exc())
    conn_rsh.rollback()  # Rollback the transaction in case of an error


conn_rds.close()
conn_rsh.close()

Symbols fetched from tbDimSymbol successfully!


KeyboardInterrupt: 

In [69]:
data_to_insert

rec.array([(datetime.date(2023, 1, 3), 151.96000671, 153.13000488, 148.47000122, 150.03999329, 149.51045227, 1414300, 3352),
           (datetime.date(2023, 1, 4), 151.6499939 , 153.03999329, 150.24000549, 151.66999817, 151.13470459, 1247400, 3352),
           (datetime.date(2023, 1, 5), 150.        , 153.07000732, 148.77000427, 152.11000061, 151.57316589, 1714600, 3352),
           ...,
           (datetime.date(2023, 7, 26),   9.43999958,   9.76000023,   9.43999958,   9.56999969,   9.56999969,  183200, 4330),
           (datetime.date(2023, 7, 27),   9.75      ,  10.19999981,   9.51000023,   9.75      ,   9.75      ,  285100, 4330),
           (datetime.date(2023, 7, 28),  10.11999989,  10.44999981,   9.88000011,   9.89000034,   9.89000034,  328000, 4330)],
          dtype=[('Date', 'O'), ('Open', '<f8'), ('High', '<f8'), ('Low', '<f8'), ('Close', '<f8'), ('Adj_Close', '<f8'), ('Volume', '<i8'), ('idsymbol', '<i8')])

In [54]:
merged_df

Unnamed: 0,Date,Open,High,Low,Close,Adj_Close,Volume,Symbol,idsymbol,symbol
0,2023-01-03,151.960007,153.130005,148.470001,150.039993,149.510452,1414300,A,3352,A
1,2023-01-04,151.649994,153.039993,150.240005,151.669998,151.134705,1247400,A,3352,A
2,2023-01-05,150.000000,153.070007,148.770004,152.110001,151.573166,1714600,A,3352,A
3,2023-01-06,154.360001,154.639999,143.009995,147.669998,147.148834,2445000,A,3352,A
4,2023-01-09,149.690002,151.279999,147.199997,147.470001,146.949524,1269600,A,3352,A
...,...,...,...,...,...,...,...,...,...,...
943130,2023-07-24,9.180000,9.510000,9.180000,9.250000,9.250000,154000,ZYXI,4330,ZYXI
943131,2023-07-25,9.190000,9.500000,9.190000,9.440000,9.440000,153100,ZYXI,4330,ZYXI
943132,2023-07-26,9.440000,9.760000,9.440000,9.570000,9.570000,183200,ZYXI,4330,ZYXI
943133,2023-07-27,9.750000,10.200000,9.510000,9.750000,9.750000,285100,ZYXI,4330,ZYXI
