# Camino mínimo y tiempo de trayecto.
#### En este cuaderno se va a calcular el camino mínimo de un grafo con pesos a través del algoritmo de Dijkstra y se calculará el tiempo que lleva ir de un origen a un destino gracias a las velocidades predichas.

El primer paso es importar todas las librerías necesarias para el correcto funcionamiento de este cuaderno de Jupyter.

In [None]:
# Librerías estándar
from math import sqrt  # Funciones matemáticas
from datetime import datetime, timedelta  # Manejo de fechas y tiempos
import json  # Lectura/escritura de archivos JSON
import pickle  # Serialización de objetos Python (binario)

# Manipulación y análisis de grafos
import networkx as nx  # Estructuras y algoritmos para grafos
from networkx.readwrite import json_graph  # Conversión entre grafos y formato JSON

# Visualización
import matplotlib.pyplot as plt  # Gráficas, incluyendo visualización de grafos

Se define el grafo a partir del documento JSON con pesos de intensidad actualizados.

In [None]:
with open("grafo_dirigido.json") as f:
    data = json.load(f)

G = json_graph.node_link_graph(data, directed=True)

Las intensidades están en los nodos y para aplicar el algoritmo de Dijkstra es necesario que estos pesos estén sobre la arista. Se deben pasar estas intensidades a las aristas que llegan a dicho nodo, y el coste inicial del camino coresponde con la intensidad del nodo origen.

In [None]:
# Regla: weight(u → v) = intensidad[v]
for u, v in G.edges():
    G[u][v]['weight'] = G.nodes[v]['intensidad']

### Camino mínimo.

Calculamos el camino mínimo definiendo antes el nodo origen y nodo destino.

In [None]:
origen = '6798'
destino = '4376'

camino = nx.dijkstra_path(G, origen, destino, weight='weight')
print("Camino mínimo:", camino)

También se visualiza dicho camino.

In [None]:
# Recuperar las aristas del camino mínimo
path_edges = list(zip(camino, camino[1:]))

plt.figure(figsize=(12, 10))

# Posiciones automáticas
pos = nx.spring_layout(G, seed=97, k=2)  # k grande = más separación entre nodos

# Dibujar nodos
nx.draw_networkx_nodes(G, pos, node_size=700, node_color='lightblue')

# Dibujar aristas con flechas
nx.draw_networkx_edges(G, pos, arrowstyle='->', arrowsize=20)

# Dibujar etiquetas de los nodos
nx.draw_networkx_labels(G, pos, font_size=10, font_weight='bold')

# Resaltar el camino mínimo
nx.draw_networkx_edges(
    G, pos,
    edgelist=path_edges,
    width=4,           # más "negrita"
    edge_color='darkorange'   # otro color para resaltar
)

# Dibujar grafo
plt.title("Grafo de tráfico dirigido")
plt.axis('off')
plt.show()

### Calcular el tiempo de desplazamiento.

Se usará la fórmula que divide la distancia y la velocidad.

Primero definiremos algunas funciones.

In [None]:
def distancia_euclidiana(p1, p2):
    '''
    Calcula la distancia euclidiana entre dos puntos p1 y p2.
    p1 y p2 deben ser tuplas (x, y) con coordenadas en metros.
    '''
    x1, y1 = p1
    x2, y2 = p2
    res = sqrt((x2 - x1)**2 + (y2 - y1)**2)
    return res/1000  # resultado en km

In [None]:
def formato_tiempo(horas, hora, programado):
    '''
    Convierte un valor decimal de horas a un formato legible y ajusta una hora dada.
    - horas: valor decimal de horas (float)
    - hora: objeto datetime.time con la hora base
    - programado: True si es un tiempo programado (restar), False si es un tiempo real (sumar)
    '''
    # Convertir la hora (datetime.time) a datetime para poder operar
    base_dt = datetime.combine(datetime.today(), hora)

    # Separar horas y minutos del valor decimal
    horas_int = int(horas)
    minutos = int(round((horas - horas_int) * 60))
    delta = timedelta(hours=horas_int, minutes=minutos)

    # Sumar o restar
    nueva_dt = base_dt - delta if programado else base_dt + delta
    # Extraer la nueva hora
    nueva_hora = nueva_dt.time()

    # Formato legible del tiempo
    if horas_int > 0 and minutos > 0:
        tiempo = f"{horas_int} h {minutos} min"
    elif horas_int > 0:
        tiempo = f"{horas_int} h"
    else:
        tiempo = f"{minutos} min" if minutos > 0 else "< 1 min"

    return tiempo, nueva_hora

In [None]:
def formato_distancia(km):
    '''
    Formatea una distancia en kilómetros a un string legible.
    - km: distancia en kilómetros (float)
    '''
    if km >= 1:
        return f"{km:.1f} km"
    else:
        metros = int(km * 1000)
        return f"{metros} m"

La velocidad se obtiene a partir de las predicciones realizadas previamente en la carpeta `predictions`. Por lo tanto, cargaremos estos datos.

In [None]:
with open('../predictions/predicciones_vmed.pkl', 'rb') as f:
    predicciones_vmed = pickle.load(f)

