In [1]:
from graphql import DocumentNode
from gql import gql
from os import environ, path
import json
from typing import Any, Dict
from linear_client import LinearClient

TEAM_NAME: str = 'Juristat'
AFTER_DATE: str = '2023-03-01'
BEFORE_DATE: str = '2023-03-31'

TOKEN: str = environ.get('LINEAR_API_KEY')
client: LinearClient = LinearClient(TOKEN)

with open(path.join('queries', 'states.gql'), 'r') as file_handle:
    states_query: DocumentNode = gql(file_handle.read())

variable_values: Dict[str, Any] = {
    'filter': {
        'team': {
            'name': {
                'eq': TEAM_NAME
            }
        },
    }
}

states_result: Dict[str, Any] = client.execute(
    states_query, variable_values=variable_values)

with open('states.json', 'w') as f:
    json.dump(states_result, f)

with open(path.join('queries', 'issues.gql'), 'r') as file_handle:
    issues_query: DocumentNode = gql(file_handle.read())

variable_values: Dict[str, Any] = {
    'filter': {
        'team': {
            'name': {
                'eq': TEAM_NAME
            }
        },
        'createdAt': {
            'gt': AFTER_DATE,
            'lt': BEFORE_DATE,
        },
    }
}

issues_result = client.drain(
    query=issues_query,
    desired_path=('issues', 'nodes'),
    page_info_path=('issues', 'pageInfo'),
    variable_values=variable_values,
)

with open('data.json', 'w') as f:
    json.dump(issues_result, f)


In [2]:
import json
import pandas as pd
from datetime import datetime

TIME_FORMAT = "%d/%m/%Y %H:%M:%SZ"
CURRENT_NOW = pd.to_datetime(datetime.utcnow().strftime(TIME_FORMAT))

# Generate a list of workflow states
with open('states.json') as f:
    states_result = json.load(f)
states = list()
for state in states_result['workflowStates']['nodes']:
    states.append(state['name'])

# Loop through issues and generate a dataframe of issue state dates
columns = ['ID']
for state in states:
    columns.append(f"{state} Start")
    columns.append(f"{state} End")
    columns.append(f"{state} Duration")
issue_state_dates = pd.DataFrame(columns=columns)
with open('data.json') as f:
    issues_result = json.load(f)
oldest_created_date = CURRENT_NOW
for issue in issues_result:
    issue_created_at = pd.to_datetime(issue['createdAt'])
    if issue_created_at < oldest_created_date:
        oldest_created_date = issue_created_at
    row = dict()
    row['ID'] = issue['identifier']
    for state in states:
        row[f"{state} Start"] = pd.NA
        row[f"{state} End"] = pd.NA
        row[f"{state} Duration"] = pd.NA
    oldest_state = 'For Grooming'
    oldest_state_date = CURRENT_NOW
    for history in issue['history']['nodes']:
        if history['fromState'] is None:
            continue
        history_created_at = pd.to_datetime(history['createdAt'])
        row[f"{history['fromState']['name']} End"] = history_created_at
        row[f"{history['toState']['name']} Start"] = history_created_at
        if history_created_at < oldest_state_date:
            oldest_state = history['fromState']['name']
            oldest_state_date = history_created_at
    row[f"{oldest_state} Start"] = issue_created_at
    for state in states:
        if row[f"{state} Start"] is not pd.NA and row[f"{state} End"] is not pd.NA:
            row[f"{state} Duration"] = row[f"{state} End"] - \
                row[f"{state} Start"]
    issue_state_dates.loc[len(issue_state_dates)] = row

# Calculate cycle time
cycle_time_df = issue_state_dates[
    ~issue_state_dates['Done Start'].isna() &
    ~issue_state_dates['In Progress Start'].isna()
]
cycle_time_durations = cycle_time_df['Done Start'] - \
    cycle_time_df['In Progress Start']
