## Extracción y almacenamiento de datos

### Justificación de elección de API:

Para este trabajo decidí utilizar la API de JCDecaux Developers porque ofrece datos estáticos y dinámicos. Por un lado, proporciona información estática sobre cada contrato, lo cual es útil para realizar una extracción full que obtenga los metadatos de las estaciones de bicicletas. Por otro lado, ofrece datos en tiempo real sobre el estado de cada estación, incluyendo bicicletas disponibles, si se debe pagar, si está abierta, entre otros, los cuales se pueden obtener mediante extracción incremental. A partir de esto, utilcé dos endpoints en particular:
- **Extracción full:** Elegí el endpoint que ofrece los metadatos estáticos de las estaciones de cada contrato, ya que tiene gran cantidad de información que se puede limpiar y procesar luego.
- **Extracción incremental:** Seleccioné un endpoint que me permitió recolectar la información en tiempo real de cada estación, siendo útil para analizar tendencias futuras.

### Desarrollo del trabajo práctico:

##### Instalación de librerias:

In [None]:
!pip install requests
!pip install deltalake
!pip install pyarrow

##### Importación de librerias y módulos:

In [None]:
import requests
import pandas as pd
import json
import pyarrow as pa
import math
from datetime import datetime, timezone
from deltalake import write_deltalake, DeltaTable
from deltalake.exceptions import TableNotFoundError
from configparser import ConfigParser


##### Definición de funciones:

In [None]:
# Manejo de json

def get_last_update (file_path):

    try:
        #with es util porque permite simplificar el manejo de archivos, conexiones, db al asegurarse que se usan y liberan de forma correcta, incluso si hay errores.

        with open(file_path, "r") as file:
            ultimo_update = json.load(file) # leo el archivo json
            return ultimo_update["last_updated"]

    except FileNotFoundError:
        raise FileNotFoundError(f"El archivo JSON en la ruta {file_path} no existe.") # propago el error y lo resuelvo en otro lado!

    except json.JSONDecodeError:
        raise json.JSONDecodeError(f"El archivo JSON en la ruta {file_path} no es válido.")

    except KeyError as k:
        raise KeyError(str(k))


def update_last_update (file_path, last_updated):

    try:
        with open(file_path, "w") as file:
            json.dump({"last_updated": last_updated.isoformat()}, file, indent=4)

    except FileNotFoundError:
        raise FileNotFoundError(f"El archivo JSON en la ruta {file_path} no existe.") # propago el error y lo resuelvo en otro lado!

# Extraccion de datos

def get_data_stations(url, params=None):

    try:
        response = requests.get(url,params=params, timeout = 10)
        response.raise_for_status() # excepcion que captura el except
        data = response.json()
        return create_data_frame(data)

    except requests.exceptions.RequestException as e:
        print(f"Error al obtener los datos. Código de error: {e}")
        return pd.DataFrame()

def create_data_frame(json_data):

    try:
        df = pd.json_normalize(json_data)
        return df

    except Exception as e:
        print(f"Se produjo un error en la construcción del DataFrame: {e}")
        return pd.DataFrame()

def incremental_extraction(url, file_path, params=None):

    try:

        last_update_json = get_last_update(file_path=file_path)
        last_update = pd.to_datetime(last_update_json, utc=True)

        df = get_data_stations(url, params=params)
        if df is None:
            print("No se pudo construir el DataFrame.")
            return pd.DataFrame()

        fechas_convertidas = []
        for f in df["last_update"]:
            if f is not None and not math.isnan(f):
                ft = datetime.fromtimestamp(f / 1000, tz=timezone.utc)
                fechas_convertidas.append(ft)
            else:
                fechas_convertidas.append(pd.NaT) #fecha nula en caso de ser null

        df["last_update"] = fechas_convertidas

        df_incremental = df[df["last_update"] > last_update]

        if df_incremental.empty:
            print("No hay nuevas actualizaciones desde la última consulta")
            return pd.DataFrame()

        max_timestamp = df["last_update"].max()
        update_last_update(file_path=file_path, last_updated=max_timestamp)
        return df_incremental

    except Exception as e:
        print(f"Se produjo un error en la extracción incremental {e}")
        return pd.DataFrame()


# Almacenamiento de datos

def save_data_as_delta(df, path, mode="overwrite", partition_cols=None):

    write_deltalake(path, df, mode = mode, partition_by = partition_cols)

def merge_new_data_as_delta(new_data, data_path, predicate, partition_cols=None):

    try:
        dt = DeltaTable(data_path)
        data_pa = pa.Table.from_pandas(new_data)
        dt.merge(
            source=data_pa,
            source_alias="source",
            target_alias="target",
            predicate=predicate
        ) \
        .when_matched_update_all() \
        .when_not_matched_insert_all() \
        .execute()
    except TableNotFoundError:
      save_data_as_delta(new_data, data_path, partition_cols=partition_cols)


#### Extracción Full:

Los metadatos de las estaciones se obtienen mediante una extracción full, ya que son datos estáticos que cambian muy poco o casi nunca. Por eso, no es necesario actualizarlos constantemente y una extracción full realizada con cierta frecuencia basta para mantenerlos actualizados.

