<a href="https://colab.research.google.com/github/rogra4813/GRUPO6/blob/main/Copia_de_ETL_CP_S3_G6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Ejercicio Práctico Clase 3 - Grupo 6**

Integrantes:
- Francisco García
- Robert Granda
- Fabián Quito
- Gabriel Salazar



---



In [None]:
!pip install selenium
!pip install sqlalchemy



# Librerías

In [None]:
import requests
from bs4 import BeautifulSoup
import pandas as pd
import datetime
import os
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from IPython.display import HTML
import sqlite3
from sqlalchemy import create_engine, inspect
import locale

# Nombre del archivo CSV donde se almacenarán los datos
archivo_csv = "datos_criptomonedas.csv"
csv_transformaciones = "datos_criptos_oper.csv"

# Extracción de Datos

In [None]:
def extraer_datos():
    """
    Extrae los datos de la tabla de criptomonedas desde la web, limitados a los primeros 10 registros.
    """
    url = "https://es.investing.com/crypto"

    chrome_options = Options()
    chrome_options.add_argument("--headless")
    chrome_options.add_argument("--no-sandbox")
    chrome_options.add_argument("--disable-dev-shm-usage")

    # WebDriver
    driver = webdriver.Chrome(options=chrome_options)

    # Carga con tiempo de espera para evitar datos erróneos
    driver.set_page_load_timeout(180)
    try:
        driver.get(url)
    except TimeoutException as ex:
        print(f"Excepción de TimeOut: {ex}")
        driver.quit()  # En caso de no responder, cierra
        return None

    try:
        element = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.TAG_NAME, "table"))
        )
    except:
        print("Carga incorrecta. Tiempo máximo alcanzado!")
        driver.quit()
        return None

    html = driver.page_source

    soup = BeautifulSoup(html, "html.parser")
    tables = soup.find_all("table")

    if len(tables) > 0:
        # Identificar la tabla principal
        table = tables[0]

        # Extraer encabezados de la tabla
        header_row = table.find("thead").find_all("th")
        column_names = [header.text.strip() for header in header_row]

        # Extraer filas de datos
        rows = table.find("tbody").find_all("tr")
        data = []

        special_coin_names = {
            "Tether USDt": "tether.png",
            "USDC": "usd-coin.png",
            "Shiba Inu": "shiba-inu.png",
            "Bitcoin Cash": "bitcoin-cash.png",
            "UNUS SED LEO": "unus-sed-leo.png",
            "NEAR Protocol": "near-protocol.png",
            "Polkadot": "polkadot-new.png"
        }

        for row in rows[:10]:  # Limitar a los primeros 10 registros
            cols = row.find_all("td")

            # Obtención de nombre de criptomoneda
            coin_name_element = cols[1].find('a')  # tag seleccionado por inspección en página web
            if coin_name_element:
                coin_name = coin_name_element.text.strip()
                if coin_name in special_coin_names:
                    logo_filename = special_coin_names[coin_name]
                else:
                    logo_filename = f"{coin_name.lower()}.png"
                logo_url = f"https://cdn.investing.com/crypto-logos/20x20/v2/{logo_filename.lower()}"

                # Abrir logotipo de la moneda
                driver.execute_script(f"window.open('{logo_url}', '_blank');")

                # Agregar URL de imagen
                cols_text = [ele.text.strip() for ele in cols[1:]]
                cols_text.insert(0, logo_url)  # Primera columna (logo)
                if cols_text:
                    data.append(cols_text)
            else:
                print("Coin name not found in hypertext for this row.")

        if os.path.exists(archivo_csv):
            existing_df = pd.read_csv(archivo_csv)
            column_names = existing_df.columns.tolist()
            if 'timestamp' in column_names:
                column_names.remove('timestamp')
        else:
            column_names = ['Logo', 'Nombre', 'Símbolo', 'Precio (USD)', 'Vol. (24h)', 'Vol. total', 'Var. (24h)', 'Var. (7d)', 'Cap. mercado']

        # Crear DataFrame con nombres de columnas detectados
        crypto_table = pd.DataFrame(data, columns=column_names)

        # Agregar columna de timestamp
        crypto_table['timestamp'] = datetime.datetime.now()
        return crypto_table
    else:
        print("No se encontraron tablas en la página.")
        return None

# Detección de Cambios

In [None]:
def detectar_cambios(nuevos_datos):
    """
    Compara los nuevos datos con el archivo CSV existente para detectar cambios.
    """
    if os.path.exists(archivo_csv):
        # Leer el archivo CSV existente
        datos_existentes = pd.read_csv(archivo_csv)

        print("\nDatos actuales en el archivo CSV:")
        #display(datos_existentes.head())

        print("\nNuevos datos extraídos:")
        #display(nuevos_datos.head())

        # Comparar todas las columnas clave para identificar cambios
        columnas_clave = ['Logo', 'Nombre', 'Símbolo', 'Precio (USD)', 'Vol. (24h)', 'Vol. total', 'Var. (24h)', 'Var. (7d)', 'Cap. mercado']
        cambios = nuevos_datos[~nuevos_datos[columnas_clave].apply(tuple, axis=1).isin(
            datos_existentes[columnas_clave].apply(tuple, axis=1)
        )]

        if not cambios.empty:
            print("\nCambios detectados en los datos:")
            #print(cambios)
        else:
            print("\nNo se detectaron cambios en los datos.")
        return cambios
    else:
        print("\nNo existe un archivo previo. Todos los datos se considerarán nuevos.")
        print("\nNuevos datos extraídos:")
        #print(nuevos_datos.head())
        return nuevos_datos

