# PROYECTO 1

**Curso:** Operaciones de Aprendizaje de Máquina

**Estudiantes:**
- Juan José García
- Ruben Dario Hoyos
- José Rafael Peña

## Setup

### Importación de librerías

In [None]:
# General modules
from pathlib import Path
import os
import requests
from typing import List
from dataclasses import dataclass
import pandas as pd
import pprint as pp

In [None]:
# Sklearn modules
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif
from sklearn.model_selection import train_test_split

In [None]:
# Tensorflow module
import tensorflow as tf

# TFX components
from tfx.components import CsvExampleGen
from tfx.components import ExampleValidator
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.v1.components import ImportSchemaGen
from tfx.components import Transform
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from google.protobuf.json_format import MessageToDict

# TFDV modules
import tensorflow_data_validation as tfdv
from tensorflow_metadata.proto.v0 import schema_pb2

In [None]:
import ml_metadata as mlmd
from ml_metadata.metadata_store import metadata_store
from ml_metadata.proto import metadata_store_pb2

### Definición de carpetas

In [None]:
# Directory of the raw data files
data_root = Path('./data/covertype')

# Directory of the preprocessed data files
data_root_prepro = Path('./data/covertype_prepro')

# Path to the raw training data
data_filepath = data_root / 'covertype_train.csv'

# Ensure the data_root directory exists
data_root.mkdir(parents=True, exist_ok=True)

# Ensure the data_root_prepro directory exists
data_root_prepro.mkdir(parents=True, exist_ok=True)

# Directory of the pipeline metadata store
pipeline_root = Path('./pipeline/')

# Ensure the pipeline_root directory exists
pipeline_root.mkdir(parents=True, exist_ok=True)

### Carga de datos

In [None]:
# Download data if it doesn't exist
if not data_filepath.is_file():
    # URL for the dataset
    # https://archive.ics.uci.edu/ml/machine-learning-databases/covtype/
    url = 'https://docs.google.com/uc?export=download&confirm={{VALUE}}&id=1lVF1BCWLH4eXXV_YOJzjR7xZjj-wAGj9'
    
    r = requests.get(url, allow_redirects=True, stream=True)
    
    data_filepath.write_bytes(r.content)

## Pasos proyecto

### **2.1** Carga el dataset

In [None]:
df = pd.read_csv(data_filepath)

In [None]:
df.info()

### **3** Selección de características

In [None]:
@dataclass
class DataConfig:
    target_col: str
    non_numeric_cols: List[str]
    final_df_path: Path

# Creating an instance with specific values
config = DataConfig(
    target_col="Cover_Type",
    non_numeric_cols=list(df.select_dtypes(include=['object']).columns),
    final_df_path= data_root_prepro / "covertype_preprocessed.csv"
)

La ejecución de la siguiente celda se omite mediante el comando `%%script false --no-raise-error`, ya que contiene la normalización de los datos, un proceso que ya se realizó previamente según lo indicado en el documento:  

> **"Recuerde que, primero, debe preparar las características de entrada y de destino:"**  

Sin embargo, más adelante en el documento se asume que los datos conservan sus valores originales, por lo que la normalización se aplica posteriormente utilizando las herramientas de TFX.

In [None]:
%%script false --no-raise-error
# Drop non-numeric columns
df_1 = df.drop(columns=config.non_numeric_cols)

# Separate features and label
X = df_1.drop(columns=[config.target_col])
y = df_1[config.target_col].astype('category')

# Scale data
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Convert back to DataFrame with original column names
X_scaled = pd.DataFrame(X_scaled, columns=X.columns)

# Implement f_classif as score function and select the 8 best columns
selector = SelectKBest(score_func=f_classif, k=8)
selector.fit(X, y)

# Create and print a df comparing the column and the result (if its retained or not)
selected_columns_df = pd.DataFrame({
    'Column': X_scaled.columns,
    'Retain': selector.get_support()
})
selected_columns_df

In [None]:
# Drop non-numeric columns
df_1 = df.drop(columns=config.non_numeric_cols)

# Separate features and label
X = df_1.drop(columns=[config.target_col])
y = df_1[config.target_col].astype('category')

# Implement f_classif as score function and select the 8 best columns
selector = SelectKBest(score_func=f_classif, k=8)
selector.fit(X, y)

# Select the best features using boolean mask
X_selected = X.loc[:, selector.get_support()]

# Create and print a df comparing the column and the result (if its retained or not)
selected_columns_df = pd.DataFrame({
    'Column': X.columns,
    'Retain': selector.get_support()
})
selected_columns_df

