In [1]:
from pathlib import Path
current_folder = globals()['_dh'][0] if '_dh' in globals() else Path(__file__).parent
current_folder

PosixPath('/Users/gp/Dropbox/projects/qa_scenarios/test/features')

## Preprocess feature files

In [2]:
from assert_feature_files_integrity import act_on_every_file, assert_single_feature_per_file, assert_all_comments_in_separate_line, \
    assert_only_valid_priority_tags_are_used, priority_tags
priority_tags

['prio_trivial', 'prio_minor', 'prio_major', 'prio_critical']

In [3]:
act_on_every_file(current_folder, assert_all_comments_in_separate_line)

In [4]:
act_on_every_file(current_folder, assert_single_feature_per_file)

In [5]:
act_on_every_file(current_folder, assert_only_valid_priority_tags_are_used)

# MAIN

In [6]:
import re
from logging import Logger
from os.path import isfile
from pathlib import Path
from subprocess import Popen
from os import environ
import pytest
from logging import Logger
from time import sleep
from typing import Final
import pytest
import glob
import pprint
pp = pprint.PrettyPrinter(depth=2)
import tempfile
from os.path import getsize
from os import remove

In [7]:
from gherkin.parser import Parser
from gherkin.pickles.compiler import Compiler

In [8]:
from uuid_generator import get_uuid_case, uuid_pattern

In [9]:
def get_comments(gd: dict):
    """make comments a dict by line"""
    
    comments = {}
    for comment in gd['comments']:
        cur_line = comment['location']['line']
        comments[cur_line] = comment

    assert len(comments) == len(gd['comments']), 'Here we have the assumption that every comment belongs in a single line and that a line cannot have multiple comments'
    
    return comments

In [10]:
def feature_child_to_scenario(feature_child):
    assert len(feature_child.keys()) == 1 and 'scenario' in feature_child.keys(), \
    'we have no other occurence yet saying that the scenario object will have any other metadata'
    scenario = feature_child['scenario'] if 'scenario' in feature_child else feature_child
    assert scenario['steps'] == [], 'current implementation works only for scenarios which lack any steps'
    assert scenario['examples'] == [], 'current implementation works only for scenarios and not for scenario outlines that also have examples'
    return scenario

In [11]:
def get_min_scenario_line(scenario: dict):
    """This finds the minimum line of the scenario taking into account the scenario tags but neglecting the comments, which are added by another function"""
    
    tags_lines = [tag['location']['line']
                  for tag in scenario['tags']]
    
    scenario_line = scenario['location']['line']
    
    if len(tags_lines) > 0:
        min_tags_line = min(tags_lines)
        assert min_tags_line < scenario_line, 'because we are under the assumption that a beautifully created feature file will always have any tags above the scenario'
        return min(min_tags_line, scenario_line)
    else:
        return scenario_line

In [12]:
def get_next_tags_column(tag_line: int, scenario: dict):
    tags_of_line = [tag for tag in scenario['tags'] if tag['location']['line'] == tag_line]

    cols_tags_line = [tag['location']['column'] for tag in tags_of_line]
    
    max_col = scenario['location']['column'] if len(cols_tags_line) == 0 else max(cols_tags_line)

    last_tags = [tag for tag in tags_of_line if tag['location']['column'] == max_col]
    if len(last_tags) == 0:
        return max_col
    elif len(last_tags) == 1:
        last_tag = last_tags[0]
        return max_col + len(last_tag['name']) + 1
    else:
        raise Exception('Only one tag is expected to match the current max column unless we have no tags at all')

In [13]:
def add_comment_in_scenario(scen: dict, comms, line_no, key='comments'):
    scen.setdefault(key, [])
    scen[key].append(comms[line_no])
    del comms[line_no]

In [14]:
def add_above_scenario_comments_and_find_min_line(scenario: dict, comments: dict):
    """Bring comments above scenario, inside the scenario_line"""
    
    min_line = get_min_scenario_line(scenario)
    
    comm_lines = []
    cur_line = min_line - 1
    while cur_line in comments:
        comm_lines[:0] = [cur_line]
        cur_line -= 1
        
    scenario['min_line'] = cur_line + 1
    
    for line_no in comm_lines:
        add_comment_in_scenario(scen=scenario, comms=comments, line_no=line_no, key='comments_above')
            
    return scenario

In [15]:
def set_feature_comments(feature: dict, comments: dict, scenarios: list):
    first_scenario_min_line = scenarios[0]['min_line']
    keys = [line_no for line_no in comments.keys() if line_no < first_scenario_min_line]
    feature['comments'] = [comments[key] for key in keys]
    for key in keys:
        del comments[key]

