# MITRE ATT&CK Layer Generator
This script processes Jira and Splunk data to generate MITRE ATT&CK Navigator layers for security technique visualization. It combines security incidents from different data sources and maps them to MITRE ATT&CK techniques.

In [None]:
# TODO: Alter distinct values of Technique Status to Continuous. Apply gradient thresholds
# TODO: Create UI as Splunk dashboard - upload files, select platform names, input layer name, select MITRE version
# TODO: Create mapping of WOW domain to platform list. Choosing WOW Domain prefilles selected platform lists
# TODO: Upload generated MITRE .json file to Git repo rather than creating "local" copy (File saved to /content dir)
##############################
# DONE: Account for Splunk rule_name values with missing Issue key fields (eg 1087?)

In [None]:
# CELL 1: Imports and Function Definitions

# Install required packages
!pip install numpy pandas mitreattack-python

from google.colab import drive, files
import pandas as pd
import pprint
from mitreattack.navlayers import Layer
from IPython.display import clear_output
import datetime
import json

clear_output()


def process_jira_data(filename):
  """Processes Jira export data to extract relevant labels and combine them.

  Args:
    filename: The path to the Jira export CSV file.

  Returns:
    A pandas DataFrame with 'Issue key', 'Status', and 'Combined Labels' columns.
  """
  df = pd.read_csv(filename)

  # Nested function to check if a label starts with 'T' followed by a number
  # Used to extract Mitre Techniques from Labels fields in csv
  def starts_with_T_number(label):
    try:
      return label.startswith('T') and label[1].isdigit()
    except:
      return False

  # Select all columns that start with 'Labels'
  # Jira Export have high number of Labels fields
  labels_df = df.filter(like='Labels')

  # Apply the function to each value in labels_df, keeping only True values
  for col in labels_df.columns:
    labels_df.loc[:, col] = labels_df[col].apply(lambda x: x if starts_with_T_number(x) else None)

  # Drop columns with all NaN values
  labels_df = labels_df.dropna(axis=1, how='all')

  # Join Issue key and Status to df
  df_selected = df[['Issue key', 'Status']].join(labels_df)

  # Drop columns with all NaN values from the final DataFrame
  # Dropping again as faced issues with empty values
  df_selected = df_selected.dropna(axis=1, how='all')

  # Drop rows with all NaN values in the label columns from the final DataFrame
  df_selected = df_selected.dropna(subset=df_selected.columns[2:], how='all')

  # Merge all Labels values into one column "Combined Labels", separated by commas
  df_selected['Combined Labels'] = df_selected.filter(like='Labels').apply(
      lambda row: ', '.join([str(label) for label in row if pd.notna(label)]), axis=1
  )

  # Drop the original 'Labels' columns
  df_selected = df_selected.drop(columns=[col for col in df_selected.filter(like='Labels').columns if col != 'Combined Labels'])

  return df_selected

def process_splunk_data(filename):
  """Processes Splunk export data to extract and filter relevant information.

  Args:
    filename: The path to the Splunk export CSV file.

  Returns:
    A pandas DataFrame with 'Issue key', 'Status', and 'Combined Labels' columns.
  """
  df = pd.read_csv(filename)

  # Select desired columns and rename them
  df_selected = df[['jira_story', 'notable_status', 'mitre_tech', "rule_name"]]
  df_selected = df_selected.rename(columns={
      'jira_story': 'Issue key',
      'notable_status': 'Status',
      'mitre_tech': 'Labels'
  })

  # Identify rule names with empty 'Issue key'
  empty_issue_mask = df_selected['Issue key'].isnull()

  # Create dummy Issue keys for rows with empty 'Issue key'
  if empty_issue_mask.any():
    # Get rule names for empty Issue keys
    rule_names_no_issue_key = df_selected[empty_issue_mask]['rule_name'].tolist()
    print(f'\nRule names with empty Issue keys: ({len(rule_names_no_issue_key)})')
    print(rule_names_no_issue_key)


    # Generate unique dummy Issue keys
    for idx, (index, row) in enumerate(df_selected[empty_issue_mask].iterrows(), 1):
      dummy_key = f"Splunk{idx+1:04d} - {row['rule_name']}"
      df_selected.at[index, 'Issue key'] = dummy_key

  # Function to check if a string is a valid MITRE technique ID
  def is_valid_technique(label):
    try:
      # Check if the label starts with 'T'
      if not label.startswith('T'):
        return False

      # Check if the rest of the string contains only digits
      technique_number = label[1:]
      return technique_number.isdigit()
    except:
      return False

  # Function to process and clean technique strings
  def clean_techniques(tech_string):
    if pd.isna(tech_string):
      return None

    # Split the string by commas and clean each technique
    techniques = [t.strip() for t in str(tech_string).split(',')]
    # Filter only valid techniques
    valid_techniques = [t for t in techniques if is_valid_technique(t)]

    # Return None if no valid techniques remain
    return ','.join(valid_techniques) if valid_techniques else None

  # Apply the function to the 'Labels' column, keeping only valid labels
  df_selected['Labels'] = df_selected['Labels'].apply(clean_techniques)

  # Drop rows with all NaN values in the label columns
  df_selected = df_selected.dropna(subset=['Labels'], how='all')

  # Identify rule names with empty 'Issue key'
  rule_names_no_issue_key = df_selected[df_selected['Issue key'].isnull()]['rule_name'].tolist()
  print(f'\nRule names with empty Issue keys: ({len(rule_names_no_issue_key)})')
  print(rule_names_no_issue_key)

  # Remove rows with empty 'Issue key' from df_selected
  df_selected = df_selected.dropna(subset=['Issue key'])
  df_selected.drop(columns='rule_name', inplace=True)
  df_selected = df_selected.rename(columns={'Labels': 'Combined Labels'})

  return df_selected

