Skip to content

Commit

Permalink
Add support for detecting NAKs issued to the host, and finish USBProxy.
Browse files Browse the repository at this point in the history
  • Loading branch information
ktemkin authored and Kyle J. Temkin committed Sep 11, 2017
1 parent cbeb18f commit d436c82
Show file tree
Hide file tree
Showing 10 changed files with 426 additions and 88 deletions.
19 changes: 10 additions & 9 deletions facedancer-usbproxy.py
Expand Up @@ -11,32 +11,33 @@
from facedancer.filters.logging import USBProxyPrettyPrintFilter
import argparse



def vid_pid(x):
return int(x, 16)

def main():

# TODO: Accept arguments that specify a list of filters to apply,
# from the local directory or the filters directory.
parser = argparse.ArgumentParser(description="FaceDancer USB Proxy")
parser.add_argument('-v', dest='vendorid', metavar='<VendorID>',
type=vid_pid, help="Vendor ID of device",
required=True)
parser.add_argument('-p', dest='productid', metavar='<ProductID>',
type=vid_pid, help="Product ID of device",
required=True)
parser.add_argument('-f', dest='fastsetaddr', action='store_true',
help="Use fast set_addr quirk")
args = parser.parse_args()
quirks = []

if args.fastsetaddr:
quirks.append('fast_set_addr')

u = FacedancerUSBApp(verbose=0)
# Create a new USBProxy device.
u = FacedancerUSBApp(verbose=1)
d = USBProxyDevice(u, idVendor=args.vendorid, idProduct=args.productid, verbose=2, quirks=quirks)

# Add our standard filters.
# TODO: Make the PrettyPrintFilter switchable?
d.add_filter(USBProxyPrettyPrintFilter(verbose=5))
d.add_filter(USBProxySetupFilters(d, verbose=0))
d.add_filter(USBProxySetupFilters(d, verbose=2))

# TODO: Figure these out from the command line!
d.connect()

try:
Expand Down
34 changes: 34 additions & 0 deletions facedancer/USB.py
Expand Up @@ -51,3 +51,37 @@ class USB:
def interface_class_to_descriptor_type(interface_class):
return USB.if_class_to_desc_type.get(interface_class, None)


class USBDescribable(object):
"""
Abstract base class for objects that can be created from USB descriptors.
"""

# Override me!
DESCRIPTOR_TYPE_NUMBER = None

@classmethod
def handles_binary_descriptor(cls, data):
"""
Returns truee iff this class handles the given descriptor. By deafault,
this is based on the class's DESCRIPTOR_TYPE_NUMBER declaration.
"""
return data[1] == cls.DESCRIPTOR_TYPE_NUMBER



@classmethod
def from_binary_descriptor(cls, data):
"""
Attempts to create a USBDescriptor subclass from the given raw
descriptor data.
"""

for subclass in cls.__subclasses__():
# If this subclass handles our binary descriptor, use it to parse the given descriptor.
if subclass.handles_binary_descriptor(data):
return subclass.from_binary_descriptor(data)

return None


85 changes: 81 additions & 4 deletions facedancer/USBConfiguration.py
Expand Up @@ -2,11 +2,26 @@
#
# Contains class definition for USBConfiguration.

class USBConfiguration:
def __init__(self, configuration_index, configuration_string, interfaces, attributes=0xe0, max_power=250):
import struct

from .USB import USBDescribable
from .USBInterface import USBInterface
from .USBEndpoint import USBEndpoint

class USBConfiguration(USBDescribable):

DESCRIPTOR_TYPE_NUMBER = 0x02

def __init__(self, configuration_index, configuration_string_or_index, interfaces, attributes=0xe0, max_power=250):
self.configuration_index = configuration_index
self.configuration_string = configuration_string
self.configuration_string_index = 0

if isinstance(configuration_string_or_index, str):
self.configuration_string = configuration_string_or_index
self.configuration_string_index = 0
else:
self.configuration_string_index = configuration_string_or_index
self.configuration_stirng = None

