In [1]:
from officelib.xllib import *
from officelib.xllib import screen_lock as xl_screen_lock
from officelib.wordlib import *
from officelib.wordlib import screen_lock as wd_screen_lock
from officelib.const import xlconst as xlc, wdconst as wdc
from pywintypes import com_error
from scripts.tools.issuetracker import IssuetrackerAPI
from os.path import join as pjoin
import os
import re

In [2]:
def frs_nums(s):
    pos = s.find("FRS")
    num = s[pos+3:]
    return num.split(".")

def num_tuple(s):
    return tuple(int(i) for i in frs_nums(s))
    
def sort_key(t):
    return num_tuple(t[0])

def sort_frs_item(s):
    return num_tuple(s[0])

In [3]:
def paste_data(ws, data):
    cells = ws.Cells
    cr = cells.Range
    
    di = data[0].index("Tested") + 1
    
    header_start = cr("A1")
    
    frs_start = cr("A2")
    frs_end = header_start.Offset(len(data), 1)

    id_start = frs_start.Offset(1, 3)
    id_end = frs_end.Offset(1, 3)

    tested_start = id_start.Offset(1,2)
    tested_end = id_end.Offset(1,2)

    paste_start = header_start
    paste_end = frs_end.Offset(1, len(data[0]))
    
    holdup_end = tested_end.Offset(1, data[0].index("Is Holdup?")+1)
    
    with xl_screen_lock(ws.Application):
        paste_range = cr(paste_start, paste_end)
        print("Pasting test data")
        paste_range.Clear()
        paste_range.Value = data

        print("Applying alignment formatting")
        cr(frs_start, id_end).VerticalAlignment = xlc.xlTop
        cr(id_start, id_end).HorizontalAlignment = xlc.xlRight
        cr(tested_start, holdup_end).HorizontalAlignment = xlc.xlRight
        cr(tested_start, holdup_end).VerticalAlignment = xlc.xlTop

        print("Marking untested cells")
        col_os = paste_end.Column - paste_start.Column + 1
        data_iter = iter(data); next(data_iter)
        for offset, (item, level, t_id, tested, *__) in enumerate(data_iter, 1):
            row = cr(frs_start.Offset(offset, 1), frs_start.Offset(offset, col_os))
            if not tested:
                format_row(row)
            elif tested[0] == "=":
                if row[di].Value not in ("Y", "n/a"):
                    format_row(row)
            
            row[1].IndentLevel = level

        #print("Applying autofilter on untested items")
        #cr(paste_start.Offset(0, 1), paste_end).AutoFilter(Field=col_os, Criteria1="=")

        print("Applying column autofit")
        # fit after filter to account for width of filter icon
        for c in paste_range.Columns:
            c.EntireColumn.AutoFit()
            
        print("Applying row autofit")
        for r in paste_range.Columns:
            r.EntireRow.AutoFit()

def format_row(row):
    rint = row.Interior
    rint.Pattern = xlc.xlSolid
    rint.PatternColorIndex = xlc.xlAutomatic
    rint.ThemeColor = xlc.xlThemeColorAccent2
    rint.TintAndShade = 0.399975585192419
    rint.PatternTintAndShade = 0
    
def get_matrix_sheet(xl):
    matrix = None
    wb = xl.Workbooks.Add()
    return wb.Worksheets(1)

In [4]:
_frs_match = re.compile(r"(.*FRS\d+)\.?([\d\.]*)").match
_toplevel_match = re.compile(r"^\>?[\+\*]{2}(FRS\d+)[\+\*]{2}.*$").match
_subitem_match = re.compile(r"^\>?[\+\*]+\s\*(FRS[\d\.]+)\:?\*\:?.*$").match
_canceled_match = re.compile(r"^[\+\*]+\s\-\*(FRS[\d\.]+)\:?\*\:?.*\-").match
def _key_match(k):
    m = _frs_match(k)
    if m:
        return m.groups()
    return k, ""

In [5]:
# Misc flags for FRS item status
FRS_NA       = 1<<0  # 1

