# Algoritmo de BackPropagation Region del Plano



### Código en Python del Algoritmo de BackPropagation

Código en Python que grafica como van evolucionando las rectas que separan a los positivos de los negativos

In [None]:
# conexion al Google Drive
from google.colab import drive
drive.mount('/content/.drive')
!mkdir -p "/content/.drive/My Drive/DMA"
!mkdir -p "/content/buckets"
!ln -s "/content/.drive/My Drive/DMA" /content/buckets/b1

In [None]:
import numpy as np
import math
import matplotlib.pyplot as plt
%matplotlib inline
from IPython import display
import time
from sklearn.preprocessing import StandardScaler

In [None]:
# definicion de las funciones de activacion
#  y sus derivadas
#  ahora agregando las versiones VECTORIZADAS

def func_eval(fname, x):
    match fname:
        case "purelin":
            y = x
        case "logsig":
            y = 1.0 / ( 1.0 + math.exp(-x) )
        case "tansig":
            y = 2.0 / ( 1.0 + math.exp(-2.0*x) ) - 1.0
    return y

# version vectorizada de func_eval
func_eval_vec = np.vectorize(func_eval)


def deriv_eval(fname, y):  #atencion que y es la entrada y=f( x )
    match fname:
        case "purelin":
            d = 1.0
        case "logsig":
            d = y*(1.0-y)
        case "tansig":
            d = 1.0 - y*y
    return d


# version vectorizada de deriv_eval
deriv_eval_vec = np.vectorize(deriv_eval)

In [None]:
# definicion de la clase de graficos, NO hay x0 = 1

class perceptron_plot:
    def __init__(self, X, Y, delay) -> None:
        self.X = X
        self.Y = Y
        self.delay = delay
        x1_min = np.min(X[:,0])
        x2_min = np.min(X[:,1])
        x1_max = np.max(X[:,0])
        x2_max = np.max(X[:,1])
        self.x1_min = x1_min - 0.5*(x1_max - x1_min)
        self.x1_max = x1_max + 0.5*(x1_max - x1_min)
        self.x2_min = x2_min - 0.5*(x2_max - x2_min)
        self.x2_max = x2_max + 0.5*(x2_max - x2_min)
        self.fig = plt.figure(figsize = (10,8))
        self.ax = self.fig.subplots()
        self.ax.set_xlim(self.x1_min, self.x1_max, auto=False)
        self.ax.set_ylim(self.x2_min, self.x2_max, auto=False)

    def graficar(self, W, x0, epoch, fila) -> None:
        display.clear_output(wait =True)
        plt.cla()
        #self.ax = self.fig.subplots()

        self.ax.set_xlim(self.x1_min, self.x1_max)
        self.ax.set_ylim(self.x2_min, self.x2_max)
        plt.title( 'epoch ' + str(epoch) + '  reg ' + str(fila) )
        # ploteo puntos positivos
        self.ax.plot(self.X[self.Y==1,0], self.X[self.Y==1,1], 'o', color="green", markersize=10)
        # ploteo puntos negativos
        self.ax.plot(self.X[self.Y==0,0], self.X[self.Y==0,1], 'o', color="blue", markersize=10)

        # Sobreploteo el punto que no coincidio
        if(fila>=0):
            self.ax.plot(self.X[fila,0], self.X[fila,1], 'o',
                         color= ('green' if self.Y[fila]==1 else 'blue'),
                         markersize= 12, markeredgecolor= 'red')

        #dibujo le recta
        vx2_min = -(W[0]*self.x1_min + x0)/W[1]
        vx2_max = -(W[0]*self.x1_max + x0)/W[1]

        self.ax.plot([self.x1_min, self.x1_max],
                     [vx2_min, vx2_max],
                     linewidth = 2,
                     color = 'red',
                     alpha = 0.5)

        display.display(plt.gcf())
        #plt.cla()
        time.sleep(self.delay)


    def graficarVarias(self, W, x0, epoch, fila) -> None:
        display.clear_output(wait =True)
        plt.cla()
        #self.ax = self.fig.subplots()

        self.ax.set_xlim(self.x1_min, self.x1_max)
        self.ax.set_ylim(self.x2_min, self.x2_max)
        plt.title( 'epoch ' + str(epoch) + '  reg ' + str(fila))
        # ploteo puntos positivos
        self.ax.plot(self.X[self.Y==1,0], self.X[self.Y==1,1], 'o', color="green", markersize=10)
        # ploteo puntos negativos
        self.ax.plot(self.X[self.Y==-1,0], self.X[self.Y==-1,1], 'o', color="blue", markersize=10)
        self.ax.plot(self.X[self.Y==0,0], self.X[self.Y==0,1], 'o', color="blue", markersize=10)

        # Sobreploteo el punto que no coincidio
        if(fila>=0):
            self.ax.plot(self.X[fila,0], self.X[fila,1], 'o',
                         color= ('green' if self.Y[fila]==1 else 'blue'),
                         markersize= 12, markeredgecolor= 'red')

        # dibujo las rectas
        for i in range(len(x0)):
            #vx2_min = -(W[0,i]*self.x1_min + x0[i])/W[1,i]
            #vx2_max = -(W[0,i]*self.x1_max + x0[i])/W[1,i]
            vx2_min = -(W[i,0]*self.x1_min + x0[i])/W[i,1]
            vx2_max = -(W[i,0]*self.x1_max + x0[i])/W[i,1]

            self.ax.plot([self.x1_min, self.x1_max],
                         [vx2_min, vx2_max],
                         linewidth = 2,
                         color = 'red',
                         alpha = 0.5)

        display.display(plt.gcf())
        #plt.cla()
        time.sleep(self.delay)


