You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Collecting en_core_web_sm==2.3.1
Using cached en_core_web_sm-2.3.1-py3-none-any.whl
Requirement already satisfied: spacy<2.4.0,>=2.3.0 in c:\users\mail\anaconda3\lib\site-packages (from en_core_web_sm==2.3.1) (2.3.9)
Requirement already satisfied: murmurhash<1.1.0,>=0.28.0 in c:\users\mail\anaconda3\lib\site-packages (from spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (1.0.7)
Requirement already satisfied: wasabi<1.1.0,>=0.4.0 in c:\users\mail\anaconda3\lib\site-packages (from spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (0.10.1)
Requirement already satisfied: tqdm<5.0.0,>=4.38.0 in c:\users\mail\anaconda3\lib\site-packages (from spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (4.64.0)
Requirement already satisfied: catalogue<1.1.0,>=0.0.7 in c:\users\mail\anaconda3\lib\site-packages (from spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (1.0.2)
Requirement already satisfied: blis<0.8.0,>=0.4.0 in c:\users\mail\anaconda3\lib\site-packages (from spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (0.7.7)
Requirement already satisfied: setuptools in c:\users\mail\anaconda3\lib\site-packages (from spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (61.2.0)
Requirement already satisfied: preshed<3.1.0,>=3.0.2 in c:\users\mail\anaconda3\lib\site-packages (from spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (3.0.6)
Requirement already satisfied: srsly<1.1.0,>=1.0.2 in c:\users\mail\anaconda3\lib\site-packages (from spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (1.0.6)
Requirement already satisfied: cymem<2.1.0,>=2.0.2 in c:\users\mail\anaconda3\lib\site-packages (from spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (2.0.6)
Requirement already satisfied: numpy>=1.15.0 in c:\users\mail\anaconda3\lib\site-packages (from spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (1.21.5)
Requirement already satisfied: requests<3.0.0,>=2.13.0 in c:\users\mail\anaconda3\lib\site-packages (from spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (2.27.1)
Requirement already satisfied: plac<1.2.0,>=0.9.6 in c:\users\mail\anaconda3\lib\site-packages (from spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (1.1.3)
Requirement already satisfied: thinc<7.5.0,>=7.4.1 in c:\users\mail\anaconda3\lib\site-packages (from spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (7.4.6)
Requirement already satisfied: idna<4,>=2.5 in c:\users\mail\anaconda3\lib\site-packages (from requests<3.0.0,>=2.13.0->spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (3.3)
Requirement already satisfied: charset-normalizer~=2.0.0 in c:\users\mail\anaconda3\lib\site-packages (from requests<3.0.0,>=2.13.0->spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (2.0.4)
Requirement already satisfied: urllib3<1.27,>=1.21.1 in c:\users\mail\anaconda3\lib\site-packages (from requests<3.0.0,>=2.13.0->spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (1.26.9)
Requirement already satisfied: certifi>=2017.4.17 in c:\users\mail\anaconda3\lib\site-packages (from requests<3.0.0,>=2.13.0->spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (2022.12.7)
Requirement already satisfied: colorama in c:\users\mail\anaconda3\lib\site-packages (from tqdm<5.0.0,>=4.38.0->spacy<2.4.0,>=2.3.0->en_core_web_sm==2.3.1) (0.4.4)
[+] Download and installation successful
You can now load the model via spacy.load('en_core_web_sm')
[nltk_data] Downloading package punkt to
[nltk_data] C:\Users\mail\AppData\Roaming\nltk_data...
[nltk_data] Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data] C:\Users\mail\AppData\Roaming\nltk_data...
[nltk_data] Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to
[nltk_data] C:\Users\mail\AppData\Roaming\nltk_data...
[nltk_data] Package omw-1.4 is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data] C:\Users\mail\AppData\Roaming\nltk_data...
[nltk_data] Package stopwords is already up-to-date!
[nltk_data] Downloading package words to
[nltk_data] C:\Users\mail\AppData\Roaming\nltk_data...
[nltk_data] Package words is already up-to-date!
[nltk_data] Downloading package maxent_ne_chunker to
[nltk_data] C:\Users\mail\AppData\Roaming\nltk_data...
[nltk_data] Package maxent_ne_chunker is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data] C:\Users\mail\AppData\Roaming\nltk_data...
[nltk_data] Package averaged_perceptron_tagger is already up-to-
[nltk_data] date!
(0) Define helping classes and functions
(0.1) Class for clustering (deprecated)
classCluster:
defpreprocess_text(self, text: str, remove_stopwords: bool) ->str:
# Remove special chars and numberstext=re.sub(r"[^A-Za-z1-9]+", " ", text)
# Remove stopwords and stemmatizeifremove_stopwords:
tokens=nltk.word_tokenize(text)
tokens= [nltk.stem.PorterStemmer().stem(w) forwintokensifw.lower() notinnltk.corpus.stopwords.words("english")]
text=" ".join(tokens)
# Return text in lower case and stripped of whitespacesreturntext.lower().strip()
defvectorize_strings(self, texts, min_df, max_df):
vectorizer=TfidfVectorizer(sublinear_tf=True, min_df=min_df, max_df=max_df, stop_words=["playbook", "workflow", "story"])
returnvectorizer, vectorizer.fit_transform(texts)
definitialize_kmeans(self, clusters, X):
# Initialize kmeans with given number of centroidskmeans=KMeans(n_clusters=clusters, init='k-means++', max_iter=100, n_init=1, random_state=42)
# Fit the modelkmeans.fit(X)
# Calculate silhouette scores_score=silhouette_score(X, kmeans.labels_)
# Calculate Calinski-Harabasz scorec_score=calinski_harabasz_score(X.toarray(), kmeans.labels_)
# Return scores and cluster labelsreturns_score, c_score, kmeans.labels_definitialize_pca(self, X, clusters):
pca=PCA(n_components=2, random_state=42)
pca_vecs=pca.fit_transform(X.toarray())
df_tmp1=pd.DataFrame(pca_vecs, columns=['x0', 'x1'])
df_tmp1['cluster'] =clustersreturndf_tmp1defget_top_keywords(self, n_terms, vectorizer, clusters, X):
df_tmp1=pd.DataFrame(X.todense()).groupby(clusters).mean()
terms=vectorizer.get_feature_names_out()
fori, rindf_tmp1.iterrows():
top_terms= [terms[t] fortinnp.argsort(r)[-n_terms:]]
print(f"\nCluster {i}: {', '.join(top_terms)}")
defplot_cluster(self, df, plot_title):
fig, ax=plt.subplots(figsize=(12, 7))
sns.scatterplot(data=df, x='x0', y='x1', hue='cluster', ax=ax)
ax.set_xlabel("X0", fontsize=16)
ax.set_ylabel("X1", fontsize=16)
ifplot_title:
ax.set_title(plot_title)
plt.show()
defcluster_field(self, df, field, num_clusters, min_df, max_df, num_keywords, cluster_mapping=None, plot_title=None):
# Preprocess textdf['cleaned'] =df[field].apply(lambdax: self.preprocess_text(str(x), remove_stopwords=True))
# Vectorize textvectorizer, X=self.vectorize_strings(df['cleaned'], min_df, max_df)
# Cluster data using KMeanssilhouette_score, calinski_harabasz_score, cluster_labels=self.initialize_kmeans(num_clusters, X)
# Initialize PCA to visualize datadf=self.initialize_pca(X, cluster_labels)
# Print top keywords for each clusterself.get_top_keywords(num_keywords, vectorizer, cluster_labels, X)
# Apply cluster mapping if providedifcluster_mappingisnotNone:
df['cluster'] =df['cluster'].map(cluster_mapping)
# Plot clustersself.plot_cluster(df, plot_title)
returnsilhouette_score, calinski_harabasz_score, df
(0.2) Class for NLP operations
classNLP:
def__init__(self):
# load spaCy modelself.nlp=spacy.load("en_core_web_sm")
defextract_actions(self, sentence):
# parse sentence using spaCy modeldoc=self.nlp(sentence)
# extract lemmatized verbs that are not stop wordsverbs= [token.lemma_fortokenindociftoken.pos_=="VERB"andnottoken.is_stop]
returnverbsdefextract_artifacts(self, sentence):
# parse sentence using spaCy modeldoc=self.nlp(sentence)
# extract named entities of specific typesentity_types= ["PERSON", "NORP", "ORG", "GPE", "PRODUCT", "EVENT"]
entities= [ent.textforentindoc.entsifent.label_inentity_types]
# if no named entities, extract direct and prepositional objects that are nouns or proper nounsifnotentities:
entities= [token.textfortokenindociftoken.dep_in ["dobj", "pobj"] andtoken.pos_in ["NOUN", "PROPN"]]
returnentitiesdefremove_auxiliary_verbs(self, df):
# remove "is" and "did" from action and artifact columnsforcolumnin ["action", "artifact"]:
df[column] =df[column].apply(lambdaverbs: [verbforverbinverbsifverbnotin ["is", "did"]])
returndfdefparse_dependencies(self, sentence):
doc=nlp(sentence)
root= [tokenfortokenindociftoken.head==token][0]
verb=""obj=""compound= []
fortokenindoc:
iftoken.dep_=="ROOT":
verb=str(token)[:-1]
iftoken.dep_=="dobj":
obj=tokeniftoken.dep_=="compound":
compound.append(str(token))
compound=" ".join(compound)
obj=str(compound+" "+str(obj))
obj=obj.strip()
returnverb.lower(), obj.lower()
(1) Load playbooks
defget_dataframe_from_vendor(file):
# Open the file for readingwithopen(f"./vendor/{file}", "r") asfile_handle:
# Read the contents of the filedata=file_handle.read()
# Use the ast library to evaluate the literal string as a Python data structuredata=ast.literal_eval(data)
# Return the data as a pandas DataFramereturnpd.DataFrame(data)
# Call the function for each file and store the returned DataFrame in a listplaybook_array= [get_dataframe_from_vendor(file) forfilein ["fortinet.py", "tines.py", "demisto.py", "catalyst.py", "iacd.py",
"logichub.py", "oasisopen.py", "rapid7.py", "resolve.py", "shuffle.py",
"chronicle.py", "splunk.py", "threatconnect.py", "xsoar.py", "cisa.py"]]
# Concatenate all the DataFrames into one DataFramedf_playbooks=pd.concat(playbook_array)
# Add an 'id' column to the DataFrame with a unique identifier for each row# Replace empty values and NaN values in 'playbook_name' and 'playbook_description'df_playbooks['playbook_name'].replace("", "no playbook name", inplace=True)
df_playbooks['playbook_description'].fillna("no description", inplace=True)
# Reorder the columns in the DataFramedf_playbooks=df_playbooks.reindex(columns=["id", "vendor", "playbook_name", "playbook_description", "tags", "steps", "actuator",
"step_types", "step_names"])
df_playbooks=df_playbooks.reset_index()
df_playbooks['id'] ='p'+ (df_playbooks.index+1).astype(str) # indexing in pandas starts at 0, so we add 1 to start the 'id' from 1# Save playbooksdf_playbooks.to_csv("./coding/playbooks.csv", sep=',', index=False)
# Print the final DataFramedf_playbooks.head()
index
id
vendor
playbook_name
playbook_description
tags
steps
actuator
step_types
step_names
0
0
p1
FortiSOAR
Action (Type All) - Block Indicators
Blocks all types of indicators on the firewall...
[Subroutine, Mitigation]
10
[, , , , , , , , , cyops_utilities]
[Start, Configuration, Condition, Playbook exe...
[Start, Configuration, Check Type of Indicator...
1
1
p2
FortiSOAR
Action - Asset Mitigation
Carries out a sequence of processes such as Cl...
[ManualAction, Mitigation]
17
[, , , , , , , , , , , , , , , , ]
[Manual task, Manual task, User decision, User...
[Note about AV Scan, Add Note, Take Snapshot a...
2
2
p3
FortiSOAR
Action - Domain - Block (Indicator)
Blocks the indicators of type 'Domain' on the ...
[ManualAction, Mitigation]
6
[, , , , , ]
[Start, API call, Manual task, API call, User ...
[Start, Mark as TBD, Add note to Indicator, Ma...
3
3
p4
FortiSOAR
Action - Domain - Block (Specified by User)
Creates an indicator for the domain name speci...
[ManualAction, Mitigation]
4
[, , , cyops_utilities]
[Playbook execution, Start, Manual task, Conne...
[Block Domain, Start, Create Domain Indicator ...
4
4
p5
FortiSOAR
Action - Domain - Unblock (Indicator)
Unblocks the indicators of type 'Domain' on th...
[ManualAction, Mitigation]
6
[, , , , , ]
[Manual task, Start, API call, User decision, ...
[Add note to Indicator, Start, Mark as Unblock...
(2) Transform playbooks to get atomar steps
# Explode playbooksdf_steps=df_playbooks.explode(['step_names', "step_types", "actuator"])
df_steps['actuator'].replace("", "Platform", inplace=True)
# Add a unique identifier for each stepdf_steps['step_id'] = ['s'+str(i) foriinrange(1, len(df_steps) +1)]
# Add an 's' to the end of the first word of the step name, if applicableadd_s_to_first_word=lambdax: ' '.join([str(x).split()[0].lower() +'s'] +str(x).split()[1:]) iflen(str(x).split()) >1elsestr(x).lower() +"s"df_steps['step_names'] =df_steps['step_names'].apply(add_s_to_first_word)
# Create a step description by combining the actuator and step namedf_steps['step_description'] =df_steps.apply(lambdax: str(x['actuator']).strip() +' '+str(x['step_names']).strip() ifstr(x['step_names']).strip().lower() else"", axis=1)
# Reorder columnsdf_steps=df_steps[['id', 'step_id', 'vendor', 'playbook_name', 'playbook_description', 'step_names', 'step_types', 'step_description', 'actuator']]
# Filter out NaN valuesdf_steps=df_steps.dropna(subset=['step_names', 'step_types'])
# Write the resulting DataFrame to a CSV filedf_steps.to_csv("./coding/steps.csv", sep=',', index=False)
# Display the resulting DataFramedf_steps.head()
id
step_id
vendor
playbook_name
playbook_description
step_names
step_types
step_description
actuator
0
p1
s1
FortiSOAR
Action (Type All) - Block Indicators
Blocks all types of indicators on the firewall...
starts
Start
Platform starts
Platform
0
p1
s2
FortiSOAR
Action (Type All) - Block Indicators
Blocks all types of indicators on the firewall...
configurations
Configuration
Platform configurations
Platform
0
p1
s3
FortiSOAR
Action (Type All) - Block Indicators
Blocks all types of indicators on the firewall...
checks Type of Indicator
Condition
Platform checks Type of Indicator
Platform
0
p1
s4
FortiSOAR
Action (Type All) - Block Indicators
Blocks all types of indicators on the firewall...
blocks IP
Playbook execution
Platform blocks IP
Platform
0
p1
s5
FortiSOAR
Action (Type All) - Block Indicators
Blocks all types of indicators on the firewall...
blocks Domain
Playbook execution
Platform blocks Domain
Platform
(4) Apply NLP dependency parsing to extract SVO (actuator, action, and artifact)
(4.1) Prepare manual SVO reviewing by removing duplicates
# Select the columns that we want to use to identify duplicates# in this case, 'step_name', 'step_type', and 'actuator'columns_to_check= ['step_names', 'step_types', 'actuator']
# Drop the duplicate rows based on the columns_to_checkdf_dedup=df_steps.drop_duplicates(subset=columns_to_check)
df_dedup.to_csv("./coding/steps-to-be-coded.csv")
(4.2) Manual review and update the SVO triplets
(4.3) Merge reviewed SVO triplets
# Load the DataFrame from the action-manual-coded.csv filedf_SOV=pd.read_csv('./coding/steps-coded.csv', sep=';')
# Use tqdm to add a progress bar when iterating through the DataFrametqdm.pandas()
# Create an empty list to store the resultsplaybook_steps= []
naa= []
# Iterate through each row in the df_steps DataFrameforindex, rowintqdm(df_steps.iterrows(), total=df_steps.shape[0]):
# Find all matching rows in the df_SOV DataFrame based on the 'step_id' columnmatching_rows=df_SOV[(df_SOV['step_id'] ==row["step_id"])]
# If there are no matching rows, try to find a parent based on the 'step_description'# and 'step_type' columns, and if that doesn't work, use the 'step_name' columniflen(matching_rows) ==0:
parent_rows=df_SOV[(df_SOV['step_description'] ==row["step_description"]) &
(df_SOV['step_types'] ==row["step_types"])]
iflen(parent_rows) ==0:
parent_rows=df_SOV[(df_SOV['step_names'] ==row["step_names"])]
# Create a new dictionary containing the relevant data from both DataFramestry:
playbook_step= {
"playbook_id": row["id"],
"step_id": row["step_id"],
"vendor": row["vendor"],
"playbook_name": row["playbook_name"],
"playbook_description": row["playbook_description"],
"step_name": row["step_names"],
"step_type": row["step_types"],
"step_description": row["step_description"],
"actuator": parent_rows.iloc[0]['actuator'],
"action": parent_rows.iloc[0]['action'],
"artifact": parent_rows.iloc[0]['artifact'],
"automation": parent_rows.iloc[0]['automation']
}
playbook_steps.append(playbook_step)
except:
print(row)
# Create a new DataFrame from the playbook_steps listdf_tmp=pd.DataFrame(playbook_steps)
# Concatenate the df_SOV and df_tmp DataFrames and reset the indexdf_SOV=pd.concat([df_SOV, df_tmp], ignore_index=True)
# Update step description by combining the actuator and step namedf_SOV['step_description'] =df_SOV.apply(lambdax: str(x['action']).strip() +" "+str(x['artifact']).strip() ifstr(x['action']).strip().lower() else"", axis=1)
(5) Playbooks contain a median of 10 steps and are 97% automated (Result 1)
(5.1) Data preparation: group dataframe, filter out vendors, and calculate statistical descriptions.
# calculate mean automation by playbook id and convert to percentagedf_SOV_grouped=df_SOV.groupby('playbook_id')['automation'].mean().reset_index()
df_SOV_grouped['automation'] =df_SOV_grouped['automation'] *100# join playbooks with mean automation by playbook iddf_result1=df_playbooks.merge(df_SOV_grouped, left_on='id', right_on='playbook_id')
# filter out vendors with less than 2 entriesdf_result1=df_result1.groupby('vendor').filter(lambdax: len(x) >=2)
# plot boxplot with seabornvendor_median_steps=df_result1.groupby('vendor')['steps'].median().sort_values(ascending=False)
# add horizontal line for global medianmedian=df_result1['steps'].median()
# add mean automation as a bar plot on second y-axisdf_automation=df_result1.groupby('vendor')['automation'].mean().reset_index()
# add horizontal line for global mean automationmean_automation=df_result1['automation'].mean()
print(f"Automation level: {mean_automation}")
vendor_counts=df_result1['vendor'].value_counts()
Automation level: 96.93898110449203
(5.2) Plot the playbook steps per vendor and the automation level.
# Set figure sizefig, ax=plt.subplots(figsize=(20, 6))
sns.set(style='white')
# Create boxplotsns.boxplot(x='vendor', y='steps', data=df_result1, ax=ax, order=vendor_median_steps.index, boxprops={"facecolor": "white"})
# Add horizontal line for global median stepsmedian=df_result1['steps'].median()
ax.axhline(y=median, color='black', linestyle='--', label='Playbook steps median ({:.2f} steps)'.format(median))
# Add bar plot of automation on a second y-axis#ax2 = ax.twinx()#sns.lineplot(x='vendor', y='automation', data=df_automation, color='#005b96', ax=ax2, linewidth=3)# Add horizontal line for global mean automation#ax2.axhline(y=mean_automation, color='black', linestyle='-', label='Playbook automation mean ({:.2f}%)'.format(mean_automation))# Set axis labels and font weightsax.set_xlabel('SOAR vendor playbooks', fontsize=16, fontweight='bold')
ax.set_ylabel('Steps per playbook', fontsize=16, fontweight='bold')
#ax2.set_ylabel('Playbook automation (in %)', fontsize=16, fontweight='bold')# Add padding to axis labelsax.xaxis.labelpad=15ax.yaxis.labelpad=15#ax2.yaxis.labelpad = 13# Set font size of tick labelsax.tick_params(axis='both', labelsize=13)
#ax2.tick_params(axis='y', labelsize=13)# Set x-axis labels with vendor countsxtick_labels= ['{}\n(n={})'.format(label.get_text(), vendor_counts[label.get_text()]) forlabelinax.get_xticklabels()]
ax.set_xticklabels(xtick_labels)
# Set y-axis limits for automation#ax2.set_ylim(0, 100)# Add legendhandles= [
#plt.Line2D([], [], color='#005b96', alpha=0.3, label='Playbook automation by vendor (mean)'),#plt.Line2D([], [], color='black', linestyle='-', label='Playbook automation mean ({:.2f}%)'.format(mean_automation)),plt.Line2D([], [], color='black', linestyle='--', label='Playbook steps median ({:.2f} steps)'.format(median))
]
ax.legend(handles=handles, fontsize=13, borderaxespad=0., frameon=False)
# Save and show plotplt.savefig('./charts/playbook-steps-automation.pdf', format='pdf', dpi=200, bbox_inches='tight')
plt.show()
# Print summary statisticssummary=df_result1['steps'].describe()
print('Playbook Mean: {:.2f}'.format(summary['mean']))
print('Playbook Median: {:.2f}'.format(summary['50%']))
print('Playbook 0.25 quartile: {:.2f}'.format(summary['25%']))
print('Playbook 0.75 quartile: {:.2f}'.format(summary['75%']))
(6) The steps in the playbook are either for logic, utility, alerting/ticketing, investigation, or remediation (Result 2).
(6.1) Group steps by actions and export results as CSV
# Access the column containing the text dataactions=df_SOV.groupby('action')['artifact'].agg(list)
# Export the resultsactions.to_csv("./coding/step-action-grouped.csv")
(6.2) Read in coded step types and map them to the steps
importpandasaspdimportseabornassnsimportmatplotlib.pyplotasplt# Group the DataFrame by the 'type' column and get the size of each groupgrouped_types=df_result2.groupby('type').size()
# Convert the resulting Series to a dictionarygrouped_types_dict=grouped_types.to_dict()
# Initialize a new dictionary to hold the cleaned datacleaned_dict= {}
# Loop over each key-value pair in the original dictionaryforkey, valueingrouped_types_dict.items():
# Split the key string into an array of individual valueskey_arr=key.split(",")
# Loop over each value in the key arrayforvalinkey_arr:
# If the value is not already in the cleaned dictionary, initialize it to 0ifvalnotincleaned_dict:
cleaned_dict[val] =0# Add the group size divided by the number of values in the key array to the value in the cleaned dictionarycleaned_dict[val] =cleaned_dict[val] +value/len(key_arr)
# Sort the cleaned dictionary by key in ascending ordercleaned_dict=dict(sorted(cleaned_dict.items(), key=lambdax: int(x[0])))
# Convert the cleaned dictionary to a DataFrame, with the keys as the index and a single 'Frequency' columndf_tmp=pd.DataFrame.from_dict(cleaned_dict, orient='index', columns=['Frequency'])
# Rename the index values using a dictionarydf_tmp=df_tmp.rename(index={'1': 'Logic', '2': 'Utility', '3': 'Ticketing', '4': 'Investigation', '5': 'Countermeasure'})
# Print the first few rows of the resulting DataFrametotal_frequency=df_tmp['Frequency'].sum()
# add a new column called "Percentage"df_tmp['Percentage'] =0# calculate the percentage value for each row and assign it to the "Percentage" columnforindex, rowindf_tmp.iterrows():
percentage=round((row['Frequency'] /total_frequency) *100,2)
df_tmp.at[index, 'Percentage'] =percentageprint(df_tmp.head())
# Create a barplot using the cleaned DataFrameax=sns.barplot(x=df_tmp.index, y='Frequency', data=df_tmp, palette='crest_r', alpha=0.8)
# Set padding for the x- and y-axis labelsax.xaxis.labelpad=15ax.yaxis.labelpad=15# Set y-axis limits for better visualizationax.set_ylim([0, max(df_tmp['Frequency']) *1.1])
# Set x- and y-axis labels and font sizesax.set_xlabel('Step category', fontsize=16, fontweight='bold')
ax.set_ylabel('Category frequency', fontsize=16, fontweight='bold')
# Set font size of tick labels and rotate x-axis tick labels for better visibilityax.tick_params(axis='both', labelsize=13)
plt.xticks(rotation=45)
# Show the resulting plotplt.show()
(7) Playbooks are mostly built out of more than one step cluster (Result 3)
(7.1) Prepare step clusters
defget_sorted_values(x):
values=list(x.str.split(','))
sorted_values=sorted(values)
return','.join(sorted_values)
# group dataframe by playbook_id and apply function to type columndf_playbook_types=df_result2[df_result2['type'].notna()]
df_playbook_types=df_playbook_types[df_playbook_types['type'].apply(lambdax: not","inx)]
df_playbook_types=df_playbook_types.groupby('playbook_id').agg({'type': lambdax: ','.join(x.astype(str))})
# create new columns for each value from 1 to 5foriinrange(1, 6):
df_playbook_types[i] =0# iterate over each rowforindex, rowindf_playbook_types.iterrows():
# split the "type" column by ","types=row["type"].split(",")
# calculate the frequency of each number using value_countscounts=pd.Series(types).value_counts()
# update the respective columns using locforcol, valincounts.iteritems():
df_playbook_types.loc[index, int(col)] =valdf_playbook_types['type_size'] =df_playbook_types['type'].apply(lambdax: len(set(x.split(','))))
df_playbook_types.drop('type', axis=1, inplace=True)
counts= (df_playbook_types.iloc[:, :-1] ==0).sum(axis=1).value_counts(normalize=True)
counts=counts.apply(lambdax: x*100)
grouped=df_playbook_types.groupby('type_size').agg({col: 'sum'forcolindf_playbook_types.columns[:-1]})
# Calculate the percentage of each value within a row# Rename the columns and calculate the percentage of each value within a rownew_columns= {
1: 'Logic',
2: 'Utility',
3: 'Ticketing',
4: 'Investigation',
5: 'Countermeasure'
}
grouped=grouped.rename(columns=new_columns).apply(lambdarow: row/row.sum() *100, axis=1)
grouped=grouped.drop(index=1)
grouped=grouped.sort_values(by='type_size', ascending=False)
grouped=grouped.rename(index={2: f'2 categ.\n ({round(counts[3],1)}%)', 3: f'3 categ.\n ({round(counts[2],1)}%)', 4: f'4 categ.\n ({round(counts[1],1)}%)', 5: f'5 categ.\n ({round(counts[0],1)}%)'})
# Create the stacked bar plotax=grouped.plot.barh(stacked=True, figsize=(10,6),color=sns.color_palette('gray',5))
# Add data labels in percentforrectinax.containers:
# Iterate over the rectangles in the container and add the labelforrinrect:
width=r.get_width()
ifwidth>3:
# Calculate the percentage and format the labellabel=f'{int(width)}%'# Add the label to the rectangleax.annotate(label, xy=(r.get_x() +r.get_width() /2, r.get_y() +r.get_height() /2),
xytext=(0, 0), textcoords="offset points", color="white",
ha='center', va='center', fontsize=13, fontweight='bold')
# Add padding to axis labelsax.xaxis.labelpad=15ax.yaxis.labelpad=15# Set y-axis limits for automationax.set_xlim(0, 100)
# Set axis labels and font weightsax.set_xlabel('Step category frequency (in %)', fontsize=16, fontweight='bold')
ax.set_ylabel('Step categories per playbook', fontsize=16, fontweight='bold')
# Set font size of tick labelsax.tick_params(axis='both', labelsize=13)
# Remove legend borderax.legend(frameon=False)
# Move legend to top of the diagramax.legend(loc='upper center', fontsize=13, bbox_to_anchor=(0.5, +1.1), ncol=5, frameon=False)
plt.savefig('./charts/multi-purpose-playbooks.pdf', format='pdf', dpi=200, bbox_inches='tight')
# Show the plotplt.show()