# Enunciado
Hola ingeniero 👋. Bienvenido a tu tercer desafio! Ya sabes que en [Vault-Tec Corporation](https://fallout.fandom.com/es/wiki/Vault-Tec_Corporation) tenemos sensores encargados de monitorear las condiciones actuales de nuestros refugios. En ocasiones pasadas detectamos que varios de ellos presentaban fallas recurrentes, impidiendo que pudieramos determinar las condiciones de nuestros bóvedas y darle la seguridad esperada a nuestros clientes en un mundo post apocaliptico 🙃. 

Anteriormente, nuestro equipo de mantenimiento fisico pudo reemplazar los sensores mal funcionantes e hicimos un codigo para poder determinar cuanto fue el tiempo de falla registrado por un sensor para una fecha, de la siguiente forma:

```python
get_sensor_failure_duration("hidra_flow", "2025-07-31", logs)
> {'sensor_id': 'hidra_flow', 'date': datetime.date(2025, 7, 31), 'failure_total_time': -94}
```

Ahora lo que requiere Vault-Tec Corporation es que construyas un [pipeline de datos](https://www.ibm.com/mx-es/think/topics/data-pipeline) el cual automatize lo desarrollado en ocasiones pasadas: 

extraccion de logs -> determinar tiempo de falla (en segundos) para un sensor -> determinar probabilidad de falla del sensor -> determinar probabilidad de falla condicional de dos sensores.  

Tu tarea, si decides aceptarla, es diseñar y construir dicho pipeline.

# Insumos
El equipo encargado del monitoreo de la calidad de los sensores, te ha enviado los [logs](https://keepcoding.io/blog/que-son-logs-y-para-que-sirven/) de dichos sensores. 

Algunos ejemplos de logs de estos sensores se ven de la siguiente forma:
```python
[
    {'sensor_id': 'oxy_guard','event_type': 'FAILURE_START','timestamp': '2025-07-31 07:01:24','duration_seconds': 577},
    {'sensor_id': 'radi_shield','event_type': 'FAILURE_START','timestamp': '2025-07-31 07:19:02','duration_seconds': 542}   
]
```

Recuerda que la vez pasada el equipo de monitoreo se encargo de agrupar los logs, para que solo exista una entrada de inicio y fin de falla por sensor por dia.

# Tu turno!
Ahora es tu turno! Vault-Tec Corp necesita saber lo siguiente:

1. Un reporte semanal:
- Dado un sensor y una fecha de inicio, determina cuantos **segundos** ha permanecido en estado de falla **cada dia de la semana** comprendida entre la fecha de inicio y los 7 días siguientes.
    - En caso de que uno o mas de los dias no existan dentro de los logs asignales un valor de `None` al resultado.
    - Ejemplo: El sensor radi_shield ha fallado un total de 89s en la última semana.

2. Riesgo semanal de falla:
- Dado un sensor y una fecha de inicio, determina la probabilidad de que el sensor falle para **cada dia** de la semana analizada.
    - En caso de que uno o mas de los dias no existan dentro de los logs asignales un valor de `None` al resultado.


3. Riesgo semanal condicional de falla:
- Dado dos sensores, un numero de minutos y una fecha de inicio, determina la probabilidad de que un sensor falle dado que otro sensor también falló por mas de los minutos definidos.
    - En caso de que el dia solicitado no este presente en los logs asignale un valor de `None` al resultado.

💡 Recuerda que todos estos puntos ya los hemos desarollado previamente, ahora el reto es pensar en como integrar y conectarlo todo.

Para poder lograrlo tu equipo te recomienda seguir los siguientes pasos:

1. Crear una clase para la entidad Log: Crea una clase para poder almacenar la informacion correspondiente a un log.

💡 Revisa nuevamente que atributos componen a un log. 

In [None]:
from datetime import datetime
from typing import Literal

from pydantic import BaseModel


class Log(BaseModel):
    """Representa un registro (log) generado por un sensor.

    Atributos:
        sensor_id (Literal): Identificador del sensor que generó el registro.
            Valores posibles: "oxy_guard", "thermo_core", "hidra_flow", "radi_shield".
        event_type (Literal): Tipo de evento registrado.
            Valores posibles: "FAILURE_START" (inicio de falla) o "FAILURE_END" (fin de falla).
        timestamp (datetime): Fecha y hora exacta en que ocurrió el evento.
        duration_seconds (int): Duración de la falla en segundos. En caso de ser "FAILURE_START",
            puede ser 0 si aún no ha finalizado la falla.
    """

    sensor_id: Literal["oxy_guard", "thermo_core", "hidra_flow", "radi_shield"]
    event_type: Literal["FAILURE_END", "FAILURE_START"]
    timestamp: datetime
    duration_seconds: int

2. Extraer los logs: Crea una clase cuya responsabilidad sea extraer y filtrar informacion de un archivo de logs.

💡 Recuerda nombrar tus clases con nombres significativos, tal y como lo dice uno de los principios del core de python: "explicito es mejor que implicito".

In [None]:
import json
from datetime import date
from os.path import abspath


class LogExtractor:
    """Clase para extraer y filtrar logs desde un archivo JSON."""

    def __init__(self, file_path: str):
        """
        Inicializa el extractor de logs con la ruta del archivo.

        Args:
            file_path (str): Ruta del archivo JSON que contiene los logs.
        """
        self.file_abspath = abspath(file_path)

    def from_file(self) -> list["Log"]:
        """
        Carga los logs desde el archivo JSON especificado en la inicialización.

        Returns:
            list[Log]: Lista de instancias de Log extraídas del archivo.
        """
        with open(self.file_abspath) as file:
            raw_logs: list[dict] = json.load(file)
        logs: list[Log] = [Log(**log) for log in raw_logs]
        return logs

    @staticmethod
    def fetch_logs_on_date(
        target_date: date, logs: list["Log"], sensor_id: int | None = None
    ) -> list["Log"]:
        """
        Filtra los logs que corresponden a una fecha y, opcionalmente, a un sensor específico.

        Args:
            target_date (date): Fecha objetivo para filtrar los logs.
            logs (list[Log]): Lista de logs disponibles.
            sensor_id (Optional[int], optional): ID del sensor para filtrar.
                Si es None, se incluyen logs de todos los sensores. Defaults to None.

        Returns:
            list[Log]: Lista de logs que cumplen con los filtros aplicados.
        """
        logs_on_date: list[Log] = list(
            filter(
                lambda x: (
                    x.timestamp.date() == target_date
                    and (sensor_id is None or x.sensor_id == sensor_id)
                ),
                logs,
            )
        )
        return logs_on_date

3. Analizar las fallas por medio de los logs: Crea una clase que tenga los metodos necesarios para poder analizar las fallas presentadas por los sensores segun lo requerido por Vault-Tec Corporation.

💡 Recuerda incorporar las buenas practicas en tu codigo: type annotations y doc-string.

In [None]:
from datetime import date, timedelta


class SensorFailureAnalyzer:
    """Clase para analizar fallos de sensores a partir de registros de eventos.

    Esta clase permite calcular la duración de fallos de sensores,
    la probabilidad de fallo semanal y la probabilidad condicional
    de que un sensor falle dado que otro sensor también falló.
    """

    WEEK_TOTAL_SECONDS: int = 7 * 24 * (60**2)
    """int: Número total de segundos en una semana."""

    def __init__(self, logs: list[Log]):
        """Inicializa el analizador con los registros de logs.

        Args:
            logs (list[Log]): Lista de objetos `Log` que contienen los eventos de los sensores.
        """
        self.logs = logs

    def get_sensor_failure_duration(self, sensor_id: str, searched_date: date) -> float | None:
        """Obtiene la duración total del fallo de un sensor en una fecha específica.

        Busca en los logs los eventos `FAILURE_START` y `FAILURE_END`
        del sensor en la fecha indicada y calcula el tiempo total en segundos.

        Args:
            sensor_id (str): Identificador del sensor a analizar.
            searched_date (date): Fecha para la que se quiere obtener la duración del fallo.

        Returns:
            float | None: Duración del fallo en segundos.
            Devuelve `None` si no hay logs del sensor en esa fecha.
        """
        logs_on_date: list[Log] = LogExtractor.fetch_logs_on_date(
            sensor_id=sensor_id, date=searched_date, logs=self.logs
        )

        if not logs_on_date:
            return None

        failure_start_time: float = next(
            (log.duration_seconds for log in logs_on_date if log.event_type == "FAILURE_START"), 0
        )
        failure_end_time: float = next(
            (log.duration_seconds for log in logs_on_date if log.event_type == "FAILURE_END"), 0
        )

        return abs(failure_end_time - failure_start_time)

    def get_weekly_sensor_failure_duration(
        self, sensor_id: int, start_date: date, end_date: date | None = None
    ) -> list[dict]:
        """Obtiene la duración diaria de fallos de un sensor durante una semana.

        Args:
            sensor_id (int): Identificador del sensor.
            start_date (date): Fecha de inicio del periodo.
            end_date (Optional[date], optional): Fecha final del periodo.
                Si no se proporciona, se toma `start_date + 7 días`.

        Returns:
            list[dict]: Lista de diccionarios con la siguiente información:
                - "sensor_id" (int): Identificador del sensor.
                - "date" (str): Fecha en formato ISO (YYYY-MM-DD).
                - "failure_seconds" (float | None): Duración del fallo en segundos.
        """
        end_date = start_date + timedelta(7) if end_date is None else end_date
        searched_week: list[date] = [
            (start_date + timedelta(days=i)) for i in range((end_date - start_date).days)
        ]
        return [
            {
                "sensor_id": sensor_id,
                "date": str(date),
                "failure_seconds": self.get_sensor_failure_duration(
                    sensor_id=sensor_id, searched_date=date
                ),
            }
            for date in searched_week
        ]

    def get_weekly_sensor_failure_probability(self, sensor_id: int, start_date: date) -> list[dict]:
        """Calcula la probabilidad diaria de fallo de un sensor durante una semana.

        La probabilidad se calcula como la proporción del tiempo en fallo
        respecto al total de segundos en una semana.

        Args:
            sensor_id (int): Identificador del sensor.
            start_date (date): Fecha de inicio del periodo.

        Returns:
            list[dict]: Lista de diccionarios con la siguiente información:
                - "sensor_id" (int): Identificador del sensor.
                - "date" (str): Fecha en formato ISO (YYYY-MM-DD).
                - "failure_probability" (str): Probabilidad del fallo (0 a 1) con 4 decimales.
        """
        weekly_sensor_failure_duration = self.get_weekly_sensor_failure_duration(
            sensor_id, start_date
        )
        return [
            {
                "sensor_id": sensor_id,
                "date": x["date"],
                "failure_probability": f"{x['failure_seconds'] / self.WEEK_TOTAL_SECONDS:.4f}",
            }
            for x in weekly_sensor_failure_duration
        ]

    def get_conditional_failure_probability(
        self,
        sensor_a_id: int,
        sensor_b_id: int,
        start_date: date,
        failure_minutes_threshold: float | None = 3,
    ) -> float:
        """Calcula la probabilidad condicional de que un sensor falle dado que otro sensor falló.

        Se considera que un sensor falló en un día si su duración de fallo
        es mayor o igual a un umbral en minutos.

        Fórmula:
            P(A|B) = (Número de días donde A y B fallaron) / (Número de días donde B falló)

        Args:
            sensor_a_id (int): Identificador del sensor A (sensor condicionado).
            sensor_b_id (int): Identificador del sensor B (sensor condicionante).
            start_date (date): Fecha de inicio del periodo.
            failure_minutes_threshold (Optional[float], optional): Umbral de fallo en minutos.
                Defaults a 3 minutos.

        Returns:
            float: Probabilidad condicional P(A|B) como un valor entre 0 y 1.
            Devuelve 0 si B nunca falló en el periodo.
        """
        failure_seconds_threshold = failure_minutes_threshold * 60

        sensor_b_weekly_failures = self.get_weekly_sensor_failure_duration(sensor_b_id, start_date)
        count_sensor_b_failures = sum(
            1
            for x in sensor_b_weekly_failures
            if x["failure_seconds"] and x["failure_seconds"] >= failure_seconds_threshold
        )

        if count_sensor_b_failures == 0:
            return 0.0

        sensor_a_weekly_failures = self.get_weekly_sensor_failure_duration(sensor_a_id, start_date)
        count_sensor_a_b_failures = sum(
            1
            for x, y in zip(sensor_a_weekly_failures, sensor_b_weekly_failures, strict=False)
            if x["failure_seconds"]
            and y["failure_seconds"]
            and x["failure_seconds"] >= failure_seconds_threshold
            and y["failure_seconds"] >= failure_seconds_threshold
        )

        return count_sensor_a_b_failures / count_sensor_b_failures

In [4]:
from datetime import datetime

searched_date = datetime.strptime("2025-08-01", "%Y-%m-%d").date()

extraction = LogExtractor(
    r"/Users/spuertaf/Desktop/Repos/HenryLectures/LoopsYFunciones/homework/data/logs.json"
)
analyzer = SensorFailureAnalyzer(logs=extraction.from_file())

In [9]:
analyzer.get_weekly_sensor_failure_duration(sensor_id="oxy_guard", start_date=searched_date)

[{'sensor_id': 'oxy_guard', 'date': '2025-08-01', 'failure_seconds': 399},
 {'sensor_id': 'oxy_guard', 'date': '2025-08-02', 'failure_seconds': 0},
 {'sensor_id': 'oxy_guard', 'date': '2025-08-03', 'failure_seconds': 96},
 {'sensor_id': 'oxy_guard', 'date': '2025-08-04', 'failure_seconds': 190},
 {'sensor_id': 'oxy_guard', 'date': '2025-08-05', 'failure_seconds': 280},
 {'sensor_id': 'oxy_guard', 'date': '2025-08-06', 'failure_seconds': 187},
 {'sensor_id': 'oxy_guard', 'date': '2025-08-07', 'failure_seconds': 362}]

In [6]:
analyzer.get_weekly_sensor_failure_probability(sensor_id="thermo_core", start_date=searched_date)

[{'sensor_id': 'thermo_core',
  'date': '2025-08-01',
  'failure_probability': '0.0002'},
 {'sensor_id': 'thermo_core',
  'date': '2025-08-02',
  'failure_probability': '0.0009'},
 {'sensor_id': 'thermo_core',
  'date': '2025-08-03',
  'failure_probability': '0.0009'},
 {'sensor_id': 'thermo_core',
  'date': '2025-08-04',
  'failure_probability': '0.0003'},
 {'sensor_id': 'thermo_core',
  'date': '2025-08-05',
  'failure_probability': '0.0000'},
 {'sensor_id': 'thermo_core',
  'date': '2025-08-06',
  'failure_probability': '0.0000'},
 {'sensor_id': 'thermo_core',
  'date': '2025-08-07',
  'failure_probability': '0.0007'}]

In [10]:
analyzer.get_conditional_failure_probability(
    "thermo_core", "oxy_guard", searched_date, failure_minutes_threshold=5
)

0.5