In [None]:
# Add the target column back
final_df = X_selected.copy()
final_df[config.target_col] = y.values

# Save the updated dataframe to CSV
final_df.to_csv(config.final_df_path, index=False)

<span style="color:red; font-weight:bold;">NOTA:</span> Se debe tener cargado el dataset (`final_df`) en memoria, pues posteriormente se debe hacer una división para algunas pruebas.


### **4.1** Configurar el contexto interactivo

In [None]:
context = InteractiveContext(pipeline_root=str(pipeline_root))

### **4.2** Generando ejemplos

In [None]:
# Instantiate ExampleGen with the input CSV dataset
example_gen = CsvExampleGen(input_base=str(data_root_prepro))

# Execute the component
context.run(example_gen)

print("CsvExampleGen ok")

### **4.3** Estadísticas

In [None]:
# get the artifact object
artifact = example_gen.outputs['examples'].get()[0]

# print split names and uri
print(f'split names: {artifact.split_names}')
print(f'artifact uri: {artifact.uri}')

In [None]:
# Instantiate StatisticsGen with the ExampleGen ingested dataset
statistics_gen = StatisticsGen(
    examples=example_gen.outputs['examples'])

# Execute the component
context.run(statistics_gen)

print('StatisticsGen OK')

In [None]:
# Show the output statistics
context.show(statistics_gen.outputs['statistics'])

Se puede observar que, efectivamente, como se menciona en el documento, la columna `cover_type` tiene resaltado en rojo el porcentaje de valores en 0. Esto no es preocupante, ya que es la *target* del problema y una variable categórica.  

Por otro lado, se puede mencionar que la *feature* con el coeficiente de variación más alto (`avg/std`) es `Vertical_Distance_To_Hydrology`.

### **4.4** Inferir Esquema

In [None]:
# Instantiate SchemaGen with the StatisticsGen ingested dataset
schema_gen = SchemaGen(
    statistics=statistics_gen.outputs['statistics'],
    )

# Run the component
context.run(schema_gen)

print('SchemaGen OK')

In [None]:
# Visualize the schema
context.show(schema_gen.outputs['schema'])

### **4.5** Curando Esquema

In [None]:
# Load schema as tensorflow_metadata.proto.v0.schema_pb2
schema_path = schema_gen.outputs['schema'].get()[0].uri + "/schema.pbtxt"
schema = tfdv.load_schema_text(schema_path)
type(schema)

In [None]:
# Set domains for Hillshade_9am, Hillshade_Noon, Slope, Cover_Type
tfdv.set_domain(schema, 'Hillshade_9am', schema_pb2.IntDomain(min=0, max=255))
tfdv.set_domain(schema, 'Hillshade_Noon', schema_pb2.IntDomain(min=0, max=255))
tfdv.set_domain(schema, 'Slope', schema_pb2.IntDomain(min=0, max=99))
domain_cover_type = schema_pb2.StringDomain(value=['0','1', '2', '3', '4', '5', '6'], is_categorical=True)
tfdv.set_domain(schema, 'Cover_Type', domain_cover_type)

Se observó que a pesar de agregar un dominio tipo String y especificar que es categorico, el tipo de la variable `Cover_Type` no cambió por lo que manualmente se asigna el tipo de dato correcto y que al mostrar el esquema, el tipo aparezca como `STRING`

In [None]:
# Manually assign Cover_Type feature type to FeatureType.BYTES
schema.feature[0].type = schema_pb2.FeatureType.BYTES

En los siguientes bloques se muestra que el esquema ha cambiado solo en memoria. Sin embargo, aún es necesario guardarlo en la metadata de SchemaGen para que los cambios sean persistentes y reconocidos por el pipeline.

In [None]:
# Display the updated schema with domains
tfdv.display_schema(schema)

In [None]:
# Display the old schema with no domains
context.show(schema_gen.outputs['schema'])

La siguiente celda garantiza que este quede guardado y ahora sí se podrán observar los cambios por medio del artefacto ya creado schema_gen

In [None]:
# Overwrite the file
tfdv.write_schema_text(schema, schema_path)

# Display updated schema with SchemaGen
context.show(schema_gen.outputs['schema'])

### **4.6** Entornos de esquema

In [None]:
def generate_serving_csv(df: pd.DataFrame,  path: Path, target: str = config.target_col) -> Path:
    train_df, test_df = train_test_split(df, test_size=0.3, random_state=42)
    test_df = test_df.drop(columns=[target])
    test_df.to_csv(path, index=False)
    return str(path)

serving_data = generate_serving_csv(final_df, data_root / "serving_data.csv")

