# Imports

In [1]:
import gym, requests, random, sys, struct, string
import numpy as np
import yaml
import json, copy
from requests.exceptions import ChunkedEncodingError

# Data Setup

In [2]:
def create_json_body(object_data, existing_objects: dict):
    """Converts components of the OpenAPI specifications to JSON objects.

    Args:
        object_data (yaml): component to be built.
        existing_objects (dict): components already built that may be referenced in other components.

    Returns:
        JSON: The component in a JSON format. 
    """
    json_body = {}
    property_dict = {}

    if 'properties' in object_data:
        properties = object_data['properties']

        for prop_name, prop_data in properties.items():
            if 'type' in prop_data:
                prop_type = prop_data['type']
                if prop_type == 'array':
                    if 'type' in prop_data['items']:
                        array_type = prop_data['items']['type']
                        property_dict[prop_name] = [array_type]
                    elif '$ref' in prop_data['items']:
                        array_type = prop_data['items']['$ref']
                        property_dict[prop_name] = [existing_objects.get(prop_ref)]
                else:
                    property_dict[prop_name] = prop_type
            elif '$ref' in prop_data:
                prop_ref = prop_data['$ref'].split('/')[-1]
                property_dict[prop_name] = existing_objects.get(prop_ref)
        return property_dict
    else:
        prop_data = object_data['content']['application/json']['schema']
        if '$ref' in prop_data:
            return existing_objects.get(prop_data['$ref'].split('/')[-1])
        elif prop_data['type'] == "array":
            return [existing_objects.get(prop_data['items']['$ref'].split('/')[-1])]
        


In [3]:
def fill_body_values(schema, old_sample, contains_previous, mutation_methods):
    if schema is not None:
        sample = copy.deepcopy(schema)
        if contains_previous:
            sample = fill_previous_body(sample, old_sample)

        for item in sample.items():
            if item[1] is None:
                print("THIS ITEM IS NONEE!!!!")
                print(sample)
            if type(item[1]) is list:
                list_size = random.randint(0,100)
                values = []

                if list_size <= len(item[1]):
                    values = item[1][0:list_size]
                else:
                    values = [schema[item[0]][0]] * list_size
                    values[0:len(item[1])] = item[1]
                
                for i in range(list_size):
                    if type(values[i]) is dict:
                        if type(schema[item[0]]) is list:
                            values[i] = fill_body_values(schema[item[0]][0], values[i], contains_previous, mutation_methods)
                        else:
                            values[i] = fill_body_values(schema[item[0]], values[i], contains_previous, mutation_methods)

                    else:
                        if type(values[i]) is str and values[i] in {"integer", "double", "float", "string", "boolean"}:
                            val = get_mutated_value(None, values[i], None)
                            if val is None:
                                print("Val is None!")
                            values[i] = val
                        else:
                            val = get_mutated_value(values[i], None, mutation_methods[type(values[i])])
                            if val is None:
                                print("Val is None!")
                            values[i] = val

                sample[item[0]] = values

            else:
                if type(item[1]) is dict:
                        sample[item[0]] = fill_body_values(schema[item[0]], item[1], contains_previous, mutation_methods)
                else:
                    if type(item[1]) is str and item[1] in {"integer", "double", "float", "string", "boolean"}:
                        val = get_mutated_value(None, item[1], None)
                        if val is None:
                                print("Val is None!")
                        sample[item[0]] = val
                    else:
                        val = get_mutated_value(item[1], None,  mutation_methods[type(item[1])])
                        if val is None:
                            print("Val is None!")
                        sample[item[0]] = val
        return sample

def fill_previous_body(schema, old_sample):
    sample = copy.deepcopy(schema)
    for item in old_sample.items():
        if type(item[1]) is list:
            if len(item[1]) > 0:
                if item[1][0] is dict:
                    #TODO Implement to when it is a dict inside a list
                    raise ValueError("Unsuported dict inside list verification")
                else:
                    sample[item[0]] = item[1]
        else:
            if type(item[1]) is dict:
                sample[item[0]] = fill_previous_body(sample[item[0]], item[1])
            else:
                sample[item[0]] = item[1]
    

    return sample


