# NOTICE

This software was produced for the U. S. Government
under Contract No. FA8702-19-C-0001, and is
subject to the Rights in Noncommercial Computer Software
and Noncommercial Computer Software Documentation Clause DFARS 252.227-7014 (FEB 2014)

Copyright 2024 The MITRE Corporation


# Imports

In [1]:
# Standard python packages
import random
import tqdm
import re
from datetime import datetime
import os
import re
import json
import signal
import configparser

# Packages that need to be installed
from pymongo import MongoClient
import openai

# Configuration

The file `CLLaMP.cfg` contains configurable parameters for various system components.  Look in that file for a description of each parameter.

In [2]:
# Load the config file
if __name__ == '__main__':
    config = configparser.ConfigParser(inline_comment_prefixes = ('#', ';'))
    config.read('CLLaMP.cfg');

# Database

We're using the NoSQL database MongoDB because it allows us to evolve the schema dynamically, and the underlying data model (collections of documents) is appropriate for the kind of data we'll be storing - CVEs, PDDL planning operators, etc.

In [3]:
class Mongo:
    """
    This class is a wrapper for accessing the CLLaMP MondoDB.  There is a single database and a single
    collection used for CLLaMP.  Each document in the collection represents a CVE.  CVEs are initially 
    loaded from files that are assumed to be in CVE JSON 5.0 format as described here: 
    
    https://www.cve.org/AllResources/CveServices#cve-json-5.  

    The schema for documents in the collection is as follows:

    ```
    {
      cve_id: ID assigned to the CVE, e.g., CVE-2022-0991
      raw_data: The raw contents of the file containing the CVE as a string
      date_published: The date the CVE was originally published
      date_inserted: The date the CVE was inserted into this database
      description: The text of the description of the CVE
      pddl {
        operator: The planning operator extracted from the CVE
        date_inserted: The date the operator was inserted into this database
      }
    }
    ```   

    Class instances have have the following data members:

      config (ConfigParser)   - CLLaMP config parser        
      conn_string (str)       - Mongo connection string  
      client (MongoClient)    - Mongo client  
      db (Database)           - CLLaMP database  
      collection (Collection) - CLLaMP collection  
    """

    def __init__(self, config, connect = True):
        """
        The initializer for Mongo instances takes the following arguments.
        
        Arguments:

          config (ConfigParser) - CLLaMP config parser  
          connect (bool)        - If true, connect to Mongo server during init  
        """

        # Check the config data 
        assert not (config.has_option('MONGO', 'username') ^ config.has_option('MONGO', '')), "Config must have both MONGO 'username' and 'password' or neither"
        assert config.has_option('MONGO', 'host') and config.has_option('MONGO', 'port'), "Config must have both MONGO 'host' and 'port'"
        assert config.has_option('MONGO', 'db') and config.has_option('MONGO', 'collection'), "Config must have both MONGO 'db' and 'collection'"

        # Build the connection string
        if config.has_option('MONGO', 'username'):
            conn_string = 'mongodb://%s:%s@%s:%s/' % (config.get('MONGO', 'username'),
                                                      config.get('MONGO', 'password'),
                                                      config.get('MONGO', 'host'),
                                                      config.get('MONGO', 'port'))
        else:
            conn_string = 'mongodb://%s:%s/' % (config.get('MONGO', 'host'),
                                                config.get('MONGO', 'port'))

        self.config = config
        self.conn_string = conn_string
        self.client = None
        self.db = None
        self.collection = None

        if connect:
            self.connect()
        

    def connect(self):
        """
        Connect to the Mongo server.  This must be called before any interactions with the
        database.  By default, it is called on creation of a Mongo instance.
        """
        
        self.client = MongoClient(self.conn_string)
        self.db = self.client[self.config.get('MONGO', 'db')]
        self.collection = self.db[self.config.get('MONGO', 'collection')]

        self.collection.create_index('cve_id', unique = True)


    def drop_db(self):
        """
        Drop the CLLaMP database.
        """
        
        self.client.drop_database(self.config.get('MONGO', 'db'))
        self.connect()


    def add_cve(self, cve_file, replace = False):
        """
        Add a CVE to the CLLaMP database.

        Arguments:
          cve_file (str) - File containing the CVE in CVE JSON 5.0 format  
          replace (bool) - If true and the CVE exists in the DB, replace the entire record.  If false and the 
                           CVE exists in the DB, exit with a warning.  
        """
        
        raw_data = open(cve_file).read()
        json_data = json.loads(raw_data)
        cve_id = json_data['cveMetadata']['cveId']

        # Parse the date published
        date_published = None
        if 'datePublished' in json_data['cveMetadata']:
            date_published = json_data['cveMetadata']['datePublished']
            if re.match(r'\d\d\d\d-\d\d-\d\d', date_published):
                date_published = datetime.strptime(date_published[:10], '%Y-%m-%d')

        # Handle the case where the CVE already exists in the collection
        if self.collection.find_one({'cve_id':cve_id}) and not replace:
            print('CVE with ID %s already exists' % cve_id)
            return None

        # Get CVE description
        description = None
        descriptions = []
        for container in json_data['containers']:
            if 'descriptions' in json_data['containers'][container]:
                descriptions = json_data['containers'][container]['descriptions']
            for desc in descriptions:
                if desc['lang'].startswith('en'):
                    description = desc['value']

        # Create document
        doc = {
            'cve_id':cve_id,
            'raw_data':raw_data,
            'date_inserted':datetime.now(),
            'date_published':date_published
        }
        if description:
            doc['description'] = description
            self.collection.delete_one({'cve_id':cve_id})
            self.collection.insert_one(doc)
        elif not 'REJECTED' in raw_data:
            print('CVE %s has no description' % cve_id)

        return doc

    
    def add_pddl(self, doc, pddl):
        """
        Add PDDL planning operator for a CVE.

        Arguments:
          doc (dict) - CVE document from which the operator was extracted  
          pddl (str) - The planning operator  
        """
        
        self.collection.update_one({'_id': doc['_id']},  {'$set': {'pddl':{'operator':pddl, 'inserted':datetime.now()}}}) 