self.interfaces = interfaces

self.attributes = attributes
Expand All @@ -17,6 +32,68 @@ def __init__(self, configuration_index, configuration_string, interfaces, attrib
for i in self.interfaces:
i.set_configuration(self)


@classmethod
def from_binary_descriptor(cls, data):
"""
Generates a new USBConfiguration object from a configuration descriptor,
handling any attached subordiate descriptors.
data: The raw bytes for the descriptor to be parsed.
"""

length = data[0]

# Unpack the main colleciton of data into the descriptor itself.
descriptor_type, total_length, num_interfaces, index, string_index, \
attributes, max_power = struct.unpack('<xBHBBBBB', data[0:length])

# Extract the subordinate descriptors, and parse them.
interfaces = cls._parse_subordinate_descriptors(data[length:total_length])
return cls(index, string_index, interfaces, attributes, max_power)


@classmethod
def _parse_subordinate_descriptors(cls, data):
"""
Generates descriptor objects from the list of subordinate desciptors.
data: The raw bytes for the descriptor to be parsed.
"""

# TODO: handle recieving interfaces out of order?
interfaces = []

# Continue parsing until we run out of descriptors.
while data:

# Determine the length and type of the next descriptor.
length = data[0]
descriptor = USBDescribable.from_binary_descriptor(data[:length])

# If we have an interface descriptor, add it to our list of interfaces.
if isinstance(descriptor, USBInterface):
interfaces.append(descriptor)
elif isinstance(descriptor, USBEndpoint):
interfaces[-1].add_endpoint(descriptor)

# Move on to the next descriptor.
data = data[length:]

return interfaces


def __repr__(self):
"""
Generates a pretty form of the configuation for printing.
"""
# TODO: make attributes readable

max_power_mA = self.max_power * 2
return "<USBConfiguration index={} num_interfaces={} attributes=0x{:02X} max_power={}mA>".format(
self.configuration_index, len(self.interfaces), self.attributes, max_power_mA)


def set_device(self, device):
self.device = device

Expand Down
7 changes: 7 additions & 0 deletions facedancer/USBDevice.py
Expand Up @@ -200,6 +200,13 @@ def handle_buffer_available(self, ep_num):
if callable(endpoint.handler):
endpoint.handler()

def handle_nak(self, ep_num):
if self.state == USB.state_configured and ep_num in self.endpoints:
endpoint = self.endpoints[ep_num]
if callable(endpoint.nak_callback):
endpoint.nak_callback()


# standard request handlers
#####################################################

Expand Down
56 changes: 54 additions & 2 deletions facedancer/USBEndpoint.py
Expand Up @@ -2,7 +2,13 @@
#
# Contains class definition for USBEndpoint.

class USBEndpoint:
import struct
from .USB import *

class USBEndpoint(USBDescribable):

DESCRIPTOR_TYPE_NUMBER = 0x05

direction_out = 0x00
direction_in = 0x01

Expand All @@ -21,7 +27,7 @@ class USBEndpoint:
usage_type_implicit_feedback = 0x02

def __init__(self, number, direction, transfer_type, sync_type,
usage_type, max_packet_size, interval, handler):
usage_type, max_packet_size, interval, handler=None, nak_callback=None):