def fill_parameter_values(input_parameters, contains_previous, mutation_methods):
    for parameter in input_parameters:
        if type(parameter['schema']) is list:
            list_size = random.randint(0,100)
            values = []
            if contains_previous:
                if list_size <= len(parameter['sample']):
                    values = parameter['sample'][0:list_size]
                else:
                    values = [None] * list_size
                    values[0:len(parameter['sample'])] = parameter['sample']
                
                for i in range(list_size):
                    values[i] = get_mutated_value(values[i], None, mutation_methods[type(values[i])])
                
            else:
                for i in range(list_size):
                    values.append(get_mutated_value(None, parameter['schema'][0], None))

            parameter["sample"] = values

        else:
            parameter['sample'] = get_mutated_value(None, parameter['schema'], None)

    return input_parameters


def fill_values(function, contains_previous_values, mutation_methods):
    function['input_parameters'] = fill_parameter_values(function['input_parameters'], contains_previous_values, mutation_methods)

    if type(function['input_body']['schema']) is list:
        list_size = random.randint(0,100)
        list_schema = function['input_body']['schema'][0]
        items = []
        for i in range(list_size):
            items.append(fill_body_values(list_schema, function['input_body']['sample'], contains_previous_values, mutation_methods))

        function['input_body']['sample'] = items
    else:
        function['input_body']['sample'] = fill_body_values(function['input_body']['schema'], function['input_body']['sample'], contains_previous_values, mutation_methods)

    return function

# Mutation Methods

In [19]:
# Bit Flips: Randomly flips individual bits in the input data.
def bit_flips(data):
    if isinstance(data, str):
        chars = list(data)
        for i in range(len(chars)):
            char_code = ord(chars[i])
            flipped_code = char_code ^ random.getrandbits(16)
            chars[i] = chr(flipped_code)
        modified_data = ''.join(chars)
        return modified_data
    elif isinstance(data, bytes):
        flipped_data = bytearray(data)
        for i in range(len(flipped_data)):
            flipped_data[i] = flipped_data[i] ^ random.getrandbits(8)
        return bytes(flipped_data)
    elif isinstance(data, bool):
        flipped_data = not data
        return flipped_data
    else:
        raise ValueError("Unsupported data type for bit flips")

# Byte Shuffling: Shuffles the order of the bytes in the input data.
def byte_shuffling(data):
    if isinstance(data, str):
        char_list = list(data)
        random.shuffle(char_list)
        shuffled_string = ''.join(char_list)
        return shuffled_string
    elif isinstance(data, bytes):
        byte_list = list(data)
        random.shuffle(byte_list)
        return bytes(byte_list)
    elif isinstance(data, bool):
        return data
    else:
        raise ValueError("Unsupported data type for byte shuffling")

# Byte Injection/Deletion: Adds or removes random bytes, causing structural changes to the input data.
def byte_injection(data):
    if isinstance(data, str):
        code_points = list(data)
        code_point_to_inject = chr(random.randint(0, 1114111))
        index = random.randint(0, len(code_points))
        code_points.insert(index, code_point_to_inject)
        return ''.join(code_points)
    elif isinstance(data, bytes):
        mutated_data = bytearray(data)
        byte_to_inject = random.randint(0, 255)
        index = random.randint(0, len(mutated_data))
        mutated_data.insert(index, byte_to_inject)
        return bytes(mutated_data)
    else:
        raise ValueError("Unsupported data type for byte injection/deletion")

def byte_deletion(data):
    if isinstance(data, str):
        code_points = list(data)
        if len(code_points) > 0:
            index = random.randint(0, len(code_points) - 1)
            del code_points[index]
        return ''.join(code_points)
    elif isinstance(data, bytes):
        mutated_data = bytearray(data)
        if len(mutated_data) > 0:
            index = random.randint(0, len(mutated_data) - 1)
            del mutated_data[index]
        return bytes(mutated_data)
    else:
        raise ValueError("Unsupported data type for byte injection/deletion")

