In [2]:
cd ..

/Users/nitastha/Desktop/NitishFiles/Work/Optum/project


In [4]:
import json
import os
from typing import Dict, List, Optional, Any
import google.generativeai as genai
import yaml
from datetime import datetime
import logging

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('test_generation.log'),
        logging.StreamHandler()
    ]
)

class TestCaseGenerator:
    def __init__(self, config_path: str = "config/settings.yaml"):
        self.config = self._load_config(config_path)
        self.llm_client = self._initialize_llm()
        
    def _load_config(self, config_path: str) -> dict:
        """Load configuration from YAML file with error handling."""
        try:
            with open(config_path, "r") as f:
                return yaml.safe_load(f)
        except Exception as e:
            logging.error(f"Failed to load config: {str(e)}")
            raise

    def _initialize_llm(self) -> genai.GenerativeModel:
        """Initialize the LLM client with error handling."""
        try:
            if not self.config.get("gemini_api_key"):
                raise ValueError("Gemini API key not found in config")
            
            genai.configure(api_key=self.config["gemini_api_key"])
            model_name = self.config.get("gemini_model", "gemini-1.5-flash")
            return genai.GenerativeModel(model_name)
        except Exception as e:
            logging.error(f"Failed to initialize LLM: {str(e)}")
            raise

    def _generate_prompt(self, field_name: str, data_type: str, constraints: List[str]) -> str:
        """Generate a more structured and specific prompt for test case generation."""
        return f"""
Generate test cases for the field '{field_name}' with following specifications:
- Data Type: {data_type}
- Constraints: {constraints}

Requirements:
1. Include ONLY the JSON array of test cases in your response
2. Each test case must have these exact fields:
   - "test_case": A clear, unique identifier for the test
   - "description": Detailed explanation of what the test verifies
   - "expected_result": MUST be exactly "Pass" or "Fail"
   - "input": The test input value (can be null, string, number, etc.)

3. Include these types of test cases:
   - Basic valid inputs
   - Basic invalid inputs
   - Null/empty handling
   - Boundary conditions
   - Edge cases
   - Type validation

Return the response in this exact format:
[
    {{
        "test_case": "TC001_Valid_Basic",
        "description": "Basic valid input test",
        "expected_result": "Pass",
        "input": "example"
    }},
    {{
        "test_case": "TC002_Invalid_Null",
        "description": "Test with null input",
        "expected_result": "Fail",
        "input": null
    }}
]

IMPORTANT: Return ONLY the JSON array. No additional text or explanation."""

    def _parse_llm_response(self, response_text: str) -> Optional[List[Dict[str, Any]]]:
        """Parse and validate LLM response with improved error handling."""
        try:
            # Remove any markdown code blocks if present
            cleaned_text = response_text.replace("```json", "").replace("```", "").strip()
            
            # Parse JSON
            test_cases = json.loads(cleaned_text)
            
            # Validate structure
            if not isinstance(test_cases, list):
                raise ValueError("Response is not a JSON array")
            
            # Validate and normalize each test case
            validated_cases = []
            for idx, case in enumerate(test_cases, 1):
                required_fields = {"test_case", "description", "expected_result", "input"}
                if not all(field in case for field in required_fields):
                    logging.warning(f"Test case {idx} missing required fields, skipping")
                    continue
                
                # Normalize expected_result to Pass/Fail
                case["expected_result"] = "Pass" if case["expected_result"].lower() == "pass" else "Fail"
                validated_cases.append(case)
            
            return validated_cases

        except json.JSONDecodeError as e:
            logging.error(f"JSON parsing error: {str(e)}")
            return None
        except Exception as e:
            logging.error(f"Unexpected error parsing response: {str(e)}")
            return None

    def generate_test_cases(self, rules_file: str, output_file: str) -> None:
        """Main method to generate and save test cases."""
        try:
            # Load rules
            with open(rules_file, "r") as f:
                rules = json.load(f)

            all_test_cases = {}
            total_fields = sum(len(details["fields"]) for details in rules.values())
            processed_fields = 0

            # Process each field
            for parent_field, details in rules.items():
                for field_name, field_details in details["fields"].items():
                    full_field_name = f"{parent_field}.{field_name}"
                    logging.info(f"Processing field {processed_fields + 1}/{total_fields}: {full_field_name}")

                    # Generate prompt
                    prompt = self._generate_prompt(
                        field_name,
                        field_details["data_type"],
                        field_details["constraints"]
                    )

                    # Get LLM response with retries
                    max_retries = 3
                    for attempt in range(max_retries):
                        try:
                            response = self.llm_client.generate_content(prompt)
                            test_cases = self._parse_llm_response(response.text)
                            
                            if test_cases:
                                all_test_cases[full_field_name] = test_cases
                                logging.info(f"Successfully generated {len(test_cases)} test cases")
                                break
                            else:
                                logging.warning(f"Attempt {attempt + 1}: Failed to generate valid test cases")
                        except Exception as e:
                            logging.error(f"Attempt {attempt + 1} failed: {str(e)}")
                            if attempt == max_retries - 1:
                                logging.error(f"Failed to generate test cases for {full_field_name} after {max_retries} attempts")
                    
                    processed_fields += 1

            # Save results
            self._save_test_cases(all_test_cases, output_file)
            
            # Generate summary
            self._generate_summary(all_test_cases, output_file)

        except Exception as e:
            logging.error(f"Failed to generate test cases: {str(e)}")
            raise

    def _save_test_cases(self, test_cases: Dict[str, List[Dict[str, Any]]], output_file: str) -> None:
        """Save test cases with backup."""
        try:
            # Create backup of existing file if it exists
            if os.path.exists(output_file):
                backup_file = f"{output_file}.{datetime.now().strftime('%Y%m%d_%H%M%S')}.bak"
                os.rename(output_file, backup_file)
                logging.info(f"Created backup: {backup_file}")

            # Save new test cases
            with open(output_file, "w") as f:
                json.dump(test_cases, f, indent=2)
            logging.info(f"Successfully saved test cases to {output_file}")

        except Exception as e:
            logging.error(f"Failed to save test cases: {str(e)}")
            raise

    def _generate_summary(self, test_cases: Dict[str, List[Dict[str, Any]]], output_file: str) -> None:
        """Generate a summary of the test case generation."""
        total_fields = len(test_cases)
        total_test_cases = sum(len(cases) for cases in test_cases.values())
        
        summary = (
            f"\nTest Case Generation Summary\n"
            f"{'='*30}\n"
            f"Total fields processed: {total_fields}\n"
            f"Total test cases generated: {total_test_cases}\n"
            f"Average test cases per field: {total_test_cases/total_fields:.2f}\n"
            f"Output file: {output_file}\n"
            f"{'='*30}"
        )
        
        logging.info(summary)

