In [7]:
import sys
sys.path.append("../")

In [8]:
import datetime
import pandas as pd
import numpy as np
import re 

from jira_cc_ingestion.application.get_from_db import (get_extraction_depth, get_last_row)
from jira_cc_ingestion.application.extract_from_jira import (retrieve_raw_issues, process_issues,
                                                             retrieve_deleted_issue_keys, get_sprints_data,
                                                             get_todo_deleted_list, get_worklog_deleted_list,
                                                             get_users_data, retrieve_worklogs, retrieve_to_do_list)
from jira_cc_ingestion.application.load_to_db import (loading_info, tag_deleted, delete_insert_sprints_data,
                                                     loading_deleted_todo, loading_deleted_worklog,
                                                     delete_insert_users_data)
from jira_cc_ingestion.infra.jira import JiraData
from jira_cc_ingestion.infra.sql_db import DatabaseHelper
#from jira_cc_ingestion.config import MSSQL_DB, MSSQL_SERVER, PORT
import logging

In [9]:
logger = logging.getLogger(__name__)

In [10]:
token_auth = 'OuxnJVJx41OING5vgcncvXLbcS47EwXYvkPX5h'

In [14]:
project = {'name': "TCC",
               'jira_url': 'https://devstack.vwgroup.com/jira',
               "additional_conditions": 'AND Type = Test AND "KPM-Tickets to test" is not Empty and created >= "2024-01-01"',
               'sql_table_issue': 'raw_jira_technica_TCC_daily',
               'list_of_direct_fields': [
                   {'field': 'key', 'corresponding_customfield': 'key',
                    'method_of_extraction': 'direct_retrieval'},
                   {'field': 'changelog_history', 'corresponding_customfield': 'changelog',
                    'method_of_extraction': 'field_arg_retrieval_value',
                    'arguments': ['histories', False]}
               ],
               'list_of_fields': [
                   {'field': 'New KPM Tickets', 'corresponding_customfield': 'customfield_34603',
                    'method_of_extraction': 'field_arg_retrieval'},
                   {'field': 'KPM-Tickets to test', 'corresponding_customfield': 'customfield_52954',
                    'method_of_extraction': 'field_arg_retrieval'},
                    {'field': 'description', 'corresponding_customfield': 'description',
                   'method_of_extraction': 'field_arg_retrieval'},
                   {'field': 'assignee', 'corresponding_customfield': 'assignee',
                    'method_of_extraction': 'field_arg_retrieval_value',
                    'arguments': ['displayName']},
                   {'field': 'reporter', 'corresponding_customfield': 'reporter',
                    'method_of_extraction': 'field_arg_retrieval_value',
                    'arguments': ['displayName']},
                   {'field': 'components', 'corresponding_customfield': 'components',
                    'method_of_extraction': 'list_field_retrieval_value',
                    'arguments': ['name']},
                   {'field': 'due_date', 'corresponding_customfield': 'duedate',
                    'method_of_extraction': 'field_arg_retrieval'},
                   {'field': 'description', 'corresponding_customfield': 'description',
                    'method_of_extraction': 'field_arg_retrieval'},
                   {'field': 'epic_link', 'corresponding_customfield': 'customfield_10000',
                    'method_of_extraction': 'field_arg_retrieval'},
                   {'field': 'status', 'corresponding_customfield': 'status',
                    'method_of_extraction': 'field_arg_retrieval_value',
                    'arguments': ['name']},
                   {'field': 'priority', 'corresponding_customfield': 'priority',
                    'method_of_extraction': 'field_arg_retrieval_value',
                    'arguments': ['name']},
                   {'field': 'resolution', 'corresponding_customfield': 'resolution',
                    'method_of_extraction': 'field_arg_retrieval_value',
                    'arguments': ['name']},
                   {'field': 'issue_type', 'corresponding_customfield': 'issuetype',
                    'method_of_extraction': 'field_arg_retrieval_value',
                    'arguments': ['name']},
                   {'field': 'project', 'corresponding_customfield': 'project',
                    'method_of_extraction': 'field_arg_retrieval_value',
                    'arguments': ['key']},
                   {'field': 'parent', 'corresponding_customfield': 'parent',
                    'method_of_extraction': 'field_arg_retrieval_value',
                    'arguments': ['key']},
                   {'field': 'issue_id', 'corresponding_customfield': 'issuetype',
                    'method_of_extraction': 'field_arg_retrieval_value',
                    'arguments': ['id']},
                   {'field': 'sprint_names', 'corresponding_customfield': 'customfield_10004',
                    'method_of_extraction': 'list_field_retrieval_sprints',
                    'arguments': ['name']},
                   {'field': 'sprint_ids', 'corresponding_customfield': 'customfield_10004',
                    'method_of_extraction': 'list_field_retrieval_sprints',
                    'arguments': ['id']},
                   {'field': 'sprints_startDate', 'corresponding_customfield': 'customfield_10004',
                    'method_of_extraction': 'field_arg_retrieval_sprint',
                    'arguments': ['startDate', 0]},
                   {'field': 'sprint', 'corresponding_customfield': 'customfield_10004',
                    'method_of_extraction': 'field_arg_retrieval_sprint',
                    'arguments': ['name', 0]},
                   {'field': 'sprint_id', 'corresponding_customfield': 'customfield_10004',
                    'method_of_extraction': 'field_arg_retrieval_sprint',
                    'arguments': ['id', 0]},
                   {'field': 'last_sprint', 'corresponding_customfield': 'customfield_10004',
                    'method_of_extraction': 'field_arg_retrieval_sprint',
                    'arguments': ['name', -1]},
                   {'field': 'last_sprint_id', 'corresponding_customfield': 'customfield_10004',
                    'method_of_extraction': 'field_arg_retrieval_sprint',
                    'arguments': ['id', -1]},
                   {'field': 'timespent', 'corresponding_customfield': 'timespent',
                    'method_of_extraction': 'field_arg_retrieval'},
                   {'field': 'Technology', 'corresponding_customfield': 'customfield_45200',
                    'method_of_extraction': 'field_arg_retrieval_value',
                    'arguments': ['value']},
                   {'field': 'Cluster', 'corresponding_customfield': 'customfield_18401',
                    'method_of_extraction': 'field_arg_retrieval_value',
                    'arguments': ['value']},
                   {'field': 'summary', 'corresponding_customfield': 'summary',
                    'method_of_extraction': 'field_arg_retrieval'},
                   {'field': 'updated', 'corresponding_customfield': 'updated',
                    'method_of_extraction': 'field_arg_retrieval'},
                   {'field': 'story_points', 'corresponding_customfield': 'customfield_10508',
                    'method_of_extraction': 'field_arg_retrieval'},
                   {'field': 'resolution_date', 'corresponding_customfield': 'resolutiondate',
                    'method_of_extraction': 'field_arg_retrieval'},
                   {'field': 'created', 'corresponding_customfield': 'created',
                    'method_of_extraction': 'field_arg_retrieval'},
                   {'field': 'labels', 'corresponding_customfield': 'labels',
                    'method_of_extraction': 'list_field_retrieval'},
                   {'field': 'SW', 'corresponding_customfield': 'customfield_31804',
                    'method_of_extraction': 'field_arg_retrieval'},
                   {'field': 'ecu', 'corresponding_customfield': 'customfield_51500',
                    'method_of_extraction': 'field_arg_retrieval'},
                   {'field': 'Last comment', 'corresponding_customfield': 'customfield_19508',
                    'method_of_extraction': 'field_arg_retrieval'},
                   {'field': 'Acceptance Criterias name', 'corresponding_customfield': 'customfield_10335',
                    'method_of_extraction': 'list_field_retrieval_value',
                    'arguments': ['name']},
                   {'field': 'Acceptance Criterias checks', 'corresponding_customfield': 'customfield_10335',
                    'method_of_extraction': 'list_field_retrieval_value',
                    'arguments': ['checks']},
                    {'field': 'comments_dates', 'corresponding_customfield': 'comment',
                     'method_of_extraction': 'extract_simplified_comments',
                     'arguments': [[('created',)], False]},
               ],
               'list_date_fields': [
                   'created',
                   'updated',
                   'due_date',
                   'resolution_date'
               ]
               }

