<a href="https://colab.research.google.com/github/vmatiasw/modelos_y_simulacion/blob/main/P4E8.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [96]:
import pandas as pd
from collections import Counter
import random
import time
import numpy as np
import math

In [97]:
def medir_tiempo(func):
    inicio = time.perf_counter()
    resultado = func()
    duracion = time.perf_counter() - inicio
    return resultado, duracion

def calcular_TVD(dict_d1, dict_d2):
    """Distancia Total de Variación (TVD) entre dos distribuciones."""
    soporte_total = set(dict_d1) | set(dict_d2)
    return 0.5 * sum(abs(dict_d1.get(k, 0) - dict_d2.get(k, 0)) for k in soporte_total)

def calcular_momentos(dict_fpm):
    """Calcula esperanza, varianza y desviación estándar de una distribución discreta."""
    esperanza = sum(x * p for x, p in dict_fpm.items())
    varianza = sum((x - esperanza)**2 * p for x, p in dict_fpm.items())
    return esperanza, varianza, varianza**0.5

# --------------------------- Pandas ---------------------------

def generar_df_distribucion(dict_fpm):
    """Genera un DataFrame a partir de un diccionario FPM cualquiera."""
    soporte = list(dict_fpm.keys())
    data = [{
        "Datos": f"P({x})",
        "Valor": dict_fpm.get(x, 0)
    } for x in soporte]

    esp, var, desv = calcular_momentos(dict_fpm)

    resumen = [
        {"Datos": "Esperanza", "Valor": esp},
        {"Datos": "Varianza", "Valor": var},
        {"Datos": "Desv. Est.", "Valor": desv},
    ]

    return pd.concat([pd.DataFrame(data), pd.DataFrame(resumen)], ignore_index=True)

def comparar_df_distribuciones(dict_fpm_A, dict_fpm_B):
    df_A = generar_df_distribucion(dict_fpm_A)
    df_B = generar_df_distribucion(dict_fpm_B)

    df_comparacion = pd.DataFrame({
        "Datos": df_B["Datos"],
        "A": df_A["Valor"],
        "B": df_B["Valor"],
    })
    df_comparacion["Diferencia"] = (df_comparacion["A"] - df_comparacion["B"]).abs()

    return df_comparacion

# --------------------------- Calificadores / Analizadores ---------------------------
def calificar_generador_de_muestras(generador, dict_fpm_real, n=100_000):
  soporte = list(dict_fpm_real.keys())

  muestras, tiempos = zip(*(medir_tiempo(generador) for _ in range(n)))
  frec = Counter(muestras)
  dict_fpm_estimada = {x: frec.get(x, 0) / n for x in soporte}

  df_comparacion = comparar_df_distribuciones(dict_fpm_estimada, dict_fpm_real)
  tvd = calcular_TVD(dict_fpm_estimada, dict_fpm_real)
  t_prom = np.mean(tiempos)

  print("Estimada (A) vs Real (B)")
  print(df_comparacion.to_string(index=False))
  print()
  print(f"Distancia total de variación: {tvd:.6f}")
  print(f"Tiempo promedio de ejecución: {t_prom} segundos")

def analizar_generador_de_muestras(generador, n=100_000):
  muestras, tiempos = zip(*(medir_tiempo(generador) for _ in range(n)))

  frec = Counter(muestras)
  soporte = sorted(list(frec.keys()))
  dict_fpm_estimada = {x: frec.get(x, 0) / n for x in soporte}
  df_fpm_estimada = generar_df_distribucion(dict_fpm_estimada)

  t_prom = np.mean(tiempos)

  print("FPM estimada:")
  print(df_fpm_estimada.to_string(index=False))
  print()
  print(f"Tiempo promedio de ejecución: {t_prom} segundos")

# --------------------------- Utilidades ---------------------------

def ordenar_dict_fpm_por_probabilidad(dict_fpm):
  return dict(sorted(dict_fpm.items(), key=lambda x: x[1], reverse=True))

def calcular_dict_fda(dict_fpm):
  """
  Recibe una funcion de probabilidad de masa en forma de diccionario de pares
  (valor, probabilidad) y devuelve su funcion de distribucion acumulada.
  Solo definidas para el soporte.
  """
  dict_fda = {}
  acc = 0
  for v, p in dict_fpm.items():
    acc += p
    dict_fda[v] = acc
  return dict_fda

# def crear_dict_fpm(soporte, fpm):
#   return {v: fpm(v) for v in soporte}

# dict_fpm = dict(gen_fpm)

def probabilidad(condicion, n):
    exitos = sum(condicion() for _ in range(n))
    return exitos / n

# def TI(fpm): # soporte subarreglo de [0, 1, ..., inf]
#   u = random.random()
#   i = 0; i_fda = fpm(0)
#   while u >= i_fda:
#     i += 1; i_fda += fpm(i)
#   return i