def main():
    try:
        generator = TestCaseGenerator()
        generator.generate_test_cases(
            generator.config["constrains_processed_rules_file"],
            generator.config["generated_test_cases_file"]
        )
    except Exception as e:
        logging.error(f"Application failed: {str(e)}")
        raise

if __name__ == "__main__":
    main()

2025-02-17 11:46:55,950 - INFO - Processing field 1/10: Rx Bc Demographics.Rx BC Email
2025-02-17 11:47:03,573 - ERROR - JSON parsing error: Invalid \escape: line 54 column 37 (char 1503)
2025-02-17 11:47:09,708 - INFO - Successfully generated 14 test cases
2025-02-17 11:47:09,709 - INFO - Processing field 2/10: Rx Bc Demographics.Rx BC First Name
2025-02-17 11:47:14,191 - ERROR - JSON parsing error: Expecting ',' delimiter: line 66 column 20 (char 1770)
2025-02-17 11:47:18,208 - INFO - Successfully generated 10 test cases
2025-02-17 11:47:18,208 - INFO - Processing field 3/10: Rx Bc Demographics.Rx BC Last Name
2025-02-17 11:47:23,415 - INFO - Successfully generated 13 test cases
2025-02-17 11:47:23,416 - INFO - Processing field 4/10: Rx BC Email Event.Rx BC Email
2025-02-17 11:47:29,061 - ERROR - JSON parsing error: Invalid \escape: line 54 column 37 (char 1530)
2025-02-17 11:47:34,386 - ERROR - JSON parsing error: Invalid \escape: line 48 column 37 (char 1339)
2025-02-17 11:47:39,95

In [6]:
import json
import os
from typing import Dict, List, Optional, Any, Tuple
import google.generativeai as genai
import yaml
from datetime import datetime
import logging
import re

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('test_generation.log'),
        logging.StreamHandler()
    ]
)