# Bytes Substitution: Randomly replaces bytes with others.
def bytes_substitution(data):
    if isinstance(data, str):
        mutated_data = ""
        for char in data:
            if random.random() < 0.5:  # 50% probability for substitution
                mutated_data += random.choice(string.printable)
            else:
                mutated_data += char
        return mutated_data
    elif isinstance(data, bytes):
        mutated_data = bytearray(data)
        for i in range(len(mutated_data)):
            mutated_data[i] = random.randint(0, 255)
        return bytes(mutated_data)
    else:
        raise ValueError("Unsupported data type for bytes substitution")

# Truncation: Shortens the input data by removing trailing bytes.
def truncation(data):
    if isinstance(data, str):
        if len(data) > 1:
            truncation_length = random.randint(0, len(data)-1)
            return data[:-truncation_length]
        else:
            return data
    elif isinstance(data, bytes):
        if len(data) > 1:
            truncation_length = random.randint(0, len(data)-1)
            return data[:-truncation_length]
        else:
            return data
    else:
        raise ValueError("Unsupported data type for truncation")

# Dictionary Fuzzy: Substitutes certain parameters for other pre-defined ones.
def dictionary_fuzzy(data, substitutions):
    if isinstance(data, str):
        for key, value in substitutions.items():
            data = data.replace(key, value)
        return data
    elif isinstance(data, bytes):
        for key, value in substitutions.items():
            data = data.replace(bytes(key, 'utf-8'), bytes(value, 'utf-8'))
        return data
    else:
        return data

def arithmetic_addition(data):
    if isinstance(data, float):
        return data + random.uniform(sys.float_info.min, sys.float_info.max)
    elif isinstance(data, int):
        new_value = data + random.randrange(-2147483648, 2147483647)
        if new_value > 2147483647 or new_value < -2147483648:
            return data
        return new_value
    else:
        raise ValueError("Unsupported data type for arithmetic operations")

def arithmetic_subtraction(data):
    if isinstance(data, float):
        return data - random.uniform(sys.float_info.min, sys.float_info.max)
    elif isinstance(data, int):
        new_value = data - random.randrange(-2147483648, 2147483647)
        new_value = new_value & 0xFFFFFFFF  # 0xFFFFFFFF represents a 32-bit mask (all 1's)
        # If the number exceeds the positive limit of 2147483647, convert to negative equivalent
        if new_value > 2147483647 or new_value < -2147483648:
            return data
        return new_value
    else:
        raise ValueError("Unsupported data type for arithmetic operations")

def arithmetic_multiplication(data):
    if isinstance(data, float):
        return data * random.uniform(sys.float_info.min, sys.float_info.max)
    elif isinstance(data, int):
        new_value = data * random.randrange(-2147483648, 2147483647)
        new_value = new_value & 0xFFFFFFFF  # 0xFFFFFFFF represents a 32-bit mask (all 1's)
        # If the number exceeds the positive limit of 2147483647, convert to negative equivalent
        if new_value > 2147483647 or new_value < -2147483648:
            return data
        return new_value
    else:
        raise ValueError("Unsupported data type for arithmetic operations")

def arithmetic_division(data):
    if isinstance(data, float):
        return data / random.uniform(sys.float_info.min, sys.float_info.max)
    elif isinstance(data, int):
        return data // random.randrange(-2147483648, 2147483647)
    else:
        raise ValueError("Unsupported data type for arithmetic operations")

