In [None]:
#Step to funbction mapping

import os
import json
import re
from pathlib import Path
import glob
import javalang
import logging
from typing import Dict, List, Optional
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class FeatureStepMatcher:
    def __init__(self, feature_files_path: str):
        self.feature_files_path = Path(feature_files_path)
        self.feature_steps: List[dict] = []
        self.step_implementations: Dict[str, Dict] = {}
        

    def extract_steps_from_features(self) -> None:
        """Extract all steps from feature files."""
        feature_files = glob.glob(str(self.feature_files_path / "*.feature"))
        
        if not feature_files:
            logger.warning(f"No feature files found in {self.feature_files_path}")
            return
            
        logger.info(f"Found {len(feature_files)} feature files")
        
        for file_path in feature_files:
            try:
                with open(file_path, 'r', encoding='utf-8') as file:
                    logger.info(f"Processing feature file: {Path(file_path).name}")
                    for line in file:
                        line = line.strip()
                        if line.startswith(('Given ', 'When ', 'Then ', 'And ')):
                            step_type = line.split()[0]
                            step = re.sub(f'^{step_type}\\s+', '', line)
                            
                            self.feature_steps.append({
                                'original': step,
                                'normalized': self.normalize_step(step),
                                'type': step_type,
                                'file': str(Path(file_path).name)
                            })

                            
            except Exception as e:
                logger.error(f"Error processing feature file {file_path}: {str(e)}")

    def parse_step_definitions(self, java_file_content: str, file_path: str) -> None:
        """Parse Java file to extract step definitions with their annotations."""
        try:
            tree = javalang.parse.parse(java_file_content)
            
            for path, node in tree.filter(javalang.tree.MethodDeclaration):
                for annotation in node.annotations:
                    annotation_name = annotation.name
                    
                    if annotation_name in ['Given', 'When', 'Then', 'And']:
                        try:
                            raw_pattern = annotation.element.value.strip('"')
                            processed_pattern = re.sub(r'\.+', '.star', raw_pattern)
                            processed_pattern = re.sub(r'\([^)]+\)', 'VARIABLE', processed_pattern)
                            processed_pattern = re.sub(r'\{[^}]+\}', 'VARIABLE', processed_pattern)
                            processed_pattern = re.sub(r'\?', '', processed_pattern)
                            processed_pattern = re.sub(r'\[[^\]]+\]', 'CHAR_CLASS', processed_pattern)
                            processed_pattern = processed_pattern.replace('^', '').replace('$', '')
                            
                            method_lines = java_file_content.split('\n')
                            annotation_line = max(0, node.position.line - 2)
                            
                            
                            end_line = node.position.line
                            brace_count = 0
                            found_start = False
                            
                            for i, line in enumerate(method_lines[node.position.line - 1:], node.position.line):
                                if ('{' in line) and (not found_start):
                                    found_start = True
                                if found_start:
                                    brace_count = brace_count + line.count('{') - line.count('}')
                                    if brace_count == 0:
                                        end_line = i + 1
                                        break
                            
                            method_content = '\n'.join(method_lines[annotation_line:end_line])
                            
                            normalized_pattern = self.normalize_step(processed_pattern)
                            
                            self.step_implementations[normalized_pattern] = {
                                'original_pattern': raw_pattern,
                                'processed_pattern': processed_pattern,
                                'annotation': f'@{annotation_name}("{raw_pattern}")',
                                'method_name': node.name,
                                'method_content': method_content,
                                'file_path': file_path,
                                'type': annotation_name,
                                'parameters': self.extract_parameters(raw_pattern)
                            }
                            
                            
                        except AttributeError as e:
                            logger.warning(f"Error processing annotation in {file_path}: {str(e)}")
                            continue
                            
        except Exception as e:
            logger.error(f"Error parsing Java file {file_path}: {str(e)}")

    def extract_parameters(self, pattern: str) -> List[str]:
        """Extract parameter patterns from the step definition."""
        parameters = []
        
        regex_params = re.finditer(r'\((.*?)\)', pattern)
        for match in regex_params:
            param_pattern = match.group(1)
            if param_pattern != ".*":
                parameters.append(param_pattern)
        
        cucumber_params = re.finditer(r'\{([^}]+)\}', pattern)
        for match in cucumber_params:
            parameters.append(match.group(1))
        
        return parameters

    def normalize_step(self, step: str) -> str:
        """Normalize step by replacing variables and placeholders with generic tokens."""
        step = re.sub(r'"[^"]*"', 'QUOTED_STRING', step)
        step = re.sub(r'\b\d+\b', 'NUMBER', step)
        step = re.sub(r'\.+', 'WILDCARD', step)
        step = re.sub(r'\([^)]+\)', 'VARIABLE', step)
        step = re.sub(r'\{[^}]+\}', 'VARIABLE', step)
        step = re.sub(r'\.[a-zA-Z]+\b', 'FILE_EXT', step)
        step = re.sub(r'\[[^\]]+\]', 'CHAR_CLASS', step)
        step = re.sub(r'[\^\$\*\+\?\[\]\{\}\|\(\)]', '', step)
        return step.lower().strip()

    def find_best_match(self, step: dict) -> Optional[dict]:
        """Find the best matching step definition using cosine similarity."""
        if not self.step_implementations:
            return None
            
        step_text = step['normalized']
        implementation_texts = [(pattern, impl) for pattern, impl in self.step_implementations.items()]
        
        try:
            vectorizer = TfidfVectorizer(ngram_range=(1, 3))
            texts = [step_text] + [pattern for pattern, _ in implementation_texts]
            tfidf_matrix = vectorizer.fit_transform(texts)
            
            similarities = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:]).flatten()
            
            best_match_index = np.argmax(similarities)
            best_match_score = similarities[best_match_index]
            
            if best_match_score > 0.1:
                return {
                    'implementation': implementation_texts[best_match_index][1],
                    'similarity_score': best_match_score
                }
            
        except Exception as e:
            logger.error(f"Error calculating similarity: {str(e)}")
        
        return None

    def match_steps_with_implementations(self) -> Dict:
        """Match feature steps with their implementations using cosine similarity."""
        matches = {}
        
        for step in self.feature_steps:
            match_result = self.find_best_match(step)
            
            matches[step['original']] = {
                'implementation': match_result['implementation'] if match_result else None,
                'similarity_score': match_result['similarity_score'] if match_result else 0,
                'type': step['type'],
                'feature_file': step['file']
            }
            
        return matches

