
Trabajé con la API de SpaceX, la cual ofrece información sobre cohetes y lanzamientos. Seleccione esta API, porque me pareció interesante y porque cumplía con los datos que necesitaba( uno endpoit estático y otro temporal).

Tipos de Extracción:

En la extracción completa, la aplique con el endpoint estático, ya que los datos no cambian.
En la extracción incremental, utilicé el endpoint dinámico, dado que este puede actualizarse con nuevos lanzamientos .

Transformaciones:

Limpie y normalicé los datos para que fueran consistentes.
Convertí columnas conflictivas (como diccionarios o listas) en formatos planos, como cadenas de texto.
Reemplacé los valores nulos, mediante valores predeterminados para evitar errores.Ademas optimice con bucles varias lineas de codigo que se repetian, segun su sugerencia .

Almacenamiento:

En el almacenamiento, apliqué las tres capas porque usted me aclaró que era necesario implementarlas en el TP anterior. Los datos quedaron mejor divididos según lo que representa cada capa. Utilicé Delta Lake para gestionar las capas, y el resultado fue el esperado .




In [2]:
# Instalamos cosas a utilizar
!pip install requests
!pip install deltalake
!pip install pandas
!pip install pyarrow
!pip install delta




In [3]:
# Importamos todas la librerias a usar
import requests
import json
import pandas as pd
import pyarrow as pa
import os
import deltalake
from deltalake.writer import write_deltalake
import pyarrow.dataset as ds
from datetime import datetime
import delta as delta
from deltalake import DeltaTable
import ast

In [18]:
# Obtenemos los datos de los endpoints
# Endpoint ESTÁTICO
static_endpoint = "https://api.spacexdata.com/v4/rockets"

# Endpoint DINÁMICO
dynamic_endpoint = "https://api.spacexdata.com/v4/launches"

# Función para extraer datos desde un endpoint
def fetch_data(endpoint):
    # Obtenemos el enpoint
    response = requests.get(endpoint)

    # Verificamos si el status code es bueno
    if response.status_code == 200:
        print(f"Bien, el estatus code es:{requests.status_codes}")

        # Retornamos los datos en json
        return response.json()

    # De lo contrario error
    else:
        print(f"Error al obtener datos: {response.status_code}")
        return None


# Obtenemos los datos del endpoint estático
data_static = fetch_data(static_endpoint)

# Obtenemos los datos del endpoint dinámico
data_dynamic = fetch_data(dynamic_endpoint)



Bien, el estatus code es:<module 'requests.status_codes' from '/usr/local/lib/python3.10/dist-packages/requests/status_codes.py'>
Bien, el estatus code es:<module 'requests.status_codes' from '/usr/local/lib/python3.10/dist-packages/requests/status_codes.py'>


In [19]:

# Convertimos las columnas con diccionarios o listas a cadenas
def normalize_column(column):
    return column.apply(lambda x: str(x) if isinstance(x, (dict, list)) else x)

# Convertimos los json a data frame de una forma que se puedan ver los datos bien
# Con esto verificamos y ajustamos los tipos de datos en el DataFrame
def normalize_dtypes(df):
    # Vemos los tipos de datos
    print("Tipos de datos originales:")
    print(df.dtypes)

    # Convertimos columnas conflictivas
    for column in df.columns:
        if df[column].dtype == 'float64' or df[column].dtype == 'int64':
            df[column] = df[column].astype('float')
    print("Tipos de datos después de la normalización:")
    print(df.dtypes)
    return df

# Convertimos a data frame y lo almcenamos en cada variable
df_static_clean = pd.DataFrame(data_static) if data_static else pd.DataFrame()
df_dynamic_clean = pd.DataFrame(data_dynamic) if data_dynamic else pd.DataFrame()

# Mostramos el data frame statico y Imprimimos los tipos de datos del df_static_clean
print(df_static_clean.head(2))
print(df_static_clean.dtypes)

# Creamos lista de columnas a procesar
df_static_clean_list=['stages','boosters','cost_per_launch','success_rate_pct']

# Iterar sobre cada columna de la lista
for colum in df_static_clean_list:
  # Convertimos las columnas numéricas a un tipo adecuado
  df_static_clean[colum] = pd.to_numeric(df_static_clean[colum], errors='coerce')
  print(f"Covertimos columna {colum}")


