# **Trabajo Final**

# Trabajo Pratico 1 *Extraccion de datos*

En los siguientes codigo se va a extraer de dos APIs diferentes. Una de las apis se va a generar un codigo para una extraccion FULL y la otra api se va a generar un codigo para una extraccion incremental

Importo las libreria que se van a utilizar durante todo el codigo

In [20]:
import requests
import pandas as pd
import os
from datetime import datetime, timedelta

En el siguiente codigo se encuentra 2 funciones que se encargan de:


*   get_data:
        Realizar una solicitud GET a una API para obtener datos. y retornar el JSON de "data" o un codigo de error si eso sucede
*   build_table
        Construir un DataFrame de pandas a partir de datos en formato JSON. y retornar el mismo

In [70]:
def get_data(base_url, endpoint, params=None):
    """
    Realiza una solicitud GET a una API para obtener datos.

    :param base_url: La URL base de la API.
    :param endpoint: El endpoint (ruta) de la API para obtener datos específicos.
    :param params: Los parámetros de la solicitud GET.
    :return: Los datos obtenidos de la API en formato JSON.
    """
    try:
        endpoint_url = f"{base_url}/{endpoint}"
        response = requests.get(endpoint_url, params=params)
        response.raise_for_status()  # Levanta una excepción si hay un error en la respuesta HTTP.

        # Verificar si los datos están en formato JSON.
        try:
            data = response.json()
            data = data["data"]
        except:
            print("El formato de respuesta no es el esperado")
            return None
        return data

    except requests.exceptions.RequestException as e:
        # Capturar cualquier error de solicitud, como errores HTTP.
        print(f"La petición ha fallado. Código de error : {e}")
        return None

def build_table(json_data):
    """
    Construye un DataFrame de pandas a partir de datos en formato JSON.

    :param json_data: Los datos en formato JSON obtenidos de la API.
    :return: Un DataFrame de pandas con los datos estructurados.
    """
    df = pd.json_normalize(json_data)
    return df

def save_to_parquet(df, save_path, partition_col=None):
    """
    Guarda un DataFrame en formato Parquet en la ubicación especificada.

    Args:
        df (pd.DataFrame): El DataFrame que se desea guardar.
        save_path (str): La ruta donde se guardará el archivo Parquet.
        partition_col (str or list, optional): Columna(s) por la cual particionar los datos en el formato Parquet.

    Raises:
        TypeError: Si el argumento df no es un DataFrame de pandas.
        ValueError: Si partition_col no es None ni una cadena o lista de cadenas.
    """
    # Verificar que df sea un DataFrame
    if not isinstance(df, pd.DataFrame):
        raise TypeError("El argumento 'df' debe ser un DataFrame de pandas.")

    # Verificar el tipo de partition_col
    if partition_col is not None and not isinstance(partition_col, (str, list)):
        raise ValueError("El argumento 'partition_col' debe ser un string o una lista de strings.")

    # Crear el directorio si no existe
    directory = os.path.dirname(save_path)
    if directory and not os.path.exists(directory):
        os.makedirs(directory)

    try:
        # Guardar el DataFrame en formato Parquet
        df.to_parquet(save_path, partition_cols=partition_col)
        print("DataFrame guardado exitosamente en formato Parquet.")
    except Exception as e:
        print(f"Error al guardar el DataFrame en formato Parquet: {str(e)}")



EXTRACCION DE DATOS

Extraccion de datos de manera FULL

El siguiente codigo extrae datos de una API la cual nos brinda datos de criptomonedas. Se realiza una extraccion FULL ya que extraemos todos los datos disponible del mismo

La siguiente extraccion se realizo en https://docs.coincap.io/#d8fd6001-e127-448d-aadd-bfbfe2c89dbe

In [None]:
url_base = "https://api.coincap.io/v2" # Coloco la url base para su luego utilizacion
endpoint = "assets" # Coloco el endpoint para su luego utilizacion
#Obtengo los datos
assets = get_data(url_base, endpoint)
if assets: # Pregunto si existe y genero el data frame del diccionario extraido
  df_assets = build_table(assets)