In [6]:
def dump(node, level=0):
    for k, v in sorted(node.children.items()):
        print("."*level + str(v.id)+ " "+str(v._tests))
        dump(v, level+1)
        

class Node():

    def __init__(self, id, parent, flags=0):
        self.id = id
        self.flags = flags
        self.parent = parent
        self.children = {}
        self._tests = []
        self.text = ""
        
    def is_na(self):
        return self.flags & FRS_NA
    
    def set_flags(self, flags):
        self.flags = flags
        
    def add_test(self, id_test):
        self._tests.append(id_test)
        for c in self.children.values():
            c.add_test(id_test)
        
    def get_tests(self):
        return sorted(self._tests, key=str)
        
    def get(self, id):
        return self.children.get(id, None)
        
    def add_child(self, id, flags=0):
        child = self.mk_child(id, flags)
        self.children[id] = child
        return child
    
    def mk_child(self, id, flags=0):
        return self.__class__("%s.%s"%(self.id, id), self, flags)
    
    def iter(self):
        # use .items() to sort by item order
        for _, v in sorted(self.children.items()):
            yield v
            yield from v.iter()
            
    def total_len(self):
        n = len(self.children)
        for v in self.children.values():
            n += v.total_len()
        return n
            
    def __repr__(self):
        return "%s(%r)" % (self.__class__.__name__, self.id)
    
    def is_tested(self):
        if self._tests:
            return True
        elif not self.children:
            return False
        else:
            for c in self.children.values():
                if not c.is_tested():
                    return False
            return True
        
    def is_leaf(self):
        return not self.children
    
    def has_children(self):
        return not not self.children
        
class Root(Node):
    def __init__(self, key_func):
        super().__init__("", 0, None)
        self._key_func = key_func
        
    def mk_child(self, id, flags):
        return Node(id, self, flags)
        
    def add(self, key, flags):
        root_key, nums = self._key_func(key)
        child = self.get(root_key)
        if not child:
            if nums:
                child = self.add_child(root_key)
            else:
                child = self.add_child(root_key, flags)
        if not nums:
            return child
        path = [int(i) for i in nums.split(".")]
        node = child
        
        for id in path:
            child = node.get(id)
            if child is None:
                child = node.add_child(id)
            node = child
        node.set_flags(flags)
        return node
    
    def lookup(self, key):
        root_key, nums = self._key_func(key)
        node = self.get(root_key)
        if not node or not nums:
            return node
        path = [int(i) for i in nums.split(".")]
        for id in path:
            node = node.get(id)
            if not node: break
        return node

In [7]:
KNOWN_WEBFRS_MAX = 121

class BadFRSNumber(Exception):
    pass

def load_user_test_map(ws):
    cells = ws.Cells
    cr = cells.Range
    id_start = cells.Find("ID_TEST").Offset(2, 1)
    id_end = id_start.End(xlc.xlDown)
    frs_start = cells.Find("List Web FRS").Offset(2, 1)
    frs_end = frs_start.Offset(id_end.Row - id_start.Row + 1, 1)
    test_data = cr(id_start, frs_end).Value
    test_map = []
    fixed_frs = []
    
    # since we iterate over test data here, take the opportunity to
    # convert "\n" -> "\r\n".
    for row in test_data:
        id_test, frs = row[0], row[-1]
        iid_test = int(id_test)
        if id_test == iid_test:
            id_test = iid_test
        frs = frs or ""
        frs = [f.strip() for f in frs.splitlines()]
        fixed_frs.append(("\r\n".join(frs),))
        for f in filter(None, frs):
            test_map.append((f, id_test))
    webfrs_r = cr(frs_start, frs_end)
    webfrs_r.Value = fixed_frs
        
    test_map.sort(key=sort_key)
    return test_map

def build_frs_tree(all_items):
    root = Root(_key_match)
    for frs, flags, text in all_items:
        try:
            node = root.add(frs, flags)
        except:
            print(frs, flags, text)
            raise
        node.text = text
    return root

def _bad_frs_num(frs):
    if frs[:3] == "FDS":  # Old comments left in LV code by Cyth
        return
    #raise BadFRSNumber(frs)