# # Crossbreeding: Merges two different input values of the same type to produce a new one.
# def crossbreeding(data, datatype):
#     generated_data = random_generation(datatype)
#     if isinstance(data, str):
#         data1_list = list(data)
#         data2_list = list(generated_data)
#         random.shuffle(data1_list)
#         random.shuffle(data2_list)
#         crossover_point = random.randint(0, min(len(data1_list), len(data2_list)))
#         new_data = data1_list[:crossover_point] + data2_list[crossover_point:]
#         return ''.join(new_data)
#     elif isinstance(data, bytes):
#         data1_list = list(data)
#         data2_list = list(to_binary(generated_data))
#         # Ensure data1_list and data2_list have the same length
#         max_len = max(len(data1_list), len(data2_list))
#         data1_list.extend([random.choice(data1_list) for _ in range(max_len - len(data1_list))])
#         data2_list.extend([random.choice(data2_list) for _ in range(max_len - len(data2_list))])
#         crossover_point = random.randint(0, max_len)
#         new_data = data1_list[:crossover_point] + data2_list[crossover_point:]
#         return bytes(new_data)

#     return data

# Random Generation: Randomly generates input of a certain type.
def random_generation(data_type):
    if data_type == str:
        size = random.randint(0, 500)  # Random size between int min and int max
        return ''.join(random.choice(string.printable) for _ in range(size))
    elif data_type == int:
        return random.randrange(-2147483648, 2147483647)  # Random integer between int min and int max
    elif data_type == float:
        return random.uniform(sys.float_info.min, sys.float_info.max)
    elif data_type == bool:
        return random.choice([True, False])
    elif data_type == bytes:
        size = random.randint(0, 500)  # Random size between 1 and 10
        return bytes(random.randint(0, 255) for _ in range(size))
    else:
        raise ValueError("Unsupported data type for random generation")

In [18]:
print(truncation("a"))

None


In [5]:
# Method to Convert to Binary
def to_binary(data):
    if isinstance(data, str):
        binary_data = data.encode('utf-8')
    elif isinstance(data, int):
        binary_data = struct.pack("i", data)
    elif isinstance(data, float):
        binary_data = struct.pack('d', data)
    elif isinstance(data, bool):
        binary_data = struct.pack('?', data)
    elif isinstance(data, bytes):
        binary_data = data
    else:
        raise ValueError("Unsupported data type")

    return binary_data

# Method to Convert From Binary to original data type.
def from_binary(binary_data, data_type):
    if data_type == str:
        output_value = binary_data.decode('utf-8')
    elif data_type == int:
        output_value = struct.unpack("i", binary_data)[0]
    elif data_type == float:
        output_value = struct.unpack('d', binary_data)[0]
    elif data_type == bool:
        output_value = struct.unpack('?', binary_data)[0]
    elif data_type == bytes:
        output_value = binary_data
    else:
        raise ValueError("Unsupported data type")

    return output_value

In [6]:
def get_mutated_value(old_value, datatype, method):
    try:
        if old_value is None:
            if datatype == 'integer':
                return random_generation(int)
            elif datatype == 'float' or datatype == 'double':
                return random_generation(float)
            elif datatype == 'boolean':
                return random_generation(bool)
            elif datatype == 'string':
                return random_generation(str)
            else:
                random_generation(bytes)
        else:
            datatype = type(old_value)
            if method is random_generation:
                return random_generation(datatype)
            if datatype is int or datatype is float:
                if method is arithmetic_division or method is arithmetic_addition or method is arithmetic_multiplication or method is arithmetic_subtraction:
                    return method(old_value)
                else:
                    new_value_bin = to_binary(old_value)
                    return from_binary(method(new_value_bin), datatype)
            else:
                print(old_value)
                print(datatype)
                print(method)
                return method(old_value)
    except:
        raise ValueError("NotImplemented")
            


# Reinforcement Learning Environment

In [7]:
import gym
from gym.spaces import MultiDiscrete, Discrete
import numpy as np