In [4]:
# Get access to the database
if __name__ == '__main__':
    mongo = Mongo(config)

# CVEs

The CVEs used here were downloaded from https://www.cve.org/ on 9/7/2023 and are in CVE JSON 5.0 format.

In [5]:
def get_cve_files(cve_dir = 'cvelistV5-main/cves', filter = None):
    """
    Walk a directory tree and return a list of all of the files in that tree, which are
    each assumed to contain a CVE.

    Arguments:
      cve_dir (str) - Root of directory to walk  
      filter (str)  - If specified, regex applied to file paths such that only paths that match the regex are returned.
      For example, a filter of `json$` will only match files that end with json, or a filter of `2023` will only
      match paths that contain the year 2023.  

    Returns:  
        List of paths to JSON files, each of which contains a CVE.
    """
    
    cve_files = []
    
    for root, dirs, files in os.walk(cve_dir):
        for file in files:
            if filter is None or re.search(filter, file):
                cve_files.append(os.path.join(root, file))

    return cve_files

In [6]:
if __name__ == '__main__':
    
    # Load all 2023 CVEs
    cve_files = get_cve_files(filter = 'CVE-2023-01')
    
    print('%d files found' % (len(cve_files)))

    # Add them to the database
    for cve_file in tqdm.tqdm(cve_files):
        mongo.add_cve(cve_file, replace = True)

    # Some will not be added because they have been rejected by the CVE team
    print('%d CVEs in collection' % mongo.collection.count_documents({}))

96 files found


100%|██████████████████████████████████████████| 96/96 [00:00<00:00, 437.91it/s]

211483 CVEs in collection





# LLM

We're using OpenAI LLMs for this implementation.