In [None]:
# Obtengo api-keys

parser = ConfigParser()
parser.read("pipeline.conf")
api_key = parser["api-credentials"]["api-key"]

urlBase = "https://api.jcdecaux.com/vls/v1"

In [None]:
paramsEstatico = {
        "apiKey" : api_key
    }

endpointEstatico = "contracts"

urlEstatica = f"{urlBase}/{endpointEstatico}"

df_estatico = get_data_stations(urlEstatica, paramsEstatico)


#### Extracción Incremental:

En primer lugar, utilicé un método stateful, es decir, almacené el último update en un archivo JSON. Esto se debe a que los datos se actualizan en tiempo real, por lo que de esta forma siempre se obtiene solo la información nueva o modificada desde la última consulta. Para la extracción incremental, se consulta ese último update guardado y se solicitan únicamente los datos que superen ese estado.

In [None]:
endpointDinamico = "stations"

paramsDinamico = {
    "contract": "lyon",
    "apiKey" : api_key
}

urlDinamica= f"{urlBase}/{endpointDinamico}"

path_metadata = "metadata/metadata.json"

df_dinamico = incremental_extraction(urlDinamica, path_metadata, paramsDinamico)

#### Almacenamiento de Datos - Extracción Full

En la extracción full, se sobreescriben todos los datos directamente para evitar la existencia de registros duplicados. Esto garantiza que siempre podamos ver el estado más actualizado.

In [None]:
bronze_dir = "datalake/bronze/jcdecauxDeveloper"

In [None]:
contracts_dir = f"{bronze_dir}/{endpointEstatico}"

merge_new_data_as_delta(df_estatico, contracts_dir, "target.name = source.name")

In [None]:
dt = DeltaTable("datalake/bronze/jcdecauxDeveloper/contracts")
df = dt.to_pandas()
df.sort_values(by= "name")


#### Almacenamiento de Datos - Extracción Incremental

Decidí registrar todos los datos que obtengo de la API en tiempo real, guardando un historial completo del estado de cada estación en cada fecha específica. Es decir, las actualizaciones se realizan considerando cada fecha, evitando duplicados por fecha. Si bien esto puede generar algunos datos repetidos, creo que es una estrategia adecuada porque me permite conocer el estado de cada estación en cada momento. Esto facilita analizar cambios a lo largo del tiempo y detectar tendencias.

In [None]:
stations_dir = f"{bronze_dir}/{endpointDinamico}"

if not df_dinamico.empty:

    df_dinamico["last_update"] = pd.to_datetime(df_dinamico.last_update)

    df_dinamico["fecha"] = df_dinamico.last_update.dt.date

    merge_new_data_as_delta(df_dinamico, stations_dir, predicate= "target.number = source.number AND target.contract_name = source.contract_name AND target.fecha = source.fecha", partition_cols=["fecha"])

In [None]:
dt = DeltaTable(stations_dir)
df = dt.to_pandas()
df.sort_values(by= "number")


-------------------------------------------------------------------------------------------------------------------------------------------------

## Procesamiento de datos

#### Importación de librerías y módulos

In [None]:
from datetime import timedelta

#### Definición de funciones

In [None]:
def read_most_recent_partition(data_path, file_path):
    
    try:
      last_update_json = get_last_update(file_path=file_path)
      requested_date  = pd.to_datetime(last_update_json, utc=True)
      dt = DeltaTable(data_path)
      df_recent = dt.to_pandas(
        partitions=[
        ("fecha", "=", requested_date.date())]
        )
      return df_recent
    except Exception as E:
      raise Exception(f"No se pudo procesar la tabla Delta Lake, por {E}")
      return None

#### Obtención de datos de la capa Bronze - Extracción full

En primer lugar, debo obtener los datos crudos almacenados en la capa Bronze. 

In [None]:
df_estatico_bronze = DeltaTable(contracts_dir).to_pandas()

#### Obtención de datos de la capa Bronze - Extracción incremental

En el caso de los datos dinámicos, los mismos se encuentran almacenados en distintas particiones en la capa bronze. Por ende, debo leer la última partición obtenida utilizando la función read_most_recent_partition().

In [None]:
df_dinamico_bronze = read_most_recent_partition(stations_dir, path_metadata)

#### Transformaciones - Extracción Full

##### 1. Eliminación de duplicados

En primer lugar garantizo que no haya duplicados en los datos provenientes del endpoint estático. Para determinar si las filas están duplicadas me baso en el nombre del contrato, ya que el mismo es el que identifica unívocamente a cada fila en este caso. 

In [None]:
df_estatico_bronze_cleaned = df_estatico_bronze.drop_duplicates(subset="name", keep="first")

##### 2. Eliminación o reemplazo de nulos

En el caso de los datos obtenidos del endpoint estático, se imputarán con un valor por defecto todas las filas que contengan valores nulos excepto el nombre del contrato. En caso de que el mismo sea null se eliminará la fila, ya que el mismo es el que identifica a cada contrato.