class TestCaseGenerator:
    def __init__(self, config_path: str = "config/settings.yaml"):
        self.config = self._load_config(config_path)
        self.llm_client = self._initialize_llm()
        self.field_specific_rules = self._initialize_field_rules()
        
    def _load_config(self, config_path: str) -> dict:
        """Load configuration from YAML file with error handling."""
        try:
            with open(config_path, "r") as f:
                return yaml.safe_load(f)
        except Exception as e:
            logging.error(f"Failed to load config: {str(e)}")
            raise

    def _initialize_llm(self) -> genai.GenerativeModel:
        """Initialize the LLM client with error handling."""
        try:
            if not self.config.get("gemini_api_key"):
                raise ValueError("Gemini API key not found in config")
            
            genai.configure(api_key=self.config["gemini_api_key"])
            model_name = self.config.get("gemini_model", "gemini-1.5-flash")
            return genai.GenerativeModel(model_name)
        except Exception as e:
            logging.error(f"Failed to initialize LLM: {str(e)}")
            raise

    def _initialize_field_rules(self) -> Dict[str, Dict[str, Any]]:
        """Initialize specific rules for different field types."""
        return {
            "Date": {
                "valid_formats": [
                    "%Y-%m-%d %H:%M:%S",
                    "%Y/%m/%d %H:%M:%S",
                    "%m/%d/%Y %H:%M:%S"
                ],
                "extra_validation": self._validate_date_format
            },
            "String": {
                "extra_validation": self._validate_string_format
            }
        }

    def _validate_date_format(self, test_case: Dict[str, Any]) -> Tuple[bool, str]:
        """Validate date format test cases."""
        if test_case["input"] is None:
            return True, ""
            
        if isinstance(test_case["input"], str):
            for date_format in self.field_specific_rules["Date"]["valid_formats"]:
                try:
                    datetime.strptime(test_case["input"], date_format)
                    return True, ""
                except ValueError:
                    continue
            return False, f"Invalid date format. Expected formats: {self.field_specific_rules['Date']['valid_formats']}"
        return False, "Date input must be a string"

    def _validate_string_format(self, test_case: Dict[str, Any]) -> Tuple[bool, str]:
        """Validate string format test cases."""
        if test_case["input"] is None:
            return True, ""
            
        if not isinstance(test_case["input"], (str, type(None))):
            if test_case["expected_result"] == "Pass":
                return False, "String field with non-string input should fail"
        return True, ""

    def _generate_prompt(self, field_name: str, data_type: str, constraints: List[str], description: str = "") -> str:
        """Generate a more structured and specific prompt for test case generation."""
        field_specific_info = ""
        if data_type == "Date":
            field_specific_info = "\nFor Date fields, use these formats only:\n" + \
                                "\n".join(f"- {fmt}" for fmt in self.field_specific_rules["Date"]["valid_formats"])

        return f"""
Generate test cases for the field '{field_name}' with following specifications:
- Data Type: {data_type}
- Constraints: {constraints}
- Description: {description}{field_specific_info}

Requirements:
1. Include ONLY the JSON array of test cases in your response
2. Each test case must have these exact fields:
   - "test_case": A clear, unique identifier for the test
   - "description": Detailed explanation of what the test verifies
   - "expected_result": MUST be exactly "Pass" or "Fail"
   - "input": The test input value (can be null, string, number, etc.)

3. Include these types of test cases:
   - Basic valid inputs
   - Basic invalid inputs
   - Null/empty handling
   - Boundary conditions
   - Edge cases
   - Type validation

4. Consider field-specific requirements:
   - For Date fields: Include only valid date formats specified
   - For String fields: Consider length limits and character restrictions
   - Handle nullable fields appropriately based on constraints

Return the response in this exact format:
[
    {{
        "test_case": "TC001_Valid_Basic",
        "description": "Basic valid input test",
        "expected_result": "Pass",
        "input": "example"
    }}
]

IMPORTANT: Return ONLY the JSON array. No additional text or explanation."""

    def _validate_test_case(self, test_case: Dict[str, Any], data_type: str) -> Tuple[bool, str]:
        """Validate a single test case based on field type and rules."""
        if not all(field in test_case for field in ["test_case", "description", "expected_result", "input"]):
            return False, "Missing required fields"

        if test_case["expected_result"] not in ["Pass", "Fail"]:
            return False, "Invalid expected_result value"

        # Apply field-specific validation
        if data_type in self.field_specific_rules:
            return self.field_specific_rules[data_type]["extra_validation"](test_case)

        return True, ""

    def _parse_llm_response(self, response_text: str, data_type: str) -> Optional[List[Dict[str, Any]]]:
        """Parse and validate LLM response with improved error handling."""
        try:
            # Remove Markdown JSON blocks if present
            cleaned_text = response_text.replace("```json", "").replace("```", "").strip()

            # Handle invalid escape sequences
            cleaned_text = re.sub(r'\\([^"\\])', r'\\\\\1', cleaned_text)  

            # Parse JSON
            test_cases = json.loads(cleaned_text)

            # Validate structure
            if not isinstance(test_cases, list):
                raise ValueError("Response is not a JSON array")

            # Validate and normalize each test case
            validated_cases = []
            for idx, case in enumerate(test_cases, 1):
                is_valid, error_msg = self._validate_test_case(case, data_type)
                if not is_valid:
                    logging.warning(f"Test case {idx} validation failed: {error_msg}")
                    continue
                
                # Normalize expected_result to Pass/Fail
                case["expected_result"] = "Pass" if case["expected_result"].lower() == "pass" else "Fail"
                validated_cases.append(case)

            return validated_cases

        except json.JSONDecodeError as e:
            logging.error(f"JSON parsing error: {str(e)} - Raw response: {response_text}")
            return None
        except Exception as e:
            logging.error(f"Unexpected error parsing response: {str(e)}")
            return None


    def generate_test_cases(self, rules_file: str, output_file: str) -> None:
        """Main method to generate and save test cases."""
        try:
            # Load rules
            with open(rules_file, "r") as f:
                rules = json.load(f)

            all_test_cases = {}
            total_fields = sum(len(details["fields"]) for details in rules.values())
            processed_fields = 0


            for parent_field, details in rules.items():
                for field_name, field_details in details["fields"].items():
                    full_field_name = f"{parent_field}.{field_name}"
                    logging.info(f"Processing field {processed_fields + 1}/{total_fields}: {full_field_name}")
                    if full_field_name in all_test_cases:
                        logging.warning(f"Skipping {full_field_name}, already processed.")
                        continue
            # # Process each field
            # for parent_field, details in rules.items():
            #     for field_name, field_details in details["fields"].items():
            #         full_field_name = f"{parent_field}.{field_name}"
            #         logging.info(f"Processing field {processed_fields + 1}/{total_fields}: {full_field_name}")

                    # Generate prompt
                    prompt = self._generate_prompt(
                        field_name,
                        field_details["data_type"],
                        field_details["constraints"],
                        field_details.get("description", "")
                    )

                    # Get LLM response with retries
                    max_retries = 3
                    for attempt in range(max_retries):
                        try:
                            response = self.llm_client.generate_content(prompt)
                            test_cases = self._parse_llm_response(response.text, field_details["data_type"])
                            
                            if test_cases:
                                all_test_cases[full_field_name] = test_cases
                                logging.info(f"Successfully generated {len(test_cases)} test cases")
                                break
                            else:
                                logging.warning(f"Attempt {attempt + 1}: Failed to generate valid test cases")
                        except Exception as e:
                            logging.error(f"Attempt {attempt + 1} failed: {str(e)}")
                            if attempt == max_retries - 1:
                                logging.error(f"Failed to generate test cases for {full_field_name} after {max_retries} attempts")
                    
                    processed_fields += 1

            # Save results
            self._save_test_cases(all_test_cases, output_file)
            
            # Generate summary
            self._generate_summary(all_test_cases, output_file)

        except Exception as e:
            logging.error(f"Failed to generate test cases: {str(e)}")
            raise

    def _save_test_cases(self, test_cases: Dict[str, List[Dict[str, Any]]], output_file: str) -> None:
        """Save test cases with backup."""
        try:
            # Create backup of existing file if it exists
            if os.path.exists(output_file):
                backup_file = f"{output_file}.{datetime.now().strftime('%Y%m%d_%H%M%S')}.bak"
                os.rename(output_file, backup_file)
                logging.info(f"Created backup: {backup_file}")

            # Save new test cases
            with open(output_file, "w") as f:
                json.dump(test_cases, f, indent=2)
            logging.info(f"Successfully saved test cases to {output_file}")

        except Exception as e:
            logging.error(f"Failed to save test cases: {str(e)}")
            raise

    def _generate_summary(self, test_cases: Dict[str, List[Dict[str, Any]]], output_file: str) -> None:
        """Generate a summary of the test case generation."""
        total_fields = len(test_cases)
        total_test_cases = sum(len(cases) for cases in test_cases.values())
        
        summary = (
            f"\nTest Case Generation Summary\n"
            f"{'='*30}\n"
            f"Total fields processed: {total_fields}\n"
            f"Total test cases generated: {total_test_cases}\n"
            f"Average test cases per field: {total_test_cases/total_fields:.2f}\n"
            f"Output file: {output_file}\n"
            f"{'='*30}"
        )
        
        logging.info(summary)