# Convertimos las columnas que contienen cadenas que parecen listas/diccionarios
df_static_clean['height'] = df_static_clean['height'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else x)
df_static_clean['diameter'] = df_static_clean['diameter'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else x)

# Creamos lista de columnas a procesar
df_static_clean_list= ['height','diameter','mass','payload_weights','flickr_images']

# Iterar sobre cada columna de la lista
for colum in df_static_clean_list:
  # Convertimos las columnas con diccionarios o listas a cadenas para permitir comparación
  df_static_clean[colum] = normalize_column(df_static_clean[colum])
  print(f"Covertimos columna {colum}")

# Reemplazamos los valores nulos por un valor predeterminado, "sin asignacion"
df_static_clean = df_static_clean.fillna("No assignment")
df_dynamic_clean = df_dynamic_clean.fillna("No assignment")

# Eliminamos duplicados
df_static_clean = df_static_clean.map(str).drop_duplicates()

# Convertimos la columna 'mass' a numérico antes de aplicar la agregación
df_static_clean['mass'] = pd.to_numeric(df_static_clean['mass'], errors='coerce')

# Cruzamos los DataFrames estático y dinámico usando 'id' como clave, haciendo un inner join
df_merged = pd.merge(df_static_clean, df_dynamic_clean, on='id', how='inner')

# Agrupamos por una columna y calculamos la temperatura promedio
df_grouped = df_static_clean.groupby('type')['mass'].agg(['mean', 'max', 'min'])

# Mostramos las versiones planas
print(df_static_clean.head(3))
print(df_dynamic_clean.head(3))



                          height                       diameter  \
0  {'meters': 22.25, 'feet': 73}  {'meters': 1.68, 'feet': 5.5}   
1  {'meters': 70, 'feet': 229.6}    {'meters': 3.7, 'feet': 12}   

                            mass  \
0     {'kg': 30146, 'lb': 66460}   
1  {'kg': 549054, 'lb': 1207920}   

                                         first_stage  \