class APIFuzzyTestingEnvironment(gym.Env):
    def __init__(self, base_url, function, mutation_methods):
        super(APIFuzzyTestingEnvironment, self).__init__()
        self.function = function
        self.base_url = base_url
        self.response = None
        self.mutation_methods = mutation_methods  # List of mutation methods.
        self.action_space: MultiDiscrete = MultiDiscrete([len(methods) for methods in mutation_methods])
        self.observation_space: Discrete = Discrete(5) # Possible HTTP error codes.

    def step(self, action):
        # Execute the action on the API and get the response
        mutation_methods = {}
        mutation_methods[int] = self.mutation_methods[0][action[0]]
        mutation_methods[float] = self.mutation_methods[1][action[1]]
        mutation_methods[bool] = self.mutation_methods[2][action[2]]
        mutation_methods[bytes] = self.mutation_methods[3][action[3]]
        mutation_methods[str] = self.mutation_methods[4][action[4]]
         
        self.function = fill_values(self.function, True, mutation_methods)
        
        self.response = self._execute_action(self.function)

        # Calculate the reward based on the response
        reward = self._calculate_reward()

        print(self.response.status_code)
        print(reward)
        print(action)

        # Update the state of the environment (if applicable)
        # self._update_environment_state()

        # Return the new state, reward, and episode completion status
        return self.state, reward, True

    def reset(self):
        self.state = 1
        return self.state

    def _execute_action(self, function):
        parameters = {}
        headers = {'Content-type': function['content-type'], 'Accept': '*/*'}
        headers = {}
        path: str = copy.deepcopy(function['path'])
        if len(function['input_parameters']) > 0:
            for item in function['input_parameters']:
                if item['name'] in path:
                    path = path.replace('{' + item['name'] + '}', str(item['sample']))
                else:
                    parameters[item['name']] = str(item['sample'])
            
        if function['method'] == 'GET':
            return requests.get(self.base_url + path, params=parameters, timeout=5)
        
        elif function['method'] == 'PUT':
            return requests.put(self.base_url + path, json=function['input_body']['sample'], headers=headers, params=parameters, timeout=5)

        elif function['method'] == 'DELETE':
            return requests.delete(self.base_url + path, json=function['input_body']['sample'], headers=headers, params=parameters, timeout=5)
        
        elif function['method'] == 'POST':
            try:
                if function['content-type'] != "multipart/form-data":
                    return requests.post(self.base_url + path, json=function['input_body']['sample'], headers=headers, params=parameters, timeout=5)
                else:
                    return requests.post(self.base_url + path, files=function['input_body']['sample'], params=parameters, timeout=5)
                
            except ChunkedEncodingError as ex:
                print(f"Invalid chunk encoding {str(ex)}")

    def _calculate_reward(self):
        # Implement the logic to calculate the reward based on the API response
        # You can define a reward function that suits your testing objective
        # Return the reward value
        if int(self.response.status_code) in range(100, 200):
            return 0
        elif int(self.response.status_code) in range(200, 300):
            return 5
        elif int(self.response.status_code) in range(300, 400):
            return 5
        elif int(self.response.status_code) in range(400, 500):
            return -20
        elif int(self.response.status_code) >= 500:
            return 10


    def _update_environment_state(self, function):
        # Implement any necessary state updates based on the API response
        # For example, update the state with new information after each request

        # Change the state
        if int(self.response.status_code) in range(100, 200):
            self.state = 0
        elif int(self.response.status_code) in range(200, 300):
            self.state = 1
        elif int(self.response.status_code) in range(300, 400):
            self.state = 2
        elif int(self.response.status_code) in range(400, 500):
            self.state = 3
        elif int(self.response.status_code) >= 500:
            self.state = 4

        # update the dictionary with the new ID
        # TODO this

    def _change_environment_function(self, function):
        # Changes the function under test
        self.function = function
    
    def render(self, mode='human'):
        # Implement the logic to visualize the environment if needed
        if mode == 'human':
            # Print the current state or any relevant information for human visualization
            print("Current state:", self.state)
            print("Last response status code:", self.response.status_code)
            # Print any other relevant information about the environment

        elif mode == 'machine':
            # Return the current state or any relevant information for machine visualization
            # You can return it as a dictionary or in any other appropriate format
            return {'current_state': self.state, 'last_response_status': self.response.status_code}

        else:
            raise ValueError("Invalid render mode. Supported modes are 'human' and 'machine'.")

        pass