def root_add_test_map(root, test_map):
    """
    :param test_map: list[(frs, id_test)]
    """
    for frs, id_test in test_map:
        node = root.lookup(frs)
        if not node:
            _bad_frs_num(frs)
        else:
            node.add_test(id_test)

def _extract_frs_line(line):
    matches = (
        (_toplevel_match, 0),
        (_subitem_match, 0),
        (_canceled_match, FRS_NA),
    )
    for match, flags in matches:
        m = match(line)
        if m:
            return m.group(1), flags, line
    return None, 0, ""
    
def load_frs_from_issuetracker():
    relevant = download_relevant_issues()
    all_frs = {(None, 0, "")}
    for v in relevant:
        lines = v.description.splitlines()
        for line in lines:
            frs = _extract_frs_line(line)
            all_frs.add(frs)
    for i in range(1, KNOWN_WEBFRS_MAX + 1):
        all_frs.add(("3.0WebFRS%03d" % i, 0, ""))
    all_frs.remove((None, 0, ""))
    return all_frs

def download_relevant_issues():
    api = IssuetrackerAPI('issue.pbsbiotech.com', 'nstarkweather', 'kookychemist')
    issues = api.download_issues("pbssoftware")
    relevant = []
    for v in issues.values():
        if v.sprint_milestone == "3.0" and \
            v.tracker == "Specification" and \
            v.status not in ("Closed", "Rejected"):
                relevant.append(v)
    return relevant

def _xl_child_yes(row, col, n):
    form = '=IF(COUNTIF(%s,"Y")+COUNTIF(%s, "n/a")=(ROW(%s)-ROW(%s)+1), "Y", "")'
    first = row + 1, col
    last = row + n, col
    rstr = cell_range(first, last)
    r1 = cell_str(*first)
    r2 = cell_str(*last)
    return form % (rstr, rstr, r2, r1)

def make_paste_data(root):
    data = [("FRS Number", "Level", "id_test", "Tested", "Is Leaf?", "Is Holdup?")]
    di = data[0].index("Tested") + 1
    for i, node in enumerate(root.iter(), 2):
        f = node.id
        tests = TEST_ITEM_SEP.join(str(i) for i in node.get_tests())
        if node.is_leaf():
            if node.is_tested():
                tested = "Y"
                holdup = ""
            elif node.is_na():
                tested = "n/a"
                holdup = ""
            else:
                tested = ""
                holdup = "Y"
            leaf = "Y"
        else:
            if node.is_na():
                tested = "n/a"
            else:
                tested = _xl_child_yes(i, di, node.total_len())
            leaf = ""
            holdup = ""
            
        if not tests and node.has_children() and node.is_tested():
            tests = "'--"
        
        if f[:3] == "3.0":
            count = 0
        else:
            count = f.count(".")
        data.append((f, count, tests, tested, leaf, holdup))
    return data
        


In [8]:
TEST_ITEM_SEP = "\n"

In [9]:
def _find_cols(ws, *cols):
    first = ws.Cells(1,1)
    last = first.End(xlc.xlToRight)
    headers = ws.Cells.Range(first,last).Value[0]
    res = [headers.index(c) for c in cols]
    return res

def _unit_data(ws):
    cr = ws.Cells.Range
    first = cr("A2")
    last = first.End(xlc.xlDown).End(xlc.xlToRight)
    return cr(first, last).Value
        
def load_unit_test_map(ws):
    data = _unit_data(ws)
    vi, frs, tested = _find_cols(ws, "VI", "FRS", "Unit tested?")
    utmap = []
    for row in data:
        v = row[vi]
        f = row[frs]
        t = row[tested].lower() == "yes"
        if v: v = "<UnitTest> " + v
        utmap.append((f, v))
    return utmap

In [10]:
def process_user_tests(xl, root, user_test_matrix):
    user_wb = xl.Workbooks.Open(user_test_matrix)
    user_ws = user_wb.Worksheets(1)
    user_test_map = load_user_test_map(user_ws)
    root_add_test_map(root, user_test_map)
    user_wb.Close(False)
    