El siguiente paso es obtener las coordenadas de cada nodo del grafo, que ya se almacenaron durante el procesamiento de datos en la carpeta `data`.

In [None]:
# Diccionario: id → (x, y)
coordenadas = {}

with open("../data/coordenadas_ids.txt") as f:
    for line in f:
        data = json.loads(line)
        node_id = str(data["id"])
        coordenadas[node_id] = [data["coordenada_x"], data["coordenada_y"]]

También hay que preparar los datos para poder meterlos en la función `formato_tiempo` que se definió unas celdas más arriba.

In [None]:
# Cargamos la fecha de la predicción
with open('../predictions/fecha.pkl', 'rb') as f:
        fecha = pickle.load(f)

fecha_dt = datetime.strptime(fecha, "%d/%m/%Y %H:%M:%S")
hora_inicial = fecha_dt.time()
hora = hora_inicial

Ahora ya se inicia la simulación para calcular la distancia y tiempo para cada paso del camino mínimo. Se calculará el tiempo para cada movimiento del trayecto mediante la fórmula t = distancia / velocidad.

En este primer caso se realiza la simulación de que se calcula una hora de llegada a partir de una de salida.

In [None]:
tiempo_total = 0
distancia_total = 0

print(f"Hora de inicio: {hora_inicial}")
print('Trayecto paso por paso:')

for i in range(len(camino)):
    if i == len(camino) - 1: # Último nodo, no hay siguiente
        break

    # Movimiento entre un nodo y el siguiente
    origen = camino[i]
    destino = camino[i + 1]

    # Distancia entre el origen y el destino
    distancia = distancia_euclidiana(coordenadas[origen], coordenadas[destino])
    distancia_total += distancia

    # Velocidad media entre el origen y el destino
    velocidad = (predicciones_vmed[origen]+predicciones_vmed[destino])/2

    # Tiempo de trayecto
    tiempo = distancia / velocidad
    tiempo_total += tiempo

    # Formateamos la salida
    tiempo, hora = formato_tiempo(tiempo, hora, programado=False)

    # Imprimimos los resultados del tramo
    print(f"{i+1}: \nOrigen: {origen} Destino: {destino}")
    print(f"Velocidad media: {velocidad}")
    print(f"Distancia: {formato_distancia(distancia)}")
    print(f"Tiempo de trayecto: {tiempo} ")
    print(f"Hora de llegada: {hora}")

# Formateamos el tiempo total
tiempo_total, hora_final = formato_tiempo(tiempo_total, hora_inicial, programado=False)

# Imprimimos el resumen del trayecto completo
print("----------------------")
print("Trayecto completo:")
print(f"Origen: {camino[0]} Destino: {camino[-1]}")
print(f"Distancia total: {formato_distancia(distancia_total)}")
print(f"Tiempo total: {tiempo_total}")
print(f"Hora de salida: {hora_inicial} Hora de llegada: {hora_final}")

En este segundo caso se realiza la simulación de que se calcula una hora de salida a partir de una de llegada.

In [None]:
# Volvemos a definir las variables tipo tiempo para que se reinicien
with open('../predictions/fecha.pkl', 'rb') as f:
        fecha = pickle.load(f)

fecha_dt = datetime.strptime(fecha, "%d/%m/%Y %H:%M:%S")
hora_inicial = fecha_dt.time()
hora = hora_inicial

# Comenzamos las simulación
tiempo_total = 0
distancia_total = 0

print(f"Hora de llegada: {hora_inicial}")
print('Trayecto paso por paso:')

# Invertimos el camino para el trayecto inverso
camino_inv = camino[::-1]

for i in range(len(camino_inv)):
    if i == len(camino_inv) - 1: # Último nodo, no hay siguiente
        break

    # Movimiento entre un nodo y el siguiente
    origen = camino_inv[i]
    destino = camino_inv[i + 1]

    # Distancia entre el origen y el destino
    distancia = distancia_euclidiana(coordenadas[origen], coordenadas[destino])
    distancia_total += distancia

    # Velocidad media entre el origen y el destino
    velocidad = (predicciones_vmed[origen]+predicciones_vmed[destino])/2

    # Tiempo de trayecto
    tiempo = distancia / velocidad
    tiempo_total += tiempo

    # Formateamos la salida
    tiempo, hora = formato_tiempo(tiempo, hora, programado=True)

    # Imprimimos los resultados del tramo
    print(f"{i+1}: \nOrigen: {origen} Destino: {destino}")
    print(f"Velocidad media: {velocidad}")
    print(f"Distancia: {formato_distancia(distancia)}")
    print(f"Tiempo de trayecto: {tiempo} ")
    print(f"Hora de salida actualizada: {hora}")

# Formateamos el tiempo total
tiempo_total, hora_final = formato_tiempo(tiempo_total, hora_inicial, programado=True)

# Imprimimos el resumen del trayecto completo
print("----------------------")
print("Trayecto completo:")
print(f"Origen: {camino[0]} Destino: {camino[-1]}")
print(f"Distancia total: {formato_distancia(distancia_total)}")
print(f"Tiempo total: {tiempo_total}")
print(f"Hora de llegada: {hora_inicial} Hora de salida: {hora_final}")