In [2]:
from sqlalchemy import create_engine, text
import oracledb

# Define the DSN and connection string
dsn = oracledb.makedsn("172.16.20.69", 1521, sid="demodb")
connection_string = f"oracle+oracledb://map:mapper@{dsn}"


def get_db_connection():
    try:
        engine = create_engine(connection_string)
        return engine.connect()
    except Exception as e:
        print(f"Database connection error: {str(e)}")
        raise

In [2]:
# Improved implementation with connection established outside the function
import oracledb

# Establish connection outside the function (do this once)
def create_oracle_connection():
    try:
        connection = oracledb.connect(
            user="map", 
            password="mapper", 
            dsn="172.16.20.69:1521/demodb"
        )
        # print("Oracle connection established successfully")
        return connection
    except Exception as e:
        print(f"Error establishing Oracle connection: {str(e)}")
        raise


In [4]:
# Function to create/update mapping with parameters
def create_update_mapping(connection, p_mapref, p_mapdesc, p_trgschm, p_trgtbtyp, 
                         p_trgtbnm, p_frqcd, p_srcsystm, p_lgvrfyflg, p_lgvrfydt, p_stflg,p_blkprcrows):
    """
    Creates or updates a mapping using MAP.PKGDWMAPR.CREATE_UPDATE_MAPPING
    
    Args:
        connection: Oracle connection object
        p_mapref: Mapping reference
        p_mapdesc: Mapping description
        p_trgschm: Target schema
        p_trgtbtyp: Target table type
        p_trgtbnm: Target table name
        p_frqcd: Frequency code
        p_srcsystm: Source system
        p_lgvrfyflg: Logic verification flag
        p_lgvrfydt: Logic verification date
        p_stflg: Status flag
        
    Returns:
        Mapping ID
    """
    cursor = None
    try:
        cursor = connection.cursor()
        
        # Define the output parameter
        v_mapid = cursor.var(oracledb.NUMBER)
        
        # SQL to execute with named parameters
        sql = """
        BEGIN
            :result := MAP.PKGDWMAPR.CREATE_UPDATE_MAPPING(
                p_mapref => :p_mapref,
                p_mapdesc => :p_mapdesc,
                p_trgschm => :p_trgschm,
                p_trgtbtyp => :p_trgtbtyp,
                p_trgtbnm => :p_trgtbnm,
                p_frqcd => :p_frqcd,
                p_srcsystm => :p_srcsystm,
                p_lgvrfyflg => :p_lgvrfyflg,
                p_lgvrfydt => :p_lgvrfydt,
                p_stflg => :p_stflg,
                p_blkprcrows=>:p_blkprcrows
            );
        END;
        """
        
        # Execute with named parameters
        cursor.execute(
            sql,
            result=v_mapid,
            p_mapref=p_mapref,
            p_mapdesc=p_mapdesc,
            p_trgschm=p_trgschm,
            p_trgtbtyp=p_trgtbtyp,
            p_trgtbnm=p_trgtbnm,
            p_frqcd=p_frqcd,
            p_srcsystm=p_srcsystm,
            p_lgvrfyflg=p_lgvrfyflg,
            p_lgvrfydt=p_lgvrfydt,
            p_stflg=p_stflg,
            p_blkprcrows=p_blkprcrows
        )
        
        connection.commit()
        # Get the result
        mapid = v_mapid.getvalue()

        return mapid
        
    except Exception as e:
        print(f"Error creating/updating mapping: {str(e)}")
        raise
        
    finally:
        if cursor:
            cursor.close()


In [5]:
try:
    # Create connection once
    oracle_connection = create_oracle_connection()
    
    # Example 1: Validate logic
    
    # Example 2: Create/update mapping
    import datetime
    mapping_id = create_update_mapping(
        connection=oracle_connection,
        p_mapref='MAP001',
        p_mapdesc='Customer Dimension Mapping',
        p_trgschm='DW_SCHEMA_1',
        p_trgtbtyp='DIM',
        p_trgtbnm='DIM_CUSTOMER',
        p_frqcd='DL',
        p_srcsystm='CRM',
        p_lgvrfyflg='Y',
        p_lgvrfydt=datetime.datetime.now(),
        p_stflg='A',
        p_blkprcrows=30
    )
    print(f"Created/Updated mapping ID: {mapping_id}")
    