def process_unit_tests(xl, root, unit_test_matrix):
    if unit_test_matrix is None:
        print("No unit tests provided - skipping")
        return
    unit_wb = xl.Workbooks.Open(unit_test_matrix)
    unit_ws = unit_wb.Worksheets(1)
    unit_test_map = load_unit_test_map(unit_ws)
    root_add_test_map(root, unit_test_map)
    unit_wb.Close(False)
    
def _is_wd(f):
    return f.endswith(".docx") and f[:2] != "~$"
    
def _get_files(path):
    return [pjoin(path, f) for f in os.listdir(path) if _is_wd(f)]
        

def process_one_code_review(doc, root):
    doc_id = None
    frs = []
    for p in doc.Paragraphs:
        split = p.Range.Text.split(":", 1)
        tag = split[0].strip()
        if tag == "Document ID":
            doc_id = split[1].strip()
        elif tag.startswith("FRS"):
            frs.append(tag)
    if not doc_id:
        print("Did not find document ID for %r" % doc.Name)
        return
    items = [(f, doc_id) for f in frs]
    root_add_test_map(root, items)
    
def process_code_reviews(word, root, path):
    if path is None:
        print("No code reviews provided - skipping")
    docs = word.Documents
    files = _get_files(path)
    for f in files:
        doc = docs.Open(f)
        process_one_code_review(doc, root)
    doc.Close(False)

def main(user_test_matrix, unit_test_matrix=None, code_review_path=None):
    
    print("Downloading FRS items from issuetracker...")
    all_frs_items = load_frs_from_issuetracker()
    reqs = build_frs_tree(all_frs_items)
    
    xl = Excel()
    with xl_screen_lock(xl):
        print("Loading User Test Matrix...")
        process_user_tests(xl, reqs, user_test_matrix)

        print("Loading Unit Test Matrix...")
        process_unit_tests(xl, reqs, unit_test_matrix)

    word = Word()
    with wd_screen_lock(word):
        print("Loading Code Review Matrix...")
        process_code_reviews(word, reqs, code_review_path)
    word.Quit()

    print("Compiling data for final matrix")
    data = make_paste_data(reqs)
    
    ws = get_matrix_sheet(xl)
    paste_data(ws, data)
    print("Done")

In [11]:
# caching scheme for test purposes. 
try:
    _cache
except NameError:
    _cache = None

try:
    _lffi
except NameError:
    _lffi = load_frs_from_issuetracker
    
def load_frs_from_issuetracker():
    global _cache
    if _cache is None:
        all_frs_items = _lffi()
        _cache = all_frs_items
    else:
        all_frs_items = _cache
    return all_frs_items


In [12]:
trace_path = "C:\\Users\\Nathan\\Documents\\PBS\\SW test\\3.0 User Test\\Traceability"
user_tests = "Hello tests 161122 1.xlsx"
unit_tests = "Traceability Matrix (Reqs and VI) 161123.csv"
code_review_path = '\\\\PBSSTATION\\PBSCloudShare\\(2) R&D-Product Engineering\\Software Development\\3.0 Project\\Phase 2 Working Copy\\IS00038B5 Rev A Code Reviews\\'
p1 = os.path.join(trace_path, user_tests)
p2 = os.path.join(trace_path, unit_tests)
#main(p1, p2, code_review_path)

In [13]:
def main2(user_test_matrix, unit_test_matrix=None, code_review_path=None):
    
    print("Downloading FRS items from issuetracker...")
    all_frs_items = load_frs_from_issuetracker()
    reqs = build_frs_tree(all_frs_items)
    
    xl = Excel()
    with xl_screen_lock(xl):
        print("Loading User Test Matrix...")
        process_user_tests(xl, reqs, user_test_matrix)

        print("Loading Unit Test Matrix...")
        process_unit_tests(xl, reqs, unit_test_matrix)

    word = Word()
    with wd_screen_lock(word):
        print("Loading Code Review Matrix...")
        process_code_reviews(word, reqs, code_review_path)
    word.Quit()
    print("Done")
    return reqs