In [None]:
options = tfdv.StatsOptions(schema=schema)
serving_stats = tfdv.generate_statistics_from_csv(serving_data, stats_options=options)
serving_anomalies = tfdv.validate_statistics(serving_stats, schema)

tfdv.display_anomalies(serving_anomalies)

In [None]:
# All features are by default in both TRAINING and SERVING environments.
schema.default_environment.append('training')
schema.default_environment.append('serving')

# Specify that 'tips' feature is not in SERVING environment.
tfdv.get_feature(schema, config.target_col).not_in_environment.append('serving')

serving_anomalies_with_env = tfdv.validate_statistics(
    serving_stats, schema, environment='serving')

tfdv.display_anomalies(serving_anomalies_with_env)

Se procede a guardar el archivo de schema ***tanto en la carpeta de SchemaGen para persitirlo en el pipeline y también se guarda en otro archivo local como se indica en el ejercicio***

In [None]:
# Overwrite the file in SchemaGen component files and save it in a local file
tfdv.write_schema_text(schema, schema_path)
tfdv.write_schema_text(schema, data_root.parent / "schema_entornos.pbtxt")

Como se especifica en el documento se procede a verificar que este schema sí tenga los cambios realizados.

> **Como verificación, debe mostrar el esquema que acaba de guardar y verificar que contiene los cambios introducidos**

La opción `display_schema` no brinda ninguna información respecto a los entornos, pero mostrando el archivo texto se debe observar los entornos como se presenta en la documentación:
```
default_environment: "TRAINING"
default_environment: "SERVING"
```

In [None]:
print(schema.default_environment)

In [None]:
print(schema)

### **4.7** Nuevas estadísticas usando el esquema actualizado

In [None]:
# Instantiate SchemaGen with the StatisticsGen ingested dataset
path_local_schema = data_root.parent / "schema_entornos.pbtxt"
schema_gen_2 = ImportSchemaGen(schema_file=str(path_local_schema))

# Run the component
context.run(schema_gen_2)

print('ImportSchemaGen OK')
context.show(schema_gen_2.outputs['schema'])

In [None]:
compute_eval_stats = StatisticsGen(
      examples=example_gen.outputs['examples'],
      schema=schema_gen_2.outputs['schema'],
    
      )

# Execute the component
context.run(compute_eval_stats)

print('New StatisticsGen OK')

In [None]:
# Show the output statistics
context.show(compute_eval_stats.outputs['statistics'])

Se observa un despliegue de estadísticas muy similar al obtenido con el esquema inferido. Esto puede deberse a que CsvExampleGen ya infiere y asigna tipos a las variables, por lo que sería necesario definir estas restricciones de tipo al momento de la lectura.

### **4.8** Comprobar anomalías

In [None]:
# Instantiate ExampleValidator with the StatisticsGen and ImportSchemaGen ingested data
example_validator = ExampleValidator(
    statistics=compute_eval_stats.outputs['statistics'],
    schema=schema_gen_2.outputs['schema'])

# Run the component.
context.run(example_validator)

print('ExampleValidator OK')

In [None]:
# Visualize the results
context.show(example_validator.outputs['anomalies'])

En este caso, era previsibile que resalte anomalías, ya que con las nuevas estadísticas no se logra observar el cambio en el tipo de la variable. Se intuye, sin certeza, que esto sucede porque CsvExampleGen ya infiere los tipos, y para que las estadísticas reflejen los nuevos tipos, se debe modificar la lectura.

### **4.9** Ingeniería de características

In [None]:
# Set the constants module filename
constants_module_file = 'constants.py'

In [None]:
%%writefile {constants_module_file}
# Numerical features that are marked as continuous
NUMERIC_FEATURE_KEYS = ['Elevation', 'Hillshade_9am', 'Hillshade_Noon', 'Horizontal_Distance_To_Fire_Points', 'Horizontal_Distance_To_Hydrology', 'Horizontal_Distance_To_Roadways', 'Vertical_Distance_To_Hydrology']

# Feature that can be grouped into buckets
BUCKET_FEATURE_KEYS = ['Slope']

# Number of buckets used by tf.transform for encoding each bucket feature.
FEATURE_BUCKET_COUNT = {'Slope': 4}

# Feature that the model will predict
LABEL_KEY = 'Cover_Type'

# Utility function for renaming the feature
def transformed_name(key):
    return key + '_xf'

In [None]:
# Set the transform module filename
transform_module_file = 'transform.py'

In [None]:
%%writefile {transform_module_file}
import tensorflow as tf
import tensorflow_transform as tft
import constants

