In [1]:
import cv2
import torch
import traitlets
import PIL.Image
import torchvision
import numpy as np
from torch2trt import TRTModule
import torch.nn.functional as F
from IPython.display import display
import ipywidgets.widgets as widgets
from jetcam.csi_camera import CSICamera
import torchvision.transforms as transforms
from jetracer.nvidia_racecar import NvidiaRacecar
import speedtest



CATEGORIES = ['Train']
device = torch.device('cuda')
model = torchvision.models.resnet18(pretrained=False)
model.fc = torch.nn.Linear(512, 2 * len(CATEGORIES))
model = model.cuda().eval().half()
road_following = TRTModule()
road_following.load_state_dict(torch.load('VD_trt.pth'))
obstacle_avoidance = TRTModule()
obstacle_avoidance.load_state_dict(torch.load('OBSTACLE2_trt.pth'))
trajectoire_2 = TRTModule()
trajectoire_2.load_state_dict(torch.load('externol2_trt.pth'))
car = NvidiaRacecar()
camera = CSICamera(width=224, height=224, capture_fps=65)

mean = torch.Tensor([0.485, 0.456, 0.406]).cuda().half()
std = torch.Tensor([0.229, 0.224, 0.225]).cuda().half()

normalize = torchvision.transforms.Normalize(mean, std)

def preprocess(image):
    image = PIL.Image.fromarray(image)
    image = transforms.functional.to_tensor(image).to(device).half()
    image.sub_(mean[:, None, None]).div_(std[:, None, None])
    return image[None, ...]

def bgr8_to_jpeg(value, quality=75):
    return bytes(cv2.imencode('.jpeg', value)[1])

image = widgets.Image(format='jpeg', width=224, height=224)
blocked_slider = widgets.FloatSlider(description='blocked', min=0.0, max=1.0, orientation='vertical')

camera_link = traitlets.dlink((camera, 'value'), (image, 'value'), transform=bgr8_to_jpeg)

camera.running = True

display(widgets.VBox([widgets.HBox([image, blocked_slider])]))