In [8]:
class QLearningAgent:
    def __init__(self, env: APIFuzzyTestingEnvironment, mutation_methods, learning_rate=0.1, discount_factor=0.9, exploration_rate=0.1):
        self.env = env
        self.int_q_table = np.zeros([env.observation_space.n, len(mutation_methods[0])])
        self.float_q_table = np.zeros([env.observation_space.n, len(mutation_methods[1])])
        self.bool_q_table = np.zeros([env.observation_space.n, len(mutation_methods[2])])
        self.byte_q_table = np.zeros([env.observation_space.n, len(mutation_methods[3])])
        self.string_q_table = np.zeros([env.observation_space.n, len(mutation_methods[4])])
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.exploration_rate = exploration_rate

    def choose_action(self, state, int_q_table, float_q_table, bool_q_table, byte_q_table, string_q_table):
        # Epsilon-greedy exploration policy
        if np.random.uniform(0, 1) < self.exploration_rate:
            action = self.env.action_space.sample()
        else:
            action = []
            action.append(np.argmax(int_q_table[state]))
            action.append(np.argmax(float_q_table[state]))
            action.append(np.argmax(bool_q_table[state]))
            action.append(np.argmax(byte_q_table[state]))
            action.append(np.argmax(string_q_table[state]))
        return action

    def update_q_table(self, state, action, reward, next_state, q_table):
        # Q-Learning update rule
        max_q_value = np.max(q_table[next_state])
        q_table[state, action] += self.learning_rate * (
            reward + self.discount_factor * max_q_value - q_table[state, action])

    def train(self, num_episodes):
        for episode in range(num_episodes):
            state = self.env.reset()
            done = False

            while not done:
                action = self.choose_action(state, self.int_q_table, self.float_q_table, self.bool_q_table, self.byte_q_table, self.string_q_table)
                next_state, reward, done = self.env.step(action)
                print("NEXT STATE AND REWARD")
                print(next_state)
                print(reward)
                self.update_q_table(state, action[0], reward, next_state, self.int_q_table)
                self.update_q_table(state, action[1], reward, next_state, self.float_q_table)
                self.update_q_table(state, action[2], reward, next_state, self.bool_q_table)
                self.update_q_table(state, action[3], reward, next_state, self.byte_q_table)
                self.update_q_table(state, action[4], reward, next_state, self.string_q_table)
                state = next_state

            # Print episode statistics or perform other necessary actions

    def test(self):
        # Run the trained agent without exploration (exploitation only)
        state = self.env.reset()
        done = False

        while not done:
            action = self.choose_action(state, self.int_q_table, self.float_q_table, self.bool_q_table, self.byte_q_table, self.string_q_table)
            next_state, _, done = self.env.step(action)
            state = next_state

            # Perform necessary actions during testing (e.g., log results, visualize, etc.)

# Execution

In [31]:
# Read the OpenAPI specification file (assuming it's in YAML format)
with open('openapi_petshop.yaml', 'r') as file:
    spec = yaml.safe_load(file)

# Extract the base connection string (servers -> url)
base_url = spec['servers'][0]['url']

# Load schemas
schemas = {}

for object_name, object_data in spec['components']['schemas'].items():
    parameter_schema = create_json_body(object_data, schemas)
    schemas[object_name] = parameter_schema

for object_name, object_data in spec['components']['requestBodies'].items():
    parameter_schema = create_json_body(object_data, schemas)
    schemas[object_name] = parameter_schema

