# Data science with Kiln

This notebook demonstrates how the data generated by Kiln can be used to gain insights into how quickly Open Source dependency vulnerabilities are remediated by projects, allowing you to focus your effort on projects that pose more risk to your organisation.

The data for this demo was gathered from the [OWASP RailsGoat](https://github.com/OWASP/railsgoat), [Mastodon](https://github.com/tootsuite/mastodon) and [GitLab](https://gitlab.com/gitlab-org/gitlab) codebases. Kiln was run on each commit to the default branch of each of these projects, from the very first until the most recent as of 2020/01/18. RailGoat serves as a point of comparison and is a deliberately vulnerable application, whereas Mastodon and GitLab are fairly large codebases using in production. All three are Ruby projects, which when this demo was originally built, was the only language that Kiln had tools available for.

Data is read from the Kiln Kafka cluster using Apache Spark, then most of the heavy lifting is done by the Python Data Analysis library, Pandas.
The data coming from Kafka is GZip compressed and encoded using Apache Avro, Spark handles decompressing the data, and the FastAvro library is used to get the data back into a useable form.

The Avro decoding is a heavily CPU intensive task, and because this was written to analyse ~1,000,000 events which are all read into memory, it also also fairly memory intensive. To speed up the Avro decoding, the Python Multiprocessing library is used to parallelise this task over a number of cores. This was tested on an AWS T3a.2xlarge EC2 instance and found to perform acceptably.

For each CVE found in each project, we record the commit that introduced the vulnerable library and the last commit it was seen in. Then we can use the PyGit2 library to find the next commit to the default branch that did not contain the vulnerable package, and record the time this commit was made. Next we load data from the [NIST NVD](https://nvd.nist.gov/vuln/data-feeds) to find when the CVE was published. We can then use the CVE publication date and remediation date to determine how long that vulnerability was present for. We filter out results with a negative remediation time, because this indicates that the package was upgraded in the default branch before the CVE was discovered. From there, we can calculate the some basic statistics on the remediation times for each project and compare them.

In [None]:
from collections import ChainMap
from functools import partial
from io import BytesIO
from multiprocessing import Pool
from pygit2 import GIT_SORT_REVERSE, GIT_SORT_TOPOLOGICAL, Repository,Oid
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
import datetime
import dateutil
import fastavro
import itertools
import numpy as np
import os
import pandas
import requests
import string
import subprocess

In [None]:
MAX_MEMORY = "28g"
os.environ['PYTHON_PATH'] = '$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.8.1-src.zip'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0-preview2 pyspark-shell'

spark = SparkSession \
    .builder \
    .appName("kiln-analysis") \
    .config("spark.executor.memory", MAX_MEMORY) \
    .config("spark.driver.memory", MAX_MEMORY) \
    .getOrCreate()

In [None]:
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka-headless:9092") \
  .option("subscribe", "DependencyEvents") \
  .option("compression", "deflate") \
  .load()
pdf = df.select(df.value).toPandas()

In [None]:
num_partitions = 30
num_cores = 7

def parallelize_dataframe(df, func):
    df_split = np.array_split(df, num_partitions)
    with Pool(num_cores) as pool:
        df = pandas.concat(pool.map(func, df_split))
        pool.close()
        pool.join()
        return df

def parallel_parse(df):
    return df.apply(lambda val: parse_avro(val), axis='columns')

def parse_avro(val):
    buf = BytesIO(val['value'])
    reader = fastavro.reader(buf)
    return [rec for rec in reader]

parsed_pdf = parallelize_dataframe(pdf, parallel_parse).explode()

In [None]:
data = pandas.DataFrame.from_records(parsed_pdf.apply(lambda val: dict(val)))
data['timestamp'] = pandas.to_datetime(data['timestamp'])

In [None]:
grouped_data = data.groupby(['application_name'])
print(grouped_data.size())
grouped_data.head()

In [None]:
railsgoat_data = grouped_data.get_group('railsgoat')
mastodon_data = grouped_data.get_group('mastodon')
gitlab_data = grouped_data.get_group('gitlab')

def get_cve_data(raw_data, app_name):
    cve_ids = raw_data[raw_data['advisory_id'].str.startswith('CVE')]['advisory_id'].unique()
    cve_data = dict()
    repo = Repository(f"/home/jovyan/{app_name}.git")
    walker = repo.walk(repo.head.target, GIT_SORT_TOPOLOGICAL | GIT_SORT_REVERSE)
    walker.simplify_first_parent()
    repo_commits = [str(commit.id) for commit in walker]
    for cve in cve_ids:
        commits = raw_data[raw_data['advisory_id'] == cve].sort_values('timestamp')
        first_commit = commits.head(1)['git_commit_hash'].values[0]
        last_commit = commits.tail(1)['git_commit_hash'].values[0]
        try:
            fixing_commit_parent = repo_commits.index(last_commit)
            fixing_commit = repo.get(Oid(hex=repo_commits[fixing_commit_parent + 1]))
            fixing_commit_id = fixing_commit.id
            fixing_time_offset = datetime.timedelta(minutes=fixing_commit.commit_time_offset)
            original_fixing_date = datetime.datetime.fromtimestamp(fixing_commit.commit_time, datetime.timezone(fixing_time_offset))
            fix_date = datetime.datetime.astimezone(original_fixing_date, datetime.timezone.utc)
        except:
            fixing_commit_id = None
            fix_date = None        
        cve_data[cve] = {"first_commit": first_commit, "fixing_commit": fixing_commit_id, "fix_date": fix_date}
    return cve_data

railsgoat_cve_data = get_cve_data(railsgoat_data, "railsgoat")
mastodon_cve_data = get_cve_data(mastodon_data, "mastodon")
gitlab_cve_data = get_cve_data(gitlab_data, "gitlab")

print(f"Railsgoat has had {len(railsgoat_cve_data.keys())} CVEs land in the default branch")
print(f"Mastodon has had {len(mastodon_cve_data.keys())} CVEs land in the default branch")
print(f"Gitlab has had {len(gitlab_cve_data.keys())} CVEs land in the default branch")

In [None]:
data_by_year = list()
for year in itertools.chain(list(range(2002, 2020)), ["modified"]):
    filename = f"file:///home/jovyan/vulndata/nvdcve-1.1-{year}.json.gz"
    raw_json = pandas.read_json(filename)
    parsed_vulns = dict()
    for item in iter(raw_json['CVE_Items']):
        parsed_vulns[item['cve']['CVE_data_meta']['ID']] = item['publishedDate']
    data_by_year.append(parsed_vulns)
nvd_cve_data = ChainMap(*data_by_year)

for key, val in railsgoat_cve_data.items():
    try:
        val['cve_date'] = datetime.datetime.astimezone(dateutil.parser.parse(nvd_cve_data[key]), datetime.timezone.utc)
    except:
        val['cve_date'] = None
        continue
for key, val in mastodon_cve_data.items():
    try:
        val['cve_date'] = datetime.datetime.astimezone(dateutil.parser.parse(nvd_cve_data[key]), datetime.timezone.utc)
    except:
        val['cve_date'] = None
        continue
for key, val in gitlab_cve_data.items():
    try:
        val['cve_date'] = datetime.datetime.astimezone(dateutil.parser.parse(nvd_cve_data[key]), datetime.timezone.utc)
    except:
        val['cve_date'] = None
        continue

In [None]:
railsgoat_df = pandas.DataFrame(railsgoat_cve_data).transpose().dropna()
mastodon_df = pandas.DataFrame(mastodon_cve_data).transpose().dropna()
gitlab_df = pandas.DataFrame(gitlab_cve_data).transpose().dropna()

In [None]:
railsgoat_df['remediation_time'] = railsgoat_df['fix_date'] - railsgoat_df['cve_date']
mastodon_df['remediation_time'] = mastodon_df['fix_date'] - mastodon_df['cve_date']
gitlab_df['remediation_time'] = gitlab_df['fix_date'] - gitlab_df['cve_date']

In [None]:
railsgoat_positive_remediation_time = railsgoat_df.loc[railsgoat_df['remediation_time'].dt.total_seconds() > 0]
mastodon_positive_remediation_time = mastodon_df.loc[mastodon_df['remediation_time'].dt.total_seconds() > 0]
gitlab_positive_remediation_time = gitlab_df.loc[gitlab_df['remediation_time'].dt.total_seconds() > 0]

print(f"RailsGoat has had {len(railsgoat_positive_remediation_time.index)} vulnerable packages in the default branch")
print(f"Mastodon has had {len(mastodon_positive_remediation_time.index)} vulnerable packages in the default branch")
print(f"Gitlab has had {len(gitlab_positive_remediation_time.index)} vulnerable packages in the default branch")

In [None]:
railsgoat_positive_remediation_time['remediation_time'].describe()

In [None]:
mastodon_positive_remediation_time['remediation_time'].describe()

In [None]:
gitlab_positive_remediation_time['remediation_time'].describe()

In [None]:
mastodon_positive_remediation_time.sort_values('remediation_time').tail(1)

In [None]:
gitlab_positive_remediation_time.sort_values('remediation_time').tail(1)