In [12]:
pd.set_option('max_colwidth', 300)

In [13]:
# logger.info("connecting to sql server...")
# connection_string = 'mssql+pyodbc://{}:{}@{}:{}/{}?driver=SQL+Server'.format(user_sql, password_sql, MSSQL_SERVER,
#                                                                              PORT, MSSQL_DB)
# data_connector = DatabaseHelper(connection_string=connection_string)
# logger.info("sql server is ready!")

logger.info("connecting to jira server...")
jc = JiraData(token=token_auth, jira_url=project['jira_url'])
jc.connect_to_jira()
logger.info("jira server is ready!")

logger.info("getting current date...")
current_datetime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logger.info(f"current date is {current_datetime}")

logger.info(f"Retrieving extraction depth for {project['name']} project... ")
#last_date = get_extraction_depth(data_connector, project, num_update_days)
last_date = "2024-04-01"
logger.info(
    f"Retrieving update issues since {last_date} and configured fields for the {project['name']} project...")
issues_list = retrieve_raw_issues(jc, project, last_date)
logger.info(f"update issues  and configured fields are extracted.")

logger.info(f"Extracting of needed fields for the {project['name']} project...")
extracted_issues = process_issues(project, issues_list, current_datetime)
#extracted_issues.to_csv("extracted_issues_all_resolution_20241025.csv", index=False, header=1, sep=";")

Trying to connect to the jira server...




Extracting Jira Issues...




KeyError: 'c'

In [15]:
extracted_issues = process_issues(project, issues_list, current_datetime)

In [16]:
def extract_status_changes(key, changelog_history):
    l = [[key, h.created, item.fromString, item.toString] for h in changelog_history for item in h.items  if item.field == "status"]
    return l

In [17]:
l = [[extracted_issues.loc[i, 'key'], h.created, item.fromString, item.toString] for i in range(len(extracted_issues)) 
     for h in extracted_issues.loc[i, 'changelog_history'] for item in h.items  if item.field == "status" ]

In [18]:
#pd.DataFrame(l, columns=['key', 'change_date', 'from_status', 'to_status']).to_csv("change_log_tcc.csv", sep=';', index=False, header=1)

