In [None]:
# This code is for capturing the usbtraffic data from usbmon interface
#  in a linux machine and then extracting the details of the traffic 
# captured into a csv file and storing them. This code works only for 
# keyboard and mouse data whose payload is 8 bytes. 

In [43]:
import subprocess
import os
import getpass

def capture_usb_traffic_for_time(output_file, usbmon_interface='usbmon0', duration=300):
    """
    Capture USB traffic data from the specified usbmon interface for a certain duration and save it as a binary file.
    
    Args:
        output_file (str): Path to the output binary file.
        usbmon_interface (str): The usbmon interface to capture data from (default: 'usbmon0').
        duration (int): Duration for capturing traffic in seconds (default: 300 seconds or 5 minutes).
    """
    # Prompt the user for the sudo password
    sudo_password = getpass.getpass("Enter your sudo password: ")

    # Ensure the output directory exists
    output_dir = os.path.dirname(output_file)
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Construct the command with sudo
    command = [
        'sudo', '-S', 'timeout', str(duration),   # '-S' allows sudo to read the password from stdin
        'dd', 
        f'if=/dev/{usbmon_interface}',            # Input file: usbmon interface
        f'of={output_file}'                       # Output file: binary file
    ]

    try:
        # Run the command and pass the password to sudo via stdin
        process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        stdout, stderr = process.communicate(sudo_password.encode() + b'\n')  # Pass the password to sudo

        # Output any stdout or stderr for debugging
        print(f"STDOUT: {stdout.decode()}")
        print(f"STDERR: {stderr.decode()}")

        # Handle the return code and ignore the expected timeout error (124)
        if process.returncode == 0:
            file_size = os.path.getsize(output_file)
            print(f"Successfully captured USB traffic for {duration} seconds to {output_file}")
            print(f"Length of data captured: {file_size} bytes")
        elif process.returncode == 124:
            # Timeout occurred, but it's expected
            file_size = os.path.getsize(output_file)
            print(f"Capture finished after timeout ({duration} seconds). File saved to {output_file}")
            print(f"Length of data captured: {file_size} bytes")
        else:
            # Unexpected error
            print(f"Error capturing USB traffic. Return code: {process.returncode}")
            if stderr:
                print(f"Detailed error: {stderr.decode()}")

    except Exception as e:
        print(f"An unexpected error occurred: {e}")

# Example usage
capture_usb_traffic_for_time('/home/user/Desktop/USB_Project/usb_capture/usb_traffic.bin', usbmon_interface='usbmon0', duration=45)


Enter your sudo password:  ········


KeyboardInterrupt: 

In [44]:
# parsing of the usbtraffic captured using the linux command "cat /dev/usbmon0 > /home/chills/Desktop/usbtraffic/usb_capture.bin"
# The following code assumes that the bin data file is generated using the above code.
# It is stored in the location mentioned in the command. The captured is keyboard data 
# belonging to interrupt traffic. 
#The interrupt traffic fields are extracted from the bin file and stored 
# in a seperate csv file in the same location as the input usbtraffic data file.


    
import struct
import csv
import os



# Input binary file path (make sure this path is correct)
input_file_path = '/home/user/Desktop/USB_Project/usb_capture/usb_traffic.bin'

# Determine the directory from which the input file was read
output_directory = os.path.dirname(input_file_path)

# Create dictionaries to map transfer types to their respective CSV file paths in the same directory
# transfer_type_files = {
#     2: os.path.join(output_directory, 'control_transfers.csv'),
#     0: os.path.join(output_directory, 'isochronous_transfers.csv'),
#     3: os.path.join(output_directory, 'bulk_transfers.csv'),
#     1: os.path.join(output_directory, 'interrupt_transfers.csv')
# }
transfer_type_files = {
    2: os.path.join(output_directory, 'control_transfers2.csv'),
    0: os.path.join(output_directory, 'isochronous_transfers2.csv'),
    3: os.path.join(output_directory, 'bulk_transfers2.csv'),
    1: os.path.join(output_directory, 'digispark_attack.csv')
}

# Define the columns for the CSV file
csv_columns = [
    'urb_id', 'urb_type', 'transfer_type', 'endpoint_number','urb_direction' ,'device_number',
    'bus_number', 'setup_request', 'data_flag', 'urb_sec', 'urb_usec','urb_status',
    'urb_length', 'data_length', 'urb_unusedsetup_header','payload_data'
]

