### filename_basepath =....


In [0]:
from pathlib import Path

# 1️ Base path in Unity Catalog
base_path = "/Volumes/workspace/default/my_file"

# 2️ Store file paths
file_paths = []

def list_all_files(path):
    """Recursively list all files in Databricks FS path."""
    items = dbutils.fs.ls(path)
    for item in items:
        if item.isDir():
            list_all_files(item.path)  # Go deeper into subdirectories
        else:
            file_paths.append(item.path)

# 3️ Run the function for the given base_path
list_all_files(base_path)

# 4️ Print each file with its base path variable name
def create_variable_name(file_path):
    filename = Path(file_path).stem.replace(" ", "_").replace("-", "_")
    return f"{filename}_basepath"

for file in file_paths:
    var_name = create_variable_name(file)
    print(f"{var_name} = '{file}'")


### 1.filename_basepath =.... 
### 2.get_latest_partition_path

In [0]:
from pathlib import Path

# 1️Base path in Unity Catalog
# base_path = "my_unity_catalog_metastore.version_contol.version_con_file"

# base_path = "/Volumes/workspace/default/my_file"
base_path = "/Volumes/workspace/default/my_file/bash_path/"

# 2️ Store file paths
file_paths = []

def list_all_files(path):
    """Recursively list all files in Databricks FS path."""
    items = dbutils.fs.ls(path)
    for item in items:
        if item.isDir():
            list_all_files(item.path)  # Go deeper into subdirectories
        else:
            file_paths.append(item.path)

# 3️ Run the function for the given base_path
list_all_files(base_path)

# 4️ Print each file with its base path variable name
def create_variable_name(file_path):
    filename = Path(file_path).stem.replace(" ", "_").replace("-", "_")
    return f"{filename}_basepath"

# Store variables in a dictionary for later use
file_basepaths = {}
for file in file_paths:
    var_name = create_variable_name(file)
    file_basepaths[var_name] = file
    print(f"{var_name} = '{file}'")

# 5️Function to get latest partition path
def get_latest_partition_path(path):
    items = dbutils.fs.ls(path)
    partition_dirs = [item.path for item in items if item.isDir()]
    if not partition_dirs:
        return None
    latest_dir = max(partition_dirs, key=lambda p: dbutils.fs.ls(p)[0].modificationTime if dbutils.fs.ls(p) else 0)
    return latest_dir

# 6️ Example usage for each base path variable
for var_name, file_path in file_basepaths.items():
    latest_partition = get_latest_partition_path(file_path)
    if latest_partition:
        print(f"latest_path_{var_name} = '{latest_partition}'")


In [0]:
%sql
CREATE TABLE my_unity_catalog_metastore.base_path.base_path_info(
  base_path_name STRING,
  path_location STRING
);

In [0]:
%sql
CREATE TABLE my_unity_catalog_metastore.base_path.latest_partition_path_info(
  large_path_Name STRING,
  latest_path_location STRING
)

### finel working code

In [0]:
from pathlib import Path

# ---------- CONFIG ----------
base_path = "/Volumes/workspace/default/my_file/bash_path/"  # <-- your base path

# ---------- HELPERS ----------
def sql_escape(s: str) -> str:
    # escape single quotes for safe SQL insertion
    return s.replace("'", "''")

file_paths = []

def list_all_files(path: str):
    """Recursively collect file paths under `path` (uses dbutils)."""
    try:
        for item in dbutils.fs.ls(path):
            if item.isDir():
                list_all_files(item.path)
            else:
                file_paths.append(item.path)
    except Exception as e:
        # optionally print or handle listing errors
        print(f"Error listing {path}: {e}")

# ---------- STEP 1: collect files ----------
list_all_files(base_path)

# ---------- STEP 2: insert single base_path row ----------
# derive base_path_name as the parent folder of the last segment (e.g. 'my_file')
base_path_name = Path(base_path).parts[-2]  # -> 'my_file'

spark.sql(f"""
    INSERT INTO my_unity_catalog_metastore.base_path.base_path_info
    VALUES ('{sql_escape(base_path_name)}', '{sql_escape(base_path)}')
""")

# ---------- STEP 3: insert file rows (filename + relative path) ----------
for fp in file_paths:
    # compute relative path after base_path
    rel = fp[len(base_path):] if fp.startswith(base_path) else fp
    filename = Path(fp).name
    spark.sql(f"""
        INSERT INTO my_unity_catalog_metastore.base_path.latest_partition_path_info
        VALUES ('{sql_escape(filename)}', '{sql_escape(rel)}')
    """)

print(f"Inserted base path: {base_path_name} -> {base_path}")
print(f"Inserted {len(file_paths)} file rows into latest_partition_path_info.")


In [0]:
%sql
select * from my_unity_catalog_metastore.base_path.base_path_info

In [0]:
%sql
select * from my_unity_catalog_metastore.base_path.latest_partition_path_info;

In [0]:
%sql
-- truncate table my_unity_catalog_metastore.base_path.base_path_info;
    -- 
-- truncate table my_unity_catalog_metastore.base_path.latest_partition_path_info;