In [19]:
#extracted_issues.apply(lambda x: extract_status_changes(x.key, x.changelog_history), axis=1).explode()

In [20]:
def datetime_diff(d2, d1):
    return round((d2 - d1).days + (d2 - d1).seconds /86400, 1)

In [21]:
def nb_w_days(enddate, startdate):
    return np.busday_count(startdate, enddate)

In [22]:
server_time = jc.jira_connector.server_info()["serverTime"]
extracted_issues["end date"] = extracted_issues["resolution_date"].map(
    lambda d: datetime.datetime.strptime(d[:16], "%Y-%m-%dT%H:%M") if d else datetime.datetime.strptime(server_time[:16], "%Y-%m-%dT%H:%M"))
extracted_issues["created"] = pd.to_datetime(extracted_issues["created"].str[:16])



In [23]:
def extract_assignees_info(changelog_history, end_date, current_assignee, created_date):
    l = ([(created_date[:16], None)] + [(h.created, item.fromString) for h in changelog_history for item in h.items  if item.field == "assignee"]
     + [(end_date, current_assignee)])
    return (" | ".join([ str(l[i + 1][1]) + ":" + str(datetime_diff(datetime.datetime.strptime(l[i + 1][0][:16], "%Y-%m-%dT%H:%M"),
                                                                   datetime.datetime.strptime(l[i][0][:16], "%Y-%m-%dT%H:%M"))) 
                                                     for i in range(len(l) -1)]), datetime_diff(datetime.datetime.strptime(l[-1][0][:16], "%Y-%m-%dT%H:%M"),
                                                                                                datetime.datetime.strptime(l[-2][0][:16], "%Y-%m-%dT%H:%M")),
            datetime.datetime.strptime(l[-2][0][:16], "%Y-%m-%dT%H:%M"))

def extract_status_info(changelog_history, end_date, current_status, created_date):
    l = ([(created_date[:16], None)] + [(h.created, item.fromString) for h in changelog_history for item in h.items  if item.field == "status"]
     + [(end_date, current_status)])
    statuses_list =[(str(l[i + 1][1]), datetime_diff(datetime.datetime.strptime(l[i + 1][0][:16], "%Y-%m-%dT%H:%M"),
                                                     datetime.datetime.strptime(l[i][0][:16], "%Y-%m-%dT%H:%M")))
                    for i in range(len(l) -1)]
    return datetime_diff(datetime.datetime.strptime(l[-1][0][:16], "%Y-%m-%dT%H:%M"),
                         datetime.datetime.strptime(l[-2][0][:16], "%Y-%m-%dT%H:%M")), [e[1] for e in l].count("Test Blocked"), statuses_list

In [24]:
assignee_extracted_info = extracted_issues.apply(lambda r: extract_assignees_info(r["changelog_history"], r["end date"].strftime("%Y-%m-%dT%H:%M"),
                                                                                  r["assignee"], r["created"].strftime("%Y-%m-%dT%H:%M")), axis=1)
extracted_issues["Assignee history"] = [l[0] for l in assignee_extracted_info]
extracted_issues["time with current Assignee"] = [l[1] for l in assignee_extracted_info]
extracted_issues["Current assignee date"] = [l[2] for l in assignee_extracted_info]

In [25]:
status_extracted_info = extracted_issues.apply(lambda r: extract_status_info(r["changelog_history"], r["end date"].strftime("%Y-%m-%dT%H:%M"),
                                                                                 r["status"], r["created"].strftime("%Y-%m-%dT%H:%M")), axis=1)
extracted_issues["time in current status"] =  [round(l[0], 1) for l in status_extracted_info]
extracted_issues["Status_Blocked_Counter"] =  [l[1] for l in status_extracted_info]
extracted_issues["was blocked?"] = extracted_issues["Status_Blocked_Counter"].map(lambda x: "x" if x > 0 else "o")

In [26]:
statuses_list = ['Planned', 'Backlog', 'Test Blocked',
       'Accepted for Implementation', 'Cancelled', 'Test results ok',
       'Implementation Review', 'In Analysis', 'Waiting',
       'Provide Documentation', 'In Progress', 'Test results not ok']
for status in statuses_list:
    extracted_issues[status] = [round(sum([e[1] for e in l[2] if e[0]==status]), 1) for l in status_extracted_info]
extracted_issues["Total"] = extracted_issues[
        ['Planned', 'Backlog', 'Test Blocked',
       'Accepted for Implementation',
       'Implementation Review', 'In Analysis', 'Waiting',
       'Provide Documentation', 'In Progress']].sum(axis=1).round(1)

extracted_issues["Total excluding Blocked/Planned"] = extracted_issues[
        ['Backlog', 'Accepted for Implementation',
       'Implementation Review', 'In Analysis', 'Waiting',
       'Provide Documentation', 'In Progress']].sum(axis=1).round(1)

extracted_issues["Percentage time in Deffect Discussion"] = (extracted_issues['Implementation Review'] 
                                                             / extracted_issues["Total excluding Blocked/Planned"]).round(1)

for status in statuses_list:
    extracted_issues[status] = extracted_issues[status].round(1)