reqs = main2(p1, p2, code_review_path)

Downloading FRS items from issuetracker...
Downloading projects...
Downloading issues: 658/658      
Loading User Test Matrix...
Loading Unit Test Matrix...
Loading Code Review Matrix...
Done


In [14]:
test_map = {}

In [15]:
import pickle, os, datetime, difflib
from scripts.tools import issuetracker
import inspect


def clear_cache():
    global _tcache
    try:
        os.remove(_cache_file)
    except (FileNotFoundError, NameError):
        pass
    _tcache = None
    _cache = None
    
def setup():
    global IssuetrackerAPI, tests, passed, errors, _cache_file, _tcache
    tests = 0
    passed = 0
    errors = []
    _cache_file = "issues_cache.pkl"
    _tcache = None
    class MockAPI(issuetracker.IssuetrackerAPI):
        
        def __init__(self, *args, **kw):
            super().__init__(*args, login=False, **kw)

        def download_issues(self, *args, _Force=False, **kw):
   
            date, issues, reason = self._load_cache(_cache_file)
            
            if not reason:
                if _Force:
                    reason = "Forced re-cache"
                elif issues is None:
                    reason = "No existing cache found."
                elif date < datetime.date.today():
                    reason = "Old cache: %s < %s" % (date, datetime.date.today())
            
            if reason:
                print("Redownloading issues:", reason)
                issues = super().download_issues(*args, **kw)
                self._cache_issues(_cache_file, issues)
                _tcache = datetime.date.today(), issues
            return issues
        
        def _load_cache(self, file):
                global _tcache
                if _tcache is None:
                    if os.path.exists(file):
                        with open(file, 'rb') as f:
                            try:
                                date, issues = pickle.load(f)
                            except Exception as e:
                                reason = "Error: %s" % str(e)
                            else:
                                _tcache = date, issues
                                reason = ""
                    else:
                        date, issues, reason = None, None, "No Cache Found"
                else:
                    date, issues = _tcache
                    reason = ""
                return date, issues, reason   

        def _cache_issues(self, file, issues):
            date = datetime.date.today()
            with open(file, 'wb') as f:
                ob = (date, issues)
                pickle.dump(ob, f)
    IssuetrackerAPI = MockAPI

            
def fail(msg):
    global tests, errors
    tests += 1
    errors.append(msg)

def success():
    global tests, passed
    tests += 1
    passed += 1
    
def assert_equal(a, b, func=None):
    if a != b:
        if func:
            msg = func(a, b)
        else:
            msg = "%s(): %r != %r" % (inspect.stack()[1].function, a, b)
        fail(msg)
    else:
        success()
        
def get_root():
    all_frs = load_frs_from_issuetracker()
    root = build_frs_tree(all_frs)
    return root

def str_diff(a, b):
    return "\n".join(difflib.Differ().compare([a],[b]))

In [16]:
###########
# Cleanup 
###########
    
def teardown():
    global IssuetrackerAPI
    IssuetrackerAPI = issuetracker.IssuetrackerAPI
    
def finish():
    teardown()
    print("%d / %d tests passed" % (passed, tests))
    if errors:
        print("Errors found")
        for e in errors:
            print(e)
    else:
        print("Success")

In [17]:
###########
# Tests 
###########
setup()

def test_root_all_frs():
    all_frs = load_frs_from_issuetracker()
    root = build_frs_tree(all_frs)
    seen = {(v.id, v.flags, v.text) for v in root.iter()}
    if seen != all_frs:
        d1 = {a: (b,c) for a,b,c in seen-all_frs}
        d2 = {a: (b,c) for a,b,c in all_frs-seen}
        l1 = sorted(d1.items())
        l2 = sorted(d2.items())
        for a,b in zip(l1, l2):
            print(a, b)
        fail("test_root_all_frs")
    else:
        success()
test_root_all_frs()

def test_root_lookup():
    all_frs = load_frs_from_issuetracker()
    root = build_frs_tree(all_frs)
    ids = {(v.id, v.flags) for v in root.iter()}
    all_frs_2 = {(a,b) for a,b,_ in all_frs}
    assert_equal(ids, all_frs_2)
    s1 = []
    s2 = []
    for id, flags in ids:
        s1.append((root.lookup(id).id, flags))
        s2.append((id, flags))
    assert_equal(s1, s2)
