# **Detection Rule Analyzer for Google Chronicle SIEM**
In order to get useful information from detection rules, their documentation must be up-to-date and brilliantly documented. This notebook connects to Chronicle SIEM through its API, grabs the detection rules, and analyzes them to check their documentation.

Detection rules in Chronicle SIEM are written in [YARA-L](https://cloud.google.com/chronicle/docs/detection/yara-l-2-0-overview) 游댕 format and the section `meta` aggregates the documentation fields for them. For the purpose of this notebook, the following rules apply for such fields --all case-sensitive:

- `author`: String up to 35 characters.
- `description`: String up to 140 characters.
- `reference`: URL (string).
- `response`: URL (string).
- `priority`: String in [`Low`, `Medium`, `High`].
- `severity`: String in [`Low`, `Medium`, `High`].
- `detection_score`: String in [`Basic`, `Fair`, `Good`, `Very Good`, `Excellent`].
- `mitre_technique`: MITRE Techniques list:
  - `T8888`
  - `T8888.888`
  - `T8888,T8889,T8890`
  - `T8888,T8888.888,T8889`

Additionally, the name of the rule is tracked by this notebook and it's expectected that it is made of lowercase characters, written in snake_case format, and up to 70 characters.

---
## **Part 1. Rules Retrieval**
The first step is to log into Chronicle SIEM's API and get the latest detection rules created there. The routines in this section will handle that.

In [None]:
#@title Step 1a. Keyfile Upload and Client Initialization
#@markdown Upload a valid keyfile for Chronicle SIEM to allow the login.
#@markdown If it's OK, a new session will be raised and attached to a client.

from os import rename
from json import JSONDecodeError

from google.colab import files
from google.oauth2 import service_account
from google.auth.transport.requests import AuthorizedSession
from googleapiclient import _auth

KEYFILE = 'chronicle.key'

print('Select and upload the Chronicle\'s keyfile')
uploaded = files.upload()
rename(list(uploaded.keys())[0], f'{KEYFILE}')
print('游릭 Keyfile uploaded successfuly')

def init_webclient(keyfile, region="North America"):
  SCOPES = [
      'https://www.googleapis.com/auth/chronicle-backstory',  # regular backstory API
      'https://www.googleapis.com/auth/malachite-ingestion',  # ingestion API
      'https://www.googleapis.com/auth/cloud-platform'        # dataplane API (experimenting)
  ]

  if region == 'North America': region_prefix = ''; cbn_region = 'US'; cli_region = 'US'
  elif region == 'Europe': region_prefix = 'europe-'; cbn_region = 'EUROPE'; cli_region = 'EUROPE'
  elif region == 'United Kingdom': region_prefix = 'europe-west2-'; cbn_region = 'EUROPE'; cli_region = 'EUROPE-WEST2'
  elif region == 'Asia (Singapore)': region_prefix = 'asia-southeast1-'; cbn_region = 'ASIA'; cli_region = 'ASIA-SOUTHEAST1'
  elif region == 'Australia (Sydney)': region_prefix = 'australia-southeast1-'; cbn_region = 'AUSTRALIA'; cli_region = 'AUSTRALIA-SOUTHEAST1'
  elif region == 'Tel Aviv': region_prefix = 'me-west1-'; cbn_region = 'AUSTRALIA'; cli_region = 'ME-WEST1'

  try:
    credentials = service_account.Credentials.from_service_account_file(keyfile, scopes=SCOPES)
  except JSONDecodeError:
    raise Exception('游댮 File not in JSON format')
  except ValueError:
    raise Exception('游댮 Invalid key')

  http_client = _auth.authorized_http(credentials)
  session = AuthorizedSession(credentials)
  return (http_client, session, region_prefix, cbn_region, cli_region)

http_client, session, region_prefix, cbn_region, cli_region = init_webclient(KEYFILE)
print('游릭 Webclient initialized')

In [None]:
#@title Step 1b. Rule Load and Sanity Check
#@markdown This routine will effectively connect to Chronicle SIEM, get the Detection Rules, and load them into a Pandas dataframe. 游냪游냪
#@markdown It'll also run sanity checks to find missing fields and fields out of the norm.

from urllib.parse import urlencode, urlparse
from http import HTTPStatus
from json import loads
from base64 import b64encode, b64decode
from re import compile

import pandas as pd


XS_STRING = 35
S_STRING  = 70
M_STRING  = 140
L_STRING  = 240

SCORES_1 = ['low', 'medium', 'high']
SCORES_2 = ['basic', 'fair', 'good', 'very good', 'excellent']

RE_NAME = compile(r'^[a-z0-9_]{3,70}$')
RE_TECHNIQUES = compile(r'^T\d{4}(\.\d{3})?(,\s*T\d{4}(\.\d{3})?)*$')


def request(http_client, region_prefix, page_size=2000, page_token=''):
  url_params = {'page_size': page_size}
  if page_token:
    url_params['page_token'] = page_token
  uri = f'https://{region_prefix}backstory.googleapis.com/v2/detect/rules?{urlencode(url_params)}'
  res = http_client.request(uri, 'GET')
  if res[0].status == HTTPStatus.OK:
    return loads(res[1])
  else:
    return loads(res[1]).get('error').get('message')

def check_name(s):
  if RE_NAME.search(s):
    return s
  else:
    return None

def check_alerting_live(s):
  if s:
    return True
  else:
    return False

def check_author(s):
  if len(s) <= XS_STRING:
    return s
  else:
    return None

def check_url(s):
  try:
    url = urlparse(s)
    if all([url.scheme, url.netloc]):
      return s
    else:
      return None
  except TypeError:
    return None

def check_priority_severity(s):
  try:
    s = s.lower()
    if s in SCORES_1:
      return s
    else:
      return None
  except AttributeError:
    return None

def check_techniques(s):
  try:
    if RE_TECHNIQUES.search(s):
      return s.replace(' ','').split(',')
    else:
      return None
  except TypeError:
    return None

def check_detection_score(s):
  try:
    s = s.lower()
    if s in SCORES_2:
      return s
    else:
      return None
  except AttributeError:
    return None

def check_description(s):
  try:
    if len(s) <= M_STRING:
      return s
    else:
      return None
  except TypeError:
    return None


fields = [
  'id',                        # ruleId
  'version',                   # versionId
  'name',                      # ruleName
  'content',                   # ruleText
  'alerting',                  # alertingEnabled
  'live',                      # liveRuleEnabled
  'version_date',              # versionCreateTime
  'compilation',               # compilationState
  'type',                      # ruleType
  'meta.author',               # metadata.author
  'meta.priority',             # metadata.priority
  'meta.severity',             # metadata.severity
  'meta.mitre_technique',      # metadata.mitre_technique
  'meta.detection_score',      # metadata.detection_score
  'meta.description',          # metadata.description
  'meta.reference',            # metadata.reference
  'meta.response'              # metadata.response
]
manual_fields = ['name', 'meta.author', 'meta.priority',
                 'meta.severity', 'meta.mitre_technique',
                 'meta.detection_score', 'meta.description',
                 'meta.reference', 'meta.response']

in_rules = request(http_client, region_prefix)
print(f'游릭 {len(in_rules["rules"])} rules retrieved from Chronicle')

out_rules = list()
errors = pd.DataFrame(columns=['id','name','error'])

for rule in in_rules['rules']:
  data = {
    'id': rule['ruleId'],
    'name': rule['ruleName'],
    'version': rule['versionId'],
    'content': b64encode(rule['ruleText'].encode()).decode(),
    'version_date': rule['versionCreateTime'],
    'compilation': rule['compilationState'],
    'type': rule['ruleType']
  }

  try: data['alerting'] = rule['alertingEnabled']
  except KeyError: data['alerting'] = None

  try: data['live'] = rule['liveRuleEnabled']
  except KeyError: data['live'] = None

  try: data['meta.author'] = rule['metadata']['author']
  except KeyError: data['author'] = None

  try: data['meta.priority'] = rule['metadata']['priority']
  except KeyError: data['meta.priority'] = None

  try: data['meta.severity'] = rule['metadata']['severity']
  except KeyError: data['meta.severity'] = None

  try: data['meta.mitre_technique'] = rule['metadata']['mitre_technique']
  except KeyError: data['meta.mitre_technique'] = None

  try: data['meta.detection_score'] = rule['metadata']['detection_score']
  except KeyError: data['meta.detection_score'] = None

  try: data['meta.description'] = rule['metadata']['description']
  except KeyError: data['meta.description'] = None

  try: data['meta.reference'] = rule['metadata']['reference']
  except KeyError: data['meta.reference'] = None

  try: data['meta.response'] = rule['metadata']['response']
  except KeyError: data['meta.response'] = None

  out_rules.append(data)

rules = pd.DataFrame(out_rules)
rules.insert(len(rules.columns)-1, 'content', rules.pop('content'))  # content as last column
rules['name'] = rules['name'].apply(check_name)
rules['version_date'] = rules['version_date'].apply(pd.to_datetime)
rules['alerting'] = rules['alerting'].apply(check_alerting_live)
rules['live'] = rules['live'].apply(check_alerting_live)
rules['meta.author'] = rules['meta.author'].apply(check_author)
rules['meta.priority'] = rules['meta.priority'].apply(check_priority_severity)
rules['meta.severity'] = rules['meta.severity'].apply(check_priority_severity)
rules['meta.mitre_technique'] = rules['meta.mitre_technique'].apply(check_techniques)
rules['meta.detection_score'] = rules['meta.detection_score'].apply(check_detection_score)
rules['meta.description'] = rules['meta.description'].apply(check_description)
rules['meta.reference'] = rules['meta.reference'].apply(check_url)
rules['meta.response'] = rules['meta.response'].apply(check_url)

# catching up errors
for field in manual_fields:
  aux = rules[rules[field].isnull()][['id','name']]
  aux['error'] = f'{field} not found or out of the norm'
  errors = pd.concat([errors, aux])
errors = errors.sort_values(by=['name']).drop_duplicates().reset_index(drop=True)

print(f'游릭 rules loaded into 游냪游냪游냪 dataframe ({rules.shape[0]} rows x {rules.shape[1]} columns)')
print(f'游댮 {errors.shape[0]} syntax errors found')


In [None]:
#@title Step 1c. [Optional] Dataframe Troubleshoot
#@markdown This subsection is meant only for troubleshooting reasons. If you wanna see the dataframe resulting from the Chronicle import, run it. If not, you can skip it.

#@markdown If you run it with `show_rules == True`, then you'll see the rules' dataframe, if it is `False`, you'll see the errors' dataframe.

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.max_colwidth', None)
pd.set_option('display.expand_frame_repr', False)

show_rules = False #@param['True','False']{type:"raw"}

if show_rules:
  display(rules)
else:
  display(errors)

In [None]:
#@title Step 1d. [Optional] Rule Visualization
#@markdown Decodes a rule from the data frame (column `content`) so you can see its code.

encoded_rule = ''  #@param{type:"string"}
decoded_rule = b64decode(encoded_rule)
print(decoded_rule.decode('utf-8'))

---
## **Part 2. Rules Analysis**
With all rules in the dataframe, we will analyze if their fields are following the standards.

In [None]:
#@title Step 2a. Chart Setup
#@markdown This snipet imports libraries needed to deal with charts and sets up the style.

import matplotlib.pylab as plt

# full list in matplotlib.style.available
style = 'grayscale'  #@param['Solarize_Light2', '_classic_test_patch', '_mpl-gallery', '_mpl-gallery-nogrid', 'bmh', 'classic', 'dark_background', 'fast', 'fivethirtyeight', 'ggplot', 'grayscale', 'seaborn-v0_8', 'seaborn-v0_8-bright', 'seaborn-v0_8-colorblind', 'seaborn-v0_8-dark', 'seaborn-v0_8-dark-palette', 'seaborn-v0_8-darkgrid', 'seaborn-v0_8-deep', 'seaborn-v0_8-muted', 'seaborn-v0_8-notebook', 'seaborn-v0_8-paper', 'seaborn-v0_8-pastel', 'seaborn-v0_8-poster', 'seaborn-v0_8-talk', 'seaborn-v0_8-ticks', 'seaborn-v0_8-white', 'seaborn-v0_8-whitegrid', 'tableau-colorblind10']
plt.style.use(style)

print(f'游릭 Setup done')

In [None]:
#@title Step 2b. Documentation Analysis
#@markdown Some insights on documentation problems.

from IPython.display import display, Markdown

total_rules = len(rules)
alerting_rules = len(rules[rules['alerting'] == True])
live_rules = len(rules[rules['live'] == True])
poorly_documented_rules = len(errors['name'].drop_duplicates())
alerting_not_live = len(rules[(rules['alerting'] == True) & (rules['live'] == False)])

display(Markdown(f'''
# 游댊 {total_rules} rules in total
# 游뚿 {alerting_rules} rules alerting
# 游댙 {live_rules} live rules
# 游뛀 {poorly_documented_rules} rules with documentation issues
# 游댒 {alerting_not_live} rules alerting and not live
'''))

In [None]:
#@title Step 2c. Priority, Severity, and Detection Score Distribution

fig = plt.figure()
ax_pri = fig.add_subplot(311)
ax_sev = fig.add_subplot(312)
ax_sco = fig.add_subplot(313)

ax_priorities = rules['meta.priority'].value_counts().plot(kind='barh', ax=ax_pri)
# ax_priorities.set_xlabel('Count')
ax_priorities.set_ylabel('Priority')

ax_severities = rules['meta.severity'].value_counts().plot(kind='barh', ax=ax_sev)
# ax_severities.set_xlabel('Count')
ax_severities.set_ylabel('Severity')

ax_scores = rules['meta.detection_score'].value_counts().plot(kind='barh', ax=ax_sco)
ax_scores.set_xlabel('Count')
ax_scores.set_ylabel('Detection Score')

fig.tight_layout()
plt.show()

In [None]:
#@title Step 2d. Word Cloud
# Colormap options follow the Matplotlib ones: https://matplotlib.org/stable/users/explain/colors/colormaps.html

from wordcloud import WordCloud, STOPWORDS
from PIL import Image
import numpy as np


# import urllib.request
# urllib.request.urlretrieve(
#   'https://domain/path/to/image',
#   'nu-cloud.png')


# SHAPE = 'wc-shape.png'
nu_palette = {
    'The Purple': '820AD1FF',
    'The Purple Sur Ton': 'AA68FFFF',
    'Grey': 'E4E4E4FF',
    'Off-White': 'F4F4F4FF',
    'White': 'FFFFFFFF',
    'Black': '000000FF'
}
wc_source = 'meta.description'  #@param['meta.description','meta.mitre_technique']
# wc_mask = False                     #@param['True','False']{type:'raw'}
wc_bg_color = 'Black'               #@param['The Purple','The Purple Sur Ton','Grey','Off-White','White','Black']
wc_colormap = 'rainbow'             #@param['Purples','viridis','binary','cool','PRGn','Paired','tab20b','tab20c','rainbow']
wc_max_words = 200                  #@param{type:"slider",min:20,max:200,step:10}
wc_contour_width = 0                #@param{type:"slider",min:0,max:10,step:1}
wc_dpi = 100                        #@param{type:"slider",min:100,max:300,step:100}

# if wc_mask:
#   wc_mask_shape = np.array(Image.open(SHAPE))
# else:
#   wc_mask_shape = None
wc_stopwords = ['rule', 'rules', 'based', 'detected', 'many', 'someone']

wc = WordCloud(
    width=1920,
    height=1080,
    stopwords=wc_stopwords+list(STOPWORDS),
    collocations=True,
    max_words=wc_max_words,
    background_color=f'#{nu_palette[wc_bg_color]}',
    colormap=wc_colormap,
    # mask=wc_mask_shape,
    contour_width=wc_contour_width,
    contour_color='#AA68FFFF'
    )

if wc_source == 'meta.mitre_technique':
  wc.generate_from_text(' '.join(rules[rules[wc_source].notnull()][wc_source].sum()))
else:
  wc.generate_from_text(' '.join(i for i in rules[rules[wc_source].notnull()][wc_source].str.lower()))

plt.figure(figsize=(16,9), dpi=wc_dpi)
plt.axis('off')
plt.imshow(wc, interpolation='bilinear')
plt.show()

---
## **Part 3. Integrations**
Routines to export the data to other tools.

In [None]:
#@title Step 3a. MITRE Navigator Heatmap
#@markdown Build and export a [MITRE Navigator](https://mitre-attack.github.io/attack-navigator/) layer based on the loaded rules.
#
#@markdown MITRE Navigator is a great tool to help you compare different scenarios expressed in layers.
#@markdown One good example is to overlay the SIEM layer with another layer from CTI or related to any APT.
#@markdown Having the two layers loaded in Navigator as SIEM (`a`) and the other (`b`), you can create a third layer based on these ones and use an expression like `min(5 - (b - a), 5)` to combine scores from both layers in this new one.
#@markdown This overlay provides insightful ideas on strengths and weaknesses, helping to prioritize actions.

# TODO:
# The user should be able to select if he wants solid colors
# (technique.color:hexcolor) or if he wants the color to be
# an extra dimension like severity, priority, or detection_score.


from copy import deepcopy
from json import dump


LAYER = 'chronicle-siem-map.json'  #@param{type:'string'}

mitre_attack_version = '14'           #@param{type:'string'}
mitre_navigator_version = '4.9.1'     #@param{type:'string'}
mitre_navigator_layer_format = '4.5'  #@param{type:'string'}

name = 'SIEM'               #@param{type:'string'}
source = 'Chronicle SIEM'   #@param{type:'string'}
description = 'SIEM Rules'  #@param{type:'string'}

tactic_background = True      #@param['True','False']{type:'raw'}
tactic_hexcolor = '820AD1FF'  #@param{type:'string'}

technique_show_sub = False                       #@param['True','False']{type:'raw'}
technique_comment = 'Chronicle SIEM assessment'  #@param{type:'string'}

# static coloring (background color)
technique_hexcolor = '008744FF'  #@param{type:'string'}

# dynamic coloring (scores)
gradient_color_1 = 'D62D20FF'  #@param{type:'string'}
gradient_color_2 = 'FFA700FF'  #@param{type:'string'}
gradient_color_3 = '008744FF'  #@param{type:'string'}
gradient_min_value = 1         #@param{type:'integer'}
gradient_max_value = 5         #@param{type:'integer'}

# the layer template
layer = {
	'name': name,
	'versions': {
		'attack': mitre_attack_version,
		'navigator': mitre_navigator_version,
		'layer': mitre_navigator_layer_format
	},
	'domain': 'enterprise-attack',
	'description': description,
	'showTacticRowBackground': tactic_background,
	'tacticRowBackground': f'#{tactic_hexcolor}',
  'gradient': {
	  'colors': [
			f'#{gradient_color_1}',
			f'#{gradient_color_2}',
			f'#{gradient_color_3}'
		],
		"minValue": gradient_min_value,
		"maxValue": gradient_max_value
	},
	'techniques': []
}

# technique template
template_technique = {
    'techniqueID': '',
    # 'color': f'#{technique_hexcolor}',  # test if we're gonna use gradient (scores) of fixed colors
		'score': 1,  # if dynamic, should be calculated based on extra dimension
    'comment': technique_comment,
    'metadata': [],
    'links': [],
    'enabled': True,
    'showSubtechniques': technique_show_sub
}

mapping = {
    'basic': 1,
    'fair': 2,
    'good': 3,
    'very good': 4,
    'excellent': 5
  }

def score_to_int(s):
  try:
    return mapping[s]
  except KeyError:
    return 0

# filter rules with mitre.attack data, live, and alerting
attack_rulez = rules[rules[['meta.mitre_technique']].notnull().all(1)]
attack_rulez = attack_rulez[(attack_rulez['live'] == True) & (attack_rulez['alerting'] == True)]
attack_rulez['meta.detection_score_int'] = attack_rulez['meta.detection_score'].apply(score_to_int)
covered_techniques = sorted(set([i for ii in attack_rulez['meta.mitre_technique'].to_list() for i in ii]))
techniques = list()

for t in covered_techniques:
	filter = attack_rulez[attack_rulez['meta.mitre_technique'].apply(lambda x: t in x)]
	technique = deepcopy(template_technique)
	technique['techniqueID'] = t
	technique['score'] = sum(filter['meta.detection_score_int'].to_list()) // len(filter['meta.detection_score_int'].to_list())
	technique['metadata'] = [{'name':'rule','value':x} for x in filter['name'].to_list()]
	technique['links'] = [{'label':'reference','url':x} for x in filter['meta.reference'].to_list()]
	techniques.append(technique)

layer['techniques'] = techniques

with open(LAYER, 'w') as f:
  dump(layer, f)
files.download(LAYER)

print('游릭 Layer is ready to roll')

In [None]:
#@title Step 3b. Export Rules in CSV
OUT_FILE = 'chronicle-rules.csv' #@param{type:"string"}
rules.to_csv(OUT_FILE, index=False)
print(f'游릭 {len(out_rules)} rules exported in {OUT_FILE} - the download should start soon...')
files.download(OUT_FILE)