In [None]:
df_estatico_bronze_cleaned = df_estatico_bronze_cleaned.dropna(subset=["name"])

imputation_mapping_e = {
    "commercial_name": "No especificado",
    "cities": "No especificado",
    "country_code": "No especificado"
}

df_estatico_bronze_cleaned = df_estatico_bronze_cleaned.fillna(imputation_mapping_e)

##### 3. Eliminación de campos multivaluados

Con el fin de evitar tener campos multivaluados, decidí crear un registro por cada una de las ciudades que conforman la lista de ciudades del endpoint estático.

In [None]:
df_estatico_bronze_cleaned = df_estatico_bronze_cleaned[["name", "commercial_name", "cities", "country_code"]].copy()

df_estatico_bronze_cleaned = df_estatico_bronze_cleaned.explode("cities")

df_estatico_bronze_cleaned = df_estatico_bronze_cleaned.rename(columns={"cities": "city"})

##### 4. Conversión de tipos de datos de columnas

A continuación, se realizan las conversiones de tipos de datos de columnas del endpoint estático, con el objetivo de evitar ocupar más espacio del necesario y establecer el tipo adecuado para cada columna.

In [None]:
conversion_mapping_e = {
    "name": "string",
    "commercial_name": "string",
    "city": "string",
    "country_code": "category"

}

df_estatico_bronze_cleaned = df_estatico_bronze_cleaned.astype(conversion_mapping_e)

#### Transformaciones - Extracción Incremental

##### 1. Eliminación de duplicados

Respecto a los datos provenientes del endpoint dinamico, me baso en el número de la estación y el nombre del contrato (cada número de estación es único dentro de un contrato particular).

In [None]:
df_dinamico_bronze_cleaned = df_dinamico_bronze.drop_duplicates(subset=["contract_name", "number", "last_update"], keep="first")

##### 2. Eliminación o reemplazo de nulos

Del mismo modo, los datos obtenidos del endpoint dinámico se imputarán con un valor por defecto, excepto por el número de la estación y el nombre del contrato ya que ambos son parte de la clave primaria que sirve para identificar cada estación.

In [None]:
df_dinamico_bronze_cleaned = df_dinamico_bronze_cleaned.dropna(subset=["number", "contract_name"])

imputation_mapping_d = {
    "name": "No especificado",
    "address": "No especificado",
    "position": "No especificado",
    "banking": "No especificado",
    "bonus": "No especificado",
    "bike_stands": 0,
    "available_bike_stands": 0,
    "available_bikes": 0,
    "status": "No especificado"
}

df_dinamico_bronze_cleaned = df_dinamico_bronze_cleaned.fillna(imputation_mapping_d)

##### 3. Creación de nuevas columnas

Para el caso del endpoint dinámico renombré las columnas de latitud y longitud para que en vez de aparecer position.lat aparezca position_lat, por ejemplo.

In [None]:
df_dinamico_bronze_cleaned = df_dinamico_bronze_cleaned.rename(columns=lambda col: col.replace(".","_"))



##### 4. Mejora en la legibilidad de los campos banking y bonus

Con el fin de mejorar la expresividad de los valores de las columnas banking y bonus, decidí reemplazar los valores True y False por una sentencia más clara para el usuario.

In [None]:
df_dinamico_bronze_cleaned["banking"] = df_dinamico_bronze_cleaned["banking"].replace({True: "Payment required", False: "No payment required"})
df_dinamico_bronze_cleaned["bonus"] = df_dinamico_bronze_cleaned["bonus"].replace({True: "Bonus station", False: "No bonus station"})


##### 5. Eliminación de campos vacios o blank

In [None]:
df_dinamico_bronze_cleaned = df_dinamico_bronze_cleaned.replace({"": "No especificado", " ": "No especificado"})

##### 6. Conversión de tipos de datos de columnas

In [None]:
conversion_mapping_d = {
    "number": "int16",
    "contract_name": "string",
    "address": "string",
    "position_lat": "float32",
    "position_lng": "float32",
    "bike_stands": "int8",
    "available_bike_stands": "int8",
    "available_bikes": "int8",
    "status": "category",
    "last_update": "datetime64[ms, UTC]",
    "banking": "string",
    "bonus": "string"
}

df_dinamico_bronze_cleaned = df_dinamico_bronze_cleaned.astype(conversion_mapping_d)

#### Almacenamiento en capa Silver - Extracción Full

In [None]:
silver_dir = "datalake/silver/jcdecauxDeveloper"

contracts_dir_silver = f"{silver_dir}/contracts"

merge_new_data_as_delta(df_estatico_bronze_cleaned, contracts_dir_silver, predicate="target.name = source.name")

#### Almacenamiento en capa Silver - Extracción Incremental

In [None]:
stations_dir_silver = f"{silver_dir}/stations"

merge_new_data_as_delta(df_dinamico_bronze_cleaned, stations_dir_silver, predicate="target.number = source.number AND target.fecha = source.fecha AND target.contract_name=source.contract_name", partition_cols=["fecha"])