In [27]:
def check_daily_comments(comments_dates, start_date, end_date):
    expected_nb_comments = nb_w_days(end_date.date(), start_date.date())
    nbr_disctinct_comments_dates = len(set([
        d.date() for d in comments_dates if start_date.date() <= d.date() and d.date() <= end_date.date()]))
    return "x" if nbr_disctinct_comments_dates > expected_nb_comments else "o"

extracted_issues["DailyCommentCheck"] = extracted_issues.apply(
    lambda r: check_daily_comments([datetime.datetime.strptime(d[:16], "%Y-%m-%dT%H:%M") for d in r['comments_dates']],
                                   datetime.datetime.strptime(r["sprints_startDate"][:16], "%Y-%m-%dT%H:%M"),
                                   r["end date"]) if len(r["sprints_startDate"]) > 14 else None, axis=1)

In [28]:
test_iteration_matching = {
    "tryout": "tryout",
    "try_out": "tryout",
    "test1": "test1",
    "test2": "test2",
    "test3": "test3",
    "test4": "test4",
    "test5": "test5"
}
test_system_contains_matching = {
    "CAN": "NWT",
    "FR": "NWT",
    "LIN": "NWT",
    "BAP": "BAP",
    "ETH": "BTST",
    "SOMEIP": "BTST",
    "ViWi": "BTST",
    "FAZIT": "Sec"
}
test_system_equals_matching = {
    "Routing": "Routing",
    "VKMS": "Sec",
    "SOK": "Sec",
    "TLS": "Sec",
    "sSOA": "Sec",
    "SOMEIP": "BTST",
    "ViWi": "BTST"
}
    #"Added Conditional Column4" = Table.AddColumn(#"Expanded fnCountKPMcreated", "SolutionTrain", each if Text.Contains([Labels], "ST1.2") then "ST1.2" else if Text.Contains([Labels], "ST1.1") then "ST1.1" else null),
solutiontrain_contains_matching = {
    "ST1.1": "ST1.1",
    "ST1.2": "ST1.2"
}

In [29]:
def extract_testtype(s):
    lower_s = s.lower()
    if lower_s.startswith("ft"):
        testtype = "FT"
    elif lower_s.startswith("st"):
        testtype = "ST"
    elif lower_s.startswith("pt"):
        testtype = "PT"
    elif lower_s.startswith("dt"):
        testtype = "DT"
    elif lower_s.startswith("try"):
        testtype = "Try"
    else:
        testtype = "undefined"
    return testtype

def extract_test_iteration(s):
    lower_s = s.lower()
    if "tryout" in lower_s or "try_out" in lower_s:
        test_iteration = "tryout"
    elif "test1" in lower_s:
        test_iteration = "test1"
    elif "test2" in lower_s:
        test_iteration = "test2"
    elif "test3" in lower_s:
        test_iteration = "test3"
    elif "test4" in lower_s:
        test_iteration = "test4"
    elif "test5" in lower_s:
        test_iteration = "test5"
    else:
        test_iteration = "undefined"
    return test_iteration

def extract_with_contains_condition(s, contains_matching, default):
    for pattern, result in contains_matching.items():
        if pattern in s:
            return result
    return default

def extract_with_equals_condition(s, contains_matching, default):
    for pattern, result in contains_matching.items():
        if pattern == s:
            return result
    return default

def extract_test_iteration(s):
    return extract_with_contains_condition(s, test_iteration_matching, "undefined")

def extract_test_system(s):
    result_contains = extract_with_contains_condition(s, test_system_contains_matching, None)
    if result_contains:
        return result_contains
    else:
        extract_with_equals_condition(s, test_system_equals_matching, None)

def extract_solutiontrain(s):
    return extract_with_contains_condition(s, solutiontrain_contains_matching, None)

def treansfrom_status(s):
    return ("Test performed" if s == "Test results ok" 
            else "Failed" if s == "Test results not ok" else s)


def transform_epic_link(s):
    return (s
            .replace("TCC-13860", "EDAG")
            .replace("TCC-13870", "PORSCHETESTHAUS")
            .replace("TCC-13859", "DIGITEQ")
            .replace("TCC-14627", "in-tech")
            .replace("TCQ Vernetzung", "Networking")
            .replace("TCC-31666", "KPIT")
            .replace("TCC-31247", "AUDI")
            .replace("TCC-32455", "ESRLabs")
            .replace("TCQ Security", "Security")
           )
    
def transform_components(s):
    return (s
            .replace("TCQ Vernetzung", "Networking")
            .replace("TCQ Security", "Security")
           )

In [30]:
extracted_issues["TestType"] = extracted_issues["summary"].map(extract_testtype)

extracted_issues["test iteration"] = extracted_issues["labels"].map(extract_test_iteration)

extracted_issues["test system"] = extracted_issues["Technology"].map(extract_test_system)

extracted_issues["Tech-Cluster"] = extracted_issues["components"].map(transform_components)

extracted_issues["Custom field (Epic Link)"] = extracted_issues["epic_link"].map(transform_epic_link)

extracted_issues["Solution Train"] = extracted_issues["labels"].map(extract_solutiontrain)

