In [10]:
import pandas as pd
from sqlglot import parse_one, exp
from sqlglot.dialects.ma import MA
from sqlglot.dialects.tsql import TSQL
import sqlglot
import re


In [11]:

def replace_spaces_in_brackets(input_string: str, replacement: str = "_space_" ) -> str:
    """
    Replaces spaces within square brackets [] in a string with replacement using regex.

    :param input_string: The input string.
    :return: A new string with spaces replaced within square brackets.
    """
    def replace_space(match):
        # Replace spaces within the matched brackets with other sequence
        return match.group(0).replace(' ', replacement)
    
    # Regex pattern to find text within square brackets, including the brackets
    pattern = r'\[.*?\]'
    
    # Use re.sub with a replacement function
    return re.sub(pattern, replace_space, input_string)


def split_sql_string_to_sections(sql_string):
    # List of SQL keywords to identify the start of each section
    keywords = [
        r"DECLARE", r"SELECT", r"INSERT", r"UPDATE", r"DELETE", r"MERGE", r"CREATE", r"ALTER",
        r"DROP", r"TRUNCATE", r"BEGIN", r"DECLARE", r"EXEC(?:UTE)?", r"WITH",
        r"COMMIT", r"ROLLBACK", r"SAVEPOINT", r"USE", r"SHOW", r"DESCRIBE", r"EXPLAIN", r"WHILE"
    ]
    
    # Compile regex pattern to match any of the keywords at the start of a line
    keyword_pattern = re.compile(r"^\s*(" + "|".join(keywords) + r")\b", re.IGNORECASE)

    # Split the input string into lines
    sql_lines = sql_string.splitlines()

    sections = []
    current_section = []
    
    for line in sql_lines:
        # Check if the line starts with any of the keywords
        if keyword_pattern.match(line):
            # If there's an existing section, add it to the list
            if current_section:
                sections.append(" ".join(current_section))
                current_section = []
        # Add the current line to the current section
        current_section.append(line)

    # Add the last section if any
    if current_section:
        sections.append(" ".join(current_section))

    return [replace_spaces_in_brackets(re.sub(r'\s+', ' ', section)).replace('[', '').replace(']', '').strip() for section in sections] # remove multiple spaces and return



def open_query(dir:str) -> list:
    """
    Open TSQL queries from one text file
    """ 
    with open(dir, 'r') as file: 
        file = file.read().strip()#.split(';')
        sql_queries = split_sql_string_to_sections(file)
    return sql_queries

preprocessed_queries = []
sql_queries = open_query('data/queries-txts/queries_rabo_qrm.txt')
for i, query in enumerate(sql_queries, start=1):
    print(f"Query {i}:\n{query}\n")


Query 1:
DECLARE @PortfolioCurrent varchar(255) = ?

Query 2:
DECLARE @Market date = ?

Query 3:
DECLARE @CompanyName varchar(255) = ?

Query 4:
DECLARE @RunId int

Query 5:
DECLARE @RunId1 int; -- Get RunId from QRM.Run_Time_Description main table(Portfolio Run check)

Query 6:
SELECT @RunId = RUNID FROM QRM.Run_Time_Description WHERE Portfolio = @PortfolioCurrent AND CONVERT(date, Market) = CONVERT(date, @Market) AND CompanyName = @CompanyName AND RunParameter in ('Val_shock','Val_shock_old','CSRBB') AND ISDATE(Market) = 1;

Query 7:
Update staging.FTP_Unpivot set Funding_space_Component = d.Funding_Component, Receiving_Legal_Entity = d.Receiving_Legal_Entity From staging.FTP_Unpivot a INNER JOIN cfg.ALM_LKP_Receiving_legal_entity d ON a.Funding_space_Component=d.Funding_Component_Export;

Query 8:
UPDATE staging.FTP_Unpivot SET Dirty_space_Price = 0 WHERE ( Level_4 = 'Fixed Rate Mortgages' AND instrument_set IN ( 'mortgage' ,'offset mortgage' ) AND external_counterparty_segment = 'h

In [12]:

def parse_tables(table, table_alias_list, subquery=True):    
    """
    Parses all table information available (db, catalog...)
    """ 

    if subquery == False:
        table_alias =  table.alias.strip()
        table_name = table.name.strip()
        table_db = table.db.strip()
        table_catalog = table.catalog.strip()

    else:
        table_alias = table.alias.strip()
        source = table.this.args["from"].strip()
        table_name= source.this.name.strip()
        table_catalog =  source.this.catalog.strip()
        table_db = source.this.db.strip()
        
    if " " in table_name:
        table_name = table_name.replace(" ", "")
    if table_catalog != "" and table_db != "":
        result = (table_catalog+"."+ table_db+"."+table_name, table_alias)
    elif table_db == "" and table_catalog == "":
        result = (table_name, table_alias)
    elif table_catalog == "": 
        result = (table_db+"."+table_name, table_alias)
    elif table_db == "":
        result = (table_catalog+"."+table_name, table_alias)
        
    table_alias_list.append(result)
    return result


def get_tables(ast: sqlglot.expressions.Select):
    """
    Extracts the table names and their aliases, used to reconstruct a tuple with structure (database+schema+name, alias )
    """
    # find all tables
    table_alias = list(ast.find_all(exp.Table))
    alias_table = []
    # extract information from each table
    for table in table_alias:
        try:
            parse_tables(table, alias_table, False)
        except:
            pass

    return alias_table

def replace_aliases(ast: sqlglot.expressions) -> sqlglot.expressions:
    """
    Replaces the tables' aliases in a query
    """
    #ast = list(ast.find_all(exp.Select))[0]
    alias_table = get_tables(ast)
    
    def transformer_table(node):
        for element in alias_table:
            if isinstance(node, exp.Column) and node.table == element[1]:
                return parse_one(element[0] + "." + node.name)
        return node

    transformed_tree = ast.transform(transformer_table)

    return transformed_tree

In [13]:
# parse declared variables
variables = []

for i, query in enumerate(sql_queries):
    for line in query.split("\n"):
        line=line.strip()
        if 'declare' in line.lower():
            #if line[-1] == '?':
            variable = [i.replace('@', '') for i in re.findall(r"@\w+", line)][0]
            print(variable)
            variables.append(variable)
      
            # if there is an equal not followed by ? then extract hardcoded value
            #print(line)
            #print()
            
variables

PortfolioCurrent
Market
CompanyName
RunId
RunId1


['PortfolioCurrent', 'Market', 'CompanyName', 'RunId', 'RunId1']

In [14]:
# parse simple select

for i, query in enumerate(sql_queries):
    if 'select' in query.lower():
        try:
            ast = parse_one(query)
            ast = replace_aliases(ast)
            print(repr(ast))
            print()
            main_select = list(ast.find_all(exp.Select))[0]
            expressions = main_select.expressions[0]
            #print(expressions)
            eq_statements = list(expressions.find_all(exp.EQ))

            for eq in eq_statements:

                print(list(eq.find_all(exp.Var)))
                print(list(eq.find_all(exp.Column)))


            #print(list(expressions.find_all(exp.Column)))

            print()
            from_statement = list(ast.find_all(exp.From))
            table = list(from_statement[0].find_all(exp.Table))[0]
            print(table)
            print()

            print(list([str(i.sql('tsql')) for i in ast.find_all(exp.Where)]))
            print()
        except:
            ast = parse_one(query)
            print(repr(ast))


Select(
  expressions=[
    EQ(
      this=Parameter(
        this=Var(this=RunId)),
      expression=Column(
        this=Identifier(this=RUNID, quoted=False),
        table=Identifier(this=Run_Time_Description, quoted=False),
        db=Identifier(this=QRM, quoted=False)))],
  from=From(
    this=Table(
      this=Identifier(this=Run_Time_Description, quoted=False),
      db=Identifier(this=QRM, quoted=False))),
  where=Where(
    this=And(
      this=And(
        this=And(
          this=And(
            this=EQ(
              this=Column(
                this=Identifier(this=Portfolio, quoted=False),
                table=Identifier(this=Run_Time_Description, quoted=False),
                db=Identifier(this=QRM, quoted=False)),
              expression=Parameter(
                this=Var(this=PortfolioCurrent))),
            expression=EQ(
              this=Cast(
                this=Column(
                  this=Identifier(this=date, quoted=False),
                  table=Ident

In [16]:
# parse update set statements

for i, query in enumerate(sql_queries):
    if 'update' in query.lower() and 'select' not in query.lower():

        ast = parse_one(query.replace('[', '').replace(']', ''))
        ast = replace_aliases(ast)

        update = list(ast.find_all(exp.Update))[0]

        # source and target table
        table = list(list(update.find_all(exp.Table))[0].find_all(exp.Table))
        print('source and dest table: ', str(list(list(update.find_all(exp.Table))[0].find_all(exp.Table))[0]))
        print()

        # columns (lineages)
        columns = list(update.expressions)

        for column in list(update.expressions):
            print("target col: ", column.this)
            print("source column: ", column.expression)
            print()

        # join statements
        join  = list(ast.find_all(exp.Join))#[0]
        print(join)

        if join != []:
            print('join table: ', str(list(join[0].find_all(exp.Table))[0]))
            print('join type: ', join[0].kind)
            print('join condition: ', join[0].on())

        print()

        # where statements
        wheres  = list(ast.find_all(exp.Where))
        print('where: ', [where.sql('tsql') for where in wheres])

        print()
        print('===================================')
       

source and dest table:  staging.FTP_Unpivot

target col:  staging.FTP_Unpivot.Funding_space_Component
source column:  cfg.ALM_LKP_Receiving_legal_entity.Funding_Component

target col:  staging.FTP_Unpivot.Receiving_Legal_Entity
source column:  cfg.ALM_LKP_Receiving_legal_entity.Receiving_Legal_Entity

[Join(
  this=Table(
    this=Identifier(this=ALM_LKP_Receiving_legal_entity, quoted=False),
    db=Identifier(this=cfg, quoted=False),
    alias=TableAlias(
      this=Identifier(this=d, quoted=False))),
  kind=INNER,
  on=EQ(
    this=Column(
      this=Identifier(this=Funding_space_Component, quoted=False),
      table=Identifier(this=FTP_Unpivot, quoted=False),
      db=Identifier(this=staging, quoted=False)),
    expression=Column(
      this=Identifier(this=Funding_Component_Export, quoted=False),
      table=Identifier(this=ALM_LKP_Receiving_legal_entity, quoted=False),
      db=Identifier(this=cfg, quoted=False))))]
join table:  cfg.ALM_LKP_Receiving_legal_entity AS d
join type:  

## Convert Where Statements to Natural Language

In [17]:
wheres[0].sql('tsql')

"WHERE (staging.FTP_Unpivot.Level_4 = 'Fixed Rate Mortgages' AND staging.FTP_Unpivot.instrument_set IN ('mortgage', 'offset mortgage') AND staging.FTP_Unpivot.external_counterparty_segment = 'households' AND (staging.FTP_Unpivot.market_shock LIKE '%SOT%' OR staging.FTP_Unpivot.market_shock LIKE '%EVEatR%')) OR (staging.FTP_Unpivot.Level_4 = 'Fixed Rate Mortgage Savings' AND staging.FTP_Unpivot.instrument_set = 'mortgage savings' AND (staging.FTP_Unpivot.market_shock LIKE 'SOT%' OR staging.FTP_Unpivot.market_shock LIKE 'EVEatR%')) AND (staging.FTP_Unpivot.Portfolio LIKE '%OBV' OR staging.FTP_Unpivot.Portfolio LIKE '%ABB')"

In [19]:

def sql_to_natural_language(sql_where_clause):
    """
    Converts a SQL WHERE clause into a natural language explanation.

    Parameters:
        sql_where_clause (str): The SQL WHERE clause to be translated.

    Returns:
        str: The natural language explanation.
    """
    # Replace common SQL syntax with natural language equivalents
    replacements = [
        (r"\bAND\b", "and"),
        (r"\bOR\b", "or"),
        (r"=", "is"),
        (r"IN \((.*?)\)", r"is one of \1"),
        (r"LIKE '%(.*?)%'", r"contains '\1'"),
        (r"LIKE '(.*?)%'", r"starts with '\1'"),
        (r"LIKE '%(.*?)'", r"ends with '\1'"),
        (r"\(\s*(.*?)\s*\)", r"(\1)")  # Remove extra spaces inside parentheses
    ]
    
    natural_lang = sql_where_clause.strip()
    for pattern, replacement in replacements:
        natural_lang = re.sub(pattern, replacement, natural_lang, flags=re.IGNORECASE)

    # Add a period after OR conditions for better readability
    groups = re.split(r"\\s*\\bOR\\b\\s*", natural_lang, flags=re.IGNORECASE)
    explanation = []
    
    for group in groups:
        # Keep AND intact in the explanation
        readable_group = re.sub(r"\\s*\\bAND\\b\\s*", " and ", group)
        explanation.append(f"({readable_group.strip()})")
    
    # Rejoin with " or "
    return " or ".join(explanation)


natural_language = sql_to_natural_language(wheres[0].sql('tsql'))
print('tsql statement:')
print(wheres[0].sql('tsql'))
print("Natural Language Explanation:")

print(natural_language)


tsql statement:
WHERE (staging.FTP_Unpivot.Level_4 = 'Fixed Rate Mortgages' AND staging.FTP_Unpivot.instrument_set IN ('mortgage', 'offset mortgage') AND staging.FTP_Unpivot.external_counterparty_segment = 'households' AND (staging.FTP_Unpivot.market_shock LIKE '%SOT%' OR staging.FTP_Unpivot.market_shock LIKE '%EVEatR%')) OR (staging.FTP_Unpivot.Level_4 = 'Fixed Rate Mortgage Savings' AND staging.FTP_Unpivot.instrument_set = 'mortgage savings' AND (staging.FTP_Unpivot.market_shock LIKE 'SOT%' OR staging.FTP_Unpivot.market_shock LIKE 'EVEatR%')) AND (staging.FTP_Unpivot.Portfolio LIKE '%OBV' OR staging.FTP_Unpivot.Portfolio LIKE '%ABB')
Natural Language Explanation:
(WHERE (staging.FTP_Unpivot.Level_4 is 'Fixed Rate Mortgages' and staging.FTP_Unpivot.instrument_set is one of 'mortgage', 'offset mortgage' and staging.FTP_Unpivot.external_counterparty_segment is 'households' and (staging.FTP_Unpivot.market_shock contains 'SOT' or staging.FTP_Unpivot.market_shock contains 'EVEatR')) or (st

In [21]:
def sql_to_natural_language(sql_where_clause):
    """
    Converts a SQL WHERE clause into a natural language explanation.

    Parameters:
        sql_where_clause (str): The SQL WHERE clause to be translated.

    Returns:
        str: The natural language explanation.
    """
    # Replace common SQL syntax with natural language equivalents
    replacements = [
        (r"\bAND\b", "and"),
        (r"\bOR\b", "or"),
        (r"=", "equals"),
        (r"IN \((.*?)\)", r"is one of \1"),
        (r"LIKE '%(.*?)%'", r"contains '\1'"),
        (r"LIKE '(.*?)%'", r"starts with '\1'"),
        (r"LIKE '%(.*?)'", r"ends with '\1'"),
        (r"ISDATE\((.*?)\) = 1", r"where \1 is a valid date"),  # Handle ISDATE
        (r"CAST\((.*?) AS (.*?)\)", r"convert \1 to \2"),  # Handle CAST
        (r"@(\w+)", r"the variable '\1'"),  # Handle variables like @PortfolioCurrent
        (r"\(\s*(.*?)\s*\)", r"(\1)")  # Remove extra spaces inside parentheses
    ]
    
    natural_lang = sql_where_clause.strip()
    for pattern, replacement in replacements:
        natural_lang = re.sub(pattern, replacement, natural_lang, flags=re.IGNORECASE)

    # Handle breaking down logical groups
    groups = re.split(r"\\s*\\bOR\\b\\s*", natural_lang, flags=re.IGNORECASE)
    explanation = []
    
    for group in groups:
        # Keep AND intact in the explanation
        readable_group = re.sub(r"\\s*\\bAND\\b\\s*", " and ", group)
        explanation.append(f"({readable_group.strip()})")

    if 'or' in sql_where_clause.lower():
        return " or ".join(explanation)
    else:
        return natural_lang

where_condition = "WHERE Portfolio = @PortfolioCurrent AND CAST(date AS Market) = CAST(date AS Market) AND CompanyName = @CompanyName AND RunParameter IN ('Val_shock', 'Val_shock_old', 'CSRBB') AND ISDATE(Market) = 1"
natural_language = sql_to_natural_language(where_condition)
print('tsql statement:')
print(where_condition)
print("Natural Language Explanation:")
print(natural_language)

tsql statement:
WHERE Portfolio = @PortfolioCurrent AND CAST(date AS Market) = CAST(date AS Market) AND CompanyName = @CompanyName AND RunParameter IN ('Val_shock', 'Val_shock_old', 'CSRBB') AND ISDATE(Market) = 1
Natural Language Explanation:
(WHERE Portfolio equals the variable 'PortfolioCurrent' and convert date to Market equals convert date to Market and CompanyName equals the variable 'CompanyName' and RunParameter is one of 'Val_shock', 'Val_shock_old', 'CSRBB' and ISDATE(Market) equals 1)


In [None]:
parse_one("""From  staging.FTP_Unpivot a
          INNER JOIN cfg.ALM_LKP_Receiving_legal_entity d 
ON 
   a.Funding_Component=d.Funding_Component_Export""")

Select(
  expressions=[
    Star()],
  from=From(
    this=Table(
      this=Identifier(this=FTP_Unpivot, quoted=False),
      db=Identifier(this=staging, quoted=False),
      alias=TableAlias(
        this=Identifier(this=a, quoted=False)))),
  joins=[
    Join(
      this=Table(
        this=Identifier(this=ALM_LKP_Receiving_legal_entity, quoted=False),
        db=Identifier(this=cfg, quoted=False),
        alias=TableAlias(
          this=Identifier(this=d, quoted=False))),
      kind=INNER,
      on=EQ(
        this=Column(
          this=Identifier(this=Funding_Component, quoted=False),
          table=Identifier(this=a, quoted=False)),
        expression=Column(
          this=Identifier(this=Funding_Component_Export, quoted=False),
          table=Identifier(this=d, quoted=False))))])