# Guardado de Datos

In [None]:
def guardar_datos(nuevos_datos, conn):
    cursor = conn.cursor()
    datos_actualizados = nuevos_datos
    # Escribir los datos actualizados en el archivo CSV sin el índice
    datos_actualizados.to_csv(archivo_csv, index=False, encoding='utf-8-sig')
    print(f"\nDatos actualizados guardados en {archivo_csv}.")

    #Nuevo csv para transformaciones
    datos = pd.read_csv(archivo_csv)
    datos['Precio (USD)'] = datos['Precio (USD)'].str.replace('.', '', regex=False).str.replace(',', '.', regex=False).astype(float)
    datos.to_csv(csv_transformaciones, index=False, encoding='utf-8-sig')
    print(f"Datos con 'Precio (USD)' transformado guardados en {csv_transformaciones}.")

     # Agregar nuevos datos a la base SQL
    for index, row in datos.iterrows():
        nombre = row['Nombre']
        precio = row['Precio (USD)']
        fecha_actualizacion = row['timestamp']

        cursor.execute("""
            INSERT INTO datos_criptos (Nombre, Precio, Fecha_Actualizacion)
            VALUES (?, ?, ?)
        """, (nombre, precio, fecha_actualizacion))

    conn.commit()

    print("Datos guardados en la base de datos.")

# Base de Datos SQL

In [None]:
def initialize_database(db_file="datos_criptos.db"):
    """Initializes the SQLite database."""
    conn = sqlite3.connect(db_file)
    cursor = conn.cursor()

    cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='datos_criptos'")
    table_exists = cursor.fetchone() is not None

    if not table_exists:
        cursor.execute("""
            CREATE TABLE datos_criptos (
                Nombre TEXT,
                Precio REAL,
                Fecha_Actualizacion TIMESTAMP
            )
        """)
        conn.commit()

    return conn

# Obtención Estadísticas Recientes

In [None]:
def estadisticas_recientes(conn, timeframe_minutes=10, delete_outdated=False):
    """
    Calculates average, maximum, and minimum prices for each coin in the last 10 minutes
    from the database, and optionally deletes outdated records.
    """
    try:
        cursor = conn.cursor()
        timeframe_ago = datetime.datetime.now() - datetime.timedelta(minutes=timeframe_minutes)

        # Obtención de los nombres de las monedas
        cursor.execute("SELECT DISTINCT Nombre FROM datos_criptos")
        coin_names = [row[0] for row in cursor.fetchall()]

        if delete_outdated:
            cursor.execute(
                "DELETE FROM datos_criptos WHERE Fecha_Actualizacion < ?", (timeframe_ago,)
            )
            conn.commit()
            print(f"\nDatos obsoletos (anteriores a {timeframe_minutes} minutos) eliminados.")
            print("")

        for coin_name in coin_names:
            cursor.execute(
                """
                SELECT AVG(Precio), MAX(Precio), MIN(Precio)
                FROM datos_criptos
                WHERE Nombre = ? AND Fecha_Actualizacion >= ?
                """,
                (coin_name, timeframe_ago),
            )
            result = cursor.fetchone()

            if result:
                avg_price, max_price, min_price = result
            else:
                print(f"No se encontraron datos para {coin_name} en los últimos {timeframe_minutes} minutes.")


    except sqlite3.Error as e:
        print(f"Error accessing the database: {e}")

# DataFrame con Estadísticas Para Visualización

