In [1]:
import socket
import time
import random
from datetime import datetime
import struct
import binascii

class RopeGpsEmulator:
    def __init__(self, server_ip, server_port, device_id):
        self.server_ip = server_ip
        self.server_port = server_port
        self.device_id = device_id
        self.pseudo_ip = self.generate_pseudo_ip(device_id)
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sequence = 1
        
    def generate_pseudo_ip(self, sim_number):
        """Genera la dirección IP pseudo según el algoritmo del protocolo"""
        # Asegurarnos de que tenemos al menos 10 dígitos
        if len(sim_number) == 11:
            sim_number = sim_number[1:]  # Remove first digit if length is 11
        
        # Rellenar con ceros si es necesario para tener 10 dígitos
        sim_number = sim_number.zfill(10)
        
        # Dividir en grupos de 2 dígitos
        groups = [sim_number[i:i+2] for i in range(0, 10, 2)]
        
        # Tomar los últimos 4 grupos (índices 1 a 4)
        last_four = [int(groups[i]) for i in range(1, 5)]
        
        # Procesar el primer grupo
        first_group = int(groups[0]) - 30
        binary_first = format(first_group, '04b')
        
        # Generar los bytes pseudo IP
        pseudo_bytes = []
        for i in range(4):
            last_byte = format(last_four[i], '08b')
            new_byte = binary_first[i] + last_byte[1:]
            pseudo_bytes.append(int(new_byte, 2))
            
        return bytes(pseudo_bytes)
    
    def calculate_xor(self, data):
        """Calcula el checksum XOR para los datos"""
        checksum = 0
        for byte in data:
            checksum ^= byte
        return checksum
    
    def send_packet(self, packet_type, data=b''):
        """Envía un paquete al servidor"""
        # Cabecera básica
        header = b'\x24\x24'
        main_command = packet_type.to_bytes(1, 'big')
        length = len(data).to_bytes(2, 'big')
        
        # Construir paquete completo
        packet = header + main_command + length + self.pseudo_ip + data
        
        # Calcular y añadir checksum
        checksum = self.calculate_xor(packet[2:-1])  # Excluye header y checksum final
        packet += checksum.to_bytes(1, 'big') + b'\x0D'  # Añade checksum y footer
        
        # Enviar paquete
        self.socket.sendto(packet, (self.server_ip, self.server_port))
        print(f"Enviado: {binascii.hexlify(packet).decode('utf-8')}")
        return packet
    
    def generate_position_data(self):
        """Genera datos de posición simulados"""
        now = datetime.now()
        
        # Fecha y hora (BCD)
        date_time = bytes([
            (now.year - 2000) // 10 << 4 | (now.year - 2000) % 10,
            now.month // 10 << 4 | now.month % 10,
            now.day // 10 << 4 | now.day % 10,
            now.hour // 10 << 4 | now.hour % 10,
            now.minute // 10 << 4 | now.minute % 10,
            now.second // 10 << 4 | now.second % 10
        ])
        
        # Latitud (37.901 N)
        lat_deg = 37
        lat_min = 901
        latitude = bytes([
            0x80 | (lat_deg // 10 << 4) | (lat_deg % 10),
            (lat_min // 100 << 4) | ((lat_min % 100) // 10),
            ((lat_min % 100) % 10 << 4) | 0x01,  # 0x01 para .901
            0x15  # Parte fraccional
        ])
        
        # Longitud (122.419 W)
        lon_deg = 122
        lon_min = 419
        longitude = bytes([
            0x80 | (lon_deg // 10 << 4) | (lon_deg % 10),  # 0x80 para Oeste
            (lon_min // 100 << 4) | ((lon_min % 100) // 10),
            ((lon_min % 100) % 10 << 4) | 0x04,  # 0x04 para .419
            0x19  # Parte fraccional
        ])
        
        # Velocidad (km/h) y ángulo
        speed = random.randint(0, 120)
        speed_bcd = bytes([
            speed // 100 << 4 | (speed % 100) // 10,
            (speed % 10) << 4 | 0x00
        ])
        
        angle = random.randint(0, 359)
        angle_bcd = bytes([
            angle // 100 << 4 | (angle % 100) // 10,
            (angle % 10) << 4 | 0x00
        ])
        
        # Estado GPS (localizado, 8 satélites)
        gps_status = 0x80 | 0x08  # Bit 7=1 (localizado), 4 satélites
        
        # Entradas digitales
        digital_inputs = 0x40  # Bit 6=1 (sistema en uso)
        
        # Encendido (random)
        ignition = random.randint(0, 1)
        
        # Resistencia de aceite y voltaje
        oil_resistance = random.randint(100, 500).to_bytes(2, 'big')
        voltage = int((random.uniform(11.5, 14.5) * 100)).to_bytes(2, 'big')
        
        # Kilometraje (metros)
        mileage = random.randint(10000, 50000).to_bytes(4, 'big')
        
        # Temperatura (0-40°C)
        temperature = bytes([0x00, random.randint(0, 40)])
        
        # Datos extendidos (opcional)
        extended_data = b''  # Puedes añadir datos extendidos si es necesario
        
        # Construir datos de posición
        position_data = (
            date_time + latitude + longitude + speed_bcd + angle_bcd + 
            bytes([gps_status, digital_inputs, ignition]) + 
            oil_resistance + voltage + mileage + temperature + extended_data
        )
        
        return position_data
    
    def send_heartbeat(self):
        """Envía un paquete de heartbeat"""
        return self.send_packet(0x21, b'\x00')
    
    def send_position(self):
        """Envía datos de posición"""
        position_data = self.generate_position_data()
        return self.send_packet(0x80, position_data)
    
    def send_alarm(self, alarm_type):
        """Envía un paquete de alarma"""
        position_data = self.generate_position_data()
        
        # Tipos de alarma (ver protocolo)
        alarms = {
            'emergency': b'\x01\x01',  # Alarma de emergencia
            'overspeed': b'\x00\x02',  # Exceso de velocidad
            'vibration': b'\x00\x20',  # Alarma de vibración
            'power': b'\x08\x08'       # Problema de potencia
        }
        
        alarm_data = alarms.get(alarm_type, b'\x00\x00')
        return self.send_packet(0x82, position_data + alarm_data)
    
    def send_ibutton_swipe(self, driver_name="Driver1", ibutton_id="000013082D45"):
        """Envía datos de swipe de iButton"""
        status = 0x01  # Modo anti-robo activado
        now = datetime.now()
        
        # Fecha y hora
        date_time = bytes([
            now.year - 2000,
            now.month,
            now.day,
            now.hour,
            now.minute,
            now.second
        ])
        
        # Coordenadas (similares a posición)
        latitude = bytes([0x02, 0x41, 0x71, 0x15])
        longitude = bytes([0x05, 0x43, 0x86, 0x88])
        
        # Datos del conductor
        driver_data = f"<{driver_name},{ibutton_id}>".encode('ascii')
        
        # Construir contenido
        content = (
            bytes([status]) + date_time + latitude + longitude + driver_data
        )
        
        return self.send_packet(0x94, bytes([0x0B]) + content)  # 0x0B = Subcomando para swipe
    
    def run(self, interval=30):
        """Ejecuta el emulador con intervalos regulares"""
        try:
            print(f"Emulador GPS iniciado para dispositivo {self.device_id}")
            while True:
                # Enviar heartbeat cada 3 intervalos
                if self.sequence % 3 == 0:
                    self.send_heartbeat()
                
                # Enviar posición regularmente
                self.send_position()
                
                # Enviar alarma ocasionalmente (10% de probabilidad)
                if random.random() < 0.1:
                    alarm_type = random.choice(['emergency', 'overspeed', 'vibration', 'power'])
                    self.send_alarm(alarm_type)
                
                # Enviar swipe de iButton ocasionalmente (5% de probabilidad)
                if random.random() < 0.05:
                    self.send_ibutton_swipe()
                
                time.sleep(interval)
                self.sequence += 1
                
        except KeyboardInterrupt:
            print("\nEmulador detenido")
        finally:
            self.socket.close()



In [2]:
# Configura estos valores según tu entorno
SERVER_IP = '127.0.0.1'  # IP del servidor
SERVER_PORT = 3001        # Puerto del servidor
DEVICE_ID = '13512345006'  # Número de SIM del dispositivo

In [3]:
mulator = RopeGpsEmulator(SERVER_IP, SERVER_PORT, DEVICE_ID)

In [11]:
# Para enviar paquetes manualmente:
# emulator.send_heartbeat()
mulator.send_position()
# emulator.send_alarm('emergency')
# emulator.send_ibutton_swipe()

Enviado: 242480001f0ca23286250412145913b7901115c2419419076003108840010079054700005f750021690d


b'$$\x80\x00\x1f\x0c\xa22\x86%\x04\x12\x14Y\x13\xb7\x90\x11\x15\xc2A\x94\x19\x07`\x03\x10\x88@\x01\x00y\x05G\x00\x00_u\x00!i\r'

In [4]:
# Configuración y ejecución
if __name__ == "__main__":
    # Configura estos valores según tu entorno
    SERVER_IP = '127.0.0.1'  # IP del servidor
    SERVER_PORT = 3000        # Puerto del servidor
    DEVICE_ID = '13512345006'  # Número de SIM del dispositivo
    
    emulator = RopeGpsEmulator(SERVER_IP, SERVER_PORT, DEVICE_ID)
    
    # Para ejecutar en modo automático:
    emulator.run(interval=10)  # Envia datos cada 10 segundos
    
    # Para enviar paquetes manualmente:
    # emulator.send_heartbeat()
    # emulator.send_position()
    # emulator.send_alarm('emergency')
    # emulator.send_ibutton_swipe()

IndexError: list index out of range