## Preparación

Declarar constantes:

In [None]:
project = !gcloud config get-value project
PROJECT_ID = project[0] # Nombre del proyecto

REGION = 'us-central1' # Ubicación servidores a usar
EXPERIMENT = '05'
SERIES = '05'

# Parámetros BigQuery
BQ_PROJECT = PROJECT_ID # Proyecto de BigQuery
BQ_DATASET = 'fraud' # Nombre del dataset dentro de BigQuery
BQ_TABLE = 'fraud_prepped' # Nombre de tabla dentro del dataset

# Recuersos
DEPLOY_COMPUTE = 'n1-standard-4' # Tipo de hardware
DEPLOY_IMAGE='us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-7:latest' # Imagen de Docker a usar

# Parámetros para entrenamiento de modelo
VAR_TARGET = 'Class' # Nombre (field) de las etiquetas en la tabla
VAR_OMIT = 'transaction_id' # Variables a omitir
EPOCHS = 4 # Épocas de entrenamiento
BATCH_SIZE = 100 # Batch size de entrenamiento

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S") # Timestamp para usar como identificador
BUCKET = PROJECT_ID # Bucket en Google Storage
URI = f"gs://{BUCKET}/{SERIES}/{EXPERIMENT}" # URI en Google Storage
DIR = f"temp/{EXPERIMENT}" # Path de carpeta temporal auxiliar

In [None]:
#!pip install tensorflow==2.10.0 tensorflow-io==0.27.0

Importaciones:

In [None]:
from google.cloud import bigquery

from tensorflow.python.framework import dtypes
from tensorflow_io.bigquery import BigQueryClient
import tensorflow as tf

from google.cloud import aiplatform
from datetime import datetime
import os

from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value
import json
import numpy as np
import pandas as pd
from sklearn import metrics as metrics

Declarar clientes de BigQuery y de Google Storage:

In [None]:
aiplatform.init(project = PROJECT_ID, location = REGION)
bq = bigquery.Client(project = PROJECT_ID)

Crear carpeta temporal auxiliar:

In [None]:
!rm -rf {DIR}
!mkdir -p {DIR}

## Datos de entrenamiento

Se usan los datos de transacciones bancarias importados en tabla en BigQuery.

### Esquema de la tabla

Recuperar información de las columnas de la tabla de datos.   
En BigQuery se puede usar `INFORMATION_SCHEMA` para obtener información sobre las columnas de una tabla, como el nombre y el tipo de dato que contiene.

In [None]:
query = f"SELECT * FROM {BQ_PROJECT}.{BQ_DATASET}.INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{BQ_TABLE}'"
schema = bq.query(query).to_dataframe()
schema

### Número de clases
Obtener el número de clases distintas en el dataset programáticamente.    
Sabemos que son dos (fraudulenta o normal), pero código puede ser útil en otros dataset.

In [None]:
nclasses = bq.query(query = f'SELECT DISTINCT {VAR_TARGET} FROM {BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE} WHERE {VAR_TARGET} is not null').to_dataframe()
nclasses

### Preparación de columnas para TensorFlow I/O

Usar esquema de la tabla para preparar inputs para TensorFlow I/O:
- Crear lista con nombre de columnas a usar (omitir las innecesarias)
- Definir el tipo de datos de cada columna a partir de los *data type* entregados en el esquema.

In [None]:
OMIT = VAR_OMIT.split() + ['splits'] # Lista de columnas a omitir

selected_fields = schema[~schema.column_name.isin(OMIT)].column_name.tolist() # Lista de columnas a leer

# Lista con data type de las columnas a leer
output_types = [dtypes.float64 if x=='FLOAT64' else dtypes.int64 for x in schema[~schema.column_name.isin(OMIT)].data_type.tolist()]

## Leer data de tabla de BigQuery con TensorFlow I/O 

### Separar inputs en features y etiqueta

Crear función que separa datos entre features y etiqueta.   
Codifica en One-Hot las etiquetas.

In [None]:
def transTable(row_dict):
    target = row_dict.pop(VAR_TARGET)
    target = tf.one_hot(tf.cast(target,tf.int64), nclasses)
    target = tf.cast(target, tf.float32)
    return(row_dict, target)

