In [45]:
import sqlglot

In [46]:
snow_sql = '''

SELECT
    *
    , IFF(str_length<4,True,False)   AS is_short_string
FROM dbt_db.dbt_schema.data_generator

'''

# Transpile to DuckDB SQL
duckdb_sql = sqlglot.transpile(snow_sql, read="snowflake", write="duckdb")
print(duckdb_sql)

['SELECT *, CASE WHEN str_length < 4 THEN TRUE ELSE FALSE END AS is_short_string FROM dbt_db.dbt_schema.data_generator']


In [85]:
from sqlglot import parse_one, exp, transpile
import os
import shutil

# Define the source and target dialects
source_dialect = "snowflake"
target_dialect = "duckdb"

# Directory containing your SQL files
sql_directory = os.getcwd()+"\\target\\compiled\\dbt_eda_tools\\examples\\"
duckdb_directory = os.getcwd()+'\\examples\\transpiled_duck\\'
try:
    shutil.rmtree(duckdb_directory)
except:
    pass
if not os.path.exists(duckdb_directory):
    os.makedirs(duckdb_directory)

for dirpath, _, filenames in os.walk(sql_directory):
    for filename in filenames:
        if filename.endswith(".sql") and not filename.startswith('assert_') and not dirpath.endswith("transpiled_duck"):
            file_path = os.path.join(dirpath, filename)
            with open(file_path, 'r') as file:
                sql = file.read()
                new_file_name = filename.replace(".sql", "_duckdb.sql")
                transpiled_sql = transpile(sql, read=source_dialect, write=target_dialect, pretty=True)[0]
                for table in parse_one(transpiled_sql).find_all(exp.Table):
                    if table.db and table.name and table.catalog:
                        transpiled_sql = transpiled_sql\
                                            .replace(table.name, "{{ref('"+table.name+"_duckdb')}}")\
                                            .replace(table.db+".","")\
                                            .replace(table.catalog+".","")

                with open(os.path.join(duckdb_directory, new_file_name), 'w') as file:
                    file.write(transpiled_sql)


In [48]:
from sqlglot.optimizer.scope import build_scope

ast = parse_one("""
WITH x AS (
  SELECT a FROM y
)
SELECT a FROM x
""")

root = build_scope(ast)
# for scope in root.traverse():
#     print(scope)

tables = [
    alias

    # Traverse the Scope tree, not the AST
    for scope in root.traverse()

    # `selected_sources` contains sources that have been selected in this scope, e.g. in a FROM or JOIN clause.
    # `alias` is the name of this source in this particular scope.
    # `node` is the AST node instance
    # if the selected source is a subquery (including common table expressions),
    #     then `source` will be the Scope instance for that subquery.
    # if the selected source is a table,
    #     then `source` will be a Table instance.
    for alias, (node, source) in scope.selected_sources.items()
    if isinstance(source, exp.Table)
]

for table in tables:
    print(table)

y