In [16]:
def add_comments_in_scenarios(scenarios: list, comments: dict):
    """Add the rest of the comments in scenarios
    based on the min line of each scenario defined"""
    
    next_scenarios = scenarios[1:] + [None]
    assert len(scenarios) == len(next_scenarios)

    for scenario, next_scenario in zip(scenarios, next_scenarios):
        line_start = scenario['min_line']
        if next_scenario is None:
            if len(comments) > 0 :
                line_stop = max(comments.keys())
            else:
                break #nothing else to do here
        else:
            line_stop = next_scenario['min_line'] - 1
         
        line_nos = [line_no for line_no in comments.keys() if line_start <= line_no <= line_stop]

        for cur_line_no in line_nos:
            add_comment_in_scenario(scen=scenario, comms=comments, line_no=cur_line_no, key='comments')

        # print(line_start, line_stop, line_nos)

    assert len(comments) == 0, 'all comments are expected to be assigned by now'

In [17]:
def is_smoke_scenario(scenario: dict):
    return any(['smoke' in tag['name'].lower() for tag in scenario['tags']])

In [18]:
def separate_smoke_scenarios(scenarios: list):
    """Separate Scenarios who have smoke tag vs those who do not"""
    
    smokes = []
    non_smokes = []
    for scenario in scenarios:
        if is_smoke_scenario(scenario):
            smokes.append(scenario)
        else:
            non_smokes.append(scenario)

    assert len(smokes) + len(non_smokes) == len(scenarios)
 
    return smokes, non_smokes

In [19]:
def get_prio_value_of_scenario(scenario: dict):
    """Those scenarios which have no priority tag will be put at the bottom, even below trivial priority"""
    
    tag_names = [tag['name'].lstrip('@') for tag in scenario['tags'] if tag['name'].startswith('@')]
    assert len(scenario['tags']) == len(tag_names), 'all tags are expected to start with the @ symbol'
    prio_tags = [tag_name for tag_name in tag_names if tag_name in priority_tags]
    assert len(prio_tags) <= 1, 'only one priority tag shall be found in each scenario or none'
    return priority_tags.index(prio_tags[0])+1 if len(prio_tags) == 1 else 0

In [20]:
def get_sorted_scenarios(smokes: list, non_smokes: list):
    sorted_smokes = sorted(smokes, key=get_prio_value_of_scenario, reverse=True)
    sorted_non_smokes = sorted(non_smokes, key=get_prio_value_of_scenario, reverse=True)
    return sorted_smokes + sorted_non_smokes

In [21]:
class Lines(object):
    """A class to construct the output lines"""
    
    def __init__(self):
        self.lines = {}

    def append_line(self, line_no: int, text: str, column: int = 1):
        """This means appending data to a particular line and not appending extra lines"""
        self.lines.setdefault(line_no, '')
        first_time_space_is_respected = ((' ' * (column-1)) + text)
        any_other_time_no_space = text.strip()
        final_text = first_time_space_is_respected if self.lines[line_no] == '' else f' {any_other_time_no_space}'
        self.lines[line_no] += final_text
        
    def get_max_line(self):
        return max(self.lines.keys())
    
    def render_output(self):
        output_lines = []
        lines = self.lines
        for line in range(1, max(lines.keys()) +1):
            if line in lines:
                output_lines.append(lines[line])
            else:
                output_lines.append('')
        return output_lines
    
    def render_feature_tags(self, feature: dict):
        assert feature['language'] == 'en', 'currently this script does not support any other language than English'
        for tag in feature['tags']:
            loc = tag['location']
            self.append_line(line_no = loc['line'], text = tag['name'], column = loc['column'])

    def render_feature_body(self, feature: dict):
        feature_lines = f"{feature['keyword']}: {feature['name']}\n{feature['description']}".split('\n')

        loc = feature['location']

        assert loc['column'] == 1, f'check why in the file {feature_file} the Feature is not at the beginning of the line as it is expected'

        line_no = loc['line']

        for ii, cur_line_no in enumerate(range(line_no, line_no+len(feature_lines))):
            self.append_line(line_no=cur_line_no, text=feature_lines[ii])

    def render_feature_comments(self, feature: dict):
        comments = feature['comments']

        start_line = self.get_max_line() + 2

        comment_lines = range(start_line, len(comments)+start_line)

        assert len(comment_lines) == len(comments), 'we have one comment per line therefore their length should match'

        for comm_line, comment in zip(comment_lines, comments):
            self.append_line(line_no=comm_line, text=comment['text'], column=comment['location']['column'])

    def render_feature(self, feature: dict):
        self.render_feature_tags(feature=feature)
        self.render_feature_body(feature=feature)
        self.render_feature_comments(feature=feature)

    def render_line_gap(self):
        self.append_line(line_no=self.get_max_line()+1, text='')

    def render_scenario_comments(self, scenario: dict, key = 'comments_above'):
        valid_keys = ['comments_above', 'comments']
        if key not in valid_keys:
            raise Exception(f'comments key can only be: {valid_keys}')

        if key not in scenario:
            return

        comments = scenario[key]

        start_line = self.get_max_line() + 1

        comment_lines = range(start_line, len(comments)+start_line)

        assert len(comment_lines) == len(comments), 'we have one comment per line therefore their length should match'

        # scen_col = scenario['location']['column'] #comments have the spaces inside the content

        for comm_line, comment in zip(comment_lines, comments):
            assert comment['location']['column'] == 1, \
                'for some weird reason the column of the comments are parsed as always 1 even if this is not true'
            self.append_line(line_no=comm_line, text=comment['text'])

    def render_scenario_tags(self, scenario: dict):
        tags = scenario['tags']
        
        if len(tags) == 0: # neglect if nothing to render
            return
        
        start_line = self.get_max_line() + 1
        tag_lines = [tag['location']['line'] for tag in tags]
        min_tags_line = min(tag_lines)

        lines = [tag_line - min_tags_line + start_line for tag_line in tag_lines]

        for line, tag in zip(lines, tags):
            self.append_line(line_no = line, text = tag['name'], column = tag['location']['column'])

    def render_scenario_body(self, scenario: dict):
        scenario_lines = f"{scenario['keyword']}: {scenario['name']}\n{scenario['description']}".split('\n')

        col = scenario['location']['column']

        line_no = self.get_max_line() + 1

        for ii, cur_line_no in enumerate(range(line_no, line_no + len(scenario_lines))):
            cur_col = col if ii == 0 else 1
            #respect the column only for the first scenario line and leave scenario's description with whichever tabs it has
            self.append_line(line_no=cur_line_no, text=scenario_lines[ii], column=cur_col)

    def render_scenario(self, scenario: dict):
        self.render_scenario_comments(scenario=scenario, key='comments_above')
        self.render_scenario_tags(scenario=scenario)
        self.render_scenario_body(scenario=scenario)
        self.render_scenario_comments(scenario=scenario, key='comments')