except Exception as e:
    print(f"Error in execution: {str(e)}")
    
finally:
    # Close connection when done with all operations
    if 'oracle_connection' in locals() and oracle_connection:
        oracle_connection.close()
        print("Oracle connection closed")

Created/Updated mapping ID: 128
Oracle connection closed


In [8]:
# Function to create/update mapping detail with parameters
def create_update_mapping_detail(connection, p_mapref, p_trgclnm, p_trgcldtyp, p_trgpkflg, 
                               p_trgpkseq, p_trgcldesc, p_trgnflg, p_maplogic, p_keyclnm, 
                               p_valclnm, p_mapcmbcd, p_excseq, p_scdtyp, p_lgvrfyflg, p_lgvrfydt):
    """
    Creates or updates a mapping detail using MAP.PKGDWMAPR.CREATE_UPDATE_MAPPING_DETAIL
    
    Args:
        connection: Oracle connection object
        p_mapref: Mapping reference (must exist)
        p_trgclnm: Target column name
        p_trgcldtyp: Target column data type
        p_trgpkflg: Is primary key (Y/N)
        p_trgpkseq: Primary key sequence
        p_trgcldesc: Column description
        p_trgnflg: Not nullable flag (Y/N)
        p_maplogic: Mapping logic (SQL)
        p_keyclnm: Key column name
        p_valclnm: Value column name
        p_mapcmbcd: Mapping combination code
        p_excseq: Execution sequence
        p_scdtyp: SCD Type
        p_lgvrfyflg: Logic verification flag (Y/N)
        p_lgvrfydt: Logic verification date
        
    Returns:
        Mapping detail ID
    """
    cursor = None
    try:
        cursor = connection.cursor()
        
        # Define the output parameter
        v_mapdtlid = cursor.var(oracledb.NUMBER)
        
        # SQL to execute with named parameters
        sql = """
        BEGIN
            :result := MAP.PKGDWMAPR.CREATE_UPDATE_MAPPING_DETAIL(
                p_mapref => :p_mapref,
                p_trgclnm => :p_trgclnm,
                p_trgcldtyp => :p_trgcldtyp,
                p_trgpkflg => :p_trgpkflg,
                p_trgpkseq => :p_trgpkseq,
                p_trgcldesc => :p_trgcldesc,
                p_trgnflg => :p_trgnflg,
                p_maplogic => :p_maplogic,
                p_keyclnm => :p_keyclnm,
                p_valclnm => :p_valclnm,
                p_mapcmbcd => :p_mapcmbcd,
                p_excseq => :p_excseq,
                p_scdtyp => :p_scdtyp,
                p_lgvrfyflg => :p_lgvrfyflg,
                p_lgvrfydt => :p_lgvrfydt
            );
        END;
        """
        
        # Execute with named parameters
        cursor.execute(
            sql,
            result=v_mapdtlid,
            p_mapref=p_mapref,
            p_trgclnm=p_trgclnm,
            p_trgcldtyp=p_trgcldtyp,
            p_trgpkflg=p_trgpkflg,
            p_trgpkseq=p_trgpkseq,
            p_trgcldesc=p_trgcldesc,
            p_trgnflg=p_trgnflg,
            p_maplogic=p_maplogic,
            p_keyclnm=p_keyclnm,
            p_valclnm=p_valclnm,
            p_mapcmbcd=p_mapcmbcd,
            p_excseq=p_excseq,
            p_scdtyp=p_scdtyp,
            p_lgvrfyflg=p_lgvrfyflg,
            p_lgvrfydt=p_lgvrfydt
        )
        connection.commit()
        
        # Get the result
        mapdtlid = v_mapdtlid.getvalue()
        return mapdtlid
        
    except Exception as e:
        print(f"Error creating/updating mapping detail: {str(e)}")
        raise
        
    finally:
        if cursor:
            cursor.close()


In [10]:

# Example usage of the new function
try:
    # Create connection (or reuse existing connection)
    # if 'oracle_connection' not in locals() or oracle_connection.ping() != True:
    oracle_connection = create_oracle_connection()
    
    # Example: Create/update mapping detail
    import datetime
    mapping_detail_id = create_update_mapping_detail(
        connection=oracle_connection,
        p_mapref='MAP001',
        p_trgclnm='CUSTOMER_NAMEakshay',
        p_trgcldtyp='VARCHAR(20)',
        p_trgpkflg='Y',
        p_trgpkseq=1,
        p_trgcldesc='Unique Customer NAME',
        p_trgnflg='N',
        p_maplogic='SELECT * FROM NAME;',
        p_keyclnm='NAME',
        p_valclnm='CUST_ID',
        p_mapcmbcd=None,
        p_excseq=1,
        p_scdtyp=1,
        p_lgvrfyflg='Y',
        p_lgvrfydt=datetime.datetime.now()
    )
    print(f"Created/Updated mapping detail ID: {mapping_detail_id}")
    
except Exception as e:
    print(f"Error in execution: {str(e)}")
    
finally:
    # Close connection when done with all operations
    if 'oracle_connection' in locals() and oracle_connection:
        oracle_connection.close()
        print("Oracle connection closed")


Created/Updated mapping detail ID: 646
Oracle connection closed


In [6]:
def validate_logic2(connection, p_logic, p_keyclnm, p_valclnm):
    """
    Validates SQL logic using MAP.PKGDWMAPR.VALIDATE_LOGIC2
    
    Args:
        connection: Oracle connection object
        p_logic: SQL logic to validate
        p_keyclnm: Key column name
        p_valclnm: Value column name
    
    Returns:
        Tuple: (Validation result (Y/N), Error message if any)
    """
    cursor = None
    try:
        cursor = connection.cursor()
        
        # Define the output parameters
        v_is_valid = cursor.var(oracledb.STRING)
        v_error = cursor.var(oracledb.STRING, 4000)  # Error output parameter
        
        # SQL to execute with named parameters
        sql = """
        BEGIN
            :result := MAP.PKGDWMAPR.VALIDATE_LOGIC2(
                p_logic => :p_logic,
                p_keyclnm => :p_keyclnm,
                p_valclnm => :p_valclnm,
                p_err => :p_err
            );
        END;
        """
        
        # Execute with named parameters
        cursor.execute(
            sql,
            result=v_is_valid,
            p_logic=p_logic,
            p_keyclnm=p_keyclnm,
            p_valclnm=p_valclnm,
            p_err=v_error
        )
        connection.commit()
        
        # Get the results
        is_valid = v_is_valid.getvalue()
        error_message = v_error.getvalue()
        
        return is_valid, error_message
    
    except Exception as e:
        print(f"Error validating logic: {str(e)}")
        raise
    
    finally:
        if cursor:
            cursor.close()

In [7]:
from sqlalchemy import create_engine, text
import oracledb

# Define the DSN and connection string
dsn = oracledb.makedsn("172.16.20.69", 1521, sid="demodb")
connection_string = f"oracle+oracledb://map:mapper@{dsn}"



# Example call

result, error = validate_logic2(
    connection=oracle_connection,
    p_logic='SELECT cod_acct_no FROM src.ln_acct_dtls',
    p_keyclnm='cod_acct_no',
    p_valclnm='cod_acct_no'
)

print(f"Logic is valid? {result}")
print(f"Error message: {error if error else 'None'}")

NameError: name 'oracle_connection' is not defined

In [10]:
def call_validate_mapping_details(connection, p_mapref):
    """
    Calls the Oracle function MAP.PKGDWMAPR.VALIDATE_MAPPING_DETAILS
    
    Args:
        connection: Oracle connection object
        p_mapref: Mapping reference to validate
        
    Returns:
        tuple: A tuple containing (result, error_message) where:
            - result: The function's return value ('Y' or 'N')
            - error_message: Any error message returned by the Oracle function
    """
    cursor = None
    try:
        cursor = connection.cursor()
        
        # Define the output parameters
        v_result = cursor.var(oracledb.STRING)
        v_err = cursor.var(oracledb.STRING, 400)  # VARCHAR2(400) in Oracle
        
        # SQL to execute with named parameters
        sql = """
        BEGIN
            :result := MAP.PKGDWMAPR.VALIDATE_MAPPING_DETAILS(
                p_mapref => :p_mapref,
                p_err => :p_err
            );
        END;
        """
        
        # Execute with named parameters
        cursor.execute(
            sql,
            result=v_result,
            p_mapref=p_mapref,
            p_err=v_err
        )
        connection.commit()
        # Get the results
        result = v_result.getvalue()
        error_message = v_err.getvalue()
        
        return result, error_message
        
    except Exception as e:
        print(f"Error validating mapping details: {str(e)}")
        raise
        
    finally:
        if cursor:
            cursor.close()

# Example usage
if __name__ == "__main__":
    # This would be replaced with your actual connection code
    # import oracledb
    connection =create_oracle_connection()
    
    mapping_ref = "MAP02"  # Example mapping reference
    
    # Call the function (uncommenting when you have a real connection)
    result, error = call_validate_mapping_details(connection, mapping_ref)
    
    # Display the results
    print(result, error)

    
    connection.close()

N Some/All target columns logic validation failed, please verify logic(SQL).


In [4]:
def call_create_update_job(connection, p_mapref):
    """
    Calls the Oracle function CREATE_UPDATE_JOB
    
    Args:
        connection: Oracle connection object
        p_mapref: Mapping reference to create/update job
    
    Returns:
        tuple: A tuple containing (job_id, error_message) where:
            - job_id: The returned job ID from the function
            - error_message: Any error message returned by the Oracle function
    """
    cursor = None
    try:
        cursor = connection.cursor()
        
        # Define the output parameter for job_id
        v_job_id = cursor.var(oracledb.NUMBER)  # Assuming jobid is a NUMBER in Oracle
        
        # SQL to execute with named parameters
        sql = """
        BEGIN
            :job_id := CREATE_UPDATE_JOB(
                p_mapref => :p_mapref
            );
        END;
        """
        
        # Execute with named parameters
        cursor.execute(
            sql,
            job_id=v_job_id,
            p_mapref=p_mapref
        )
        connection.commit()
        
        # Get the result
        job_id = v_job_id.getvalue()
        
        return job_id, None
    
    except Exception as e:
        error_message = f"Error creating/updating job: {str(e)}"
        print(error_message)
        return None, error_message
    
    finally:
        if cursor:
            cursor.close()

# Example usage
if __name__ == "__main__":
    # This would be replaced with your actual connection code
    # import oracledb
    connection = create_oracle_connection()
    
    mapping_ref = "MAP_CUSTOMER_DATA"  # Example mapping reference
    
    # Call the function
    job_id, error = call_create_update_job(connection, mapping_ref)
    
    # Display the results
    if error:
        print(f"Error: {error}")
    else:
        print(f"Job created/updated successfully. Job ID: {job_id}")
    
    connection.close()

Error creating/updating job: ORA-06550: line 2, column 24:
PLS-00201: identifier 'CREATE_UPDATE_JOB' must be declared
ORA-06550: line 2, column 13:
PL/SQL: Statement ignored
Help: https://docs.oracle.com/error-help/db/ora-06550/
Error: Error creating/updating job: ORA-06550: line 2, column 24:
PLS-00201: identifier 'CREATE_UPDATE_JOB' must be declared
ORA-06550: line 2, column 13:
PL/SQL: Statement ignored
Help: https://docs.oracle.com/error-help/db/ora-06550/