0  {'thrust_sea_level': {'kN': 420, 'lbf': 94000}...   
1  {'thrust_sea_level': {'kN': 7607, 'lbf': 17100...   

                                        second_stage  \
0  {'thrust': {'kN': 31, 'lbf': 7000}, 'payloads'...   
1  {'thrust': {'kN': 934, 'lbf': 210000}, 'payloa...   

                                             engines  \
0  {'isp': {'sea_level': 267, 'vacuum': 304}, 'th...   
1  {'isp': {'sea_level': 288, 'vacuum': 312}, 'th...   

                                landing_legs  \
0            {'number': 0, 'material': None}   
1  {'number': 4, 'material': 'carbon fiber'}   

                                     

In [20]:

# Configuramo directorios para el Data Lake
base_dir = "/content/delta_lake_data"

# Asignamos las distintas capas
bronze_dir = os.path.join(base_dir, "bronze")
silver_dir = os.path.join(base_dir, "silver")
gold_dir = os.path.join(base_dir, "gold")
os.makedirs(bronze_dir, exist_ok=True)
os.makedirs(silver_dir, exist_ok=True)
os.makedirs(gold_dir, exist_ok=True)

# Creamos rutas para los datos en Delta Lake
rockets_bronze_path = os.path.join(bronze_dir, "rockets")
launches_bronze_path = os.path.join(bronze_dir, "launches")
rockets_silver_path = os.path.join(silver_dir, "rockets")
launches_silver_path = os.path.join(silver_dir, "launches")
rockets_gold_path = os.path.join(gold_dir, "rockets")
launches_gold_path = os.path.join(gold_dir, "launches")

# Función para guardar en Delta Lake
def save_to_delta(df, delta_path, mode="overwrite"):
    try:
        # Aseguramos que todas las columnas tengan un tipo definido
        for col in df.columns:

            # Si toda la columna es nula
            if df[col].isnull().all():
                df[col] = df[col].fillna("sin asignacion")

            # Convertir objetos a cadenas de texto
            elif df[col].dtype == 'object':
                df[col] = df[col].astype(str)

        # Convertimos el DataFrame a una tabla Arrow
        table = pa.Table.from_pandas(df)

        # Guardamos la tabla en formato Delta Lake
        write_deltalake(delta_path, table, mode=mode)
        print(f"Datos guardados exitosamente en {delta_path}.\n")

    except Exception as e:
        print(f"Error al guardar datos en Delta Lake: {e}")

# Guardamos el DataFrame transformado en Delta Lake
save_to_delta(df_static_clean, rockets_silver_path, mode="overwrite")
save_to_delta(df_dynamic_clean, launches_silver_path, mode="overwrite")


Datos guardados exitosamente en /content/delta_lake_data/silver/rockets.

Datos guardados exitosamente en /content/delta_lake_data/silver/launches.



In [21]:

# Funcion para hacer la extraccion full
def full_load(api_endpoint, delta_path):
    print(f"Realizando extracción completa desde {api_endpoint}...")

    # Extraemos datos de la API
    response = requests.get(api_endpoint)
    data = response.json()

    # Convertimos los datos a un DataFrame de Pandas
    df = pd.DataFrame(data)

    # Lista de columnas que existen en la tabla Delta actual
    existing_columns = [
        'height', 'diameter', 'mass', 'first_stage', 'second_stage', 'engines',
        'landing_legs', 'payload_weights', 'flickr_images', 'name', 'type',
        'active', 'stages', 'boosters', 'cost_per_launch', 'success_rate_pct',
        'first_flight', 'country', 'company', 'wikipedia', 'description', 'id'
    ]

    # Eliminamos las columnas adicionales que no están en la tabla Delta
    df = df[existing_columns]

    # Convertimos el DataFrame a una tabla de PyArrow
    table = pa.Table.from_pandas(df)

    # Guardamos en formato Delta Lake
    try:
        write_deltalake(delta_path, table, mode="overwrite")
        print(f"Datos guardados exitosamente en {delta_path}.\n")
    except Exception as e:
        print(f"Error al guardar datos en Delta Lake: {e}")


# Llamamos la funcion
full_load(static_endpoint, "/content/delta_lake_data/bronze/rockets")



Realizando extracción completa desde https://api.spacexdata.com/v4/rockets...
Datos guardados exitosamente en /content/delta_lake_data/bronze/rockets.



In [22]:
# Funcion para hacer la extraccion incremental
def incremental_load(api_endpoint, delta_path, id_column="id", last_loaded_id=None):
    """
    Realiza una carga incremental desde un endpoint a Delta Lake.
    """
    try:
        # Obtenemos los datos desde el endpoint
        data = fetch_data(api_endpoint)

        # Convertimos los datos en un DataFrame de pandas si es necesario
        if isinstance(data, list):
            df = pd.DataFrame(data)
        else:
            df = data

        # Verificamos si el DataFrame está vacío
        if df.empty:
            print("No se encontraron datos en el endpoint.")
            return

        # Aseguramos que el'id_column' sea numérico
        df['id'] = pd.to_numeric(df['id'], errors='coerce')

        print(df.head())

        # Filtramos los nuevos datos basados en el último ID cargado
        if last_loaded_id is not None:
            new_data = df[df[id_column] > last_loaded_id]
        else:
            new_data = df

        # Si esta vacio
        if new_data.empty:
            print("No se encontraron nuevos datos para agregar.")
            return

        # Guardamos los datos nuevos en Delta Lake
        delta.write_deltalake(delta_path, new_data, mode="append")
        print(f"Carga incremental completada. Nuevos registros: {len(new_data)}")

    except Exception as e:
        print(f"Error durante la carga incremental: {e}")

# Guardamos la fecha y hora de la última ejecución
last_execution_time = datetime(2024, 11, 23, 10, 0, 0)

# Definimos el tiempo de la última actualización
last_update_time = datetime(2023, 1, 1)  # Establece la fecha de inicio para la carga incremental

# Llamamos a la función incremental_load para cargar los datos nuevos
incremental_load(static_endpoint, rockets_bronze_path, id_column="id", last_loaded_id=1000)



Bien, el estatus code es:<module 'requests.status_codes' from '/usr/local/lib/python3.10/dist-packages/requests/status_codes.py'>
                          height                        diameter  \
0  {'meters': 22.25, 'feet': 73}   {'meters': 1.68, 'feet': 5.5}   
1  {'meters': 70, 'feet': 229.6}     {'meters': 3.7, 'feet': 12}   
2  {'meters': 70, 'feet': 229.6}  {'meters': 12.2, 'feet': 39.9}   
3   {'meters': 118, 'feet': 387}       {'meters': 9, 'feet': 30}   

                             mass  \
0      {'kg': 30146, 'lb': 66460}   
1   {'kg': 549054, 'lb': 1207920}   
2  {'kg': 1420788, 'lb': 3125735}   
3  {'kg': 1335000, 'lb': 2943000}   

                                         first_stage  \
0  {'thrust_sea_level': {'kN': 420, 'lbf': 94000}...   
1  {'thrust_sea_level': {'kN': 7607, 'lbf': 17100...   
2  {'thrust_sea_level': {'kN': 22819, 'lbf': 5130...   
3  {'thrust_sea_level': {'kN': 128000, 'lbf': 287...   

                                        second_stage  \
0  {'t

In [23]:
# Función para transformar los datos en formato Silver
def transform_to_silver(data):
    try:
        # Transformamos estructuras anidadas a cadenas
        for item in data:
            if "diameter" in item and isinstance(item["diameter"], dict):
                item["diameter"] = str(item["diameter"])
            if "height" in item and isinstance(item["height"], dict):
                item["height"] = str(item["height"])
            if "mass" in item and isinstance(item["mass"], dict):
                item["mass"] = str(item["mass"])

        # Retornamos los datos transformados
        return data

  # En caso de un error
    except Exception as e:
        print(f"Error durante la transformación: {e}")
        return None

# Obtenemos los datos del endpoint estático
data_static = fetch_data(static_endpoint)

# Verificamos que data_static no es None
if data_static:
    # Llamamos a la función transform_to_silver
    transformed_data = transform_to_silver(data_static)

    # Ahora transformed_data contiene los datos procesados
    print("Datos transformados exitosamente:")
    print(transformed_data)
else:
    print("No se pudo transformar porque no se obtuvieron datos estáticos.")


Bien, el estatus code es:<module 'requests.status_codes' from '/usr/local/lib/python3.10/dist-packages/requests/status_codes.py'>
Datos transformados exitosamente:
[{'height': "{'meters': 22.25, 'feet': 73}", 'diameter': "{'meters': 1.68, 'feet': 5.5}", 'mass': "{'kg': 30146, 'lb': 66460}", 'first_stage': {'thrust_sea_level': {'kN': 420, 'lbf': 94000}, 'thrust_vacuum': {'kN': 480, 'lbf': 110000}, 'reusable': False, 'engines': 1, 'fuel_amount_tons': 44.3, 'burn_time_sec': 169}, 'second_stage': {'thrust': {'kN': 31, 'lbf': 7000}, 'payloads': {'composite_fairing': {'height': {'meters': 3.5, 'feet': 11.5}, 'diameter': {'meters': 1.5, 'feet': 4.9}}, 'option_1': 'composite fairing'}, 'reusable': False, 'engines': 1, 'fuel_amount_tons': 3.38, 'burn_time_sec': 378}, 'engines': {'isp': {'sea_level': 267, 'vacuum': 304}, 'thrust_sea_level': {'kN': 420, 'lbf': 94000}, 'thrust_vacuum': {'kN': 480, 'lbf': 110000}, 'number': 1, 'type': 'merlin', 'version': '1C', 'layout': 'single', 'engine_loss_max'

In [24]:
# Con esta funcion tranformamos a la capa gold
def transform_to_gold(silver_path, gold_path):
    try:
        # Leemos los datos desde la capa silver utilizando DeltaTable
        # Cambiamos de  ds.dataset a DeltaTable
        silver_data = deltalake.DeltaTable(silver_path)

        # Convertimos a DataFrame de pandas
        silver_df = silver_data.to_pandas()

        # en caso de que este vacio
        if silver_df.empty:
            print(f"No se encontraron datos en {silver_path} para transformar.")
            return

        # Aplicamos reglas de negocio (selección de columnas)
         # Selección de columnas clave
        gold_df = silver_df[['id', 'name', 'description']].copy()

        # Guardamos en la capa gold
        # Guardamos los datos en el formato Delta
        write_deltalake(gold_path, gold_df, mode="overwrite")
        print(f"Datos transformados y guardados en {gold_path}.")

   # En caso de error
    except Exception as e:
        print(f"Error durante la transformación: {e}")

# Transformamos a Gold
transform_to_gold(
    rockets_silver_path,
    rockets_gold_path
)



Datos transformados y guardados en /content/delta_lake_data/gold/rockets.