# Extract the functions, input schemas, and output schemas
functions = {}
for path, path_item in spec['paths'].items():
    for method, operation in path_item.items():
        function_name = operation.get('operationId')
        request_body = operation.get('requestBody')
        request_parameters = operation.get('parameters')
        input_schema = None
        parameter_name = None
        parameter_in  = None
        parameter_schema = None
        parameter_type = None
        schema = None

        function_parameters: list = []

        # Extract parameters
        if request_parameters != None:
            for parameters in request_parameters:
                if request_parameters != None:
                    parameter_name = parameters['name']
                    parameter_in = parameters['in']
                    parameter_schema = parameters['schema']['type']

                    if parameter_schema == 'array':
                        parameter_schema = [parameters['schema']['items']['type']]

                function_parameters.append({
                    "name": parameter_name,
                    "in": parameter_in,
                    "schema": parameter_schema,
                    "sample": copy.deepcopy(parameter_schema)
                })

        # Extract input schema
        if(request_body != None and 'content' in request_body):
            input_schema = request_body['content']
            if 'application/json' in input_schema:
                input_applicaton = 'application/json'
                input_schema = input_schema['application/json']['schema']
            elif 'application/octet-stream' in input_schema:
                input_applicaton = 'application/octet-stream'
                input_schema = input_schema['application/octet-stream']['schema']
            elif 'application/x-www-form-urlencoded' in input_schema:
                input_applicaton = 'application/x-www-form-urlencoded'
                input_schema = input_schema['application/x-www-form-urlencoded']['schema']
            elif 'multipart/form-data' in input_schema:
                input_applicaton = 'multipart/form-data'
                input_schema = input_schema['multipart/form-data']['schema']

            if 'type' in input_schema:
                if input_schema['type'] == 'array':
                    schema = "array"
                    schema_type = input_schema['items']
                    if 'type' in schema_type:
                        schema_type = input_schema['items']['type']
                    elif '$ref' in schema_type:
                        schema = schemas.get(input_schema['items']['$ref'].split('/')[-1])
                else:
                    schema = input_schema['type']
            
            if '$ref' in input_schema:
                schema = schemas.get(input_schema['$ref'].split('/')[-1])
        
        if schema is None:
            input_body = {"schema": [], "sample": None}

        input_body = {"schema": schema, "sample": copy.deepcopy(schema)}

        function = fill_values({
            'path': path,
            'content-type': input_applicaton,
            'method': method.upper(),
            'input_parameters': function_parameters,
            'input_body': input_body
        }, False, None)

        # Store the information in the functions dictionary
        functions[function_name] = function

# Print the base URL and functions dictionary
print("Base URL:", base_url)

Base URL: http://localhost:80/v3


In [32]:
# Initialize mutation methods for different data types
int_mutation_methods = [bit_flips, byte_shuffling, bytes_substitution, arithmetic_addition, arithmetic_subtraction, arithmetic_multiplication, arithmetic_division, random_generation]
float_mutation_methods = [bit_flips, byte_shuffling, bytes_substitution, arithmetic_addition, arithmetic_subtraction, arithmetic_multiplication, arithmetic_division, random_generation]
bool_mutation_methods = [bit_flips, byte_shuffling, random_generation]
byte_mutation_methods = [bit_flips, byte_shuffling, byte_injection, byte_deletion, bytes_substitution, truncation, arithmetic_addition, arithmetic_subtraction, arithmetic_multiplication, arithmetic_division, random_generation]
str_mutation_methods = [bit_flips, byte_shuffling, byte_injection, byte_deletion, bytes_substitution, truncation, random_generation]

# Combine all mutation methods into a single list
mutation_methods = [int_mutation_methods, float_mutation_methods, bool_mutation_methods, byte_mutation_methods, str_mutation_methods]

scenarios = [["addPet", "updatePet", "getPetById"]]
env = None

for scenario_functions in scenarios:
    for function in scenario_functions:
        if env is None:
            # Create an instance of the API testing environment
            env = APIFuzzyTestingEnvironment(base_url, functions[function], mutation_methods)
        else:
            env._update_environment_state(functions[function])
        
        # Create an instance of the Q-Learning agent
        agent = QLearningAgent(env, mutation_methods)

        # Train the agent
        agent.train(num_episodes=10)

        # Test the agent
        agent.test()

Exception in callback BaseAsyncIOLoop._handle_events(484, 1)
handle: <Handle BaseAsyncIOLoop._handle_events(484, 1)>
Traceback (most recent call last):
  File "c:\DEV_TOOLS\envs\FuzzTheREST_Env\Lib\site-packages\jupyter_client\session.py", line 100, in json_packer
    ).encode("utf8", errors="surrogateescape")
      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