In [22]:
def render_feature_file(feature, scenarios):
    lines = Lines()
    lines.render_feature(feature)
    lines.render_line_gap()

    for scenario in scenarios:
        lines.render_scenario(scenario=scenario)
        lines.render_line_gap()

    return lines.render_output()

In [23]:
def get_data_size(lines: list):
    return sum([len(line) for line in lines])

In [24]:
def get_file_lines_size(filepath):
    with open(filepath) as fp:
        lines = fp.read().splitlines()
    return get_data_size(lines)

In [25]:
def safe_append_uuid_tag(scenario: dict):
    """it changes the scenario object.
    returns None if uuid already existed or the generated uuid otherwise"""
    
    has_scenario_uuid = any([uuid_pattern.match(tag['name']) is not None for tag in scenario['tags']])
    if has_scenario_uuid:
        return None
    
    tag_line = get_min_scenario_line(scenario)
    tag_col = get_next_tags_column(tag_line, scenario)
    tag_ids = [int(tag['id']) for tag in scenario['tags']]
    tag_id = 0 if len(tag_ids) == 0 else max(tag_ids) + 1
    
    uuid = get_uuid_case()
    
    tag = {'id': str(tag_id), #only to keep consistency
            'location': {'line': tag_line, 'column': tag_col},
            'name': uuid}
    
    scenario['tags'].append(tag)
    
    return uuid

In [26]:
def reformat_feature_file(feature_file = './sample_feature.feature', add_uuid = False):
    """Reformatting involves basically the following business logic:
    1) Bring at the top all of the smoke scenarios and at the bottom the rest of them
    2) Sort all of the scenarios based on priority but maintain the smoke vs non-smoke groups
    NO 3) Add uuid to all of the scenarios which are lacking it
    returns a list of lines
    """
    
    assert isfile(feature_file)
    gd = Parser().parse(feature_file)
    
    comments = get_comments(gd)
    
    feature = gd['feature']
    
    scenarios = [feature_child_to_scenario(feat_child)
                 for feat_child in feature['children']]
    
    for scenario in scenarios:
        add_above_scenario_comments_and_find_min_line(scenario, comments)
        
    set_feature_comments(feature, comments, scenarios)
    
    add_comments_in_scenarios(scenarios, comments)
    
    smokes, non_smokes = separate_smoke_scenarios(scenarios)
    
    sorted_scenarios = get_sorted_scenarios(smokes, non_smokes)
    assert len(scenarios) == len(sorted_scenarios)
    
    if add_uuid:
        for scenario in sorted_scenarios:
            cur_uuid_optional = safe_append_uuid_tag(scenario)
            #cur_uuid_optional can be None if uuid already exists
            assert cur_uuid_optional is None or uuid_pattern.match(cur_uuid_optional) is not None, \
                f'if safe_append_uuid_tag method is implemented appropriately then this assertion should not have failed with cur_uuid {cur_uuid}'
    
    return render_feature_file(feature, sorted_scenarios)