cycle_time = {
    'mean': cycle_time_durations.mean(),
    'median': cycle_time_durations.median(),
    'min': cycle_time_durations.min(),
    'max': cycle_time_durations.max(),
}

# Generate throughput off cycle time
throughput = len(cycle_time_durations)

# Calculate lead time of issues that have been committed to
committed_lead_time_df = issue_state_dates[
    ~issue_state_dates['Done Start'].isna() &
    (
        ~issue_state_dates['Todo Start'].isna() |
        ~issue_state_dates['Todo backlog Start'].isna()
    )
]
committed_lead_time_durations = pd.DataFrame(
    columns=['Start', 'End', 'Duration'])
committed_lead_time_durations['End'] = committed_lead_time_df['Done Start']
committed_lead_time_durations['Start'] = committed_lead_time_df['Todo Start']
for index, row in committed_lead_time_durations.iterrows():
    if pd.isna(row['Start']):
        row['Start'] = committed_lead_time_df['Todo backlog Start'][index]
committed_lead_time_durations['Duration'] = committed_lead_time_durations['End'] - \
    committed_lead_time_durations['Start']
committed_lead_time = {
    'mean': committed_lead_time_durations['Duration'].mean(),
    'median': committed_lead_time_durations['Duration'].median(),
    'min': committed_lead_time_durations['Duration'].min(),
    'max': committed_lead_time_durations['Duration'].max(),
}

# Calculate lead time of issues that have been suggested but not committed
suggested_lead_time_df = issue_state_dates[
    ~issue_state_dates['Done Start'].isna() & (
        ~issue_state_dates['Triage Start'].isna() |
        ~issue_state_dates['For Grooming Start'].isna()
    )
]
suggested_lead_time_durations = pd.DataFrame(
    columns=['Start', 'End', 'Duration'])
suggested_lead_time_durations['End'] = suggested_lead_time_df['Done Start']
suggested_lead_time_durations['Start'] = suggested_lead_time_df['Triage Start']
for index, row in suggested_lead_time_durations.iterrows():
    if pd.isna(row['Start']):
        row['Start'] = suggested_lead_time_df['For Grooming Start'][index]
suggested_lead_time_durations['Duration'] = suggested_lead_time_durations['End'] - \
    suggested_lead_time_durations['Start']
suggested_lead_time = {
    'mean': suggested_lead_time_durations['Duration'].mean(),
    'median': suggested_lead_time_durations['Duration'].median(),
    'min': suggested_lead_time_durations['Duration'].min(),
    'max': suggested_lead_time_durations['Duration'].max(),
}

# Create dataframe of state counts per day for cumulative flow diagram
state_counts = pd.DataFrame(columns=['Date'] + states + ['Total'])
for day in pd.date_range(start=oldest_created_date, end=pd.to_datetime(datetime.utcnow().strftime("%d/%m/%Y %H:%M:%SZ")), freq='D', normalize=True):
    next_day = day + pd.Timedelta(days=1)
    row = dict()
    row['Date'] = day
    row['Total'] = 0
    for state in states:
        row[state] = 0
    oldest_possible = oldest_created_date - pd.Timedelta(days=1)
    for index, issue in issue_state_dates.iterrows():
        latest_state = ''
        latest_state_date = oldest_possible
        for state in states:
            if issue[f"{state} Start"] is not pd.NA:
                state_start = pd.to_datetime(issue[f"{state} Start"])
                if state_start < next_day and state_start > latest_state_date:
                    latest_state = state
                    latest_state_date = state_start
        if '' != latest_state:
            row[latest_state] += 1
            row['Total'] += 1
    state_counts.loc[len(state_counts)] = row


In [None]:
import matplotlib.pyplot as plt
from tabulate import tabulate
from IPython.display import HTML, display


def timedelta_to_hours_rounded(td, precision=2):
    return round(td.total_seconds() / 60 / 60, precision)

