## CVE and CWE Explorer

In [47]:
!pip install lxml jsonpath_ng requests pandas
from jsonpath_ng import jsonpath, parse
import csv, requests, zipfile, re, io, json, os, random, collections, copy



In [2]:
def get_data(uri):
    fn = uri.split('/')[-1]
    
    if os.path.exists(fn):
        print('opening {}'.format(fn))
    else:
        print('downloading {}'.format(fn))
        res = requests.get(uri)
        with open(fn,'wb') as outf:
            print('wrote {} bytes'.format(outf.write(res.content)))
    
    z = zipfile.ZipFile(fn)
    assert len(z.filelist) == 1
    data= z.open(z.filelist[0].orig_filename).read()
    if z.filelist[0].orig_filename.endswith('.json'):
        data= json.loads(data)
    z.close()
    return data

data_uris = [
    'https://static.nvd.nist.gov/feeds/json/cve/1.0/nvdcve-1.0-2018.json.zip',
    'https://static.nvd.nist.gov/feeds/json/cve/1.0/nvdcve-1.0-2017.json.zip',
    'https://static.nvd.nist.gov/feeds/json/cve/1.0/nvdcve-1.0-2016.json.zip',
    'https://static.nvd.nist.gov/feeds/json/cve/1.0/nvdcve-1.0-2015.json.zip',
    'https://static.nvd.nist.gov/feeds/json/cve/1.0/nvdcve-1.0-2014.json.zip',
    'https://static.nvd.nist.gov/feeds/json/cve/1.0/nvdcve-1.0-2013.json.zip',
    'https://static.nvd.nist.gov/feeds/json/cve/1.0/nvdcve-1.0-2012.json.zip',
    'https://static.nvd.nist.gov/feeds/json/cve/1.0/nvdcve-1.0-2011.json.zip',
    'https://static.nvd.nist.gov/feeds/json/cve/1.0/nvdcve-1.0-2010.json.zip',
    'https://static.nvd.nist.gov/feeds/json/cve/1.0/nvdcve-1.0-2009.json.zip',
    'https://static.nvd.nist.gov/feeds/json/cve/1.0/nvdcve-1.0-2008.json.zip'
]

sevlist = [
    'accessVector',
    'accessComplexity',
    'authentication',
    'confidentialityImpact',
    'integrityImpact',
    'availabilityImpact',
    'baseScore'
]

sev_items = map(lambda _: [_,parse('impact.baseMetricV2.cvssV2.{}'.format(_))], sevlist)

columns = dict(sev_items,
    cve_id = parse('cve.CVE_data_meta.ID'),
    vendor = parse('cve.affects.vendor.vendor_data.[*].vendor_name'),
    product = parse('cve.affects.vendor.vendor_data.[*].product.[*].product_data.[*].product_name'),
    cwe = parse('cve.problemtype.problemtype_data.[*].description.[*].value')
)
    
def gpv(data, path):
    """get values from a path"""
    res = path.find(data)
    try:
        values = list(set([_ for _ in map(lambda _:_.value, res)]))
    except:
        print(res)
        raise
    if len(values) == 1:
        return values[0]
    else:
        return ','.join(values)
    
def cve_extract_fields(items):
    for item in items:
        yield {
            k:gpv(item, jp) for k, jp in columns.items()
        }

In [3]:
CVEs = []

for uri in data_uris:
    CVEs.extend(get_data(uri)['CVE_Items'])

#Just those CVEs with CWEs listed
CVEs_with_CWEs = list(filter(lambda _: _['cwe'], cve_extract_fields(CVEs)))

opening nvdcve-1.0-2018.json.zip
opening nvdcve-1.0-2017.json.zip
opening nvdcve-1.0-2016.json.zip
opening nvdcve-1.0-2015.json.zip
opening nvdcve-1.0-2014.json.zip
opening nvdcve-1.0-2013.json.zip
opening nvdcve-1.0-2012.json.zip
opening nvdcve-1.0-2011.json.zip
opening nvdcve-1.0-2010.json.zip
opening nvdcve-1.0-2009.json.zip
opening nvdcve-1.0-2008.json.zip


#### An example CVE

In [4]:
print(json.dumps(random.choice(CVEs), indent=1))