In [None]:
# Los datos que debe modelar la  Deep Neural Network
# Leo de la web el dataset  de  "cero"  o el de  "ocho"

import polars as pl

df = pl.read_csv('https://storage.googleapis.com/open-courses/austral2025-af91/cero.txt', separator='\t')

registros = df.select(["x1","x2"])
clases =df.select("y")

# Paso las listas a numpy
X = np.array(registros)
Y = np.array(clases).reshape(len(clases),1) # vector columna

# con estos puntos NO hace falta estandarizar

In [None]:
# Arquitectura de la red
# Tamano datos

X_row = X.shape[0]
X_col = X.shape[1]

Y_col = Y.shape[1]

In [None]:
# Defino manualmente la arquitectura de la red
#  input es X_col e  Y_col

# DOS hidden layers
# el primer huddeb layer tiene 8 perceptrones

arquitectura = {
   "input_size" : X_col,
   "layers_qty" : 3, # incluye la capa de salida, pero no la de entrada
   "layers_size" : [8, 2, Y_col],
   "layers_func" : ['logsig','logsig','logsig'],
}


In [None]:
# seteo de la semilla aleatoria
np.random.seed(102191) # mi querida random seed para que las corridas sean reproducibles

In [None]:
# Inicializo la red con pesos al azar
#   a partir de la arquitectura

red = {
    'arq' : arquitectura,
    'layer' : list(),
}

niveles = red["arq"]["layers_qty"]

for i in range(niveles):
  nivel = dict()
  nivel["id"] = i
  nivel["last"] = (i==(niveles-1))
  nivel["size"] = red["arq"]["layers_size"][i]
  nivel["func"] = red["arq"]["layers_func"][i]

  if( i==0 ):
    entrada_size = red["arq"]["input_size"]
  else:
    entrada_size =  red["arq"]["layers_size"][i-1]

  salida_size =  nivel["size"]
  nivel["W"] = np.random.uniform(-0.5, 0.5, [salida_size, entrada_size])
  nivel["w0"] = np.random.uniform(-0.5, 0.5, [salida_size, 1])

  red["layer"].append(nivel)


print( red )

In [None]:
# controles del entrenamiento

# Limite de lo que estoy dispuesto a trabajar
epoch_limit = 10000    # para terminar si no converge

# cuando la mejora del error sea inferior a este valor, me detendré
error_delta_umbral = 1.0e-06

# controla la velocidad de convergencia
learning_rate = 0.2

# controla cada cuanto se grafica un epoch
demora_impresion = 0.1  # segundos

In [None]:
# inicializaciones del bucle principal del backpropagation

epoch = 0
error_epoch = float('inf')
error_ant =  0.0

In [None]:
# el bucle principal del algoritmo BackPropagation

grafico = perceptron_plot(X=X, Y=Y.T[0], delay=demora_impresion)