def main(suite_name):
    feature_path = f"/Users/ritusaini/Documents/acp-e2e-testing-ajo-cuc-automation-PSDK/cjm-runtime/src/test/resources/com/adobe/platform/testing/e2e/{suite_name}"
    
    try:
        matcher = FeatureStepMatcher(feature_path)
        matcher.extract_steps_from_features()
        
        java_path = "/Users/ritusaini/Documents/acp-e2e-testing-ajo-cuc-automation-PSDK/cjm-runtime/src/main/java/com/adobe/platform/testing/e2e"
        for root, _, files in os.walk(java_path):
            for file in files:
                
                if file.endswith('.java'):
                    print(file)
                    file_path = os.path.join(root, file)
                    with open(file_path, 'r', encoding='utf-8') as java_file:
                        content = java_file.read()
                        matcher.parse_step_definitions(content, file_path)
        matches = matcher.match_steps_with_implementations()
        output_path = f"step_definitions_{suite_name}.json"
        
        
        print("\nFeature Step Implementations:\n")
        for step, details in matches.items():
            print(f"{'='*80}")
            print(f"Step: {step}")
            print(f"Feature File: {details['feature_file']}")
            print(f"Type: {details['type']}")
            
            if details['implementation']:
                
                impl = details['implementation']
                json_data[step] = {"annotation": impl['annotation'], "code": impl['method_content']}
                print(f"Similarity Score: {details['similarity_score']:.2f}")
                print(f"\nImplementation:")
                print(f"File: {impl['file_path']}")
                print(f"Annotation: {impl['annotation']}")
                print(f"Method: {impl['method_name']}")
                print("\nCode:")
                print(impl['method_content'])
            else:
                print("\nNo matching implementation found")
        
        with open(output_path, 'w', encoding='utf-8') as json_file:
            json.dump(json_data, json_file, indent=4)
        print(f"Step definitions saved to {output_path}")
            
    except Exception as e:
        logger.error(f"Error in main execution: {str(e)}")
        raise