def TI(dict_fda):
  u = random.random()
  for v, p in dict_fda.items():
    if u < p:
      return v

def AyR(dict_fpm_objetivo, fpm_propuesta, dict_fpm_propuesta, C):
    while True:
        muestra = fpm_propuesta()
        prob_objetivo = dict_fpm_objetivo.get(muestra, 0)
        prob_propuesta = dict_fpm_propuesta.get(muestra, 0)

        if prob_propuesta == 0:
            continue

        if random.random() < prob_objetivo / (C * prob_propuesta):
            return muestra

In [98]:
# a) Desarrolle el método de la Transformada Inversa y el de Rechazo para generar
# una variable aleatoria X cuya distribución de probabilidad está dada por: ...

def gen_poisson(lambd):
  p_anterior = math.exp(-lambd)  # P(X=0)
  acc = p_anterior
  yield 0, p_anterior
  i = 1
  while acc < 1 - 1e-10:  # Debido a los redondeos de flotantes
    p_actual = (lambd / i) * p_anterior  # P(X=i) = (lambda/i) * P(X=i-1)
    yield i, p_actual
    acc += p_actual
    p_anterior = p_actual
    i += 1

def gen_fpm_X(lambd, k):
  gen = gen_poisson(lambd)
  fda_k = sum(next(gen)[1] for _ in range(k+1))

  gen = gen_poisson(lambd)
  for _ in range(k+1):
    i, p = next(gen)
    yield i, p/fda_k

In [99]:
# b) Estime P(X > 2) con k = 10 y λ = 0,7, y 1000 repeticiones. Compare con el valor exacto.
LAMBDA = 0.7
K = 10
N = 1_000
dict_fpm_objetivo = dict(gen_fpm_X(LAMBDA, K))

# TI
dict_fda_objetivo = calcular_dict_fda(dict_fpm_objetivo)
generador_de_muestras_TI = lambda: TI(dict_fda_objetivo)

# AyC
soporte = list(dict_fpm_objetivo.keys())
qy = 1/len(soporte)
dict_fpm_propuesta = dict.fromkeys(soporte, qy)
C = max(dict_fpm_objetivo.values())/qy

def fpm_propuesta():
  return soporte[int(random.random() * len(soporte))]

generador_de_muestras_AyR = lambda: AyR(dict_fpm_objetivo, fpm_propuesta, dict_fpm_propuesta, C)


print(probabilidad(lambda: generador_de_muestras_TI() > 2, N))
print(probabilidad(lambda: generador_de_muestras_AyR() > 2, N))

print(1 - dict_fda_objetivo[2]) # mejor aproximacion

0.036
0.035
0.03414158387347266


In [104]:
# c) Generalice el problema escribiendo un pseudocódigo para el metodo de rechazo
# para cualquier variable aleatoria truncada usando como soporte a la variable
# original (con “cualquier variable aleatoria truncada” nos referimos a una
# variable como la vista en el inciso (a) pero ahora truncada en cualquier
# parte i = (a,...,b).

def gen_fpm_truncada(get_gen_fpm, condicion):
  gen = get_gen_fpm()
  fda_k = sum(p for i, p in gen if condicion(i))

  gen = get_gen_fpm()
  for i, p in gen:
    if not condicion(i):
      continue

    yield i, p/fda_k

# print(dict(gen_fpm_truncada(lambda: gen_poisson(LAMBDA) , lambda i: 0 <= i and i <= K)))
# print(dict(gen_fpm_X(LAMBDA, K)))

dict(gen_fpm_truncada(lambda: gen_poisson(LAMBDA) , lambda i: 3 <= i and i <= 8))

{0: 0.4965853039210938, 1: 0.34760971274476565, 2: 0.12166339946066797, 3: 0.028388126540822522, 4: 0.0049679221446439405, 5: 0.0006955091002501517, 6: 8.114272836251768e-05, 7: 8.114272836251768e-06, 8: 7.099988731720297e-07, 9: 5.522213458004675e-08, 10: 3.865549420603272e-09}
{0: 0.4965853039210938, 1: 0.34760971274476565, 2: 0.12166339946066797, 3: 0.028388126540822522, 4: 0.0049679221446439405, 5: 0.0006955091002501517, 6: 8.114272836251768e-05, 7: 8.114272836251768e-06, 8: 7.099988731720297e-07, 9: 5.522213458004675e-08, 10: 3.865549420603272e-09}


{3: 0.8314838519643127,
 4: 0.1455096740937547,
 5: 0.020371354373125657,
 6: 0.002376658010197993,
 7: 0.0002376658010197993,
 8: 2.0795757589232436e-05}