def merge_dataframes(jira_df, splunk_df):
  """Merges Jira and Splunk dataframes and combines duplicate entries.

  Args:
    jira_df: The pandas DataFrame containing Jira data.
    splunk_df: The pandas DataFrame containing Splunk data.

  Returns:
    A merged pandas DataFrame with combined data.
  """
  # Merge dataframes
  merged = pd.concat([jira_df, splunk_df]).groupby('Issue key').agg({
      'Status': lambda x: ', '.join(x.dropna().unique()),
      'Combined Labels': lambda x: ', '.join(x.dropna().unique())
  }).reset_index()

  return merged.sort_values('Issue key')

def generate_technique_list(final_df):
  """Generates a list of techniques with their scores and associated issue keys.

  Args:
    final_df: The merged pandas DataFrame.

  Returns:
    A list of dictionaries, where each dictionary represents a technique.
  """
  # Status-to-score mapping
  status_score_map = {
      # Jira Status'
      'Backlog': 1,
      'To Do': 1,
      'In Progress': 1,
      'Pending': 1,
      'Blocked': 1,
      'Done': 2,

      # Splunk Status'
      'senttophantom': 1,
      'new': 2,
      'development': 2,
      'senttophantom': 1,
      'soar_to_triage': 1,
      'customer_action': 1,
      'closed': 2
  }

  # Function to get the maximum score based on status
  def get_max_score(status_str):
    scores = [status_score_map.get(status.strip(), 0) for status in status_str.split(',')]
    return max(scores)

    # Function to extract rule name from Splunk issue key
  def get_rule_name(issue_key):
    if isinstance(issue_key, str) and issue_key.startswith('Splunk'):
      # Extract everything after "SplunkXXXX- "
      return issue_key[issue_key.find('-')+2:]
    return issue_key

  # Extract unique techniques and their associated issue keys
  tech_issue_map = {}

  for index, row in final_df.iterrows():
    issue_key = row['Issue key']
    # Split and strip techniques, then convert to set to remove duplicates
    techniques = set(t.strip() for t in row['Combined Labels'].split(','))

    for technique in techniques:
      if technique not in tech_issue_map:
        tech_issue_map[technique] = set()
      tech_issue_map[technique].add(issue_key)

  # Create the final list of dictionaries with aggregated issue keys
  tech_list_to_dic = []
  for technique, issue_keys in tech_issue_map.items():
    # Convert issue_keys to list and sort
    issue_keys_list = sorted(issue_keys)

    # Handle duplicate Splunk rule names
    seen_rules = set()
    deduplicated_keys = []

    for key in issue_keys_list:
      if key.startswith('Splunk'):
        rule_name = get_rule_name(key)
        if rule_name not in seen_rules:
          seen_rules.add(rule_name)
          deduplicated_keys.append(key)
      else:
        # Keep non-Splunk keys as they are
        deduplicated_keys.append(key)

    max_score = max([get_max_score(final_df.loc[final_df['Issue key'] == key, 'Status'].values[0]) for key in deduplicated_keys])

    tech_list_to_dic.append({
        'techniqueID': technique,
        'score': max_score,
        'enabled': True,
        'comment': ', '.join(sorted(deduplicated_keys))
    })

  return tech_list_to_dic