### Función para leer batches con Tensorflow I/O

Función que lee batches de datos desde la tabla de BigQuery usando Tensorflow I/O.   
Se configura para leer data en paralelo para acelerar el entrenamiento.

In [None]:
def bq_reader(split):
    reader = BigQueryClient()

    training = reader.read_session(
        parent = f"projects/{PROJECT_ID}",
        project_id = BQ_PROJECT,
        table_id = BQ_TABLE,
        dataset_id = BQ_DATASET,
        selected_fields = selected_fields,
        output_types = output_types,
        row_restriction = f"splits='{split}'",
        requested_streams = 3
    )
    
    return training.parallel_read_rows(sloppy = True, num_parallel_calls = tf.data.experimental.AUTOTUNE)

Usar función para conjuntos de entrenamiento, validación y prueba.

In [None]:
train = bq_reader('TRAIN').prefetch(1).map(transTable).shuffle(BATCH_SIZE*10).batch(BATCH_SIZE)
validate = bq_reader('VALIDATE').prefetch(1).map(transTable).batch(BATCH_SIZE)
test = bq_reader('TEST').prefetch(1).map(transTable).batch(BATCH_SIZE)

### Revisar un batch de entrenamiento

In [None]:
for features, target in train.take(1):
    print('features:\n',list(features.keys()))
    print('\netiqueta:\n',target[0:10])

## Definir modelo

Se crea modelo de regresión logística:

- Se definen los inputs del modelo.
- Se usa Batch Normalization para normalizar datos.
- Se construye modelo de regresión logistica con función de activación *softmax*.
- Se compila el modelo.

In [None]:
# Modelo de regresion logística

# Definición de input de modelo
feature_columns = {header: tf.feature_column.numeric_column(header) for header in selected_fields if header != VAR_TARGET}
feature_layer_inputs = {header: tf.keras.layers.Input(shape = (1,), name = header) for header in selected_fields if header != VAR_TARGET}

# Concatenar columnas individuales de features en capa única
feature_layer_outputs = tf.keras.layers.DenseFeatures(feature_columns.values(), name = 'feature_layer')(feature_layer_inputs)

# Paso de Batch normalization
normalized = tf.keras.layers.BatchNormalization(name = 'batch_normalization_layer')(feature_layer_outputs)

# Capa fully connected con activación softmax
logistic = tf.keras.layers.Dense(nclasses, activation = tf.nn.softmax, name = 'logistic')(normalized)

# Construcción de modelo
model = tf.keras.Model(
    inputs = feature_layer_inputs,
    outputs = logistic,
    name = EXPERIMENT
)

# Compilar modelo
model.compile(
    optimizer = tf.keras.optimizers.SGD(), #SGD or Adam
    loss = tf.keras.losses.CategoricalCrossentropy(),
    metrics = ['accuracy', tf.keras.metrics.AUC(curve = 'PR', name = 'auprc')]
)

In [None]:
model.summary()

## Entrenamiento del modelo

- Entrenar modelo con método *fit*.
- Notar que se entrena en maquina local.

In [None]:
# Preparar logs de TensorBoard
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir = os.path.join(DIR, "logs", f'{TIMESTAMP}'), histogram_freq=1)
# Entrenar modelo
history = model.fit(train, epochs = EPOCHS, callbacks = [tensorboard_callback], validation_data = validate)

## Evaluar el modelo

Evaluar poder de generalización del modelo usando los datos de prueba.

In [None]:
loss, accuracy, auprc = model.evaluate(test)

In [None]:
predictions = model.predict(test)

actuals = np.empty(shape = [0, predictions.shape[1]])
for features, target in test.take(-1): # -1 indicates all batches
    actuals = np.append(actuals, target.numpy(), axis = 0)

predictions_proba = np.max(predictions, axis = 1)
predictions = np.argmax(predictions, axis = 1)
actuals = np.argmax(actuals, axis = 1)

### Calcular métricas:

In [None]:
metrics.log_loss(actuals, predictions)

In [None]:
metrics.accuracy_score(actuals, predictions)

In [None]:
metrics.average_precision_score(actuals, predictions)

## Visualizar curvas de entrenamiento y validación (Tensorboard)