# Open file for reading
with open(input_file_path, 'rb') as binfile:

    # Dictionary to store file objects and corresponding CSV writers
    file_objects = {}
    csv_writers = {}

    # Create a CSV writer for each transfer type
    for transfer_type, filename in transfer_type_files.items():
        csvfile = open(filename, 'w', newline='')
        file_objects[transfer_type] = csvfile
        csv_writers[transfer_type] = csv.writer(csvfile, escapechar='\\', quoting=csv.QUOTE_NONE)
        csv_writers[transfer_type].writerow(csv_columns)

    while True:
        # Read the 48-byte URB header
        header_data = binfile.read(48)
        if len(header_data) < 48:
            break  # End of file

         
        urbid = header_data[:8]
        urbtype= header_data[8:9] 
        urbtranstype= header_data[9:10] 
        urbendpoint= header_data[10:11] 
        urbdevnumber=header_data[11:12] 
        urbbusnumber=header_data[12:13]
        urbdevsetupreq=header_data[14:15]
        urbdatapresent=header_data[15:16]
        urbtimestampsec=header_data[16:20]
        urbtimestampmicrosec=header_data[24:28]
        urbstatus=header_data[28:32]
        urblength=header_data[32:36]
        urbdatalength=header_data[36:40]
        urbunusedsetupheader=header_data[40:48]
    
        # Unpacking the individual fields
        urb_id = struct.unpack('Q', urbid)[0] #Q 8bytes
        urb_type_ac=struct.unpack('B', urbtype)[0] #B 1 byte  URB packet type submit or complete
        urb_type = chr(urb_type_ac) # converting the code to ascii
        urb_transfer_type=struct.unpack('B', urbtranstype)[0] # B 1 byte ur transfer type
        urb_endpoint_number_temp=struct.unpack('B', urbendpoint)[0] # B 1 byte enpoint number
        urb_endpoint_number=urb_endpoint_number_temp & 0x0f # extracting endpoint number from lower nibble of the byte
        urb_direction=(urb_endpoint_number_temp & 0xf0) >> 4
        urb_dev_number=struct.unpack('B', urbdevnumber)[0]
        urb_bus_number=struct.unpack('B', urbbusnumber)[0]
        urb_dev_setup_req=struct.unpack('B', urbdevsetupreq)[0] # not relevant code 45 else some valid data
        urb_data_present=struct.unpack('B', urbdatapresent)[0] # if present displays value zero else non zero value
        urb_timestamp_sec=struct.unpack('I',urbtimestampsec)[0]
        urb_timestamp_microsec=struct.unpack('I',urbtimestampmicrosec)[0]
        urb_status=struct.unpack('I',urbstatus)[0] # if success 0 else non zero value
        urb_length=struct.unpack('I',urblength)[0] # data size to be fetched issued by host
        urb_datalength=struct.unpack('I',urbdatalength)[0]
        urb_unusedsetup_header=struct.unpack('Q',urbunusedsetupheader)[0]
        parsed_data=[urb_id,urb_type,urb_transfer_type,urb_endpoint_number,urb_direction,\
                    urb_dev_number,urb_bus_number,urb_dev_setup_req,urb_data_present,urb_timestamp_sec,\
                    urb_timestamp_microsec,urb_status,urb_length,urb_datalength,urb_unusedsetup_header]
        #print(parsed_data)            
        # Combine parsed header fields with payload data
        
        
        # Unpack the payload data, which follows the 48-byte header
        if (urb_data_present==0 and urb_datalength >=4):
            # print(urb_datalength)
            
            payload_data = binfile.read(urb_datalength)
            if len(payload_data) != urb_datalength:
                print(f"Error: Expected {urb_datalength} bytes but got {len(payload_data)} bytes.")
            else:
                unpack_format = f'{urb_datalength}B'
                urb_capdata = struct.unpack(unpack_format, payload_data)
            # print(payload_data)
            full_data = parsed_data+ [urb_capdata]
            #print("key pressed full data",full_data)
        # if the capture packet is only the header packet 
        # write the empty paylad to the file.
        else:
            urb_capdata=list([0,0,0,0,0,0,0,0])
            full_data = parsed_data+[urb_capdata]
            #print("key released full data",full_data)
            # print("Empty payload")
                    
        # Determine the transfer type to save to the appropriate file
        transfer_type = full_data[2]  # The transfer type is at index 2

        if transfer_type in csv_writers:
            csv_writers[transfer_type].writerow(full_data)

    # Close all the CSV files
    for file in file_objects.values():
        file.close()

print("Data parsing and saving to CSV files in the same directory as the input file is complete.")


Data parsing and saving to CSV files in the same directory as the input file is complete.


In [46]:
import pandas as pd

file_path = "/home/user/Desktop/USB_Project/usb_capture/digispark_attack.csv"
df = pd.read_csv(file_path)
# df = df[df['device_number'] == 50]
# df.to_csv(file_path)
# df['device_number'].value_counts()
df.head()

ParserError: Error tokenizing data. C error: Expected 19 fields in line 3, saw 23


In [3]:
print(csv_writers[transfer_type])

<_csv.writer object at 0x7c66ea79ad40>