df_assets.head(70) # Imprimo 10 datos obtenidos anteriormente

Extraccion de datos de manera incremental

El siguiente codigo extrae datos de concentracion de una 'formula' y lugar especificado. La extraccion se genera de forma incremental por cada ejecucion del mismo. Generando nuevos resultados por cada hora transcurrida

La extraccion se realizo en https://api-docs.luchtmeetnet.nl/

In [None]:
endpoint = "concentrations"
base_url = "https://api.luchtmeetnet.nl/open_api"
start_date = datetime.utcnow() - timedelta(hours=1)

end_date = start_date.strftime("%Y-%m-%dT%H:59:59Z") # Guardo la fecha actual una hora adelantada en el formato requerido por el Provedor del API
start_date = start_date.strftime("%Y-%m-%dT%H:00:00Z") # Guardo la fecha actual en el formato requerido por el Provedor del API

# Creo un diccionario Params para guardar los parametros requeridos por el API
params = {
    "formula": "no2",
    "longitude": "4.458807",
    "latitude": "51.924452",
    "start": start_date,
    "end": end_date
    }

Concentrations = get_data(base_url, endpoint,params) # Obtengo los datos en formato JSON

df_Concentrations = build_table(Concentrations) # Le doy un formato mas legible a los datos obtenidos anteriormente

df_Concentrations.head() # Imprimo el resultado

El siguiente codigo extrae datos de una API la cual nos brinda datos de criptomonedas. Se realiza una extraccion incremental.

Se va a extraer los datos de donde se generan los intercambios de las monedas y cuanto representa esto en el mercado del mismo.

La siguiente extraccion se realizo en https://docs.coincap.io/#d8fd6001-e127-448d-aadd-bfbfe2c89dbe

In [24]:
url_base = "https://api.coincap.io/v2" # Coloco la url base para su luego utilizacion
endpoint = "exchanges" # Coloco el endpoint para su luego utilizacion
#Obtengo los datos
exchanges = get_data(url_base, endpoint)
if exchanges: # Pregunto si existe y genero el data frame del diccionario extraido
  df_exchanges = build_table(exchanges)
df_exchanges.head(5) # Imprimo 10 datos obtenidos anteriormente

Unnamed: 0,exchangeId,name,rank,percentTotalVolume,volumeUsd,tradingPairs,socket,exchangeUrl,updated
0,binance,Binance,1,49.00264348093388,3870363025.95561,833,True,https://www.binance.com/,1693877950682
1,digifinex,DigiFinex,2,6.22910635107628,491991884.38336545,143,False,https://www.digifinex.com/,1693877947582
2,gdax,Coinbase Pro,3,5.767688306368358,455547823.14734,298,True,https://pro.coinbase.com/,1693877949865
3,kraken,Kraken,4,4.561290765257273,360263240.397497,343,False,https://kraken.com,1693877949184
4,gate,Gate,5,3.735963562402067,295076636.91376835,1284,False,https://gate.io/,1693877950625


El siguiente codigo extrae datos de una API la cual nos brinda datos de criptomonedas. Se realiza una extraccion incremental.

Se va a extraer los datos del mercado de criptomonedas.

La siguiente extraccion se realizo en https://docs.coincap.io/#d8fd6001-e127-448d-aadd-bfbfe2c89dbe

In [None]:
url_base = "https://api.coincap.io/v2" # Coloco la url base para su luego utilizacion
endpoint = "markets" # Coloco el endpoint para su luego utilizacion
#Obtengo los datos
markets = get_data(url_base, endpoint)
if markets: # Pregunto si existe y genero el data frame del diccionario extraido
  df_markets = build_table(markets)
df_markets.head(10) # Imprimo 10 datos obtenidos anteriormente

#Trabajo Practico 2 **Almacenamiento**

In [26]:
!pip install -q fastparquet # Instalo FastParquet para agilizar los tiempos

ERROR: Invalid requirement: '#'