In [7]:
class LLM:
    """
    A large language model used to extract PDDL from CVEs.  This class assumes that the model is from
    OpenAI.

    Intances of this class have the following data members:

      model (str)         - Name of the LLM to use   
      api_key (str)       - API key used to access (and charge for access to) the model  
      temperature (float) - Controls creativity of the text generated by LLM (closer to 0 = more deterministic)  
      top_p (float)       - Consider only tokens in the top p percent of the probability mass for next token  

    A good discussion of temperature and top_p can be found here: 
    
    https://community.openai.com/t/cheat-sheet-mastering-temperature-and-top-p-in-chatgpt-api-a-few-tips-and-tricks-on-controlling-the-creativity-deterministic-output-of-prompt-responses/172683
    """
    
    def __init__(self, config):
        """
        The initializer for LLM instances takes the following arguments.

        Arguments:

          config (ConfigParser) - CLLaMP config parser  
        """

        # Test validity of config file
        assert config.has_option('OPENAI', 'model'), "Config must have  OPENAI 'model'"
        assert config.has_option('OPENAI', 'api_key'), "Config must have OPENAI 'api_key'"
        assert config.has_option('OPENAI', 'temperature'), "Config must have OPENAI 'temperature'"
        assert config.has_option('OPENAI', 'top_p'), "Config must have OPENAI 'top_p'"

        self.config = config
        
        self.model = config.get('OPENAI', 'model')
        self.api_key = config.get('OPENAI', 'api_key')
        openai.api_key = self.api_key
        self.temperature = float(config.get('OPENAI', 'temperature'))
        self.top_p = float(config.get('OPENAI', 'top_p'))
        

    def make_message(self, role, content):
        """
        Create a messsage to send to the LLM

        Arguments:
          role (str)    - The role of the message - system, assistant, user  
          content (str) - The message's text  

        Returns: Correctly formatted message as a dict
        """
        
        return {'role':role, 'content':content}


    def CVE2PDDL(self, doc, verbose = False):
        """
        Extract a PDDL representation of a CVE.

        Arguments:
          doc (dict)     - Document from the database containing the CVE  
          verbose (bool) - If true print status information  

        Returns: operator as a string, list of messages sent/received to/from LLM
        """

        messages = []
        messages.append(
            self.make_message('system', 'You are an intelligent assistant.')
        )

        prompts = [
            """
            I want to convert CVEs into planning operators in PDDL.  The preconditions will test whether a system is
            vulnerable to the attack described in the CVE.  The effects will indicate what the attacker gains.

            I'm going to show you example CVEs and their corresponding planning operators in the format I want.
        
            Below is an example CVE:

            CVE-2023-2387
            A vulnerability classified as problematic was found in Netgear SRX5308 up to 4.3.5-3. 
            Affected by this vulnerability is an unknown functionality of the file 
            scgi-bin/platform.cgi?page=dmz_setup.htm of the component Web Management Interface. 
            The manipulation of the argument winsServer1 leads to cross site scripting. 
            The attack can be launched remotely. The exploit has been disclosed to the public and may be used. 
            The identifier VDB-227665 was assigned to this vulnerability. 
            NOTE: The vendor was contacted early about this disclosure but did not respond in any way.

            Here is the corresponding planning operator:

            (:action EXPLOIT-CVE-2023-2387
                 :parameters (?s ?a)
                 :precondition (and (system ?s)
                                    (attacker ?a)
                                    (has_remote_access ?a ?s)
                                    (has_component ?s 'Netgear SRX5308')
                                    (has_version 'Netgear SRX5308' 'up to 4.3.5-3')
                                    (manipulates ?a 'argument winsServer1'))
                 :effect (and (gain-access ?a ?s 'cross site scripting')))

            Here is another CVE:
    
            CVE-2023-2738
            A vulnerability classified as critical has been found in Tongda OA 11.10. 
            This affects the function actionGetdata of the file GatewayController.php. 
            The manipulation leads to unrestricted upload. It is possible to initiate the attack remotely. 
            The exploit has been disclosed to the public and may be used. 
            The identifier VDB-229149 was assigned to this vulnerability. 
            NOTE: The vendor was contacted early about this disclosure but did not respond in any way.

            Here is the corresponding planning operator:

            (:action EXPLOIT-CVE-2023-2738
                 :parameters (?s ?a)
                 :precondition (and (system ?s)
                                    (attacker ?a)
                                    (has_remote_access ?a ?s)
                                    (has_component ?s 'Tongda OA')
                                    (has_version 'Tongda OA' '11.10')
                                    (manipulates ?a 'actionGetdata'))
                 :effect (and (gain-access ?a ?s 'unrestricted upload')))

            Here is another CVE:

            CVE-2023-2368
            A vulnerability was found in SourceCodester Faculty Evaluation System 1.0. 
            It has been declared as critical. 
            This vulnerability affects unknown code of the file index.php?page=manage_questionnaire. 
            The manipulation of the argument id leads to sql injection. The attack can be initiated remotely. 
            The exploit has been disclosed to the public and may be used. The identifier of this vulnerability is VDB-227644.

            Here is the corresponding planning operator:
        
            (:action EXPLOIT-CVE-2023-2368
                 :parameters (?s ?a)
                 :precondition (and (system ?s)
                                    (attacker ?a)
                                    (has_remote_access ?a ?s)
                                    (has_component ?s 'SourceCodester Faculty Evaluation System')
                                    (has_version 'SourceCodester Faculty Evaluation System' '1.0')
                                    (manipulates ?a 'argument id'))
                 :effect (and (gain-access ?a ?s 'sql injection')))

            Here is another CVE:

            CVE-2023-2041
            A vulnerability classified as critical was found in novel-plus 3.6.2. 
            Affected by this vulnerability is an unknown functionality of the file /category/list?limit=10&offset=0&order=desc. 
            The manipulation of the argument sort leads to sql injection. 
            The attack can be launched remotely. The exploit has been disclosed to the public and may be used. 
            The associated identifier of this vulnerability is VDB-225919. 
            NOTE: The vendor was contacted early about this disclosure but did not respond in any way.

            Here is the corresponding planning operator:

            (:action EXPLOIT-CVE-2023-2041
                 :parameters (?s ?a)
                 :precondition (and (system ?s)
                                    (attacker ?a)
                                    (has_remote_access ?a ?s)
                                    (has_component ?s 'novel-plus')
                                    (has_version 'novel-plus' '3.6.2')
                                    (manipulates ?a 'argument sort'))
                 :effect (and (gain-access ?a ?s 'sql injection')))
        
            """,
        
        """Now convert the following CVE to a planning operator.\n\n%s\n%s""" % (doc['cve_id'], doc['description'])
        ]

        for prompt in prompts:
            if prompt.startswith('#'):
                continue
            if verbose:
                print('\n\n%s' % prompt)
            messages.append(self.make_message('user', prompt))
    
            chat = openai.ChatCompletion.create(model = self.model, 
                                                messages = messages, 
                                                temperature = self.temperature, 
                                                top_p = self.top_p)
            
            reply = chat.choices[0].message.content
            if verbose:
                print('\n%s' % reply)
            messages.append(
                self.make_message('assistant', reply)
            )

        reply = messages[-1]['content']
        pddl = re.findall("```pddl(.*)```", messages[-1]['content'], re.DOTALL)
        if len(pddl) == 0:
            pddl = None
        else:
            pddl = pddl[0].strip()
    
        return pddl, messages