VBox(children=(HBox(children=(Image(value=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x…

In [None]:
import time
import socket
from threading import Thread, Event, Lock

# Paramètres pour le suivi de trajectoire
kp = 2.25
kd = 8.0
last_x = 0

# Configuration initiale de la voiture
car.throttle = -0.0
car.steering_offset = 0

# Drapeau pour contrôler l'exécution de la logique d'évitement d'obstacles
is_blocked_executed = False

# Événement pour contrôler l'arrêt du robot
stop_event = Event()

# Verrou pour synchroniser l'accès à car.throttle
throttle_lock = Lock()

def check_internet_socket(min_speed_mbps=5):
    try:
        st = speedtest.Speedtest()
        st.download()  # Effectue un test de débit descendant
        download_speed_mbps = st.results.download / 1_000_000  # Convertir en Mbps
        print(f"Download speed: {download_speed_mbps:.2f} Mbps")
        if download_speed_mbps >= min_speed_mbps:
            print("Internet speed is sufficient")
            return True
        else:
            print("Internet speed is too low")
            return True #mettre False
    except Exception as ex:
        print(f"Failed to test internet speed: {ex}")
        return True #mettre False
'''
# Fonction pour vérifier la connexion Internet
def check_internet_socket(host="8.8.8.8", port=53, timeout=3):
    try:
        socket.setdefaulttimeout(timeout)
        socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect((host, port))
        print("Internet is up")
        return True
    except socket.error as ex:
        print("Internet is down")
        return False
'''
# Keep alive function running in a separate thread
def keep_alive():
    while not stop_event.is_set():
        if not check_internet_socket():
            print("No internet connection, stopping the robot")
            stop_event.set()
            with throttle_lock:
                car.throttle = 0  # Stop the robot
        time.sleep(0.001)  # Check every 10 seconds for internet connection

# Fonction pour conduire la voiture
def drive_forward(car, x, last_x):
    if stop_event.is_set():
        return last_x  # Ne rien faire si l'événement stop est activé

    # Contrôle PD (Proportionnel-Dérivé)
    car.steering = - (x * kp + (x - last_x) * kd)
    with throttle_lock:
        if not stop_event.is_set():  # Double vérification
            car.throttle = -0.21  # continuer à avancer
    return x  # Mettre à jour last_x

# Fonction pour arrêter la voiture et détecter les obstacles pendant 3 secondes
def blocked(car):
    if stop_event.is_set():
        return  # Ne rien faire si l'événement stop est activé
    
    with throttle_lock:
        if stop_event.is_set():
            return  # Double vérification
        car.throttle = -0.21
    car.steering = 0.4
    time.sleep(1.2)
    
    start_time = time.time()
    while time.time() - start_time < 1.2:  # Exécuter pendant 3 secondes
        if stop_event.is_set():
            break
        image = camera.value
        image = preprocess(image).half()
        x = trajectoire_2(image).detach().cpu().numpy().flatten()[0]
        global last_x
        last_x = drive_forward(car, x, last_x)
    
    car.steering = 0
    time.sleep(0.05)
    
    #car.steering = -0.05
    #time.sleep(0.05)

# Variable globale pour vérifier si l'obstacle a déjà été géré
obstacle_handled = False

# Fonction de mise à jour pour la détection d'obstacles
def update(change):
    global last_x, is_blocked_executed, obstacle_handled
    if stop_event.is_set():
        return  # Ne rien faire si l'événement stop est activé

    image = change['new']
    image = preprocess(image).half()

    # Détection d'obstacle
    y = obstacle_avoidance(image)
    y = F.softmax(y, dim=1)
    prob_blocked = float(y.flatten()[0])

    blocked_slider.value = prob_blocked

    if not is_blocked_executed:
        if prob_blocked < 0.90:
            if not is_blocked_executed:
                x = road_following(image).detach().cpu().numpy().flatten()[0]
                last_x = drive_forward(car, x, last_x)
        else:
            if not is_blocked_executed:
                is_blocked_executed = True
                blocked(car)
                print("obstacle franchis")
        obstacle_handled = True
    else:
        x = road_following(image).detach().cpu().numpy().flatten()[0]
        last_x = drive_forward(car, x, last_x)

# Initialisation du script
update({'new': camera.value})  # Appel initial pour initialiser

camera.observe(update, names='value')  # Attacher la fonction de mise à jour à la caméra

# Démarrer le thread keep_alive
keep_alive_thread = Thread(target=keep_alive)
keep_alive_thread.start()

# Gérer l'arrêt du programme de manière sécurisée
try:
    while not stop_event.is_set():
        time.sleep(0.001)  # Garder le programme actif
except KeyboardInterrupt:
    print("Interruption par l'utilisateur, arrêt du robot")
    stop_event.set()
finally:
    with throttle_lock:
        car.throttle = 0  # Assurez-vous que le robot s'arrête
    keep_alive_thread.join()


Failed to test internet speed: HTTP Error 403: Forbidden
Failed to test internet speed: HTTP Error 403: Forbidden
Failed to test internet speed: HTTP Error 403: Forbidden
Failed to test internet speed: HTTP Error 403: Forbidden
Failed to test internet speed: HTTP Error 403: Forbidden
Failed to test internet speed: HTTP Error 403: Forbidden
Failed to test internet speed: HTTP Error 403: Forbidden
Failed to test internet speed: HTTP Error 403: Forbidden
Failed to test internet speed: HTTP Error 403: Forbidden
Failed to test internet speed: HTTP Error 403: Forbidden
Failed to test internet speed: HTTP Error 403: Forbidden
Failed to test internet speed: HTTP Error 403: Forbidden
Failed to test internet speed: HTTP Error 403: Forbidden
Failed to test internet speed: HTTP Error 403: Forbidden
Failed to test internet speed: HTTP Error 403: Forbidden
Failed to test internet speed: HTTP Error 403: Forbidden
Failed to test internet speed: HTTP Error 403: Forbidden
Failed to test internet speed: 

In [None]:
import time
import socket
from threading import Thread, Event, Lock

# Paramètres pour le suivi de trajectoire
kp = 2.25
kd = 8.0
last_x = 0

# Configuration initiale de la voiture
car.throttle = -0.0
car.steering_offset = 0

# Drapeau pour contrôler l'exécution de la logique d'évitement d'obstacles
is_blocked_executed = False

# Événement pour contrôler l'arrêt du robot
stop_event = Event()

# Verrou pour synchroniser l'accès à car.throttle
throttle_lock = Lock()

def check_internet_socket(min_speed_mbps=20):
    try:
        st = speedtest.Speedtest()
        st.download()  # Effectue un test de débit descendant
        download_speed_mbps = st.results.download / 1_000_000  # Convertir en Mbps
        print(f"Download speed: {download_speed_mbps:.2f} Mbps")
        if download_speed_mbps >= min_speed_mbps:
            print("Internet speed is sufficient")
            return True
        else:
            print("Internet speed is too low")
            return True  # mettre False
    except Exception as ex:
        print(f"Failed to test internet speed: {ex}")
        return True  # mettre False

# Fonction pour vérifier la connexion Internet
'''def check_internet_socket(host="8.8.8.8", port=53, timeout=3):
    try:
        socket.setdefaulttimeout(timeout)
        socket.socket(socket.AF_INET, socket.SOCK_STREAM).connect((host, port))
        print("Internet is up")
        return True
    except socket.error as ex:
        print("Internet is down")
        return False
'''
# Keep alive function running in a separate thread
def keep_alive():
    while not stop_event.is_set():
        if not check_internet_socket():
            print("No internet connection, stopping the robot")
            stop_event.set()
            with throttle_lock:
                car.throttle = 0  # Stop the robot
        time.sleep(0.001)  # Check every 10 seconds for internet connection

# Fonction pour conduire la voiture
def drive_forward(car, x, last_x):
    if stop_event.is_set():
        return last_x  # Ne rien faire si l'événement stop est activé

    # Contrôle PD (Proportionnel-Dérivé)
    car.steering = - (x * kp + (x - last_x) * kd)
    with throttle_lock:
        if not stop_event.is_set():  # Double vérification
            car.throttle = -0.21  # continuer à avancer
    return x  # Mettre à jour last_x

# Fonction pour arrêter la voiture et détecter les obstacles pendant 3 secondes
def blocked(car):
    if stop_event.is_set():
        return  # Ne rien faire si l'événement stop est activé
    
    with throttle_lock:
        if stop_event.is_set():
            return  # Double vérification
        car.throttle = -0.21
    car.steering = 0.4
    time.sleep(1.2)
    
    start_time = time.time()
    while time.time() - start_time < 3:  # Exécuter pendant 3 secondes
        if stop_event.is_set():
            break
        image = camera.value
        image = preprocess(image).half()
        x = trajectoire_2(image).detach().cpu().numpy().flatten()[0]
        global last_x
        last_x = drive_forward(car, x, last_x)
    
    car.steering = 0
    time.sleep(0.05)

# Fonction pour réinitialiser `is_blocked_executed` après 30 secondes
def reset_is_blocked_executed():
    global is_blocked_executed
    time.sleep(15)  # Attendre 30 secondes
    is_blocked_executed = False
    print("is_blocked_executed has been reset to False")

# Variable globale pour vérifier si l'obstacle a déjà été géré
obstacle_handled = False

# Fonction de mise à jour pour la détection d'obstacles
def update(change):
    global last_x, is_blocked_executed, obstacle_handled
    if stop_event.is_set():
        return  # Ne rien faire si l'événement stop est activé

    image = change['new']
    image = preprocess(image).half()

    # Détection d'obstacle
    y = obstacle_avoidance(image)
    y = F.softmax(y, dim=1)
    prob_blocked = float(y.flatten()[0])

    blocked_slider.value = prob_blocked

    if not is_blocked_executed:
        if prob_blocked < 0.90:
            if not is_blocked_executed:
                x = road_following(image).detach().cpu().numpy().flatten()[0]
                last_x = drive_forward(car, x, last_x)
        else:
            if not is_blocked_executed:
                is_blocked_executed = True
                blocked(car)
                print("Obstacle franchi, démarrage du timer pour réinitialiser is_blocked_executed")
                Thread(target=reset_is_blocked_executed).start()  # Démarrer le thread pour réinitialiser is_blocked_executed
        obstacle_handled = True
    else:
        x = road_following(image).detach().cpu().numpy().flatten()[0]
        last_x = drive_forward(car, x, last_x)

# Initialisation du script
update({'new': camera.value})  # Appel initial pour initialiser

camera.observe(update, names='value')  # Attacher la fonction de mise à jour à la caméra

# Démarrer le thread keep_alive
keep_alive_thread = Thread(target=keep_alive)
keep_alive_thread.start()

# Gérer l'arrêt du programme de manière sécurisée
try:
    while not stop_event.is_set():
        time.sleep(0.001)  # Garder le programme actif
except KeyboardInterrupt:
    print("Interruption par l'utilisateur, arrêt du robot")
    stop_event.set()
finally:
    with throttle_lock:
        car.throttle = 0  # Assurez-vous que le robot s'arrête
    keep_alive_thread.join()


Obstacle franchi, démarrage du timer pour réinitialiser is_blocked_executed
is_blocked_executed has been reset to False
Failed to test internet speed: Unable to connect to servers to test latency.
Obstacle franchi, démarrage du timer pour réinitialiser is_blocked_executed
is_blocked_executed has been reset to False
Obstacle franchi, démarrage du timer pour réinitialiser is_blocked_executed
is_blocked_executed has been reset to False
Obstacle franchi, démarrage du timer pour réinitialiser is_blocked_executed
is_blocked_executed has been reset to False
Obstacle franchi, démarrage du timer pour réinitialiser is_blocked_executed
is_blocked_executed has been reset to False
Obstacle franchi, démarrage du timer pour réinitialiser is_blocked_executed
is_blocked_executed has been reset to False
Download speed: 0.00 Mbps
Internet speed is too low
Obstacle franchi, démarrage du timer pour réinitialiser is_blocked_executed
Failed to test internet speed: <urlopen error [Errno -2] Name or service no