mask_Erstdurchdringung = (extracted_issues["resolution_date"].notna() & (extracted_issues["TestType"] == "FT")
                          & ~extracted_issues["status"].isin(['Cancelled', 'Test results not ok']))
extracted_issues["Erstdurchdringung"] = None
extracted_issues["Cluster"] = extracted_issues["Cluster"].fillna('')
extracted_issues["Solution Train"] = extracted_issues["Solution Train"].fillna('')

extracted_issues.loc[mask_Erstdurchdringung, "Erstdurchdringung"] = (extracted_issues
                                                                     .loc[mask_Erstdurchdringung]
                                                                     .groupby(["summary", "Cluster", "Solution Train"])["key"]
                                                                     .rank(method="dense", ascending=True)
                                                                    )
extracted_issues.loc[mask_Erstdurchdringung, "Erstdurchdringung"] = extracted_issues.loc[mask_Erstdurchdringung, "Erstdurchdringung"].map(
    lambda x: "x" if x==1 else "o" if x > 1 else None)
# extracted_issues["Erstdurchdringung"] = extracted_issues.groupby(["summary", "Cluster", "Solution Train"])["key"].rank(method="dense", ascending=True)
# extracted_issues.loc[extracted_issues["resolution_date"].isna() | (extracted_issues["TestType"] != "FT"), "Erstdurchdringung"] = None
# extracted_issues["Erstdurchdringung"] = extracted_issues["Erstdurchdringung"].map(lambda x: "x" if x==1 else "o" if x > 1 else None)

extracted_issues["Current teststatus"] = extracted_issues["status"] .map(treansfrom_status)

extracted_issues["created [year/week]"] = extracted_issues["created"].map(lambda d: d.strftime("%Y/%V"))


extracted_issues["resolved [year/week]"] = extracted_issues["resolution_date"].map(lambda d:
                                                                           datetime.datetime.strptime(d[:10], '%Y-%m-%d').strftime("%Y/%V") if d else None)


In [31]:
def extract_betweendelimiters(s, delimter1, delimter2, occ1, occ2):
    if delimter1 not in s:
        return ""
    else:
        split1 = delimter1.join(s.split(delimter1)[occ1+1:])
        if delimter2 == "":
            return split1
        if delimter2 in split1:
            return delimter2.join(split1.split(delimter2)[:occ2+1])
        else:
            return ""

In [32]:
extracted_issues["Test results FEA"] = extracted_issues["description"].map(lambda s: extract_betweendelimiters(s,
                                                                         "||Total amount of TCs||iO||niO||offen||Testblocker||Geblockt||Testfall Problem||Testinfrastruktur Problem||Durchführung abgebrochen||",
                                                                         "|", 0, 9))
extracted_issues["Test results CB"] = extracted_issues["description"].map(lambda s: extract_betweendelimiters(s,
                                                                         "||Total amount of TCs||passed||failed||blocked||not applicable||not run yet||",
                                                                         "|", 0, 6))
extracted_issues["Test results Raw"] = extracted_issues["description"].map(lambda s: extract_betweendelimiters(s,
                                                                         "||Total amount of TCs||iO||niO||offen||nicht geplant||",
                                                                         "|", 0, 5))
extracted_issues["Link to filled FEA"] = extracted_issues["description"].map(lambda s: extract_betweendelimiters(s,
                                                                         "Link to filled FEA+*",
                                                                         "*", 0, 0))
extracted_issues["Link to Raw Data"] = extracted_issues["description"].map(lambda s: extract_betweendelimiters(s,
                                                                         "Link to Raw Data+*",
                                                                         "*", 0, 0))
extracted_issues["Link to filled Codebeamer Excel"] = extracted_issues["description"].map(lambda s: extract_betweendelimiters(s,
                                                                         "Link to filled Codebeamer Excel+*",
                                                                         "*", 0, 0))

In [33]:
def split_series(s, separator, n, cols):
    splited_series = s.str.strip().str.split(separator)
    splited_df = pd.concat([splited_series.map(lambda l: int(l[i]) if i < len(l) and  l[i].isnumeric() else np.nan) for i in range(n)], axis=1)
    splited_df.columns = cols
    return splited_df

In [34]:
#extracted_issues.drop(fea_cols+raw_cols+cb_cols, axis=1, inplace=True)
fea_cols = ["Test results FEA.1", "Number of TCs FEA.2", "io FEA.3", "nio FEA.4", "offen FEA.5", "Testblocker FEA.6",
            "Geblockt FEA.7", "Testfallproblem FEA.8", "Testinfrastruktur FEA.9", "abgebrochen FEA.10"]
raw_cols = ["Test results Raw.1", "Number of TCs Raw.2", "io Raw.3", "nio Raw.4", "offen Raw.5", "nicht geplant Raw.6"]
cb_cols = ["Test results CB.1", "Number of TCs CB.2", "passed CB.3", "failed CB.4",
              "blocked CB.5", "not applicable CB.6", "not run yet CB.7"]
extracted_issues = pd.concat([extracted_issues, split_series(extracted_issues["Test results FEA"], "|", 10,
             fea_cols)], axis=1)
extracted_issues = pd.concat([extracted_issues, split_series(extracted_issues["Test results Raw"], "|", 6,
             raw_cols)], axis=1)