In [8]:
if __name__ == '__main__':

    # Allocate access to an LLM
    llm = LLM(config)

# Operator extraction

In [9]:
def timeout_handler(signum, frame):
    """
    Simple timeout handler.  It accepts two arguments which are ignored because it is only called
    on a timeout.  It catches SIGALRM and raises an exception that mentions the LLM timeout.
    """

    raise Exception('Timeout waiting on LLM response')

In [10]:
def extract_operators(docs, mongo, llm, config, replace = False, verbose = False):
    """
    Extract planning operators from a set of documents, each corresponding to a CVE, 
    and update the database to include the operators.
    
    This function includes logic to manage LLM requests that run long or that are
    terminated by OpenAI.

    Arguments:
      mongo (Mongo)         - Mongo instance  
      llm (LLM)             - LLM instance  
      config (ConfigParser) - CLLaMP configuration parser  
      relace (bool)         - If false and doc has PDDL already, do not recompute and replace it  
      verbose (bool)        - Print progress information if true  
    """

    if not verbose:
        docs = tqdm.tqdm(docs)
        
    for doc in docs:

        if not replace and 'pddl' in doc:
            continue

        # Set an alarm before processing each doc
        if config.has_option('OPENAI', 'timeout'):
            signal.signal(signal.SIGALRM, timeout_handler)
            signal.alarm(int(config.get('OPENAI', 'timeout')))
                             
        try:

            if verbose:
                print(doc['cve_id'])
                print(doc['description'])
    
            pddl, messages = llm.CVE2PDDL(doc, verbose = False)
            mongo.add_pddl(doc, pddl)

            if verbose:
                print('%s\n' % pddl)
            
        except Exception as ex:
            print('Caught exception: %s' % ex)

        # Cancel the timer
        signal.alarm(0)