In [27]:
def crear_directorio(dir_nuevo):
   directory = os.path.dirname(dir_nuevo) #Obtengo el directorio de dir_nuevo
   if directory and not os.path.exists(directory): #Pregunto si existe y si no es una cadena vacia, si cumple la condicion lo creo en caso contrario lo ignoro
        os.makedirs(directory) # Creo el directorio si cumple las condiciones

In [86]:
dir_coincap_assets = "datalake_Santi/raw_date/coincap/assets/"
dir_mediciones = "datalake_Santi/raw_date/luchtmeetnet/concentrations/N02/"
dir_coincap_exchanges = "datalake_Santi/raw_date/coincap/exchanges/"
dir_coincap_markets = "datalake_Santi/raw_date/coincap/markets/"
#Creo la ruta donde se van a almacenar los datos extraidos anteriormente
crear_directorio(dir_coincap_assets)
crear_directorio(dir_mediciones)
crear_directorio(dir_coincap_exchanges)
crear_directorio(dir_coincap_markets)

## A continuacion introduzco los data frames extraidos anteriormente en sus respectivos directorios con un formato parquet particionados segun cada uno.

In [29]:
# Guardo los data frames con formato parquet almacenandolos en una carpeta sin hacer particiones
save_to_parquet(df_assets,"datalake_Santi/raw_date/coincap/assets/assets.parquet")
df_assets_pq = pd.read_parquet("datalake_Santi/raw_date/coincap/assets/assets.parquet")

DataFrame guardado exitosamente en formato Parquet.


In [30]:
df_Concentrations["timestamp_measured"] = pd.to_datetime(df_Concentrations.timestamp_measured)
df_Concentrations["fecha"] = df_Concentrations.timestamp_measured.dt.date
df_Concentrations["hora"] = df_Concentrations.timestamp_measured.dt.hour
#Creo un nuevo data Frame con hora y fecha divido para luego guardarlo como un archivo con formato Parquet y particionarlo por fecha y hora
# Creo por cada fecha y hora una carpeta donde guardo los datos de los correspondientes momentos
save_to_parquet(df_Concentrations,"datalake_Santi/raw_date/luchtmeetnet/concentrations/N02",["fecha"])
df_Concentrations_pq = pd.read_parquet("datalake_Santi/raw_date/luchtmeetnet/concentrations/N02")
df_Concentrations.head()

DataFrame guardado exitosamente en formato Parquet.


Unnamed: 0,formula,value,timestamp_measured,fecha,hora
0,NO2,30.0,2023-09-05 01:00:00+00:00,2023-09-05,1
1,NO2,30.0,2023-09-05 02:00:00+00:00,2023-09-05,2
2,NO2,20.0,2023-09-05 03:00:00+00:00,2023-09-05,3
3,NO2,20.0,2023-09-05 04:00:00+00:00,2023-09-05,4
4,NO2,20.0,2023-09-05 05:00:00+00:00,2023-09-05,5


In [67]:
#Creo un nuevo data Frame con fecha divido para luego guardarlo como un archivo con formato Parquet y particionarlo por fecha
df_exchanges['updated'] = pd.to_datetime(df_exchanges['updated'], unit = 'ms') #Cambio el formato UNIX timestamp a formato un formato mas estandar
df_exchanges['updated'] = pd.to_datetime(df_exchanges.updated)
df_exchanges["fecha"] = df_exchanges.updated.dt.date
save_to_parquet(df_exchanges,"datalake_Santi/raw_date/coincap/exchanges",["fecha"])
#df_exchanges.head(5)
df_exchanges_pq = pd.read_parquet("datalake_Santi/raw_date/coincap/exchanges")

DataFrame guardado exitosamente en formato Parquet.


In [None]:
df_markets['updated'] = pd.to_datetime(df_markets['updated'], unit = 'ms')
df_markets['updated'] = pd.to_datetime(df_markets["updated"])
df_markets["fecha"] = df_markets.updated.dt.date
df_markets['fecha'] = pd.to_datetime(df_markets["fecha"])
save_to_parquet(df_markets,"datalake_Santi/raw_date/coincap/markets/markets.parquet")
df_markets = pd.read_parquet("datalake_Santi/raw_date/coincap/markets/markets.parquet")
df_markets