# Desempaquetar los contenidos del módulo de constantes
NUMERIC_FEATURE_KEYS = constants.NUMERIC_FEATURE_KEYS
BUCKET_FEATURE_KEYS = constants.BUCKET_FEATURE_KEYS
FEATURE_BUCKET_COUNT = constants.FEATURE_BUCKET_COUNT
LABEL_KEY = constants.LABEL_KEY
transformed_name = constants.transformed_name


def preprocessing_fn(inputs):
    """Función de callback de tf.transform para preprocesar entradas.
    Args:
        inputs: diccionario de características sin transformar.
    Returns:
        Diccionario de características transformadas.
    """
    outputs = {}

    # Escalar características numéricas al rango [0,1]
    for key in NUMERIC_FEATURE_KEYS:
        outputs[transformed_name(key)] = tft.scale_to_0_1(inputs[key])
    
    # Crear agrupaciones por rangos en características categorizables
    for key in BUCKET_FEATURE_KEYS:
        outputs[transformed_name(key)] = tft.bucketize(inputs[key], FEATURE_BUCKET_COUNT[key])

    return outputs


In [None]:
# Ignore TF warning messages
tf.get_logger().setLevel('ERROR')

# Instantiate the Transform component
transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen_2.outputs['schema'],
    module_file=transform_module_file)

# Run the component
context.run(transform)

Ahora se procede a obtener ejemplos como se indica en las instrucciones del taller.

In [None]:
# Define a helper function to get individual examples
def get_records(dataset, num_records):
    '''Extracts records from the given dataset.
    Args:
        dataset (TFRecordDataset): dataset saved by ExampleGen
        num_records (int): number of records to preview
    '''
    
    # initialize an empty list
    records = []
    
    # Use the `take()` method to specify how many records to get
    for tfrecord in dataset.take(num_records):
        
        # Get the numpy property of the tensor
        serialized_example = tfrecord.numpy()
        
        # Initialize a `tf.train.Example()` to read the serialized data
        example = tf.train.Example()
        
        # Read the example data (output is a protocol buffer message)
        example.ParseFromString(serialized_example)
        
        # convert the protocol bufffer message to a Python dictionary
        example_dict = (MessageToDict(example))
        
        # append to the records list
        records.append(example_dict)
        
    return records

In [None]:
# Get the URI of the output artifact representing the transformed examples
train_uri = os.path.join(transform.outputs['transformed_examples'].get()[0].uri, 'Split-train')

# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
                      for name in os.listdir(train_uri)]

# Create a `TFRecordDataset` to read these files
transformed_dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")

Se puede observar al ejecutar la siguiente celda que:
- Hay nuevas columnas con "_xf" al final
- Los valores están 0 y 1
- Slope se encuentra entre los valores de 0 a 3, de las tres buckets que definimos

In [None]:
# Get 3 records from the dataset
sample_records_xf = get_records(transformed_dataset, 5)

# Print the output
pp.pprint(sample_records_xf)

### **5.1** Acceso a artefactos almacenados

Para crear un objeto `MetadataStore`, es necesario contar con una configuración de conexión. Existen diferentes formas de establecer esta configuración. Por ejemplo, se puede utilizar una base de datos en memoria mediante SQLite para experimentación rápida y ejecuciones locales:

```python
connection_config = metadata_store_pb2.ConnectionConfig()
connection_config.fake_database.SetInParent()  # Configura una base de datos ficticia en memoria.
store = metadata_store.MetadataStore(connection_config)
```

Otra opción es establecer la conexión a través de un archivo SQLite:
```python
connection_config = metadata_store_pb2.ConnectionConfig()
connection_config.sqlite.filename_uri = 'ruta_a_archivo.sqlite'
connection_config.sqlite.connection_mode = 3  # READWRITE_OPENCREATE
store = metadata_store.MetadataStore(connection_config)
```

En nuestro caso, dado que contamos con un archivo de metadatos en formato SQLite, podemos utilizar la segunda opción. Sin embargo, el contexto interactivo `InteractiveContext` que hemos creado ya incluye esta configuración de conexión, lo que nos permite inicializar `MetadataStore` sin necesidad de definirla manualmente. Esto se puede verificar utilizando la función `dir`, donde se observa que el contexto contiene un atributo `metadata_connection_config`.

In [None]:
# Show attributes of the instance context from InteractiveContext
dir(context)

In [None]:
store = metadata_store.MetadataStore(context.metadata_connection_config)
print(f"Hay un total de {len(store.get_artifacts())} artefactos")
print("Se inspecciona el artefacto 2 de la lista (posición 1 en la lista): ")
store.get_artifacts()[1] # Este método permite ver de manera más exhaustiva todos los artefactos

