## Instalar las librerias necesarias

In [0]:
%pip install requests

## Reiniciar el kernel
El kernel es el motor de ejecución, dada la instalación de una nueva librería, es necesario reiniciarlo

In [0]:
dbutils.library.restartPython()

## Importar las librerías necesarias

In [0]:
import requests
import time
import json
import os

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType
import pandas as pd

## 1. Configurar CoinGecko API
La API de CoinGecko no requiere autenticación, lo que la hace fácil de usar.

Usaremos el siguiente endpoint para obtener los precios actuales de criptomonedas:
Endpoint: https://api.coingecko.com/api/v3/simple/price.

## 2. Consumir datos desde CoinGecko
Creamos una función para obtener precios actuales(USD) de criptomonedas como Bitcoin, Ethereum, y más.

In [0]:
# Función para obtener datos de CoinGecko API
def fetch_crypto_prices(crypto_ids, vs_currency="usd"):
    url = "https://api.coingecko.com/api/v3/simple/price"
    params = {
        "ids": ",".join(crypto_ids),  # Lista de criptomonedas separada por comas
        "vs_currencies": vs_currency
    }
    response = requests.get(url, params=params)
    
    # Manejar errores
    if response.status_code != 200:
        raise Exception(f"Error en la API: {response.status_code}, {response.text}")
    
    return response.json()

# Probar la función
crypto_ids = ["bitcoin", "ethereum", "dogecoin"]
prices = fetch_crypto_prices(crypto_ids)
print(prices)

## 3. Transformar los datos en un DataFrame
Convertimos los datos JSON en un PySpark DataFrame para procesamiento adicional.

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType
import pandas as pd

# Crear un DataFrame con los datos de precios
def create_dataframe(prices):
    # Transformar datos JSON a una lista de filas
    rows = [
        {"crypto": key, "price_usd": value["usd"]}
        for key, value in prices.items()
    ]
    
    # Crear un DataFrame de pandas
    pandas_df = pd.DataFrame(rows)
    
    # Convertir a PySpark DataFrame
    return spark.createDataFrame(pandas_df)

# Crear el DataFrame en Spark
spark_df = create_dataframe(prices)
spark_df.show()


## 4. Guardar datos en Databricks
Los datos pueden ser guardados en formatos como Delta Lake o CSV para análisis posterior.

In [0]:
# Guardar en formato Delta
output_path = "/tmp/crypto_prices_delta"
spark_df.write.format("delta").mode("overwrite").save(output_path)

# Confirmar guardado
print(f"Datos guardados en: {output_path}")

## 5. Visualizar los datos
Usa Databricks SQL y gráficos para analizar los datos de manera visual.

In [0]:
%sql
CREATE TABLE crypto_prices
USING DELTA
LOCATION '/tmp/crypto_prices_delta';

In [0]:
%sql
SELECT * FROM crypto_prices;

## 6. Agregar datos en tiempo real
Simularemos un flujo de data en tiempo real

In [0]:
import time

# Simular datos en tiempo real
for _ in range(5):  # 5 iteraciones
    prices = fetch_crypto_prices(crypto_ids)
    spark_df = create_dataframe(prices)
    spark_df.show()

    # Guardar en formato Delta
    spark_df.write.format("delta").mode("append").save(output_path)
    
    time.sleep(300)  # Esperar 5 minutos entre iteraciones

## Stream de la primera función
Obtendra el precio de la criptomoneda en la moneda solicitada cada 5 segundos

In [0]:
def stream_crypto_prices():
    while True:
        prices = fetch_crypto_prices()
        timestamp = int(time.time())
        for crypto, data in prices.items():
            yield {
                "timestamp": timestamp,
                "crypto": crypto,
                "price_usd": data["usd"]
            }
        time.sleep(5)  # Esperar 5 segundos entre solicitudes

In [0]:
output_dir = "/FileStore/test_lab_1"
os.makedirs(output_dir, exist_ok=True)

for record in stream_crypto_prices():
    with open(f"{output_dir}{record['timestamp']}.json", "w") as f:
        f.write(json.dumps(record))

In [0]:
from pyspark.sql.types import StructType, StringType, DoubleType, LongType

schema = StructType() \
    .add("timestamp", LongType()) \
    .add("crypto", StringType()) \
    .add("price_usd", DoubleType())

crypto_df = (
    spark.readStream
    .schema(schema)
    .json("/FileStore/test_lab_1")
)

In [0]:
dbutils.fs.ls("/dbfs/tmp/")