In [None]:
%load_ext tensorboard

In [None]:
%tensorboard --logdir $DIR/logs

## Guardar modelo

In [None]:
model.save(f'{URI}/models/{TIMESTAMP}/model')

## Implementación de modelo (API)

### Registrar modelo en Vertex AI

Verificar que modelo no haya sido agregado anteriormente.   
Agregar modelo a Vertex AI.

In [None]:
modelmatch = aiplatform.Model.list(filter = f'display_name={SERIES}_{EXPERIMENT} AND labels.series={SERIES} AND labels.experiment={EXPERIMENT}')

upload_model = True
if modelmatch:
    print("Modelo ya existe")
    upload_model = False
else:
    print('Agregando modelo al registro')
    parent_model = ''

if upload_model:
    model = aiplatform.Model.upload(
        display_name = f'{SERIES}_{EXPERIMENT}',
        model_id = f'model_{SERIES}_{EXPERIMENT}',
        parent_model =  parent_model,
        serving_container_image_uri = DEPLOY_IMAGE,
        artifact_uri = f"{URI}/models/{TIMESTAMP}/model",
        is_default_version = True,
        version_aliases = [RUN_NAME],
        version_description = RUN_NAME,
        labels = {'series' : f'{SERIES}', 'experiment' : f'{EXPERIMENT}', 'experiment_name' : f'{EXPERIMENT_NAME}', 'run_name' : f'{RUN_NAME}'}        
    )

### Crear endpoint para hacer peticiones de predicciones

Verificar si *endpoint* ya existe.   
Crear *endpoint* (vacio por el momento).

In [None]:
endpoints = aiplatform.Endpoint.list(filter = f"labels.series={SERIES}")
if endpoints:
    endpoint = endpoints[0]
    print(f"Endpoint Exists: {endpoints[0].resource_name}")
else:
    endpoint = aiplatform.Endpoint.create(
        display_name = f"{SERIES}",
        labels = {'series' : f"{SERIES}"}    
    )
    print(f"Endpoint Created: {endpoint.resource_name}")
    
print(f'Review the Endpoint in the Console:\nhttps://console.cloud.google.com/vertex-ai/locations/{REGION}/endpoints/{endpoint.name}?project={PROJECT_ID}')

In [None]:
endpoint.display_name

In [None]:
deployed_models = endpoint.list_models()

### Desplegar modelo a endpoint
Agregar modelo en registro de Vertex AI al *endpoint* creado.

In [None]:
endpoint.deploy(
    model = model,
    deployed_model_display_name = model.display_name,
    traffic_percentage = 100,
    machine_type = DEPLOY_COMPUTE,
    min_replica_count = 1,
    max_replica_count = 1
)

## Predicciones en línea

### Extraer datos para hacer predicciones

Se extraen 10 datos de *test*.

In [None]:
n = 10
pred = bq.query(
    query = f"""
        SELECT * EXCEPT({VAR_TARGET}, {VAR_OMIT}, splits)
        FROM {BQ_PROJECT}.{BQ_DATASET}.{BQ_TABLE}
        WHERE splits='TEST'
        LIMIT {n}
        """
).to_dataframe()

In [None]:
newobs = pred.to_dict(orient = 'records')

### Obtener predicciones usando un cliente en Python

In [None]:
prediction = endpoint.predict(instances = newobs[0:1])
prediction

In [None]:
prediction = endpoint.predict(instances = newobs)
prediction

In [None]:
prediction.predictions[0]

In [None]:
np.argmax(prediction.predictions[0])

### Obtener predicciones con petición REST

In [None]:
with open(f'{DIR}/request.json','w') as file:
    file.write(json.dumps({"instances": newobs[0:1]}))

In [None]:
!curl -X POST \
-H "Authorization: Bearer "$(gcloud auth application-default print-access-token) \
-H "Content-Type: application/json; charset=utf-8" \
-d @{DIR}/request.json \
https://{REGION}-aiplatform.googleapis.com/v1/{endpoint.resource_name}:predict

### Obtener predicciones con gcloud (CLI)

In [None]:
!gcloud beta ai endpoints predict {endpoint.name.rsplit('/',1)[-1]} --region={REGION} --json-request={DIR}/request.json