In [None]:
def estadisticas_df(conn, timeframe_minutes=60):
    estadisticas_recientes(conn, timeframe_minutes, delete_outdated=False)

    original_data = pd.read_csv("datos_criptos_oper.csv")
    stats_data = []
    monedas = original_data['Nombre'].tolist()

    cursor = conn.cursor()
    tiempo_pasado = datetime.datetime.now() - datetime.timedelta(minutes=timeframe_minutes)
    ultimo_dato = tiempo_pasado - datetime.timedelta(minutes=timeframe_minutes)

    # Para tiempo especificado
    for moneda in monedas:
        cursor.execute("""
            SELECT AVG(Precio), MAX(Precio), MIN(Precio)
            FROM datos_criptos
            WHERE Nombre = ? AND Fecha_Actualizacion >= ?
        """, (moneda, tiempo_pasado))
        result = cursor.fetchone()

        if result:
            avg_price, max_price, min_price = result

            # Selección de moneda "actual"
            coin_row = original_data[original_data['Nombre'] == moneda].iloc[0]

            # Precio actual
            actual_price = float(coin_row['Precio (USD)'])

            # Obtención precio anterior
            cursor.execute("""
                SELECT AVG(Precio)
                FROM datos_criptos
                WHERE Nombre = ? AND Fecha_Actualizacion BETWEEN ? AND ?
            """, (moneda, ultimo_dato, tiempo_pasado))
            previous_avg_result = cursor.fetchone()

            if previous_avg_result and previous_avg_result[0] is not None:
                previous_avg_price = previous_avg_result[0]

                # Cambio de precio
                if actual_price > previous_avg_price:
                    price_change = "B"
                elif actual_price < previous_avg_price:
                    price_change = "S"
                else:
                    price_change = "Sin Cambios"
            else:
                price_change = "N/A"  # Primer dato

            # Agregar datos a una lista para creación de df
            stats_data.append([coin_row['Logo'], coin_row['Nombre'], coin_row['Símbolo'], coin_row['Precio (USD)'],
                              avg_price, max_price, min_price, price_change])

    stats_df = pd.DataFrame(stats_data,
                            columns=['Logo', 'Nombre', 'Símbolo', 'Actual Pric.', 'AVG Price', 'Higest 1H', 'Lower 1H', 'Signal(B|S)'])

    return stats_df

# Tabla HTML para visualizacion en Colab

In [None]:
def create_html_table(df):
    html = "<table>"
    # Cabecera
    html += "<tr>"
    for col in df.columns:
        html += f"<th>{col}</th>"
    html += "</tr>"

    # Filas
    for index, row in df.iterrows():
        html += "<tr>"
        for col in df.columns:
            if col == "Logo":
                html += f'<td><img src="{row[col]}" width="25"></td>'
            else:
                html += f"<td>{row[col]}</td>"
        html += "</tr>"

    html += "</table>"
    return html

# Conversión a Formato Original

In [None]:
def formato(price):
    # Obtener decimales
    decimales = len(str(price).split('.')[-1]) if '.' in str(price) else 0

    # Poner formato original de la página de extracción
    precio_formato = "{:,.{decimales}f}".format(price, decimales=decimales)
    precio_formato = precio_formato.replace(",", "X").replace(".", ",").replace("X", ".")

    return precio_formato

# Función Principal

In [None]:
def main():
    """
    Proceso principal de extracción, comparación y guardado de datos.
    """
    conn = initialize_database()

    nuevos_datos = extraer_datos()
    if nuevos_datos is not None:
        cambios = detectar_cambios(nuevos_datos)
        if cambios is not None and not cambios.empty:
            print("\nGuardando cambios detectados...")
            guardar_datos(nuevos_datos, conn)
        elif not os.path.exists(archivo_csv):
            print("\nNo existe archivo previo. Guardando los datos iniciales...")
            guardar_datos(nuevos_datos, conn)
        else:
            print("\nNo se detectaron cambios, y el archivo ya está actualizado.")
            guardar_datos(nuevos_datos, conn)
    else:
        print("\nNo se pudieron extraer datos.")

    print("")
    print("-------------------------------------------")
    print("")

    stats_df = estadisticas_df(conn, timeframe_minutes=10)
    stats_df.to_csv("tabla_final.csv")
    stats_df['Actual Pric.'] = stats_df['Actual Pric.'].apply(formato)
    stats_df['AVG Price'] = stats_df['AVG Price'].apply(formato)
    stats_df['Higest 1H'] = stats_df['Higest 1H'].apply(formato)
    stats_df['Lower 1H'] = stats_df['Lower 1H'].apply(formato)

    try:
        df = stats_df
        html_table = create_html_table(df)
        display(HTML(html_table))
    except FileNotFoundError:
        print("El archivo CSV no se encontró.")
        return

In [None]:
if __name__ == "__main__":
    main()


Datos actuales en el archivo CSV:

Nuevos datos extraídos:

Cambios detectados en los datos:

Guardando cambios detectados...

Datos actualizados guardados en datos_criptomonedas.csv.
Datos con 'Precio (USD)' transformado guardados en datos_criptos_oper.csv.
Datos guardados en la base de datos.

-------------------------------------------



Logo,Nombre,Símbolo,Actual Pric.,AVG Price,Higest 1H,Lower 1H,Signal(B|S)
,Bitcoin,BTC,"94.696,4","94.586,26666666666","94.696,4","94.531,2",B
,Ethereum,ETH,"3.060,05","3.068,0433333333335","3.072,04","3.060,05",S
,Tether USDt,USDT,10008,10007333333333333,10008,10007,B
,Solana,SOL,233902,23482866666666666,235292,233902,S
,BNB,BNB,60163,6018433333333334,60195,60163,S
,XRP,XRP,10974,11059666666666668,11124,10974,S
,Dogecoin,DOGE,0377105,037815100000000007,0378674,0377105,S
,USDC,USDC,09991,09990333333333333,09991,0999,B
,Cardano,ADA,07775,07822333333333332,07846,07775,S
,TRON,TRX,0195105,0195505,0195705,0195105,S