UnicodeEncodeError: 'utf-8' codec can't encode character '\udf49' in position 93: surrogates not allowed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "c:\DEV_TOOLS\envs\FuzzTheREST_Env\Lib\asyncio\events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "c:\DEV_TOOLS\envs\FuzzTheREST_Env\Lib\site-packages\tornado\platform\asyncio.py", line 206, in _handle_events
    handler_func(fileobj, events)
  File "c:\DEV_TOOLS\envs\FuzzTheREST_Env\Lib\site-packages\zmq\eventloop\zmqstream.py", line 634, in _handle_events
    self._handle_recv()
  Fi

In [7]:
falta organizar o dicionario que guardará elementos para passar p op proximo agente

Falta meter metricas no agente


SyntaxError: invalid syntax (1758034295.py, line 1)

In [25]:
print(functions['addPet'])

{'path': '/pet', 'content-type': 'application/json', 'method': 'POST', 'input_parameters': [], 'input_body': {'schema': {'id': 'integer', 'category': {'id': 'integer', 'name': 'string'}, 'name': 'string', 'photoUrls': ['string'], 'tags': [{'id': 'integer', 'name': 'string'}], 'status': 'string'}, 'sample': {'id': 714660785, 'category': {'id': 770653212, 'name': '\uf61b鱐눡蹨㹶쩲꺾큶駪죎鶔Χ䫝緙ﾈ⁚ⰻ෴凈皬⚥謕醮쮼ց敛舼ᙉ岽ᰓ슋쩠པ謘䉊纊\uf806怲᎘傯䠚쨝딇䛀矜\uee18ꡘ䚤\uef78藴턂\ud889\uf2ff鏅䴶檮佋궏韠ᯇ趰懯\ud81a\ueceb\ue0e4꺘კ欁퀫\ue45f멈쏺妟颽ꤎ䲣ސ\u1f58﮼鏽ݖꬎ虼ᢉ菼ﻹ二ᦶ全-\uec39赺䧹\ue836鸠Ꮫ鑕䭛\udb5b（饁\uf373㙻ᚵ\ue100퓳飕찅ʂ琧\uf410㤥\udd84썟层凟섚⼗㺚፞攃ᦙﺥ仈㕯\uea62䑀㜢轚\ueaa5뮰㠯乚\uf400ꞑ⏖᠇讗뭷Ⓓ͢㑯슧ꨗ汰Ꮉ\udb04㕉偾댟덡욂㶞䥹ϕᇒᖲ㷓녰ഔ墒풚쁪ꠕ瀶呬;뷥祿ရ˜據怭誝બ㖾眧腪鱅괈⾝\uf485껑갥끆댥韏뛰\uf63a\ud86e㯺낅䦽\ud862➚Ꮂ㲀\x97\uef18\ue36a武珑錢쀸ꍽ킺ᑈ첿敒᫄궛䳁፲猞ɼ\ue990碱熄琨磅ꎔࢿᆻ萩鋣窱䢽휭᧚镣\udbb9飜덮\ue0f9ﶪ뷼辰籗䞴\ue871ᾇ'}, 'name': '윿髁쌰ꓛ\ua6fa헗秸圯㭻', 'photoUrls': ['얣\uf523硴쀺떟\ueffd', '平㯨镹舷̦', '坦⟇䂮烻䗲봨玣ြꠖᬺ彯\uf582鍤檘栉喞먩駄褃鐍힔ἶ몎勭䜯\uf85c\udce4杣峌怨탺⺜\udc36\ued20\uf4e0꼿蹏⽒\uefb2䑋㇝ዌ\uee9a䁄瓊䬒섨ᥱѸ訛\uf6fbᚼ㫬荥䯊퀍䕈悤ꆇ湁怎ᢔ桦㰺ࠃ䱓읣\uec63ᎁ\ufde6\ufaf9\uee41废Վ\uef33\uf256퍴螋긷팜璩濎

In [None]:
Falta o método de dicionario