self.number = number
self.direction = direction
Expand All @@ -31,13 +37,51 @@ def __init__(self, number, direction, transfer_type, sync_type,
self.max_packet_size = max_packet_size
self.interval = interval
self.handler = handler
self.nak_callback = nak_callback

self.interface = None

self.request_handlers = {
1 : self.handle_clear_feature_request
}

@classmethod
def from_binary_descriptor(cls, data):
"""
Creates an endpoint object from a description of that endpoint.
"""

# Parse the core descriptor into its components...
address, attributes, max_packet_size, interval = struct.unpack("xxBBHB", data)

# ... and break down the packed fields.
number = address & 0x7F
direction = address >> 7
transfer_type = attributes & 0b11
sync_type = attributes >> 2 & 0b1111
usage_type = attributes >> 4 & 0b11

return cls(number, direction, transfer_type, sync_type, usage_type,
max_packet_size, interval)


def set_handler(self, handler):
self.handler = handler

def __repr__(self):
# TODO: make these nice string representations
transfer_type = self.transfer_type
sync_type = self.sync_type
usage_type = self.usage_type
direction = "IN" if self.direction else "OUT"

# TODO: handle high/superspeed; don't assume 1ms frames
interval = self.interval

return "<USBEndpoint number={} direction={} transfer_type={} sync_type={} usage_type={} max_packet_size={} inderval={}ms>".format(
self.number, direction, transfer_type, sync_type, usage_type, self.max_packet_size, interval
)

def handle_clear_feature_request(self, req):
print("received CLEAR_FEATURE request for endpoint", self.number,
"with value", req.value)
Expand Down Expand Up @@ -69,8 +113,13 @@ def send_packet(self, data):
dev = self.interface.configuration.device
dev.maxusb_app.send_on_endpoint(self.number, data)


def send(self, data):

# If we're sending something that's exactly divisible by the
# max packet size, we'll have to send a ZLP once the packet is complete.
send_zlp = (len(data) % self.max_packet_size == 0) and (len(data) > 0)

# Send the relevant data one packet at a time,
# chunking if we're larger than the max packet size.
# This matches the behavior of the MAX3420E.
Expand All @@ -80,6 +129,9 @@ def send(self, data):

self.send_packet(packet)

if self.send_zlp:
self.send_packet([])


def recv(self):
dev = self.interface.configuration.device
Expand Down
43 changes: 37 additions & 6 deletions facedancer/USBInterface.py
Expand Up @@ -2,14 +2,17 @@
#
# Contains class definition for USBInterface.

import struct
from .USB import *

class USBInterface:
class USBInterface(USBDescribable):
DESCRIPTOR_TYPE_NUMBER = 0x4

name = "generic USB interface"

def __init__(self, interface_number, interface_alternate, interface_class,
interface_subclass, interface_protocol, interface_string_index,
verbose=0, endpoints=[], descriptors={}):
verbose=0, endpoints=None, descriptors=None):

self.number = interface_number
self.alternate = interface_alternate
Expand All @@ -18,8 +21,8 @@ def __init__(self, interface_number, interface_alternate, interface_class,
self.protocol = interface_protocol
self.string_index = interface_string_index

self.endpoints = endpoints
self.descriptors = descriptors
self.endpoints = []
self.descriptors = descriptors if descriptors else {}

self.verbose = verbose

Expand All @@ -32,12 +35,40 @@ def __init__(self, interface_number, interface_alternate, interface_class,

self.configuration = None

for e in self.endpoints:
e.set_interface(self)
if endpoints:
for endpoint in endpoints:
self.add_endpoint(endpoint)

self.device_class = None
self.device_vendor = None

@classmethod
def from_binary_descriptor(cls, data):
"""
Generates an interface object from a descriptor.
"""
interface_number, alternate_setting, num_endpoints, interface_class, \
interface_subclass, interface_protocol, interface_string_index \
= struct.unpack("xxBBBBBBB", data)
return cls(interface_number, alternate_setting, interface_class,
interface_subclass, interface_protocol, interface_string_index)


def __repr__(self):
endpoints = [endpoint.number for endpoint in self.endpoints]

return "<USBInterface number={} alternate={} class={} subclass={} protocol={} string_index={} endpoints={}>".format(
self.number, self.alternate, self.iclass, self.subclass, self.protocol, self.string_index, endpoints
)


def add_endpoint(self, endpoint):
"""
Adds
"""
self.endpoints.append(endpoint)
endpoint.set_interface(self)

def set_configuration(self, config):
self.configuration = config

Expand Down

0 comments on commit d436c82

Please sign in to comment.