def main():
    try:
        generator = TestCaseGenerator()
        generator.generate_test_cases(
            generator.config["constrains_processed_rules_file"],
            generator.config["generated_test_cases_file"]
        )
    except Exception as e:
        logging.error(f"Application failed: {str(e)}")
        raise

if __name__ == "__main__":
    main()

2025-02-17 15:29:42,126 - INFO - Processing field 1/10: Rx Bc Demographics.Rx BC Email
2025-02-17 15:29:50,073 - INFO - Successfully generated 14 test cases
2025-02-17 15:29:50,074 - INFO - Processing field 2/10: Rx Bc Demographics.Rx BC First Name
2025-02-17 15:29:55,202 - ERROR - JSON parsing error: Expecting ',' delimiter: line 54 column 17 (char 1443) - Raw response: ```json
[
  {
    "test_case": "TC001_Valid_Basic",
    "description": "Basic valid input test",
    "expected_result": "Pass",
    "input": "John"
  },
  {
    "test_case": "TC002_Valid_LongName",
    "description": "Valid input with a long name",
    "expected_result": "Pass",
    "input": "JohnathanChristopherSmith"
  },
  {
    "test_case": "TC003_Valid_SpecialChars",
    "description": "Valid input with special characters allowed (if any)",
    "expected_result": "Pass",
    "input": "John O'Malley"
  },
  {
    "test_case": "TC004_Invalid_Null",
    "description": "Null input test",
    "expected_result": "Fail",

In [7]:
import json
import uuid
import logging
import os

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('add_keys.log'),
        logging.StreamHandler()
    ]
)

def add_unique_keys(input_file: str, output_file: str):
    """Read test cases, add unique keys, and save to a new file."""
    try:
        # Load JSON file
        with open(input_file, "r") as f:
            test_cases = json.load(f)
        
        # Process each field and test case
        for field_name, cases in test_cases.items():
            for case in cases:
                case["key"] = str(uuid.uuid4())  # Assign a unique UUID
        
        # Save updated test cases with a backup mechanism
        if os.path.exists(output_file):
            backup_file = f"{output_file}.{uuid.uuid4().hex}.bak"
            os.rename(output_file, backup_file)
            logging.info(f"Backup created: {backup_file}")
        
        with open(output_file, "w") as f:
            json.dump(test_cases, f, indent=2)
        
        logging.info(f"Successfully saved updated test cases to {output_file}")
    except Exception as e:
        logging.error(f"Error processing test cases: {str(e)}")
        raise

def main():
    input_file = "data/anth1.json"
    output_file = "key_test.json"
    add_unique_keys(input_file, output_file)

if __name__ == "__main__":
    main()


2025-02-17 16:36:13,685 - INFO - Successfully saved updated test cases to key_test.json


In [18]:
import pandas as pd
import json
from typing import List, Dict

def flatten_test_cases(data: Dict) -> List[Dict]:
    """
    Flattens nested JSON test cases into a list of dictionaries.
    
    Args:
        data (dict): The nested JSON data
        
    Returns:
        list: A list of flattened test case dictionaries
    """
    flattened_data = []
    
    # Iterate through each main category
    for category, test_cases in data.items():
        # Extract the main category name (e.g., "Rx Bc Demographics.Rx BC Email")
        
        # Iterate through each test case in the category
        for test_case in test_cases:
            # Create a new dictionary with all fields plus the category
            test_case_dict = {
                'category': category,
                'test_case': test_case['test_case'],
                'description': test_case['description'],
                'expected_result': test_case['expected_result'],
                'input': str(test_case['input']),  # Convert input to string to handle various types
                'key': test_case['key']
            }
            flattened_data.append(test_case_dict)
    
    return flattened_data

# Read and process the JSON
def process_test_cases(json_data: str) -> pd.DataFrame:
    """
    Process JSON test cases and convert to a DataFrame.
    
    Args:
        json_data (str): The JSON data as a string
        
    Returns:
        pd.DataFrame: A DataFrame containing all test cases
    """
    # Parse JSON
    data = json.loads(json_data)
    
    # Flatten the data
    flattened_data = flatten_test_cases(data)
    
    # Convert to DataFrame
    df = pd.DataFrame(flattened_data)
    
    # Reorder columns for better readability
    column_order = ['category', 'test_case', 'description', 'expected_result', 'input', 'key']
    df = df[column_order]
    
    return df

# Example usage:
with open('key_test.json', 'r') as file:
    json_data = file.read()

# Create DataFrame
df = process_test_cases(json_data)

# Display basic information about the DataFrame


# Display first few rows
print("\nFirst few rows of the DataFrame:")


# Get some basic statistics
print("\nTest case statistics:")
print(f"Total number of test cases: {len(df)}")
print(f"Number of unique categories: {df['category'].nunique()}")
print(f"Number of pass/fail cases: \n{df['expected_result'].value_counts()}")


First few rows of the DataFrame:

Test case statistics:
Total number of test cases: 105
Number of unique categories: 10
Number of pass/fail cases: 
expected_result
Fail    54
Pass    51
Name: count, dtype: int64


In [19]:
df

Unnamed: 0,category,test_case,description,expected_result,input,key
0,Rx Bc Demographics.Rx BC Email,TC001_Valid_Basic,Basic valid input test,Pass,test@example.com,3686623b-2261-4c79-8c10-6b6f8bc0824c
1,Rx Bc Demographics.Rx BC Email,TC002_Valid_LongEmail,Valid email with a long local part,Pass,verylongusername1234567890@example.com,c1a3a7ff-83f4-46be-b207-2c15d2f188d2
2,Rx Bc Demographics.Rx BC Email,TC003_Valid_MultipleDots,Valid email with multiple dots in local part,Pass,user.name.123@example.com,c2194d6f-2a23-4082-a85b-a89583f10c0c
3,Rx Bc Demographics.Rx BC Email,TC004_Invalid_Null,Null input test,Fail,,b6d56536-cec8-449b-8a7b-1a40a3e5dcf8
4,Rx Bc Demographics.Rx BC Email,TC005_Invalid_Empty,Empty string input test,Fail,,68a1de47-95ef-4ece-b8c2-bac72cdbb124
...,...,...,...,...,...,...
100,Rx BC Email Event.Rx BC User Group,TC006_Invalid_WhitespaceOnly,Whitespace only input test,Fail,,839b6025-dc2f-42da-b44a-02dcde1f10cf
101,Rx BC Email Event.Rx BC User Group,TC007_Invalid_Number,Numeric input test,Fail,123,b96a4cb6-f12d-4974-8f3d-a29472a03c60
102,Rx BC Email Event.Rx BC User Group,TC008_Invalid_Boolean,Boolean input test,Fail,True,ef30bcf4-51f7-4338-9a7c-58c06c8b31af
103,Rx BC Email Event.Rx BC User Group,TC009_Edge_SingleChar,Single character input test,Pass,A,5082683e-f4e8-44ba-b074-b842225d1239


## laude with open ai

In [None]:
import json
import os
from typing import Dict, List, Optional, Any, Tuple
import openai
from azure.identity import DefaultAzureCredential
import yaml
from datetime import datetime
import logging
import re

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('test_generation.log'),
        logging.StreamHandler()
    ]
)

class TestCaseGenerator:
    def __init__(self, config_path: str = "config/settings.yaml"):
        self.config = self._load_config(config_path)
        self.llm_client = self._initialize_llm()
        self.field_specific_rules = self._initialize_field_rules()
        
    def _load_config(self, config_path: str) -> dict:
        """Load configuration from YAML file with error handling."""
        try:
            with open(config_path, "r") as f:
                return yaml.safe_load(f)
        except Exception as e:
            logging.error(f"Failed to load config: {str(e)}")
            raise

    def _initialize_llm(self) -> openai.AzureOpenAI:
        """Initialize the OpenAI client with error handling."""
        try:
            # Get Azure credentials
            default_credential = DefaultAzureCredential()
            access_token = default_credential.get_token("https://cognitiveservices.azure.com/.default")
            
            if not access_token:
                raise ValueError("Failed to obtain Azure access token")
            
            # Initialize OpenAI client with Azure configuration
            return openai.AzureOpenAI(
                api_version=self.config.get("openai_api_version", "2024-06-01"),
                azure_endpoint=self.config.get("azure_openai_endpoint", "https://prod-1.services.unitedaistudio.uhg.com/aoai-shared-openai-prod-1"),
                api_key=access_token.token,
                azure_deployment=self.config.get("deployment_name", "gpt-4o_2024-05-13"),
                default_headers={
                    "projectId": self.config.get("project_id", "0bef8880-4e98-413c-bc0b-41c280fd1b2a")
                }
            )
        except Exception as e:
            logging.error(f"Failed to initialize OpenAI client: {str(e)}")
            raise

    def _initialize_field_rules(self) -> Dict[str, Dict[str, Any]]:
        """Initialize specific rules for different field types."""
        return {
            "Date": {
                "valid_formats": [
                    "%Y-%m-%d %H:%M:%S",
                    "%Y/%m/%d %H:%M:%S",
                    "%m/%d/%Y %H:%M:%S"
                ],
                "extra_validation": self._validate_date_format
            },
            "String": {
                "extra_validation": self._validate_string_format
            }
        }

    def _validate_date_format(self, test_case: Dict[str, Any]) -> Tuple[bool, str]:
        """Validate date format test cases."""
        if test_case["input"] is None:
            return True, ""
            
        if isinstance(test_case["input"], str):
            for date_format in self.field_specific_rules["Date"]["valid_formats"]:
                try:
                    datetime.strptime(test_case["input"], date_format)
                    return True, ""
                except ValueError:
                    continue
            return False, f"Invalid date format. Expected formats: {self.field_specific_rules['Date']['valid_formats']}"
        return False, "Date input must be a string"

    def _validate_string_format(self, test_case: Dict[str, Any]) -> Tuple[bool, str]:
        """Validate string format test cases."""
        if test_case["input"] is None:
            return True, ""
            
        if not isinstance(test_case["input"], (str, type(None))):
            if test_case["expected_result"] == "Pass":
                return False, "String field with non-string input should fail"
        return True, ""

    def _generate_prompt(self, field_name: str, data_type: str, constraints: List[str], description: str = "") -> List[Dict[str, str]]:
        """Generate a more structured and specific prompt for test case generation."""
        field_specific_info = ""
        if data_type == "Date":
            field_specific_info = "\nFor Date fields, use these formats only:\n" + \
                                "\n".join(f"- {fmt}" for fmt in self.field_specific_rules["Date"]["valid_formats"])

        prompt_content = f"""
Generate test cases for the field '{field_name}' with following specifications:
- Data Type: {data_type}
- Constraints: {constraints}
- Description: {description}{field_specific_info}

Requirements:
1. Include ONLY the JSON array of test cases in your response
2. Each test case must have these exact fields:
   - "test_case": A clear, unique identifier for the test
   - "description": Detailed explanation of what the test verifies
   - "expected_result": MUST be exactly "Pass" or "Fail"
   - "input": The test input value (can be null, string, number, etc.)

3. Include these types of test cases:
   - Basic valid inputs
   - Basic invalid inputs
   - Null/empty handling
   - Boundary conditions
   - Edge cases
   - Type validation

4. Consider field-specific requirements:
   - For Date fields: Include only valid date formats specified
   - For String fields: Consider length limits and character restrictions
   - Handle nullable fields appropriately based on constraints

Return the response in this exact format:
[
    {{
        "test_case": "TC001_Valid_Basic",
        "description": "Basic valid input test",
        "expected_result": "Pass",
        "input": "example"
    }}
]

IMPORTANT: Return ONLY the JSON array. No additional text or explanation."""

        return [{"role": "user", "content": prompt_content}]

    def _validate_test_case(self, test_case: Dict[str, Any], data_type: str) -> Tuple[bool, str]:
        """Validate a single test case based on field type and rules."""
        if not all(field in test_case for field in ["test_case", "description", "expected_result", "input"]):
            return False, "Missing required fields"

        if test_case["expected_result"] not in ["Pass", "Fail"]:
            return False, "Invalid expected_result value"

        # Apply field-specific validation
        if data_type in self.field_specific_rules:
            return self.field_specific_rules[data_type]["extra_validation"](test_case)

        return True, ""

    def _parse_llm_response(self, response_text: str, data_type: str) -> Optional[List[Dict[str, Any]]]:
        """Parse and validate LLM response with improved error handling."""
        try:
            # Remove Markdown JSON blocks if present
            cleaned_text = response_text.replace("```json", "").replace("```", "").strip()

            # Handle invalid escape sequences
            cleaned_text = re.sub(r'\\([^"\\])', r'\\\\\1', cleaned_text)  

            # Parse JSON
            test_cases = json.loads(cleaned_text)

            # Validate structure
            if not isinstance(test_cases, list):
                raise ValueError("Response is not a JSON array")

            # Validate and normalize each test case
            validated_cases = []
            for idx, case in enumerate(test_cases, 1):
                is_valid, error_msg = self._validate_test_case(case, data_type)
                if not is_valid:
                    logging.warning(f"Test case {idx} validation failed: {error_msg}")
                    continue
                
                # Normalize expected_result to Pass/Fail
                case["expected_result"] = "Pass" if case["expected_result"].lower() == "pass" else "Fail"
                validated_cases.append(case)

            return validated_cases

        except json.JSONDecodeError as e:
            logging.error(f"JSON parsing error: {str(e)} - Raw response: {response_text}")
            return None
        except Exception as e:
            logging.error(f"Unexpected error parsing response: {str(e)}")
            return None

    def generate_test_cases(self, rules_file: str, output_file: str) -> None:
        """Main method to generate and save test cases."""
        try:
            # Load rules
            with open(rules_file, "r") as f:
                rules = json.load(f)

            all_test_cases = {}
            total_fields = sum(len(details["fields"]) for details in rules.values())
            processed_fields = 0

            for parent_field, details in rules.items():
                for field_name, field_details in details["fields"].items():
                    full_field_name = f"{parent_field}.{field_name}"
                    logging.info(f"Processing field {processed_fields + 1}/{total_fields}: {full_field_name}")
                    
                    if full_field_name in all_test_cases:
                        logging.warning(f"Skipping {full_field_name}, already processed.")
                        continue

                    # Generate prompt
                    messages = self._generate_prompt(
                        field_name,
                        field_details["data_type"],
                        field_details["constraints"],
                        field_details.get("description", "")
                    )

                    # Get OpenAI response with retries
                    max_retries = 3
                    for attempt in range(max_retries):
                        try:
                            response = self.llm_client.chat.completions.create(
                                model=self.config.get("model_name", "gpt-4o"),
                                messages=messages,
                            )
                            
                            test_cases = self._parse_llm_response(
                                response.choices[0].message.content,
                                field_details["data_type"]
                            )
                            
                            if test_cases:
                                all_test_cases[full_field_name] = test_cases
                                logging.info(f"Successfully generated {len(test_cases)} test cases")
                                break
                            else:
                                logging.warning(f"Attempt {attempt + 1}: Failed to generate valid test cases")
                        except Exception as e:
                            logging.error(f"Attempt {attempt + 1} failed: {str(e)}")
                            if attempt == max_retries - 1:
                                logging.error(f"Failed to generate test cases for {full_field_name} after {max_retries} attempts")
                    
                    processed_fields += 1

            # Save results
            self._save_test_cases(all_test_cases, output_file)
            
            # Generate summary
            self._generate_summary(all_test_cases, output_file)

        except Exception as e:
            logging.error(f"Failed to generate test cases: {str(e)}")
            raise

    def _save_test_cases(self, test_cases: Dict[str, List[Dict[str, Any]]], output_file: str) -> None:
        """Save test cases with backup."""
        try:
            # Create backup of existing file if it exists
            if os.path.exists(output_file):
                backup_file = f"{output_file}.{datetime.now().strftime('%Y%m%d_%H%M%S')}.bak"
                os.rename(output_file, backup_file)
                logging.info(f"Created backup: {backup_file}")

            # Save new test cases
            with open(output_file, "w") as f:
                json.dump(test_cases, f, indent=2)
            logging.info(f"Successfully saved test cases to {output_file}")

        except Exception as e:
            logging.error(f"Failed to save test cases: {str(e)}")
            raise

    def _generate_summary(self, test_cases: Dict[str, List[Dict[str, Any]]], output_file: str) -> None:
        """Generate a summary of the test case generation."""
        total_fields = len(test_cases)
        total_test_cases = sum(len(cases) for cases in test_cases.values())
        
        summary = (
            f"\nTest Case Generation Summary\n"
            f"{'='*30}\n"
            f"Total fields processed: {total_fields}\n"
            f"Total test cases generated: {total_test_cases}\n"
            f"Average test cases per field: {total_test_cases/total_fields:.2f}\n"
            f"Output file: {output_file}\n"
            f"{'='*30}"
        )
        
        logging.info(summary)

def main():
    try:
        generator = TestCaseGenerator()
        generator.generate_test_cases(
            generator.config["constrains_processed_rules_file"],
            generator.config["generated_test_cases_file"]
        )
    except Exception as e:
        logging.error(f"Application failed: {str(e)}")
        raise

if __name__ == "__main__":
    main()

## gpt 

In [None]:
import json
import os
from typing import Dict, List, Optional, Any, Tuple
import openai
import yaml
from datetime import datetime
import logging
import re
from azure.identity import DefaultAzureCredential

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('test_generation.log'),
        logging.StreamHandler()
    ]
)

class TestCaseGenerator:
    def __init__(self, config_path: str = "config/settings.yaml"):
        self.config = self._load_config(config_path)
        self.llm_client = self._initialize_llm()
        self.field_specific_rules = self._initialize_field_rules()
        
    def _load_config(self, config_path: str) -> dict:
        """Load configuration from YAML file with error handling."""
        try:
            with open(config_path, "r") as f:
                return yaml.safe_load(f)
        except Exception as e:
            logging.error(f"Failed to load config: {str(e)}")
            raise

    def _initialize_llm(self):
        """Initialize the OpenAI LLM client using Azure authentication."""
        try:
            credential = DefaultAzureCredential()
            access_token = credential.get_token("https://cognitiveservices.azure.com/.default").token
            
            api_key = access_token
            os.environ["api_key"] = api_key
            os.environ["AZURE_OPENAI_API_KEY"] = api_key
            
            return openai.AzureOpenAI(
                api_version=self.config.get("openai_api_version", "2024-06-01"),
                azure_endpoint=self.config.get("azure_openai_endpoint"),
                api_key=api_key,
                azure_deployment=self.config.get("deployment_name", "gpt-4o_2024-05-13"),
                default_headers={"projectId": self.config.get("project_id", "")}
            )
        except Exception as e:
            logging.error(f"Failed to initialize OpenAI LLM: {str(e)}")
            raise

    def _generate_prompt(self, field_name: str, data_type: str, constraints: List[str], description: str = "") -> str:
        """Generate a structured prompt for test case generation."""
        return f"""
Generate test cases for the field '{field_name}' with following specifications:
- Data Type: {data_type}
- Constraints: {constraints}
- Description: {description}

Return ONLY a JSON array with fields: "test_case", "description", "expected_result", "input"."""

    def generate_test_cases(self, rules_file: str, output_file: str) -> None:
        """Main method to generate and save test cases."""
        try:
            with open(rules_file, "r") as f:
                rules = json.load(f)

            all_test_cases = {}
            for parent_field, details in rules.items():
                for field_name, field_details in details["fields"].items():
                    prompt = self._generate_prompt(
                        field_name,
                        field_details["data_type"],
                        field_details["constraints"],
                        field_details.get("description", "")
                    )
                    
                    response = self.llm_client.chat.completions.create(
                        model="gpt-4o",
                        messages=[{"role": "user", "content": prompt}]
                    )
                    
                    test_cases = json.loads(response.choices[0].message.content)
                    all_test_cases[field_name] = test_cases
                    logging.info(f"Generated {len(test_cases)} test cases for {field_name}")

            with open(output_file, "w") as f:
                json.dump(all_test_cases, f, indent=2)
            logging.info(f"Successfully saved test cases to {output_file}")

        except Exception as e:
            logging.error(f"Failed to generate test cases: {str(e)}")
            raise


def main():
    try:
        generator = TestCaseGenerator()
        generator.generate_test_cases(
            generator.config["constrains_processed_rules_file"],
            generator.config["generated_test_cases_file"]
        )
    except Exception as e:
        logging.error(f"Application failed: {str(e)}")
        raise

if __name__ == "__main__":
    main()