# continuo mientras en la iteración anterior modifique algo  y NO llegué al límite de epochs
while (((math.fabs(error_epoch - error_ant) > error_delta_umbral) or( error_epoch>0.01) ) and (epoch < epoch_limit) ):
    epoch += 1
    error_suma = 0.0
    error_ant = error_epoch

    # recorro siempre TODOS los registros de entrada
    for fila in range(X_row):
        # fila es el registro actual
        x = X[fila:fila+1,:] # ej  array([[-1, -1]])
        clase = Y[fila]

        # propagar el x hacia adelante, FORWARD
        entrada = x.T  # la entrada a la red
        niveles = red["arq"]["layers_qty"]

        # etapa forward
        # recorro hacia adelante, nivel a nivel
        vsalida =  [0] *(niveles) # salida de cada nivel de la red

        for i in range(niveles):
          estimulos = red["layer"][i]["W"] @ entrada + red["layer"][i]["w0"]
          vsalida[i] =  func_eval_vec(red["layer"][i]["func"], estimulos)
          entrada = vsalida[i]  # para la proxima vuelta


        # etapa backward
        # calculo los errores en la capa hidden y la capa output
        verror =  [0] *(niveles+1) # inicializo dummy
        verror[niveles] = clase.T - vsalida[niveles-1]

        i = niveles-1
        verror[i] = verror[i+1] * deriv_eval_vec(red["layer"][i]["func"], vsalida[i])

        for i in reversed(range(niveles-1)):
           verror[i] = deriv_eval_vec(red["layer"][i]["func"], vsalida[i])*(red["layer"][i+1]["W"].T @ verror[i+1])

        # ya tengo los errores que comete cada capa
        # corregir matrices de pesos, voy hacia atras
        # backpropagation
        entrada = x.T
        for i in range(niveles):
          red["layer"][i]["W"]  =  red["layer"][i]["W"] + learning_rate *(verror[i] @ entrada.T)
          red["layer"][i]["w0"] =  red["layer"][i]["w0"] + learning_rate * verror[i]
          entrada = vsalida[i]  # para la proxima vuelta



    # ya recalcule las matrices de pesos
    # ahora avanzo la red, feed-forward
    # para calcular el red(X) = Y
    entrada = X.T
    for i in range(niveles):
        estimulos = red["layer"][i]["W"] @ entrada + red["layer"][i]["w0"]
        salida =  func_eval_vec(red["layer"][i]["func"], estimulos)
        entrada = salida  # para la proxima vuelta

    output_salidas = salida  # salida tiene la salida del ultimo layer

    # calculo el error promedio general de TODOS los X
    error_epoch= np.mean( (Y.T - output_salidas)**2 )

    # Grafico las rectas de la Primer Hidden Layer
    # tengo que hacer w0.T[0]  para que pase el vector limpio
    W = red["layer"][0]["W"]
    w0 = red["layer"][0]["w0"]
    if( epoch % 100 == 0 ) : grafico.graficarVarias(W, w0.T[0], epoch, -1)


In [None]:
# el error
print("error_epoch= ", error_epoch)
print("error_ant = ", error_ant)
print("delta = ", math.fabs(error_epoch - error_ant))

In [None]:
# la cantidad de epochs necesarias hasta encontrar una solucion
print("Para converger hicieron falta epochs=",epoch)

In [None]:
# imprimo los niveles de la red
for i in range(red["arq"]["layers_qty"]):
  print( red["layer"][i])

In [None]:
# calculo la salida de la red
#  comprouebo que NO son valores 0 o 1
#  lo que implica que deberé decidir mediante un umbral

niveles = red["arq"]["layers_qty"]

print( "fila\tclase\testimulo\ty=f(estimulo)")
for fila in range(X_row):
    # fila es el registro actual
    x = X[fila:fila+1,:]
    clase = Y[fila]
    entrada = x.T  # la entrada a la red

    # etapa forward
    # recorro hacia adelante, nivel a nivel
    for i in range(niveles):
       estimulos = red["layer"][i]["W"] @ entrada + red["layer"][i]["w0"]
       salida =  func_eval_vec(red["layer"][i]["func"], estimulos)
       entrada = salida  # para la proxima vuelta

    # funcion de activacion
    print( fila, "\t", clase, "\t", estimulos, "\t", salida)