{
 "cve": {
  "data_type": "CVE",
  "data_format": "MITRE",
  "data_version": "4.0",
  "CVE_data_meta": {
   "ID": "CVE-2015-2754",
   "ASSIGNER": "cve@mitre.org"
  },
  "affects": {
   "vendor": {
    "vendor_data": [
     {
      "vendor_name": "gaia-gis",
      "product": {
       "product_data": [
        {
         "product_name": "freexl",
         "version": {
          "version_data": [
           {
            "version_value": "1.0.0h"
           }
          ]
         }
        }
       ]
      }
     },
     {
      "vendor_name": "debian",
      "product": {
       "product_data": [
        {
         "product_name": "debian_linux",
         "version": {
          "version_data": [
           {
            "version_value": "7.0"
           }
          ]
         }
        }
       ]
      }
     }
    ]
   }
  },
  "problemtype": {
   "problemtype_data": [
    {
     "description": [
      {
       "lang": "en",
       "value": "CWE-20"
      }
     ]
    }
   ]
  },
  "ref

### We're interested in CVEs related to wireless technology

In [5]:
wireless_terms = {_:re.compile(r'\b{}\b'.format(_)) for _ in [
    'wireless',
    'bluetooth',
    'zigbee',
    'radio',
    'wifi',
    'wimax',
    'access point',
    'cellular',
    'lte',
    'gsm',
    'wap',
    '802.11',
    'GPRS',
    'router'
]}

CVEs_wireless = []
tech_counter = collections.Counter()

for cve in CVEs:
    _ = json.dumps(cve)
    match = False
    for term, r in wireless_terms.items():
        if r.search(_, re.IGNORECASE) != None:
            match = True
            tech_counter[term] += 1
    
    if match:
        CVEs_wireless.append(cve)


In [6]:
print('Wireless relarted vulnerabilities account for {} ({:.02f}%) out of {:,} total CVEs'.format(
    len(CVEs_wireless),
    len(CVEs_wireless)/float(len(CVEs))*100,
    len(CVEs)
))
 
print('\n\nCount of term frequency (multiple terms can appear in the same CVE)')
for term, count in tech_counter.items():
    print('{:>8,}  {}'.format(count, term))

Wireless relarted vulnerabilities account for 544 (0.75%) out of 73,005 total CVEs


Count of term frequency (multiple terms can appear in the same CVE)
       8  wap
     238  router
      41  radio
     133  wireless
      33  802.11
      39  wifi
      39  bluetooth
       2  gsm
       9  GPRS
      32  access point
      13  lte
       1  cellular


### The next section extracts Common Weakness Enumerations - Categories

We use the XML source because the JSON/CSV sources are incomplete (they don't have categories)

In [7]:
from lxml import etree, objectify

def get_xml():
    cwe = get_data('https://cwe.mitre.org/data/xml/views/2000.xml.zip')
    cwe = etree.fromstring(cwe)
    root = cwe.getroottree().getroot()
    for elem in root.getiterator():
        if not hasattr(elem.tag, 'find'): continue  # (1)
        i = elem.tag.find('}')
        if i >= 0:
            elem.tag = elem.tag[i+1:]
    objectify.deannotate(root, cleanup_namespaces=True)
    return cwe

cwe_xml = get_xml()

opening 2000.xml.zip


In [8]:
weaknesses = cwe_xml.xpath('/Weakness_Catalog/Weaknesses/*')
categories = cwe_xml.xpath('/Weakness_Catalog/Categories/*')
# views
# external references
cwe_cats = {}
for c in categories:
    d = {
      "name": None,
      "id": None,
      "parents": [],
      "platforms": None,
      "modes": None,
      "consequence": None
     }
    for k,v in c.items():
        if k.lower() in d:
            if k == 'ID':
                d[k.lower()]=int(v)
            else:
                d[k.lower()]=v
    cwe_cats[d['id']] = d

### We get the rest of the CWEs from the CSV source

In [9]:
CWEs_raw = []

cwe_csv = get_data('https://cwe.mitre.org/data/csv/2000.csv.zip')
for line in csv.DictReader(cwe_csv.decode('utf-8').split('\n')):
    CWEs_raw.append(dict(line))
    
print('{} CWEs read'.format(len(CWEs_raw)))

def parse_with_fm(cwe, field, filters, maps):
    rws = cwe[field].split('::')
    for f in [lambda _:_] + filters:
        rws = filter(f, rws)
    
    for m in maps:
        rws = map(m, rws)
    
    rws = filter(lambda _:_, rws)
    
    return ', '.join(list(set(rws)))

def get_f12(f):
    f = f.split(':')
    if f[2].endswith('PREVALENCE'):
        return f[0].split(' ')[0] +'-' +f[1]
    
    return '-'.join(f[1:3])

def get_modes(f):
    m = f.split(':')
    try:
        return m[m.index('PHASE')+1]
    except:
        return ''

def get_consequences(f):
    return f.split(':')[1]
    
cwe_fields = dict(
    name=lambda cwe:cwe['Name'],
    id=lambda cwe:cwe['ID'],
    parents= 
    lambda cwe: parse_with_fm(cwe, 'Related Weaknesses', filters=[
        lambda _:_.startswith('NATURE:ChildOf')
    ], maps=[
        lambda _:_.split(':')[3]
    ]),
    platforms = lambda cwe: parse_with_fm(cwe, 'Applicable Platforms',filters=[],maps=[get_f12]),
    modes=lambda cwe:parse_with_fm(cwe, 'Modes Of Introduction', filters=[],maps=[get_modes]),
    consequence=lambda cwe:parse_with_fm(cwe, 'Common Consequences',filters=[], maps=[get_consequences])
)
    
def cwe_extract_details(items):
    
    for item in items:
        yield {
            k:fn(item) for k, fn in cwe_fields.items()
        }
    
CWEs_parsed = list(cwe_extract_details(CWEs_raw))

opening 2000.csv.zip
730 CWEs read


### Then join them with the CWE Categories from the XML

In [48]:
CWEs = {}

for cwe in CWEs_parsed + list(cwe_cats.values()):
    cwe = copy.deepcopy(cwe)
    i = int(cwe['id'])
    cwe['children']= []
    cwe['cves'] = 0
    cwe['score'] = 0.0
    CWEs[i] = cwe

for i, cwe in CWEs.items():
    if not cwe['parents']:
        continue
    for p in map(int, cwe['parents'].split(',')):
        CWEs[p]['children'].append(cwe)

### An example from CWEs

In [52]:
print(json.dumps(random.choice(CWEs), indent=1))

{
 "name": "Missing Initialization of Resource",
 "id": "909",
 "parents": "665",
 "platforms": "LANGUAGE CLASS-Language-Independent",
 "modes": "Implementation",
 "consequence": "Confidentiality, Availability",
 "children": [
  {
   "name": "Missing Initialization of a Variable",
   "id": "456",
   "parents": "909",
   "platforms": "LANGUAGE CLASS-Language-Independent",
   "modes": "Implementation",
   "consequence": "Integrity",
   "children": [],
   "cves": 0,
   "score": 0.0
  }
 ],
 "cves": 0,
 "score": 0.0
}


### Lets weight CWEs by the CVEs that list them

For every CVE with one or more CWEs listed, we add the 'Score' to the associated CWE

In [67]:
def get_yr(cve):
    return cve[4:8]

def make_cwe_graph(CWEs_source,CVEs):
    
    CWEs = copy.deepcopy(CWEs_source)
    print(len(CWEs))

    for cve in CVEs:
        year = int(get_yr(cve['cve_id']))
        for cwe in cve['cwe'].split(','):
            for cweid in cwe.split('-'):
                try:
                    cwe_id = int(cweid)
                    CWEs[cwe_id]['cves'] += 1
                    CWEs[cwe_id]['score'] += float(cve['baseScore'])
                except ValueError:
                    pass

    # And calculate the average too        
    zeros = []
    for cwe in CWEs.keys():
        try:
            CWEs[cwe]['average'] = CWEs[cwe]['score']/CWEs[cwe]['cves']
        except ZeroDivisionError:
            zeros.append(cwe)

    # delete CWEs with no associated CVEs!        
    for z in set(zeros):
        if z in CWEs:
            del CWEs[z]
            
    return CWEs

# print(random.choice(CWEs))

CWEs_weighted = make_cwe_graph(CWEs, CVEs_with_CWEs)

989


### We end up with a CWE object like this

In [58]:
print(json.dumps(random.choice(list(CWEs_weighted.values())), indent=1))

{
 "name": "Out-of-bounds Read",
 "id": "125",
 "parents": "119",
 "platforms": "LANGUAGE-C++, LANGUAGE-C",
 "modes": "Implementation",
 "consequence": "Confidentiality",
 "children": [
  {
   "name": "Buffer Over-read",
   "id": "126",
   "parents": "788, 125",
   "platforms": "LANGUAGE-C++, LANGUAGE-C",
   "modes": "Implementation",
   "consequence": "Confidentiality",
   "children": [],
   "cves": 0,
   "score": 0.0
  },
  {
   "name": "Buffer Under-read",
   "id": "127",
   "parents": "125, 786",
   "platforms": "LANGUAGE-C++, LANGUAGE-C",
   "modes": "Implementation",
   "consequence": "Confidentiality",
   "children": [],
   "cves": 0,
   "score": 0.0
  }
 ],
 "cves": 551,
 "score": 2985.2000000000157,
 "average": 5.417785843920174
}


### Top 20 CWEs based on CVE scores

In [66]:
top=20

for i, cwe in enumerate(sorted(CWEs_weighted.values(), key=lambda k:int(k['score']), reverse=True)[:top]):
    print('{:>2} {score:>10,.0f}  Avg: {average:.0f} {name}'.format(i+1, **cwe))
    

 1     72,201  Avg: 8 Improper Restriction of Operations within the Bounds of a Memory Buffer
 2     36,999  Avg: 6 Permissions, Privileges, and Access Controls
 3     32,430  Avg: 4 Improper Neutralization of Input During Web Page Generation ('Cross-site Scripting')
 4     32,319  Avg: 7 Improper Neutralization of Special Elements used in an SQL Command ('SQL Injection')
 5     30,659  Avg: 6 Improper Input Validation
 6     19,678  Avg: 7 Resource Management Errors
 7     19,407  Avg: 4 Information Exposure
 8     12,827  Avg: 8 Improper Control of Generation of Code ('Code Injection')
 9     12,329  Avg: 5 Cryptographic Issues
10     12,200  Avg: 6 Improper Access Control
11     12,072  Avg: 6 Improper Limitation of a Pathname to a Restricted Directory ('Path Traversal')
12      9,933  Avg: 7 Cross-Site Request Forgery (CSRF)
13      8,782  Avg: 7 Numeric Errors
14      7,523  Avg: 7 Improper Authentication
15      4,326  Avg: 6 Credentials Management
16      3,476  Avg: 8 Use After

### And wireless only?

In [71]:
CVEs_wireless_with_CWEs = list(filter(lambda _: _['cwe'], cve_extract_fields(CVEs_wireless)))
CWEs_weighted_wireless = make_cwe_graph(CWEs, CVEs_wireless_with_CWEs)

top=20

for i, cwe in enumerate(sorted(CWEs_weighted_wireless.values(), key=lambda k:int(k['score']), reverse=True)[:top]):
    print('{:>2} {score:>10,.0f}  Avg: {average:.0f} {name}'.format(i+1, **cwe))
    

989
 1        501  Avg: 7 Improper Restriction of Operations within the Bounds of a Memory Buffer
 2        347  Avg: 7 Permissions, Privileges, and Access Controls
 3        282  Avg: 6 Improper Input Validation
 4        245  Avg: 5 Information Exposure
 5        222  Avg: 7 Improper Authentication
 6        202  Avg: 7 Cross-Site Request Forgery (CSRF)
 7        184  Avg: 6 Resource Management Errors
 8        182  Avg: 7 Credentials Management
 9        169  Avg: 6 Cryptographic Issues
10        167  Avg: 4 Improper Neutralization of Input During Web Page Generation ('Cross-site Scripting')
11         96  Avg: 6 Improper Access Control
12         88  Avg: 5 7PK - Security Features
13         75  Avg: 6 Improper Limitation of a Pathname to a Restricted Directory ('Path Traversal')
14         69  Avg: 9 Improper Neutralization of Special Elements used in an OS Command ('OS Command Injection')
15         61  Avg: 6 Numeric Errors
16         56  Avg: 7 Configuration
17         50  Avg: