In [None]:
!pip install sqlalchemy pandas oracledb openai google-generativeai

In [29]:
import cx_Oracle
import re
import os
import json
import logging
import difflib
import openai  

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Retrieve OpenAI API key from environment variable
OPENAI_API_KEY = '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'
print("Done")

Done


In [15]:
# Database credentials
database_credentials = {
    "HERITAGE": {
        "host": "10.176.18.91",
        "port": 1522,
        "service_name": "HERITAGE19C",
        "username": "TQ_GIS",
        "password": "TQ_GIS"
    },
    "NEW_GEMINIA": {
        "host": "10.176.18.110",
        "port": 1523,
        "service_name": "NEW_GEMINIA",
        "username": "TQ_GIS",
        "password": "TQ_GIS"
    },
}
print("Done")

Done


In [30]:
# Functions Definitions
def get_package_source(db_params, package_name, object_type='PACKAGE BODY'):
    logging.info(f"Connecting to database {db_params['service_name']} to retrieve {object_type} '{package_name}'.")
    try:
        dsn_tns = cx_Oracle.makedsn(
            db_params['host'],
            db_params['port'],
            service_name=db_params['service_name']
        )
        conn = cx_Oracle.connect(
            user=db_params['username'],
            password=db_params['password'],
            dsn=dsn_tns
        )
        cursor = conn.cursor()
        query = f"""
        SELECT text
        FROM all_source
        WHERE name = '{package_name.upper()}'
        AND type = '{object_type.upper()}'
        ORDER BY line
        """
        cursor.execute(query)
        source_lines = [row[0] for row in cursor.fetchall()]
        source = ''.join(source_lines)
        logging.info(f"Retrieved {len(source)} characters of source code from {db_params['service_name']}.")
    except cx_Oracle.DatabaseError as e:
        logging.error(f"Database connection failed: {e}")
        source = ""
    finally:
        try:
            cursor.close()
            conn.close()
        except:
            pass
    return source

def parse_package_components(source_code):
    logging.info("Parsing package components.")
    components = {
        'procedures': {},
        'functions': {},
        'cursors': {},
        'types': {},
        'variables': {},
    }

    # Patterns to match full procedure and function definitions
    proc_pattern = re.compile(
        r'PROCEDURE\s+([\w$]+)\s*(\([^;]*?\))?\s*(AUTHID\s+CURRENT_USER\s+)?(IS|AS)\s+(.*?)\s+END\s+\1\s*;',
        re.IGNORECASE | re.DOTALL
    )

    func_pattern = re.compile(
        r'FUNCTION\s+([\w$]+)\s*(\([^;]*?\))?\s+RETURN\s+[\w%\.]+(\s*(AUTHID\s+CURRENT_USER\s+)?(IS|AS)\s+(.*?)\s+END\s+\1\s*;)',
        re.IGNORECASE | re.DOTALL
    )

    # Extract procedures
    procedure_matches = list(proc_pattern.finditer(source_code))
    logging.info(f"Found {len(procedure_matches)} procedures.")
    for match in procedure_matches:
        name = match.group(1)
        definition = match.group(0)
        components['procedures'][name] = definition.strip()
        logging.debug(f"Parsed procedure: {name}")

    # Extract functions
    function_matches = list(func_pattern.finditer(source_code))
    logging.info(f"Found {len(function_matches)} functions.")
    for match in function_matches:
        name = match.group(1)
        definition = match.group(0)
        components['functions'][name] = definition.strip()
        logging.debug(f"Parsed function: {name}")

    # Extract the declaration section between IS/AS and BEGIN
    declaration_section_match = re.search(r'(IS|AS)\s+(.*?)\s+BEGIN', source_code, re.IGNORECASE | re.DOTALL)
    if declaration_section_match:
        declaration_section = declaration_section_match.group(2)

        # Patterns for cursors, types, and variables
        cursor_pattern = re.compile(
            r'CURSOR\s+([\w$]+)\s*(IS|AS)\s+(.*?);',
            re.IGNORECASE | re.DOTALL
        )

        type_pattern = re.compile(
            r'TYPE\s+([\w$]+)\s+(IS|AS)\s+(.*?);',
            re.IGNORECASE | re.DOTALL
        )

        variable_pattern = re.compile(
            r'(\w+)\s+(CONSTANT\s+)?[\w%\.]+(\([\d\s,]*\))?(\s+NOT\s+NULL)?\s*(:=\s*.*?|)\s*;',
            re.IGNORECASE | re.DOTALL
        )

        # Extract cursors
        cursor_matches = list(cursor_pattern.finditer(declaration_section))
        logging.info(f"Found {len(cursor_matches)} cursors.")
        for match in cursor_matches:
            name = match.group(1)
            definition = match.group(0)
            components['cursors'][name] = definition.strip()
            logging.debug(f"Parsed cursor: {name}")

        # Extract types
        type_matches = list(type_pattern.finditer(declaration_section))
        logging.info(f"Found {len(type_matches)} types.")
        for match in type_matches:
            name = match.group(1)
            definition = match.group(0)
            components['types'][name] = definition.strip()
            logging.debug(f"Parsed type: {name}")

        # Extract variables
        variable_matches = list(variable_pattern.finditer(declaration_section))
        logging.info(f"Found {len(variable_matches)} variables.")
        for match in variable_matches:
            name = match.group(1)
            definition = match.group(0)
            components['variables'][name] = definition.strip()
            logging.debug(f"Parsed variable: {name}")

    logging.info("Finished parsing package components.")
    return components