extracted_issues = pd.concat([extracted_issues, split_series(extracted_issues["Test results CB"], "|", 7,
                                                             cb_cols)], axis=1)

In [35]:
to_replace = """\<type here\>\+\*|Please write a detailed summary for the occurred problems after the hashtag summary\.|Please provide confluence link\(s\) to the documentation where you document your new findings / insights for the team \(e\. g\. new ECU configuration\, coding\)\.|\<enter Link\(s\) to Confluence here\>\+\*|\*\(x\) \+|\"\*\+\"\,"""
extracted_issues["#Problems"] = extracted_issues["description"].map(lambda s: extract_betweendelimiters(s,
                                                                         "#Problems:",
                                                                         "#Summary", 0, 0)).str.replace(to_replace, "", regex=True).str.strip()
extracted_issues["#Summary"] = extracted_issues["description"].map(lambda s: extract_betweendelimiters(s,
                                                                         "#Summary:",
                                                                         "#Links:", 0, 0)).str.replace(to_replace, "", regex=True).str.strip()
extracted_issues["#Links"] = extracted_issues["description"].map(lambda s: extract_betweendelimiters(s,
                                                                         "#Links:",
                                                                         "", 0, 0)).str.replace(to_replace, "", regex=True).str.strip()

In [36]:
def extract_kpm_extracted_list(s):
    kpm_part = extract_betweendelimiters(s, "||KPM Number||Comments if necessary||", "", 0, 1)
    kpm_part_split = [s.replace("KPM_", "").replace("KPM", "").strip() for s in "|".join(kpm_part.split("#(lf)")).split("|")]
    return [re.search("[0-9]{6,12}", s).group().strip() for s in kpm_part_split if re.search("[0-9]{6,12}", s)]

def extract_kpm_verifictaion_list(s):
    kpm_part = extract_betweendelimiters(s, "||KPM Number||Verification status (passed, failed, blocked, not tested)||KPM Ticket updated (Y/N)||Comments if not passed||",
                                         "", 0, 1)
    kpm_part_split = [s.replace("KPM_", "").replace("KPM", "").strip() for s in "|".join(kpm_part.split("#(lf)")).split("|")]
    return [re.search("[0-9]{6,12}", s).group().strip() for s in kpm_part_split if re.search("[0-9]{6,12}", s)]
    
def extract_kpm_verifictaion_statuses(s):
    kpm_part = extract_betweendelimiters(s, "||KPM Number||Verification status (passed, failed, blocked, not tested)||KPM Ticket updated (Y/N)||Comments if not passed||",
                                         "", 0, 1)
    kpm_part_split = [s.replace("KPM_", "").replace("KPM", "").strip().lower() for s in "|".join(kpm_part.split("#(lf)")).split("|")]
    return [re.search("passed|pass|failed|fail|blocked|not tested", s).group().strip() for s in kpm_part_split 
            if re.search("passed|failed|blocked|not tested", s)]

In [37]:
extracted_issues["KPMs created list"] = extracted_issues["New KPM Tickets"].map(extract_kpm_extracted_list)
extracted_issues["KPMs created count"] = extracted_issues["KPMs created list"].map(len)
extracted_issues["KPMs verification list"] = extracted_issues["KPM-Tickets to test"].map(extract_kpm_verifictaion_list)
extracted_issues["KPMs verification statuses"] = extracted_issues["KPM-Tickets to test"].map(extract_kpm_verifictaion_statuses)
extracted_issues["KPMs verification status count passed|failed|blocked|not tested"] = extracted_issues["KPMs verification statuses"].map(lambda l:
    ",".join([str(l.count("passed")), str(l.count("failed")), str(l.count("blocked")), str(l.count("not tested"))]))

In [38]:
def nan_fill(n):
    return 0 if np.isnan(n) else n
def is_not_nan(n):
    return n and not(np.isnan(n))
    
def calculate_passed(r):
    if is_not_nan(r["Number of TCs CB.2"]):
        return r["passed CB.3"]
    if is_not_nan(r["passed CB.3"]):
        return r["io FEA.3"]
    if is_not_nan(r["Number of TCs FEA.2"]):
        return r["io FEA.3"]
    return r["io Raw.3"]
    
def calculate_failed(r):
    if is_not_nan(r["Number of TCs CB.2"]):
        return r["failed CB.4"]
    if is_not_nan(r["Number of TCs FEA.2"]):
        return r["nio FEA.4"]
    return r["nio Raw.4"]

def calculate_blocked(r):
    if is_not_nan(r["Number of TCs CB.2"]):
        return r["blocked CB.5"]
    if is_not_nan(r["Number of TCs FEA.2"]):
        return (nan_fill(r["Geblockt FEA.7"]) + nan_fill(r["Testblocker FEA.6"])
                + nan_fill(r["Testfallproblem FEA.8"]) + nan_fill(r["Testinfrastruktur FEA.9"]))
    return 0

def calculate_not_applicable(r):
    if is_not_nan(r["Number of TCs CB.2"]):
        return r["not applicable CB.6"]
    if is_not_nan(r["Number of TCs FEA.2"]):
        return r["abgebrochen FEA.10"]+r["offen FEA.5"]
    return nan_fill(r["nicht geplant Raw.6"]) + nan_fill(r["offen Raw.5"])