if __name__ == "__main__":
    feature_path = f"/Users/ritusaini/Documents/acp-e2e-testing-ajo-cuc-automation-PSDK/cjm-runtime/src/test/resources/com/adobe/platform/testing/e2e"

    for suite_name in os.listdir(feature_path):
        if os.path.isdir(os.path.join(feature_path, suite_name)):
            json_data = {}
            main(suite_name)

In [None]:
#Scenario to steps mapping

import os
import json
import logging
from pathlib import Path
from typing import Dict

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class FeatureParser:
    def __init__(self, feature_files_path: str):
        self.feature_files_path = Path(feature_files_path)
        self.scenarios: Dict[str, Dict] = {}
        
    def parse_feature_files(self) -> None:
        """Parse all feature files in the specified directory."""
        feature_files = list(self.feature_files_path.glob("*.feature"))
        
        if not feature_files:
            logger.warning(f"No feature files found in {self.feature_files_path}")
            return
            
        logger.info(f"Found {len(feature_files)} feature files")
        
        for file_path in feature_files:
            try:
                self._parse_single_feature(file_path)
            except Exception as e:
                logger.error(f"Error processing feature file {file_path}: {str(e)}")
                
    def _parse_single_feature(self, file_path: Path) -> None:
        """Parse a single feature file and extract scenarios with their steps."""
        current_scenario = None
        current_steps = []
        tags = []
        
        with open(file_path, 'r', encoding='utf-8') as file:
            logger.info(f"Processing feature file: {file_path.name}")
            lines = file.readlines()
            
            for i, line in enumerate(lines):
                line = line.strip()
                
                if line.startswith('@'):
                    tags = line.strip()
                    continue
                
                # Handle Scenario or Scenario Outline
                if line.startswith(('Scenario:', 'Scenario Outline:')):
                    # Save previous scenario if exists
                    if current_scenario and current_steps:
                        self._save_scenario(current_scenario, current_steps)
                    
                    # Start new scenario
                    current_scenario = line.split(':', 1)[1].strip()
                    current_steps = []
                    continue
                
                # Collect steps for current scenario
                if current_scenario and line and not line.startswith('@') and not line.startswith('Feature:'):
                    if line.startswith(('Given ', 'When ', 'Then ', 'And ', 'But ')):
                        current_steps.append(line)
                    elif '|' in line:  # Handle data tables
                        current_steps.append(line)
            
            # Save the last scenario
            if current_scenario and current_steps:
                self._save_scenario(current_scenario, current_steps)
    
    def _save_scenario(self, scenario_name: str, steps: list) -> None:
        """Format and save a scenario with its steps."""
        # Format steps as a code block with proper indentation
        steps_code = '\n'.join(steps)
        
        self.scenarios[scenario_name] = {
            "code": steps_code
        }
    
    def save_to_json(self, output_path: str) -> None:
        """Save the parsed scenarios and steps to a JSON file."""
        try:
            with open(output_path, 'w', encoding='utf-8') as json_file:
                json.dump(self.scenarios, json_file, indent=4, ensure_ascii=False)
            logger.info(f"Scenarios saved to {output_path}")
        except Exception as e:
            logger.error(f"Error saving JSON file: {str(e)}")

def main():
    # Update this path to match your feature files location
    feature_path = "/Users/ritusaini/Documents/acp-e2e-testing-ajo-cuc-automation-PSDK/cjm-runtime/src/test/resources/com/adobe/platform/testing/e2e"
    
    for suite_name in os.listdir(feature_path):
        suite_path = os.path.join(feature_path, suite_name)
        if os.path.isdir(suite_path):
            try:
                parser = FeatureParser(suite_path)
                parser.parse_feature_files()
                output_path = f"scenarios_{suite_name}.json"
                parser.save_to_json(output_path)
            except Exception as e:
                logger.error(f"Error processing suite {suite_name}: {str(e)}")

