diff --git a/ydb/tests/functional/backup_collection/basic_user_scenarios.py b/ydb/tests/functional/backup_collection/basic_user_scenarios.py index 396134c75e8a..5f472ca30bf8 100644 --- a/ydb/tests/functional/backup_collection/basic_user_scenarios.py +++ b/ydb/tests/functional/backup_collection/basic_user_scenarios.py @@ -35,31 +35,108 @@ def is_system_object(obj): def sdk_select_table_rows(session, table, path_prefix="/Root"): - sql = f'PRAGMA TablePathPrefix("{path_prefix}"); SELECT id, number, txt FROM {table} ORDER BY id;' + if table.startswith("/"): + full_path = table + base_name = os.path.basename(table) + table_for_sql = base_name + pp = os.path.dirname(full_path) or path_prefix + else: + base_name = table + full_path = os.path.join(path_prefix, base_name) + table_for_sql = base_name + pp = path_prefix + + cols = None + primary_keys = None + try: + if hasattr(session, "describe_table"): + desc = session.describe_table(full_path) + else: + tc = getattr(getattr(session, "driver", None), "table_client", None) + if tc is not None and hasattr(tc, "describe_table"): + desc = tc.describe_table(full_path) + else: + desc = None + + if desc is not None: + raw_cols = getattr(desc, "columns", None) or getattr(desc, "Columns", None) + if raw_cols: + try: + cols = [c.name for c in raw_cols] + except Exception: + cols = [str(c) for c in raw_cols] + + pk = getattr(desc, "primary_key", None) or getattr(desc, "primary_keys", None) or getattr(desc, "key_columns", None) + if pk: + try: + if isinstance(pk, (list, tuple)): + primary_keys = list(pk) + else: + primary_keys = [str(pk)] + except Exception: + primary_keys = None + except Exception: + cols = None + primary_keys = None + + if not cols: + try: + sql_try = f'PRAGMA TablePathPrefix("{pp}"); SELECT * FROM {table_for_sql} LIMIT 1;' + res_try = session.transaction().execute(sql_try, commit_tx=True) + rs0 = res_try[0] + try: + meta = getattr(rs0, "columns", None) or getattr(rs0, "Columns", None) + if meta: + cols = [c.name for c in meta] + except Exception: + cols = None + except Exception: + cols = None + + if not cols: + raise AssertionError(f"Не удалось получить список колонок для таблицы {full_path}") + + def q(n): + return "`" + n.replace("`", "``") + "`" + + select_list = ", ".join(q(c) for c in cols) + + order_clause = "" + if primary_keys: + pks = [p for p in primary_keys if p in cols] + if pks: + order_clause = " ORDER BY " + ", ".join(q(p) for p in pks) + + sql = f'PRAGMA TablePathPrefix("{pp}"); SELECT {select_list} FROM {table_for_sql}{order_clause};' + result_sets = session.transaction().execute(sql, commit_tx=True) rows = [] - rows.append(["id", "number", "txt"]) + rows.append(cols.copy()) for r in result_sets[0].rows: - try: - idv = r.id - except Exception: - idv = r[0] - try: - numv = r.number - except Exception: - numv = r[1] - try: - txtv = r.txt - except Exception: - txtv = r[2] + vals = [] + for i, col in enumerate(cols): + v = None + try: + v = getattr(r, col) + except Exception: + try: + v = r[i] + except Exception: + v = None - rows.append([ - str(idv) if idv is not None else "", - str(numv) if numv is not None else "", - txtv if txtv is not None else "", - ]) + if v is None: + vals.append("") + else: + try: + if isinstance(v, (bytes, bytearray)): + vals.append(v.decode("utf-8", "replace")) + else: + vals.append(str(v)) + except Exception: + vals.append(repr(v)) + rows.append(vals) return rows @@ -354,6 +431,86 @@ def wait_for_table_rows(self, raise AssertionError(f"Timeout waiting for table '{table}' rows to match expected (timeout {timeout_s}s). Last error: {last_exc}") + def _create_backup_collection(self, collection_src, tables: List[str]): + # create backup collection referencing given table full paths + table_entries = ",\n".join([f"TABLE `/Root/{t}`" for t in tables]) + sql = f""" + CREATE BACKUP COLLECTION `{collection_src}` + ( {table_entries} ) + WITH ( STORAGE = 'cluster', INCREMENTAL_BACKUP_ENABLED = 'false' ); + """ + res = self._execute_yql(sql) + stderr_out = "" + if getattr(res, 'std_err', None): + stderr_out += res.std_err.decode('utf-8', errors='ignore') + if getattr(res, 'std_out', None): + stderr_out += res.std_out.decode('utf-8', errors='ignore') + assert res.exit_code == 0, f"CREATE BACKUP COLLECTION failed: {stderr_out}" + self.wait_for_collection(collection_src, timeout_s=30) + + def _backup_now(self, collection_src): + time.sleep(1.1) + res = self._execute_yql(f"BACKUP `{collection_src}`;") + if res.exit_code != 0: + out = (res.std_out or b"").decode('utf-8', 'ignore') + err = (res.std_err or b"").decode('utf-8', 'ignore') + raise AssertionError(f"BACKUP failed: code={res.exit_code} STDOUT: {out} STDERR: {err}") + + def _restore_import(self, export_dir, exported_item, collection_restore): + bdir = os.path.join(export_dir, exported_item) + r = yatest.common.execute( + [backup_bin(), "--verbose", "--endpoint", "grpc://localhost:%d" % self.cluster.nodes[1].grpc_port, + "--database", self.root_dir, "tools", "restore", + "--path", f"/Root/.backups/collections/{collection_restore}", + "--input", bdir], + check_exit_code=False, + ) + assert r.exit_code == 0, f"tools restore import failed: {r.std_err}" + + def _verify_restored_table_data(self, table, expected_rows): + rows = self.wait_for_table_rows(table, expected_rows, timeout_s=90) + assert rows == expected_rows, f"Restored data for {table} doesn't match expected.\nExpected: {expected_rows}\nGot: {rows}" + + def _capture_acl(self, table_path: str): + # Attempt to capture owner/grants/acl in a readable form. + try: + desc = self.driver.scheme_client.describe_path(table_path) + except Exception: + return None + + acl_info = {} + owner = getattr(desc, "owner", None) + if owner: + acl_info["owner"] = owner + + for cand in ("acl", "grants", "effective_acl", "permission", "permissions"): + if hasattr(desc, cand): + try: + val = getattr(desc, cand) + acl_info[cand] = val + except Exception: + acl_info[cand] = "" + + # Fallback: try SHOW GRANTS via YQL and capture stdout + try: + res = self._execute_yql(f"SHOW GRANTS ON '{table_path}';") + out = (res.std_out or b"").decode('utf-8', 'ignore') + if out: + acl_info["show_grants"] = out + except Exception: + pass + + return acl_info + + def _drop_tables(self, tables: List[str]): + with self.session_scope() as session: + for t in tables: + full = f"/Root/{t}" + try: + session.execute_scheme(f"DROP TABLE `{full}`;") + except Exception: + raise AssertionError("Drop failed") + class TestFullCycleLocalBackupRestore(BaseTestBackupInFiles): def _execute_yql(self, script, verbose=False): @@ -960,3 +1117,320 @@ def record_last_snapshot(): # cleanup if os.path.exists(export_dir): shutil.rmtree(export_dir) + + +class TestFullCycleLocalBackupRestoreWSchemaChange(TestFullCycleLocalBackupRestore): + def _get_columns_from_scheme_entry(self, desc, path_hint: str = None): + # Reuse original robust approach: try multiple candidate attributes + try: + table_obj = getattr(desc, "table", None) + if table_obj is not None: + cols = getattr(table_obj, "columns", None) + if cols: + return [c.name for c in cols] + + cols = getattr(desc, "columns", None) + if cols: + try: + return [c.name for c in cols] + except Exception: + return [str(c) for c in cols] + + for attr in ("schema", "entry", "path"): + nested = getattr(desc, attr, None) + if nested is not None: + table_obj = getattr(nested, "table", None) + cols = getattr(table_obj, "columns", None) if table_obj is not None else None + if cols: + return [c.name for c in cols] + except Exception: + pass + + if getattr(desc, "is_table", False) or getattr(desc, "is_row_table", False) or getattr(desc, "is_column_table", False): + if path_hint: + table_path = path_hint + else: + name = getattr(desc, "name", None) + assert name, f"SchemeEntry has no name, can't form path. desc repr: {repr(desc)}" + table_path = name if name.startswith("/Root") else os.path.join(self.root_dir, name) + + try: + tc = getattr(self.driver, "table_client", None) + if tc is not None and hasattr(tc, "describe_table"): + desc_tbl = tc.describe_table(table_path) + cols = getattr(desc_tbl, "columns", None) or getattr(desc_tbl, "Columns", None) + if cols: + try: + return [c.name for c in cols] + except Exception: + return [str(c) for c in cols] + except Exception: + pass + + try: + with self.session_scope() as session: + if hasattr(session, "describe_table"): + desc_tbl = session.describe_table(table_path) + cols = getattr(desc_tbl, "columns", None) or getattr(desc_tbl, "Columns", None) + if cols: + try: + return [c.name for c in cols] + except Exception: + return [str(c) for c in cols] + except Exception: + pass + + diagnostics = ["Failed to find columns via known candidates.\n"] + try: + diagnostics.append("dir(desc):\n" + ", ".join(dir(desc)) + "\n") + except Exception as e: + diagnostics.append(f"dir(desc) raised: {e}\n") + + readable = [] + for attr in sorted(set(dir(desc))): + if attr.startswith("_"): + continue + if len(readable) >= 40: + break + try: + val = getattr(desc, attr) + if callable(val): + continue + s = repr(val) + if len(s) > 300: + s = s[:300] + "...(truncated)" + readable.append(f"{attr} = {s}") + except Exception as e: + readable.append(f"{attr} = ") + + diagnostics.append("Sample attributes (truncated):\n" + "\n".join(readable) + "\n") + + raise AssertionError( + "describe_path returned SchemeEntry in unexpected shape. Cannot locate columns.\n\nDiagnostic dump:\n\n" + + "\n".join(diagnostics) + ) + + def _capture_schema(self, table_path: str): + desc = self.driver.scheme_client.describe_path(table_path) + cols = self._get_columns_from_scheme_entry(desc, path_hint=table_path) + return cols + + def _create_table_with_data(self, session, path, not_null=False): + full_path = "/Root/" + path + session.create_table( + full_path, + ydb.TableDescription() + .with_column( + ydb.Column( + "id", + ydb.PrimitiveType.Uint32 if not_null else ydb.OptionalType(ydb.PrimitiveType.Uint32), + ) + ) + .with_column(ydb.Column("number", ydb.OptionalType(ydb.PrimitiveType.Uint64))) + .with_column(ydb.Column("txt", ydb.OptionalType(ydb.PrimitiveType.String))) + .with_column(ydb.Column("expire_at", ydb.OptionalType(ydb.PrimitiveType.Timestamp))) + .with_primary_keys("id"), + ) + + path_prefix, table = os.path.split(full_path) + session.transaction().execute( + ( + f'PRAGMA TablePathPrefix("{path_prefix}"); ' + f'UPSERT INTO {table} (id, number, txt, expire_at) VALUES ' + f'(1, 10, "one", CurrentUtcTimestamp()), (2, 20, "two", CurrentUtcTimestamp()), (3, 30, "three", CurrentUtcTimestamp());' + ), + commit_tx=True, + ) + + def _setup_test_collections(self): + collection_src = f"coll_src_{int(time.time())}" + t1 = "orders" + t2 = "products" + + with self.session_scope() as session: + self._create_table_with_data(session, t1) + self._create_table_with_data(session, t2) + + return collection_src, t1, t2 + + def test_full_cycle_local_backup_restore_with_schema_changes(self): + collection_src, t1, t2 = self._setup_test_collections() + + # Create backup collection (will reference the initial tables) + self._create_backup_collection(collection_src, [t1, t2]) + + # Add/remove data, change ACLs, add more tables + # perform first stage of modifications that must be captured by full backup 1 + with self.session_scope() as session: + # add & remove data + session.transaction().execute('PRAGMA TablePathPrefix("/Root"); UPSERT INTO orders (id, number, txt) VALUES (10, 100, "one-stage");', commit_tx=True) + session.transaction().execute('PRAGMA TablePathPrefix("/Root"); DELETE FROM products WHERE id = 1;', commit_tx=True) + + # change ACLs: try multiple grant syntaxes until success + desc_for_acl = self.driver.scheme_client.describe_path("/Root/orders") + owner_role = getattr(desc_for_acl, "owner", None) or "root@builtin" + + def q(role: str) -> str: + return "`" + role.replace("`", "") + "`" + + role_candidates = [owner_role, "public", "everyone", "root"] + grant_variants = [] + for r in role_candidates: + role_quoted = q(r) + grant_variants.extend([ + f"GRANT ALL ON `/Root/orders` TO {role_quoted};", + f"GRANT SELECT ON `/Root/orders` TO {role_quoted};", + f"GRANT 'ydb.generic.read' ON `/Root/orders` TO {role_quoted};", + ]) + grant_variants.append(f"GRANT ALL ON `/Root/orders` TO {q(owner_role)};") + + acl_applied = False + for cmd in grant_variants: + res = self._execute_yql(cmd) + if res.exit_code == 0: + acl_applied = True + break + assert acl_applied, "Failed to apply any GRANT variant in step (1)" + + # add more tables + create_table_with_data(session, "extra_table_1") + + # capture state after stage 1 + snapshot_stage1_t1 = self._capture_snapshot(t1) + snapshot_stage1_t2 = self._capture_snapshot(t2) + schema_stage1_t1 = self._capture_schema(f"/Root/{t1}") + schema_stage1_t2 = self._capture_schema(f"/Root/{t2}") + acl_stage1_t1 = self._capture_acl(f"/Root/{t1}") + acl_stage1_t2 = self._capture_acl(f"/Root/{t2}") + + # Create full backup 1 + self._backup_now(collection_src) + self.wait_for_collection_has_snapshot(collection_src, timeout_s=30) + + # modifications include add/remove data, add more tables, remove some tables, + # add/alter/drop columns, change ACLs + with self.session_scope() as session: + # data modifications + session.transaction().execute('PRAGMA TablePathPrefix("/Root"); UPSERT INTO orders (id, number, txt) VALUES (11, 111, "two-stage");', commit_tx=True) + session.transaction().execute('PRAGMA TablePathPrefix("/Root"); DELETE FROM orders WHERE id = 2;', commit_tx=True) + + # add more tables + create_table_with_data(session, "extra_table_2") + + # remove some tables from step5: drop extra_table_1 + try: + session.execute_scheme('DROP TABLE `/Root/extra_table_1`;') + except Exception: + raise AssertionError("DROP failed") + + # add columns to initial tables -> except fail + try: + session.execute_scheme('ALTER TABLE `/Root/orders` ADD COLUMN new_col Uint32;') + except Exception: + raise AssertionError("ADD COLUMN failed") + + # ALTER SET -> except fail + try: + session.execute_scheme('ALTER TABLE `/Root/orders` SET (TTL = Interval("PT0S") ON expire_at);') + except Exception: + raise AssertionError("SET TTL failed") + + # drop columns -> except fail + try: + session.execute_scheme('ALTER TABLE `/Root/orders` DROP COLUMN number;') + except Exception: + raise AssertionError("DROP COLUMN failed") + + # change ACLs again for initial tables + desc_for_acl2 = self.driver.scheme_client.describe_path("/Root/orders") + owner_role2 = getattr(desc_for_acl2, "owner", None) or "root@builtin" + owner_quoted = owner_role2.replace('`', '') + cmd = f"GRANT SELECT ON `/Root/orders` TO `{owner_quoted}`;" + res = self._execute_yql(cmd) + assert res.exit_code == 0, "Failed to apply GRANT in stage 2" + + # capture state after stage 2 + snapshot_stage2_t1 = self._capture_snapshot(t1) + snapshot_stage2_t2 = self._capture_snapshot(t2) + schema_stage2_t1 = self._capture_schema(f"/Root/{t1}") + schema_stage2_t2 = self._capture_schema(f"/Root/{t2}") + acl_stage2_t1 = self._capture_acl(f"/Root/{t1}") + acl_stage2_t2 = self._capture_acl(f"/Root/{t2}") + + # Create full backup 2 + self._backup_now(collection_src) + self.wait_for_collection_has_snapshot(collection_src, timeout_s=30) + + # Export backups so we can import snapshots into separate collections for restore verification + export_dir, exported_items = self._export_backups(collection_src) + # expect at least two exported snapshots (backup1 and backup2) + assert len(exported_items) >= 2, "Expected at least 2 exported snapshots for verification" + + # Attempt to import exported backup into new collection and RESTORE while tables exist -> expect fail + # create restore collections + coll_restore_1 = f"coll_restore_v1_{int(time.time())}" + coll_restore_2 = f"coll_restore_v2_{int(time.time())}" + self._create_backup_collection(coll_restore_1, [t1, t2]) + self._create_backup_collection(coll_restore_2, [t1, t2]) + + # import exported snapshots into restore collections + # imported_items are directories in exported_items; we'll import both + self._restore_import(export_dir, exported_items[0], coll_restore_1) + self._restore_import(export_dir, exported_items[1], coll_restore_2) + + # try RESTORE when tables already exist -> should fail + res_restore_when_exists = self._execute_yql(f"RESTORE `{coll_restore_1}`;") + assert res_restore_when_exists.exit_code != 0, "Expected RESTORE to fail when target tables already exist" + + # Remove all tables from DB (orders, products, extras) + self._drop_tables([t1, t2, "extra_table_2"]) + + # Now RESTORE coll_restore_1 (which corresponds to backup1) + res_restore1 = self._execute_yql(f"RESTORE `{coll_restore_1}`;") + assert res_restore1.exit_code == 0, f"RESTORE v1 failed: {res_restore1.std_err or res_restore1.std_out}" + + # verify schema/data/acl for backup1 + # verify data + self._verify_restored_table_data(t1, snapshot_stage1_t1) + self._verify_restored_table_data(t2, snapshot_stage1_t2) + + # verify schema + restored_schema_t1 = self._capture_schema(f"/Root/{t1}") + restored_schema_t2 = self._capture_schema(f"/Root/{t2}") + assert restored_schema_t1 == schema_stage1_t1, f"Schema for {t1} after restore v1 differs: expected {schema_stage1_t1}, got {restored_schema_t1}" + assert restored_schema_t2 == schema_stage1_t2, f"Schema for {t2} after restore v1 differs: expected {schema_stage1_t2}, got {restored_schema_t2}" + + # verify acl + restored_acl_t1 = self._capture_acl(f"/Root/{t1}") + restored_acl_t2 = self._capture_acl(f"/Root/{t2}") + # We compare that SHOW GRANTS output contains previously stored show_grants if present + if 'show_grants' in (acl_stage1_t1 or {}): + assert 'show_grants' in (restored_acl_t1 or {}) and acl_stage1_t1['show_grants'] in restored_acl_t1['show_grants'] + if 'show_grants' in (acl_stage1_t2 or {}): + assert 'show_grants' in (restored_acl_t2 or {}) and acl_stage1_t2['show_grants'] in restored_acl_t2['show_grants'] + + # === Remove all tables again and restore backup2 === + self._drop_tables([t1, t2]) # ignore errors + + res_restore2 = self._execute_yql(f"RESTORE `{coll_restore_2}`;") + assert res_restore2.exit_code == 0, f"RESTORE v2 failed: {res_restore2.std_err or res_restore2.std_out}" + + # verify data/schema/acl for backup2 + self._verify_restored_table_data(t1, snapshot_stage2_t1) + self._verify_restored_table_data(t2, snapshot_stage2_t2) + + restored_schema2_t1 = self._capture_schema(f"/Root/{t1}") + restored_schema2_t2 = self._capture_schema(f"/Root/{t2}") + assert restored_schema2_t1 == schema_stage2_t1, f"Schema for {t1} after restore v2 differs: expected {schema_stage2_t1}, got {restored_schema2_t1}" + assert restored_schema2_t2 == schema_stage2_t2, f"Schema for {t2} after restore v2 differs: expected {schema_stage2_t2}, got {restored_schema2_t2}" + + restored_acl2_t1 = self._capture_acl(f"/Root/{t1}") + restored_acl2_t2 = self._capture_acl(f"/Root/{t2}") + if 'show_grants' in (acl_stage2_t1 or {}): + assert 'show_grants' in (restored_acl2_t1 or {}) and acl_stage2_t1['show_grants'] in restored_acl2_t1['show_grants'] + if 'show_grants' in (acl_stage2_t2 or {}): + assert 'show_grants' in (restored_acl2_t2 or {}) and acl_stage2_t2['show_grants'] in restored_acl2_t2['show_grants'] + + # cleanup exported data + if os.path.exists(export_dir): + shutil.rmtree(export_dir)