def timedelta_to_days_rounded(td, precision=2):
    return round(td.total_seconds() / 60 / 60 / 24, precision)


columns = ['Date', 'Triage', 'For Grooming', 'Todo backlog',
           'Todo', 'In Progress', 'In Review', 'Done', 'Canceled']
plot_states = state_counts[columns]
figure = plt.figure()
ax = figure.add_subplot(1, 1, 1)
ax.stackplot(plot_states['Date'], plot_states.drop(
    'Date', axis=1).T, labels=columns[1:])
ax.legend(loc='upper left')
plt.xticks(rotation=45, ha='right')
plt.show()

display(HTML("<p>For both throughput and cycle time, only cards that had a <code>In Progress</code> status are counted.</p>"))

display(HTML(f"<p>Throughput: {throughput} cards</p>"))

display(HTML(f"<p>Cycle time is the amount of time it takes from when a card has been started by a dev until it hits the <code>Done</code> status.</p>"))
cycle_time_headers = ['Cycle Time', 'Days', 'Hours']
cycle_time_data = [
    ['Mean', timedelta_to_days_rounded(cycle_time['mean']), timedelta_to_hours_rounded(cycle_time['mean'])],
    ['Median', timedelta_to_days_rounded(cycle_time['median']), timedelta_to_hours_rounded(cycle_time['median'])],
    ['Min', timedelta_to_days_rounded(cycle_time['min']), timedelta_to_hours_rounded(cycle_time['min'])],
    ['Max', timedelta_to_days_rounded(cycle_time['max']), timedelta_to_hours_rounded(cycle_time['max'])],
]
cycle_time_table = tabulate(cycle_time_data, headers=cycle_time_headers, tablefmt='html')
display(HTML(cycle_time_table))

display(HTML(f"<p>Committed lead time is the amount of time it takes from when a card has been committed to, ie it has a <code>Todo</code> or <code>Todo backlog</code> state, until it hits the <code>Done</code> status.</p>"))
committed_lead_time_headers = ['Committed Lead Time', 'Days', 'Hours']
committed_lead_time_data = [
    ['Mean', timedelta_to_days_rounded(committed_lead_time['mean']), timedelta_to_hours_rounded(committed_lead_time['mean'])],
    ['Median', timedelta_to_days_rounded(committed_lead_time['median']), timedelta_to_hours_rounded(committed_lead_time['median'])],
    ['Min', timedelta_to_days_rounded(committed_lead_time['min']), timedelta_to_hours_rounded(committed_lead_time['min'])],
    ['Max', timedelta_to_days_rounded(committed_lead_time['max']), timedelta_to_hours_rounded(committed_lead_time['max'])],
]
committed_lead_time_table = tabulate(committed_lead_time_data, headers=committed_lead_time_headers, tablefmt='html')
display(HTML(committed_lead_time_table))

display(HTML(f"<p>Suggested lead time is the amount of time it takes from when a card has been suggested, ie it has a <code>Triage</code> or <code>For Grooming</code> state, until it hits the <code>Done</code> status.</p>"))
suggested_lead_time_headers = ['Suggested Lead Time', 'Days', 'Hours']
suggested_lead_time_data = [
    ['Mean', timedelta_to_days_rounded(suggested_lead_time['mean']), timedelta_to_hours_rounded(suggested_lead_time['mean'])],
    ['Median', timedelta_to_days_rounded(suggested_lead_time['median']), timedelta_to_hours_rounded(suggested_lead_time['median'])],
    ['Min', timedelta_to_days_rounded(suggested_lead_time['min']), timedelta_to_hours_rounded(suggested_lead_time['min'])],
    ['Max', timedelta_to_days_rounded(suggested_lead_time['max']), timedelta_to_hours_rounded(suggested_lead_time['max'])],
]
suggested_lead_time_table = tabulate(suggested_lead_time_data, headers=suggested_lead_time_headers, tablefmt='html')
display(HTML(suggested_lead_time_table))