if __name__ == "__main__":
    main()

In [None]:
# JSONL Data preparation

import json
import os
import random

input_folder_path = '/Users/ritusaini/Documents/OpenAI'
training_file_path = 'training_data.jsonl'
validation_file_path = 'validation_data.jsonl'

validation_split = 0.2
all_entries = []

for file_name in os.listdir(input_folder_path):
    if file_name.endswith('.json'):
        input_file_path = os.path.join(input_folder_path, file_name)
        
        with open(input_file_path, 'r', encoding='utf-8') as f:
            try:
                data = json.load(f)
                
                for step, details in data.items():
                    system_message = "You are an AI assistant that provides accurate and concise responses."
                    user_message = step.strip()
                    assistant_message = details.get("code", "").strip()
                    
                    jsonl_entry = {
                        "messages": [
                            {"role": "system", "content": system_message},
                            {"role": "user", "content": user_message},
                            {"role": "assistant", "content": assistant_message}
                        ]
                    }
                    all_entries.append(jsonl_entry)
            except json.JSONDecodeError as e:
                print(f"Error decoding JSON in file {file_name}: {e}")

random.shuffle(all_entries)
split_index = int(len(all_entries) * (1 - validation_split))
training_data = all_entries[:split_index]
validation_data = all_entries[split_index:]

with open(training_file_path, 'w', encoding='utf-8') as train_file:
    for entry in training_data:
        train_file.write(json.dumps(entry) + '\n')

with open(validation_file_path, 'w', encoding='utf-8') as val_file:
    for entry in validation_data:
        val_file.write(json.dumps(entry) + '\n')

In [13]:
#Steps Text File

import json
import os

input_folder_path = '/Users/ritusaini/Documents/OpenAI'
output_file_path = 'all_steps.txt'

all_steps = []

for file_name in os.listdir(input_folder_path):
    if file_name.startswith('step_definitions') & file_name.endswith('.json'):
        input_file_path = os.path.join(input_folder_path, file_name)
        
        with open(input_file_path, 'r', encoding='utf-8') as f:
            try:
                data = json.load(f)
                
                all_steps.extend(data.keys())
            except json.JSONDecodeError as e:
                print(f"Error decoding JSON in file {file_name}: {e}")

all_steps = list(set(all_steps))
with open(output_file_path, 'w', encoding='utf-8') as outfile:
    for step in all_steps:
        outfile.write(step + '\n')

In [18]:
#Scenario Text File

import json
import os
import logging
from pathlib import Path

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def format_scenarios_to_text(input_folder_path: str, output_file_path: str) -> None:
    """
    Read all scenario JSON files and format them into a text file.
    Each scenario name will be followed by its steps on consecutive lines.
    """
    input_folder = Path(input_folder_path)
    
    scenarios_data = {}
    
    for file_path in input_folder.glob('scenarios_*.json'):
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
                for scenario_name, details in data.items():
                    steps = details['code'].split('\n')
                    scenarios_data[scenario_name] = steps
                    
        except json.JSONDecodeError as e:
            logger.error(f"Error decoding JSON in file {file_path.name}: {e}")
        except Exception as e:
            logger.error(f"Error processing file {file_path.name}: {e}")
    
    try:
        with open(output_file_path, 'w', encoding='utf-8') as outfile:
            for scenario_name, steps in scenarios_data.items():
                outfile.write(f"## Scenario: {scenario_name}\n")
                
                for step in steps:
                    if step.strip():
                        outfile.write(f"    {step.strip()}\n")
                
                outfile.write("\n")
                        
    except Exception as e:
        logger.error(f"Error writing to output file: {e}")

def main():
    input_folder_path = '/Users/ritusaini/Documents/OpenAI'
    output_file_path = 'all_scenarios.txt'
    
    format_scenarios_to_text(input_folder_path, output_file_path)

if __name__ == "__main__":
    main()