In [12]:
if __name__ == '__main__':
    
    # Get a random subset of the documents
    docs = list(mongo.collection.find())
    random.shuffle(docs)
    docs = docs[:100]

    # Extract the operators
    extract_operators(docs, mongo, llm, config, replace = True, verbose = True)

CVE-2014-7121
The Dhanam (aka com.magzter.dhanam) application 3.1 for Android does not verify X.509 certificates from SSL servers, which allows man-in-the-middle attackers to spoof servers and obtain sensitive information via a crafted certificate.
(:action EXPLOIT-CVE-2014-7121
     :parameters (?s ?a)
     :precondition (and (system ?s)
                        (attacker ?a)
                        (has_component ?s 'Dhanam')
                        (has_version ?s '3.1')
                        (man-in-the-middle-attack ?a ?s))
     :effect (and (gain-access ?a ?s 'sensitive information')))

CVE-2023-3999
The Waiting: One-click countdowns plugin for WordPress is vulnerable to authorization bypass due to missing capability checks on its AJAX calls in versions up to, and including, 0.6.2. This makes it possible for authenticated attackers, with subscriber-level permissions and above, to create and delete countdowns as well as manipulate other plugin settings.
(:action EXPLOIT-CVE-2023-

# API usage examples

Below are examples of how to use the API.  They are written as functions so that they will show up in the auto-generated documentation.

In [12]:
def API_example_load_CVEs():
    """
    This function demonstrates loading CVEs from files and getting them, without PDDL, into the database.
    If you're looking at this in the API documentation, click to look at the code to see the inline comments.
    """

    # Load the cllamp config file
    config = configparser.ConfigParser(inline_comment_prefixes = ('#', ';'))
    config.read('CLLaMP.cfg')

    # Get a Mongo instance
    mongo = Mongo(config)

    # Get paths to CVE files from January of 2018
    cve_files = get_cve_files(filter = 'CVE-2018-01')
    print('%d files found' % (len(cve_files)))

    # Add them to the database, overwriting any documents there for the same CVEs
    for cve_file in tqdm.tqdm(cve_files):
        mongo.add_cve(cve_file, replace = True)

In [13]:
if __name__ == '__main__':
    API_example_load_CVEs()

93 files found


100%|██████████████████████████████████████████| 93/93 [00:00<00:00, 708.59it/s]


In [14]:
def API_example_extract_PDDL():
    """
    This function demonstrates extracting PDDL from CVEs already in the database and pushing the PDDL back in.
    If you're looking at this in the API documentation, click to look at the code to see the inline comments.
    """

    # Load the cllamp config file
    config = configparser.ConfigParser(inline_comment_prefixes = ('#', ';'))
    config.read('CLLaMP.cfg')

    # Get a Mongo instance
    mongo = Mongo(config)

    # Get an LLM instance
    llm = LLM(config)

    # Get a random sample of 10 documents
    docs = mongo.collection.aggregate([{'$sample': {'size': 10 }}])

    # Extract planning operators and add PDDL to the database, replacing any PDDL
    # already there for the documents.  Provide some output to track what is
    # happening.
    extract_operators(docs, mongo, llm, config, replace = True, verbose = True)

In [15]:
if __name__ == '__main__':
    API_example_extract_PDDL()

CVE-2017-4899
VMware Workstation Pro/Player 12.x before 12.5.3 contains a security vulnerability that exists in the SVGA driver. An attacker may exploit this issue to crash the VM or trigger an out-of-bound read. Note: This issue can be triggered only when the host has no graphics card or no graphics drivers are installed.
(:action EXPLOIT-CVE-2017-4899
     :parameters (?s ?a)
     :precondition (and (system ?s)
                        (attacker ?a)
                        (has_component ?s 'VMware Workstation Pro/Player')
                        (has_version ?s '12.x')
                        (has_no_graphics_card ?s)
                        (has_no_graphics_drivers ?s))
     :effect (and (crash-vm ?s ?a)
                  (trigger-out-of-bound-read ?s ?a)))

CVE-2021-44478
A vulnerability has been identified in Polarion ALM (All versions < V21 R2 P2), Polarion WebClient for SVN (All versions). A cross-site scripting is present due to improper neutralization of data sent to the web p

In [13]:
def API_example_db_queries():
    """
    This function demonstrates various database queries.  If you're looking at this in the API documentation, 
    click to look at the code to see the inline comments.
    """

    # Load the cllamp config file
    config = configparser.ConfigParser(inline_comment_prefixes = ('#', ';'))
    config.read('CLLaMP.cfg')

    # Get a Mongo instance
    mongo = Mongo(config)

    # Count the number of CVEs in the database
    print('%d CVEs in collection' % 
          mongo.collection.count_documents({}))

    # Count the number of CVEs with associated planning operators
    print('%d CVEs in collection with PDDL' % 
          mongo.collection.count_documents({'pddl': {'$exists': True}}))

    # Count the number of CVEs that mention Oracle products (case insensitive)
    print('%d CVEs in collection that mention Oracle' % 
          mongo.collection.count_documents({'raw_data': {'$regex' : 'Oracle', '$options' : 'i'}}))

    # Count the number of CVEs published in January of 2023
    print('%d CVEs in collection published in January of 2023' % 
        mongo.collection.count_documents({'date_published': {'$gte': datetime.strptime('2023-01-01', '%Y-%m-%d'),
                                                             '$lte': datetime.strptime('2023-01-31', '%Y-%m-%d')}}))


In [14]:
if __name__ == '__main__':
    API_example_db_queries()

211483 CVEs in collection
133 CVEs in collection with PDDL
11115 CVEs in collection that mention Oracle
2351 CVEs in collection published in January of 2023


In [18]:
def API_example_load_all_CVEs(really = False, reallyreally = False):
    """
    This function drops the database and loads all of the CVEs from scratch.

    To avoid calling this as a mistake you must call it with keywords 'really' and 'reallyreally' set to True.
    """

    # Avoid mistakes
    if not really or not reallyreally:
        print('You are not serious enough')
        return
        
    # Load the cllamp config file
    config = configparser.ConfigParser(inline_comment_prefixes = ('#', ';'))
    config.read('CLLaMP.cfg')

    # Get a Mongo instance
    mongo = Mongo(config)

    # Load all of the CVEs
    cve_files = get_cve_files(filter = 'CVE.*json$')
    print('%d files found' % (len(cve_files)))

    # Add them to the database
    for cve_file in tqdm.tqdm(cve_files):
        mongo.add_cve(cve_file, replace = True)

    # Some will not be added because they have been rejected by the CVE team
    print('%d CVEs in collection' % mongo.collection.count_documents({}))