def save_components_to_disk(components, package_name, base_directory='packages'):
    logging.info(f"Saving components of '{package_name}' to disk.")
    package_dir = os.path.join(base_directory, package_name)
    os.makedirs(package_dir, exist_ok=True)

    total_components = 0
    for comp_type, comp_dict in components.items():
        type_dir = os.path.join(package_dir, comp_type)
        os.makedirs(type_dir, exist_ok=True)
        for name, definition in comp_dict.items():
            # Clean the name to be file-system friendly
            safe_name = ''.join(c if c.isalnum() or c in ' _-' else '_' for c in name)
            file_name = f"{safe_name}.sql"
            file_path = os.path.join(type_dir, file_name)
            with open(file_path, 'w', encoding='utf-8') as f:
                f.write(definition)
            total_components += 1
    logging.info(f"Saved {total_components} components of '{package_name}' to '{package_dir}'.")

def save_components_as_json(components, package_name, base_directory='packages'):
    logging.info(f"Saving components of '{package_name}' as JSON.")
    package_dir = os.path.join(base_directory, package_name)
    os.makedirs(package_dir, exist_ok=True)

    json_path = os.path.join(package_dir, f"{package_name}_components.json")
    with open(json_path, 'w', encoding='utf-8') as f:
        json.dump(components, f, indent=4)
    logging.info(f"Components saved as JSON to '{json_path}'.")

def compare_components(components1, components2, package_name):
    logging.info("Comparing components with detailed diffs.")
    differences = {}
    diffs_output_dir = os.path.join('diffs', package_name)
    os.makedirs(diffs_output_dir, exist_ok=True)

    for comp_type in components1.keys():
        set1 = set(components1[comp_type].keys())
        set2 = set(components2[comp_type].keys())

        added = set2 - set1
        removed = set1 - set2
        modified = set()

        for common in set1 & set2:
            content1 = components1[comp_type][common].strip().splitlines()
            content2 = components2[comp_type][common].strip().splitlines()
            if content1 != content2:
                modified.add(common)
                # Generate diff
                diff = difflib.unified_diff(
                    content1, content2,
                    fromfile=f'{package_name}_HERITAGE_{comp_type}_{common}.sql',
                    tofile=f'{package_name}_NEW_GEMINIA_{comp_type}_{common}.sql',
                    lineterm=''
                )
                diff_output = '\n'.join(diff)
                # Save diff to file
                diff_file_path = os.path.join(diffs_output_dir, f'{comp_type}_{common}_diff.txt')
                with open(diff_file_path, 'w', encoding='utf-8') as f:
                    f.write(diff_output)
                logging.debug(f"Diff for {comp_type} '{common}' saved to '{diff_file_path}'.")
        differences[comp_type] = {
            'added': list(added),
            'removed': list(removed),
            'modified': list(modified)
        }
    logging.info("Finished comparing components with diffs.")
    return differences
print("Done")

Done


In [37]:
# Function to generate markdown reports using GPT-4 for a specific component
def generate_markdown_report(component_type, component_name, diff_file_path, report_dir='reports'):
    """
    Sends the diff to GPT-4 and generates a markdown report for a specific component.
    """
    logging.info(f"Generating markdown report for {component_type} '{component_name}'.")
    os.makedirs(report_dir, exist_ok=True)
    
    try:
        with open(diff_file_path, 'r', encoding='utf-8') as f:
            diff_content = f.read()

        # Construct the prompt
        prompt = f"""
        You are an expert PL/SQL developer. Below is a unified diff of a {component_type[:-1].capitalize()} named '{component_name}' 
        between two versions of a PL/SQL package. Analyze the changes and generate a detailed markdown report with the following sections:

        - **Summary of Key Changes:**
            - *Reordering of Conditional Logic:* ...
            - *Modification of WHERE Clauses:* ...
            - *Exception Handling Adjustments:* ...
            - *Formatting and Indentation:* ...
        - **Implications of the Changes:** ...
        - **Recommendations for Merging:** ...
        - **Potential Actions Based on Analysis:** ...
        - **Additional Considerations:** ...
        - **Conclusion:** ...

        Below is the unified diff:

        ```diff
        {diff_content}
        ```
        """
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": "You are an expert PL/SQL developer."},
                {"role": "user", "content": prompt}
            ],
            temperature=0.3,
            max_tokens=2000,
            n=1,
            stop=None,
        )
        report = response['choices'][0]['message']['content']

        # Save the report to a markdown file
        report_file_path = os.path.join(report_dir, f"{component_type}_{component_name}_report.md")
        with open(report_file_path, 'w', encoding='utf-8') as f:
            f.write(report)
        logging.info(f"Markdown report generated and saved to '{report_file_path}'.")
        return True  # Indicate success
    except Exception as e:
        logging.error(f"Failed to generate markdown report for {component_type} '{component_name}': {e}")
        return False