In [27]:
def write_in_temp_file(lines: list):
    with tempfile.NamedTemporaryFile(delete=False) as tmp:
        # print(tmp.name)
        for cur_line in lines:
            tmp.write((cur_line + '\n').encode())

    return tmp.name

In [28]:
def write_in_output_file(filepath, lines: list):

    with open(filepath, 'w') as fp:
        for cur_line in lines:
            fp.write(cur_line + '\n')
            
    return filepath

In [29]:
def assert_similar_in_out_size(feature_file, output_lines: list, bytediff = 0, debug = False, add_uuid = False):
    output_size = get_data_size(output_lines)
    original_size = get_file_lines_size(feature_file)
    if debug:
        print(original_size, output_size)

    percentage_change = abs(original_size - output_size) / original_size
    
    if add_uuid:
        assert percentage_change < 0.15, f'only 15% or smaller alterations are expected and for feature file {feature_file} the percentage change is {percentage_change*100}%'
    else:
        assert percentage_change < 0.01, f'only very small alterations are expected and for feature file {feature_file} the percentage change is {percentage_change*100}%'
        assert abs(original_size - output_size) <= bytediff, \
            f'for feature file {feature_file} being strict and demanding size with max diff {bytediff} byte did not work, original_size: {original_size}, output_size: {output_size}'

In [30]:
def generate_output_file(simulate=True, ext = 'feature', debug=False, add_uuid=False, postfix = 'out'):    
    def _gen_out_file(feature_file):
        # generate output lines
        out_lines = reformat_feature_file(feature_file, add_uuid=add_uuid)

        # assert that the size difference between input and output is very small
        assert_similar_in_out_size(feature_file, out_lines, debug=debug, add_uuid=add_uuid)

        # generate output file and use it as input, the new output should be identical
        outpath = write_in_temp_file(out_lines)
        again_out_lines = reformat_feature_file(outpath)
        assert out_lines == again_out_lines, 'self assert that redoing the sorting yields exactly a file of the same size, since this failed the code needs refactoring'
        remove(outpath)

        # generate actual output file besides the original one and compare manually
        if postfix is None:
            output_path = feature_file
        else:
            output_path = feature_file[:-len(ext)] + f'{postfix}.{ext}'
            
        if not simulate:
            write_in_output_file(output_path, out_lines)

        return output_path
    return _gen_out_file

In [31]:
# feature_file = './sample_feature.feature'
sample_feature_file = '/Users/gp/Dropbox/projects/qa_scenarios/test/features/parking/licence_plate_unique_ticket.feature'
# fine_file = '/Users/gp/Dropbox/projects/qa_scenarios/test/features/parking/enforcement/fines/fine_calculations.feature'

In [32]:
# gd = Parser().parse(sample_feature_file)
# gd

In [33]:
generate_output_file(simulate=True, debug=True, add_uuid=False, postfix='out')(sample_feature_file)

2922 2922


'/Users/gp/Dropbox/projects/qa_scenarios/test/features/parking/licence_plate_unique_ticket.out.feature'

In [34]:
act_on_every_file(current_folder, generate_output_file(simulate=True, debug=False, add_uuid=False), debug=False)
#if no assertions fail then simulation was successful, YES leave add_uuid=False for this step

In [35]:
act_on_every_file(current_folder, generate_output_file(simulate=False, debug=True, add_uuid=True, postfix=None), debug=True)

/Users/gp/Dropbox/projects/qa_scenarios/test/features/sample_feature.feature
1374 1575
/Users/gp/Dropbox/projects/qa_scenarios/test/features/mobile_app/login.feature
6699 7099
/Users/gp/Dropbox/projects/qa_scenarios/test/features/mobile_app/email_communication.feature
934 1009
/Users/gp/Dropbox/projects/qa_scenarios/test/features/mobile_app/registration.feature
794 897
/Users/gp/Dropbox/projects/qa_scenarios/test/features/mobile_app/privacy_n_support.feature
1557 1607
/Users/gp/Dropbox/projects/qa_scenarios/test/features/mobile_app/generic_logic.feature
2996 3371
/Users/gp/Dropbox/projects/qa_scenarios/test/features/mobile_app/main_menu.feature
1132 1257
/Users/gp/Dropbox/projects/qa_scenarios/test/features/mobile_app/profile/email_verification_swift_parking.feature
3212 3587
/Users/gp/Dropbox/projects/qa_scenarios/test/features/mobile_app/profile/support_and_privacy_redesign.feature
1004 1054
/Users/gp/Dropbox/projects/qa_scenarios/test/features/web/no_plus_allowed_in_emails.feature
2