def calculate_not_run_yet(r):
    if is_not_nan(r["Number of TCs CB.2"]):
        return r["not run yet CB.7"]
    if is_not_nan(r["Number of TCs FEA.2"]):
        return 0
    return 0
    
extracted_issues["passed"] = extracted_issues.apply(calculate_passed, axis=1)
extracted_issues["failed"] = extracted_issues.apply(calculate_failed, axis=1)
extracted_issues["blocked"] = extracted_issues.apply(calculate_blocked, axis=1)
extracted_issues["not_applicable"] = extracted_issues.apply(calculate_not_applicable, axis=1)
extracted_issues["not run yet"] = extracted_issues.apply(calculate_not_run_yet, axis=1)
extracted_issues["Total applicable TCs"] = (extracted_issues["passed"].fillna(0) + extracted_issues["failed"].fillna(0) 
                                           + extracted_issues["blocked"].fillna(0) + extracted_issues["not run yet"].fillna(0))
extracted_issues["Total TCs"] = (extracted_issues["passed"].fillna(0) + extracted_issues["failed"].fillna(0) 
                                 + extracted_issues["blocked"].fillna(0) + extracted_issues["not run yet"].fillna(0)
                                 + extracted_issues["not_applicable"].fillna(0))
extracted_issues["Testcoverage (passed+failed)/ applicable"] = extracted_issues.apply(lambda r: (nan_fill(r["passed"])
                                                                                                         + nan_fill(r["failed"]))
                                                                                      /  float(r["Total applicable TCs"]) 
                                                                                      if r["Total applicable TCs"] > 0 else 0, axis=1).round(2)

In [39]:

extracted_issues["Results table filled"]  = extracted_issues.apply(lambda r: "-" if r["status"]!="Test results ok"
                                                                  else "x" if r["Total applicable TCs"]>0 else "o", axis=1)
extracted_issues["Logfile available"]  = extracted_issues.apply(lambda r: "-" if r["status"]!="Test results ok"
                                                                  else "x" if "volkswagengroup.sharepoint.com" in r["Link to Raw Data"]
                                                                  else "o", axis=1)

extracted_issues["Jira ticket correct filled"] = extracted_issues.apply(lambda r: "-" if r["status"]!="Test results ok"
                                                                  else "x" if (r["Results table filled"] == 'x'
                                                                               and r["Logfile available"] == 'x'
                                                                               and r["#Problems"] 
                                                                               and r["#Summary"] )
                                                                  else "o", axis=1)
extracted_issues["Codebeamer Excels/FEAs available"] = extracted_issues.apply(lambda r: "-" if r["status"]!="Test results ok"
                                                                  else "x" if "volkswagengroup.sharepoint.com" in r["Link to filled FEA"]
                                                                              and "volkswagengroup.sharepoint.com" in r["Link to filled Codebeamer Excel"]
                                                                  else "o", axis=1)
extracted_issues["KPMs finished"] = extracted_issues.apply(lambda r: "-" if r["status"]!="Test results ok"
                                                                  else "x" if r["Acceptance Criterias name"]
                                                                  or "false" not in  r["Acceptance Criterias checks"]
                                                                  else "o", axis=1)
extracted_issues["Percentage of not aborted / executed TCs"] = extracted_issues.apply(lambda r: (nan_fill(r["passed"]) + nan_fill(r["failed"])
                                                                                      + nan_fill(r["blocked"])) /  float(r["Total TCs"]) if r["Total TCs"] > 0
                                                                  else 0, axis=1).round(2)
extracted_issues["Percentage of passed applicable TCs"] = extracted_issues.apply(
    lambda r: nan_fill(r["passed"]) / (nan_fill(r["passed"]) + nan_fill(r["failed"]) + nan_fill(r["not_applicable"]) + nan_fill(r["not run yet"])) 
    if nan_fill(r["passed"]) + nan_fill(r["failed"]) + nan_fill(r["not_applicable"]) + nan_fill(r["not run yet"]) > 0 else 0, axis=1).round(2)

In [40]:
extracted_issues["Duration current Assignee_nw"] = extracted_issues.apply(lambda r: nb_w_days(r["end date"].date(),
                                                                                              r["Current assignee date"].date()), axis=1)

In [41]:
initial_columns = ['summary', 'key', 'sprint_names', 'SW', 'TestType', 'test iteration', 'ecu', 'Technology', 'test system', 'Tech-Cluster', 'Custom field (Epic Link)',
 'Solution Train', 'Cluster', 'Current teststatus', 'time in current status', 'assignee', 'time with current Assignee', 'Assignee history', 'was blocked?',
 'Erstdurchdringung', 'created [year/week]', 'resolution_date', 'resolved [year/week]', 'Testcoverage (passed+failed)/ applicable',"Total applicable TCs",
 'passed', 'failed', 'blocked', 'not_applicable', 'not run yet',                 'Results table filled',
 'Jira ticket correct filled', 'Logfile available',  'Codebeamer Excels/FEAs available', 'KPMs finished', "DailyCommentCheck", 'Percentage of not aborted / executed TCs',
                   "Percentage of passed applicable TCs",
'Planned', 'Backlog', 'Accepted for Implementation', 'In Progress', 'Waiting', 'In Analysis', 'Implementation Review','Provide Documentation', 'Test Blocked',
 'Total', "Total excluding Blocked/Planned", "Percentage time in Deffect Discussion", '#Problems', '#Summary', '#Links', "KPMs created list", "KPMs created count",
                   "KPMs verification list", "KPMs verification statuses",
                   "KPMs verification status count passed|failed|blocked|not tested"]