# Function to process and generate reports for a specific component
def generate_reports_for_specific_component(components, package_name, specific_name, diff_dir='diffs', report_dir='reports'):
    """
    Generates markdown reports for a specific function or procedure in the package.
    """
    logging.info(f"Generating report for the specific component: '{specific_name}' in package '{package_name}'.")
    for comp_type, comp_dict in components.items():
        if specific_name in comp_dict:
            diff_file_path = os.path.join(diff_dir, package_name, f"{comp_type}_{specific_name}_diff.txt")
            if os.path.exists(diff_file_path):
                success = generate_markdown_report(comp_type, specific_name, diff_file_path, report_dir)
                if success:
                    logging.info(f"Report for '{specific_name}' generated successfully.")
                else:
                    logging.error(f"Failed to generate report for '{specific_name}'.")
            else:
                logging.warning(f"Diff file for '{specific_name}' not found at '{diff_file_path}'.")
            break
    else:
        logging.warning(f"Component '{specific_name}' not found in the parsed components.")

specific_procedure_name = "pop_mand_policy_rsk_limits.sql"
generate_reports_for_specific_component(components, package_name="GIN_STP_PKG", specific_name=specific_procedure_name)


NameError: name 'components' is not defined

In [35]:
def compare_plsql_packages(package_name, log_level='INFO', specific_component=None):
    """
    Compare two PL/SQL packages and generate markdown reports.

    Args:
        package_name (str): Name of the package to compare.
        log_level (str): Logging level (default: 'INFO').
        specific_component (str): Optional name of a specific component to generate a report for.
    """
    global failed_reports  # Declare as global to modify within the function
    failed_reports = []  # List to keep track of failed report generations

    # Set logging level
    numeric_level = getattr(logging, log_level.upper(), None)
    if not isinstance(numeric_level, int):
        logging.warning(f"Invalid log level: {log_level}. Defaulting to INFO.")
        numeric_level = logging.INFO
    logging.getLogger().setLevel(numeric_level)

    logging.info(f"Starting comparison for package '{package_name}'.")

    # Get the package body source code from both databases
    logging.info("Retrieving package sources.")
    source_body_heritage = get_package_source(database_credentials['HERITAGE'], package_name, 'PACKAGE BODY')
    source_body_new_geminia = get_package_source(database_credentials['NEW_GEMINIA'], package_name, 'PACKAGE BODY')

    if not source_body_heritage:
        logging.error(f"Failed to retrieve PACKAGE BODY from HERITAGE for package '{package_name}'.")
        return
    if not source_body_new_geminia:
        logging.error(f"Failed to retrieve PACKAGE BODY from NEW_GEMINIA for package '{package_name}'.")
        return

    # Parse components from package body
    logging.info("Parsing package components from HERITAGE.")
    components_body_heritage = parse_package_components(source_body_heritage)
    logging.info("Parsing package components from NEW_GEMINIA.")
    components_body_new_geminia = parse_package_components(source_body_new_geminia)

    # Save components to disk
    logging.info("Saving components to disk.")
    save_components_to_disk(components_body_heritage, package_name + '_HERITAGE_BODY')
    save_components_to_disk(components_body_new_geminia, package_name + '_NEW_GEMINIA_BODY')

    # Optionally, save as JSON
    logging.info("Saving components as JSON.")
    save_components_as_json(components_body_heritage, package_name + '_HERITAGE_BODY')
    save_components_as_json(components_body_new_geminia, package_name + '_NEW_GEMINIA_BODY')

    # If a specific component is specified, generate a report for it and exit
    if specific_component:
        logging.info(f"Generating report only for the specified component: {specific_component}")
        generate_reports_for_specific_component(components_body_new_geminia, package_name, specific_component)
        return

    # Compare packages with detailed diffs
    differences = compare_components(components_body_heritage, components_body_new_geminia, package_name)

    # Save differences to a JSON file for later use
    differences_file = os.path.join('diffs', package_name, 'differences.json')
    with open(differences_file, 'w', encoding='utf-8') as f:
        json.dump(differences, f, indent=4)
    logging.info(f"Differences saved to '{differences_file}'.")

    # Output differences with summaries and generate markdown reports
    logging.info("Outputting differences with summaries and generating markdown reports.")
    for comp_type, diff in differences.items():
        print(f"\nDifferences in {comp_type}:")
        if diff['added']:
            print(f"  Added in NEW_GEMINIA: {diff['added']}")
        if diff['removed']:
            print(f"  Removed from NEW_GEMINIA: {diff['removed']}")
        if diff['modified']:
            print(f"  Modified: {diff['modified']}")
            for name in diff['modified']:
                diff_file_path = os.path.join('diffs', package_name, f'{comp_type}_{name}_diff.txt')
                print(f"    - Diff for {name} saved at: {diff_file_path}")
                # Generate markdown report using GPT-4
                success = generate_markdown_report(comp_type, name, diff_file_path)
                if not success:
                    logging.error(f"Report generation failed for {comp_type} '{name}'.")

    if failed_reports:
        logging.error("Some report generations failed. You can retry generating reports for these components using the 'retry_failed_reports' function.")
        # Save failed reports info to a JSON file
        failed_reports_file = os.path.join('reports', 'failed_reports.json')
        with open(failed_reports_file, 'w', encoding='utf-8') as f:
            json.dump(failed_reports, f, indent=4)
        logging.info(f"Failed report details saved to '{failed_reports_file}'.")

    logging.info(f"Finished comparison for package '{package_name}'.")