test_root_lookup()

def test_frs_strings():
    xl = Excel()
    wb = xl.Workbooks.open(test_file_path)
    ws = wb.Worksheets("Sheet1")
    test_map = load_test_map(ws)
    all_frs = load_frs_from_issuetracker()
    frs_strings = {f for f, _ in test_map}
    diff = frs_strings - all_frs
    bad_tests = {}
    for d in diff:
        tests = test_map2.get(d)
        for t in tests:
            bad = bad_tests.get(t, None)
            if bad is None:
                bad_tests[t] = bad = []
            bad.append(d)
    #     print(test_map2.get(d), d)
    for t, frs in sorted(bad_tests.items()):
        print(t, frs)
    if not bad_tests:
        print("No Bad Tests Found")
    else:
        assert False, "Bad Tests found"
        
def test_cleanup():
    canceled_match = re.compile(r"^[ \*\+]*\-.*\-").match
    relevant = download_relevant_issues()
    for v in relevant:
        lines = v.description.splitlines()
        for line in lines:
            if canceled_match(line):
                for match in (_toplevel_match, _subitem_match):
                    m = match(line)
                    if m:
                        print(line, m.groups())
                        break
                else:
                    print(line)
#test_cleanup()

def test_regex_match(s, exp, match, should_match, msg):
    m = match(s)
    if should_match:
        if not m:
            fail(msg)
            return
        assert_equal(m.group(1), exp)
    else:
        assert_equal(m, None)

def test_canceled(s, exp, match=True):
    test_regex_match(s, exp, _canceled_match, match, 
                     "%r did not match canceled regex" % s)


exp = "FRS123.4"
test_canceled("* -*FRS123.4*: bob-", exp)
test_canceled("* -*FRS123.4*-: bob", exp)
test_canceled("* -*FRS123.4*:- bob", exp)
#test_canceled("* -*FRS123.4-*: bob")
test_canceled("* *FRS123.4*: bob", exp, False)

def test_child_yes(r, c, n, exp):
    
    def on_err(a,b):
        err = "test_child_yes"
        err += "(%s, %s, %s):\n"%(r,c,n) + str_diff(a,b)
        return err
    res = _xl_child_yes(r,c,n)
    assert_equal(exp.lower(), res.lower(), on_err)
    
test_child_yes(758, 4, 4, 
               '=if(countif(D759:D762,"Y")+countif(D759:D762, "n/a")=(ROW(D762)'
               '-ROW(D759)+1), "Y", "")')


def test_extract_frs_line(line, exp, exp_flag):
    
    def on_err(a,b):
        return "extract_frs_line(%r):\n%s" % (line, str_diff(str(a),str(b)))
    
    frs = _extract_frs_line(line)
    assert_equal(frs, (exp, exp_flag, line), on_err)
    
def test_key_match(s, exp):
    res = _key_match(s)
    assert_equal(res, exp)
    
test_key_match("FRS123.4.5.6", ("FRS123", "4.5.6"))
test_key_match("FRS123", ("FRS123", ""))
test_key_match("3.0WebFRS123.4.5", ("3.0WebFRS123", "4.5"))
test_key_match("3.0WebFRS123", ("3.0WebFRS123", ""))
test_key_match("FDS_SFW_pHControlManualMode", ("FDS_SFW_pHControlManualMode", ""))
    
    
test_extract_frs_line("*+FRS1234+*", "FRS1234", 0)
test_extract_frs_line("* *FRS1234.1:*", "FRS1234.1", 0)
test_extract_frs_line("* -*FRS123.4*: bob-", "FRS123.4", FRS_NA)
test_canceled("* -*FRS123.4*-: bob", "FRS123.4", FRS_NA)
test_canceled("* -*FRS123.4*:- bob", "FRS123.4", FRS_NA)
    
finish()

set()
18 / 18 tests passed
Success