renamed_columns = ['Summary', 'TCC', 'Sprint', 'SW', 'Test Type', 'Test Iteration', 'ECU', 'Technology', 'Test system', 'Tech-Cluster', 'Trade',
 'Solution Train', 'Cluster', 'Current teststatus', 'time in current status', 'current Assignee', 'time with current Assignee', 'Assignee history', 'was blocked?',
                   'first coverage? (Erstdurchdringung)','created [year/week]', 'resolved', 'resolved [year/week]', 'Testcoverage (passed+failed)/ applicable',"Total applicable TCs",
                   'passed', 'failed', 'blocked', 'not applicable', 'not run yet',  'Results table filled',
 'Jira ticket correct filled', 'Logfile available',  'Codebeamer Excels/FEAs available', 'KPMs finished', "Daily comments", 'Percentage of not aborted / executed TCs',
                   "Percentage of passed applicable TCs",
"Planned",	'Backlog', 'Test in Preparation',	'Test Running',	'Waiting', 'Test Result in Analysis', 'Defects Discussion',
                   'Test in Documentation', 'Test Blocked', 'Total', "Total excluding Blocked/Planned", "Percentage time in Deffect Discussion",
  '#Problems', '#Summary', '#Links', "KPMs created list", "KPMs created count", "KPMs verification list", "KPMs verification statuses",
                   "KPMs verification status count passed|failed|blocked|not tested"]

In [42]:
IntegrityCriteria = extracted_issues[initial_columns].copy()
IntegrityCriteria.columns = renamed_columns

In [43]:
extracted_issues.to_csv("jira_transformed_all_status_20241028.csv", index=False, header=1, sep=";")

In [44]:
IntegrityCriteria.to_excel("IntegrityCriteria_20241028.xlsx", index=False, header=1)

In [47]:
with pd.ExcelWriter('IC_auto_test_1.xlsx', engine='openpyxl', mode='a', if_sheet_exists='overlay') as writer:
    IntegrityCriteria.to_excel(writer, sheet_name='IntegrityCriteria', index=False, header=1)

In [None]:
#Ew = pd.ExcelWriter('IC_auto_fill_test.xlsx', mode='a', if_sheet_exists='overlay')

In [None]:
list(zip(initial_columns, renamed_columns))

In [48]:
len(extracted_issues)

7468

In [54]:
import os

In [74]:
extracted_issues.to_excel(
    f"IntegrityCriteria_test.xlsx",
    sheet_name="IC_All_Status_All_Sprints",
    index=False,
    header=1,
)

In [82]:
import pandas as pd
writer = pd.ExcelWriter(f"IntegrityCriteria_test.xlsx", engine="xlsxwriter")
extracted_issues.to_excel(
    writer,
    sheet_name="IC_All_Status_All_Sprints",
    index=False,
    header=1,
)
workbook = writer.book
worksheet = writer.sheets["IC_All_Status_All_Sprints"]

# Add a percent number format.
percent_format = workbook.add_format({"num_format": "0%"})
for col in ["Percentage of not aborted / executed TCs", "Percentage of passed applicable TCs",
            "Percentage time in Deffect Discussion", "Testcoverage (passed+failed)/ applicable"]:
    col_posi = extracted_issues.columns.indice(col)
    # Apply the number format to Grade column.
    worksheet.set_column(col_posi, col_posi, None, percent_format)

    # Close the Pandas Excel writer and output the Excel file.
writer.close()

NameError: name 'transformed_issues' is not defined

In [86]:
percent_format = workbook.add_format({"num_format": "0%"})
for col in ["Percentage of not aborted / executed TCs", "Percentage of passed applicable TCs",
            "Percentage time in Deffect Discussion"]:
    col_posi = list(extracted_issues.columns).index(col)
    # Apply the number format to Grade column.
    worksheet.set_column(col_posi, col_posi, None, percent_format)

    # Close the Pandas Excel writer and output the Excel file.
writer.close()

In [76]:
worksheet = writer.sheets["IC_All_Status_All_Sprints"]

KeyError: 'IC_All_Status_All_Sprints'

In [79]:
workbook = writer.book

In [66]:
percent_format = workbook.add_format({"num_format": "0%"})
for col in ["Percentage of not aborted / executed TCs", "Percentage of passed applicable TCs",
            "Percentage time in Deffect Discussion"]:
    col_posi = transformed_issues.columns.indice(col)
    # Apply the number format to Grade column.
    worksheet.set_column(col_posi, col_posi, None, percent_format)

AttributeError: 'Workbook' object has no attribute 'add_format'

In [67]:
import xlsxwriter

In [72]:
writer.close()

In [80]:
writer.sheets

{}