def create_mitre_layer(tech_list_to_dic, layer_name, platform_list):
  """Creates a MITRE ATT&CK Navigator layer with the given techniques and platforms.

  Args:
    tech_list_to_dic: The list of technique dictionaries.
    layer_name: The desired name for the layer.
    platform_list: A list of platforms to include in the layer.
  """

  # New layer configuration
  description = ""

  # Create a new layer for the MITRE ATT&CK Navigator
  new_layer = Layer()
  new_layer.from_dict(dict(name=layer_name, domain="enterprise-attack"))

  # Configure the versions object
  new_layer.layer.versions = dict(layer="4.5", attack="16", navigator="5.0.1")

  # Configure the filters object
  new_layer.layer.filters = dict(platforms=platform_list)

  # Configure the layout object
  new_layer.layer.layout = dict(
      layout="side",
      showID=False,
      showName=True,
      showAggregateScores=False,
      countUnscored=False,
      aggregateFunction="average"
  )

  # Configure whether or not to hide disabled techniques
  new_layer.layer.hideDisabled = False

  # Configure the gradient object
  new_layer.layer.gradient = dict(
      minValue=0,
      maxValue=2,
      colors=["#ff6666ff", "#ffe766ff", "#8ec843ff"]
  )

  # Configure collection layer settings
  new_layer.layer.description = description
  new_layer.layer.selectTechniquesAcrossTactics = True
  new_layer.layer.selectSubtechniquesWithParent = False
  new_layer.layer.tacticRowBackground = "#dddddd"

  # Create listing of techniques in this layer
  new_layer.layer.techniques = tech_list_to_dic

  # Output file is `layer_name`, with .json extension
  new_layer.to_file(layer_name)

In [None]:
# CELL 2: User Input and Data Processing

# Get user input for file types
file_types = input("Which files do you want to upload? (splunk, jira, both): ").lower()

# Define the platform list
all_platforms = ["Windows", "Linux", "macOS", "Network", "PRE", "Containers", "IaaS", "SaaS", "Office Suite", "Identity Provider"]

# Get user input for platforms
print("Available platforms:")
# Display platforms in a numbered table for user's reference
platform_data = [[i+1, platform] for i, platform in enumerate(all_platforms)]
print(pd.DataFrame(platform_data, columns=["Number", "Platform"]).to_markdown(index=False, numalign="left", stralign="left"))

# User to enter in corresponding number for selected platforms
platform_input = input("Enter the numbers of the platforms to include (comma-separated, or leave blank for all): ")

if platform_input:
  try:
    platform_indices = [int(x.strip()) - 1 for x in platform_input.split(",")]
    platform_list = [all_platforms[i] for i in platform_indices]
  except:
    print("Invalid platform input. Using all platforms.")
    platform_list = all_platforms
else:
  platform_list = all_platforms

# Get layer name from user
layer_name = input("Enter the desired layer name: ")
# Add date to layer name
today = datetime.date.today()
layer_name = f"{today.strftime('%y-%m-%d')}_{layer_name}.json"

# Process data based on user input
if file_types == "splunk":
  splunk_filename = '/content/splunk_export_csv.csv'  # TEST FILE USED. CHANGE TO DESIRED FILENAME SCHEME
  splunk_df = process_splunk_data(splunk_filename)
  final_df = splunk_df  # No merging needed
elif file_types == "jira":
  jira_filename = '/content/jira_mitre_csv.csv'  # TEST FILE USED. CHANGE TO DESIRED FILENAME SCHEME
  jira_df = process_jira_data(jira_filename)
  final_df = jira_df  # No merging needed
elif file_types == "both":
  jira_filename = '/content/jira_mitre_csv.csv'  # TEST FILE USED. CHANGE TO DESIRED FILENAME SCHEME
  splunk_filename = '/content/splunk_export_csv.csv'  # TEST FILE USED. CHANGE TO DESIRED FILENAME SCHEME
  jira_df = process_jira_data(jira_filename)
  splunk_df = process_splunk_data(splunk_filename)
  final_df = merge_dataframes(jira_df, splunk_df)
else:
  print("Invalid file type input.")
  exit()


Which files do you want to upload? (splunk, jira, both): both
Available platforms:
| Number   | Platform          |
|:---------|:------------------|
| 1        | Windows           |
| 2        | Linux             |
| 3        | macOS             |
| 4        | Network           |
| 5        | PRE               |
| 6        | Containers        |
| 7        | IaaS              |
| 8        | SaaS              |
| 9        | Office Suite      |
| 10       | Identity Provider |
Enter the numbers of the platforms to include (comma-separated, or leave blank for all): 
Enter the desired layer name: test_1


  df = pd.read_csv(filename)



Rule names with empty Issue keys: (249)

Rule names with empty Issue keys: (0)
[]