In [32]:
def retry_failed_reports():
    """
    Retries generating markdown reports for components that previously failed.
    """
    global failed_reports
    if not failed_reports:
        logging.info("No failed reports to retry.")
        return

    logging.info("Retrying failed markdown report generations.")
    successful_retries = []
    remaining_failures = []

    for comp_type, name, diff_file_path in failed_reports:
        success = generate_markdown_report(comp_type, name, diff_file_path)
        if success:
            successful_retries.append((comp_type, name))
        else:
            remaining_failures.append((comp_type, name, diff_file_path))

    if successful_retries:
        logging.info(f"Successfully retried and generated reports for: {successful_retries}")

    if remaining_failures:
        logging.error(f"Still failed to generate reports for: {[(ct, n) for ct, n, _ in remaining_failures]}")
        # Update the failed_reports global variable
        failed_reports = remaining_failures
        # Update the failed reports JSON file
        failed_reports_file = os.path.join('reports', 'failed_reports.json')
        with open(failed_reports_file, 'w', encoding='utf-8') as f:
            json.dump(failed_reports, f, indent=4)
    else:
        logging.info("All failed reports have been successfully generated.")
        failed_reports = []  # Clear the failed reports list
        # Remove the failed reports JSON file
        failed_reports_file = os.path.join('reports', 'failed_reports.json')
        if os.path.exists(failed_reports_file):
            os.remove(failed_reports_file)

In [38]:
# Replace 'YOUR_PACKAGE_NAME' with the actual package name and set desired log level
# compare_plsql_packages(package_name='GIN_STP_PKG', log_level='DEBUG')
compare_plsql_packages(package_name="GIN_STP_PKG", specific_component="pop_mand_policy_rsk_limits.sql")


2024-12-05 10:57:07,477 - INFO - Starting comparison for package 'GIN_STP_PKG'.
2024-12-05 10:57:07,483 - INFO - Retrieving package sources.
2024-12-05 10:57:07,500 - INFO - Connecting to database HERITAGE19C to retrieve PACKAGE BODY 'GIN_STP_PKG'.
2024-12-05 10:59:18,992 - INFO - Retrieved 3527752 characters of source code from HERITAGE19C.
2024-12-05 10:59:19,430 - INFO - Connecting to database NEW_GEMINIA to retrieve PACKAGE BODY 'GIN_STP_PKG'.
2024-12-05 11:01:18,733 - INFO - Retrieved 3308783 characters of source code from NEW_GEMINIA.
2024-12-05 11:01:18,976 - INFO - Parsing package components from HERITAGE.
2024-12-05 11:01:18,985 - INFO - Parsing package components.
2024-12-05 11:07:01,602 - INFO - Found 20 procedures.
2024-12-05 11:07:38,325 - INFO - Found 2 functions.
2024-12-05 11:07:38,326 - INFO - Found 0 cursors.
2024-12-05 11:07:38,329 - INFO - Found 0 types.
2024-12-05 11:07:38,330 - INFO - Found 0 variables.
2024-12-05 11:07:38,332 - INFO - Finished parsing package com

In [None]:
# If there were any failures in report generation, you can retry them:
retry_failed_reports()