A continuación se despliegan solo los nombres de los tipos de artefactos en nuestro pipeline

In [None]:
for i in store.get_artifact_types():
    print(i.name)

Relación por medio de intuición:
- Examples -> Archivo dentro de CsvExampleGen
- ExampleStatistics -> Archivo dentro de StatisticsGen
- Schema -> Archivo dentro de  SchemaGen, ImportSchemaGen
- ExampleAnomalies -> Archivo dentro de ExampleValidator

Observación real:

Vemos que artefactos de tipo Example también hay en la carpeta de Transform

In [None]:
for i in store.get_artifacts_by_type('Examples'):
    print(i.uri)

A continuacións se exploran los artefactos de tipo Schema como se pide en el documento

In [None]:
for i in store.get_artifacts_by_type('Schema'):
    print(i.uri)

Obtener las propiedades de un artefacto en particular como se pide en el documento

In [None]:
for i in store.get_artifacts_by_type('Schema'):
    pp.pprint(i.custom_properties)

In [None]:
for i in store.get_artifacts_by_type('Schema'):
    pp.pprint(i.custom_properties['producer_component'])

Se exploran las propiedades de los artefactos de tipo `ExampleStatistics` y se confirma la existencia de propiedades relacionadas con las divisiones

In [None]:
for i in store.get_artifacts_by_type('ExampleStatistics'):
    pp.pprint(i.custom_properties)

### **5.2** Seguimiento de artefactos

Se asume que no se quiere una función que liste los artefactos definiendo un tipo sino instancias usadas en el código como `SchemaGen`, `CsvExampleGen`, `Transform`. Se explica este supuesto ya que `TransfromGraph` es un tipo de artefacto y ya exploramos la función que provee el `MetadataStore` para tener los artefactos por tipo

In [None]:
def get_artifacts_by_instance(store_obj: mlmd.metadata_store.metadata_store.MetadataStore, instance: str):
    return [i for i in store.get_artifacts() if i.custom_properties['producer_component'].string_value == instance]
        

In [None]:
get_artifacts_by_instance(store, 'SchemaGen')

In [None]:
get_artifacts_by_instance(store, 'Transform')

### **5.3** Obtener artefactos principales

Se exploran los artefactos de tipo schema para obtener el SchemaGen. Se puede observar que este tiene un id 3, y se podría obtener los eventos relacionados

In [None]:
for i in store.get_artifacts_by_type('Schema'):
    print(i.uri, i.id, sep=" - ")

In [None]:
store.get_events_by_artifact_ids([3])

Se puede observar que el `execution_id` es 3, y se pueden extraer los eventos relacionados a este id de ejecución

In [None]:
store.get_events_by_execution_ids([3])

Aquí vemos que el evento se relaciona precisamente con el artefacto de `Statistics`, donde se indica un tipo `INPUT`, lo cual debería ser intuitivo, ya que este artefacto de tipo `InferSchemaGen` requiere de estadísticas previas. Además, observamos que finaliza en un artefacto, el cual es el `OUTPUT`.

Si exploramos el segundo schema, se observan más eventos asociados, esto es coherente debido a que este segundo se uso también en el proceso de transformación por lo que estará asociado a estos procesos

In [None]:
store.get_events_by_artifact_ids([4])

Entre esos eventos debe estar el que se relaciona con la generación de estadísticas realizadas después de importar el esquema. Se identifica que este está relacionado con el ID de ejecución 5. Así que al explorar los eventos, vemos que se usa el artefacto de `Example` y `ImportSchema` como entradas y da salida un artefacto de `Statistics`

In [None]:
for i in store.get_events_by_execution_ids([5]):
    print(f"ID: {i.artifact_id} - Key: {i.path.steps[0]}")

Se explora el `CsvExampleGen` mencionado en el documento

In [None]:
store.get_events_by_artifact_ids([1])

Se observa que se usa en 3 otros procesos como `INPUT`. Posiblemente para la primera y segunda generación de estadísticas, y en el proceso de transformación.

In [None]:
for i in store.get_events_by_execution_ids([2]):
    print(f"ID: {i.artifact_id} - Key: {i.path.steps[0]}")

In [None]:
for i in store.get_events_by_execution_ids([5]):
    print(f"ID: {i.artifact_id} - Key: {i.path.steps[0]}")

In [None]:
for i in store.get_events_by_execution_ids([7]):
    print(f"ID: {i.artifact_id} - Key: {i.path.steps[0]}")

Se confirma la hipótesis