# Script: Recreate Metastore

## Objetivo

Este script tem como objetivo **recriar o metastore** do Spark/Hive com base na estrutura de diretórios existente no caminho do warehouse.  
Ele é útil em cenários onde o metastore foi perdido ou corrompido, como no caso da reinicialização do cluster, mas os dados físicos (arquivos Delta) ainda estão presentes no armazenamento DBFS.

## Funcionamento

O script percorre o diretório base (`/user/hive/warehouse/`), onde os databases e tabelas do Hive/Spark são armazenados fisicamente, e recria:

- **Databases**, a partir de pastas com sufixo `.db`
- **Tabelas Delta**, baseando-se nas subpastas dos databases

O Spark infere o schema automaticamente para recriar as tabelas no catálogo, assumindo que os dados estão em formato **Delta Lake**.

## Observações 
- O script assume que todas as tabelas são do tipo Delta Lake
- Utiliza dbutils.fs.ls, que é específico do ambiente Databricks
- Requer que os arquivos estejam íntegros e acessíveis no caminho do warehouse


In [0]:
from pyspark.sql import SparkSession
import os

# Inicializa Spark
spark = SparkSession.builder \
    .appName("RecreateMetastore") \
    .enableHiveSupport() \
    .getOrCreate()

# Caminho base onde os databases ficam salvos
warehouse_path = "/user/hive/warehouse/"

# Lista os databases (pastas terminando com .db)
for item in dbutils.fs.ls(warehouse_path):
    if item.isDir() and item.name.endswith(".db/"):
        database_path = item.path
        db_name = os.path.basename(os.path.normpath(database_path)).replace(".db", "")

        # Cria o database se não existir
        spark.sql(f"CREATE DATABASE IF NOT EXISTS `{db_name}` LOCATION '{database_path}'")
        print(f"✅ Database '{db_name}' recriado!")

        # Agora percorre as tabelas dentro do database
        for table in dbutils.fs.ls(database_path):
            if table.isDir():
                table_path = table.path
                table_name = os.path.basename(os.path.normpath(table_path))

                try:
                    # Cria a tabela Delta com base no caminho (Spark infere schema automaticamente)
                    spark.sql(f"""
                        CREATE TABLE IF NOT EXISTS `{db_name}`.`{table_name}`
                        USING DELTA
                        LOCATION '{table_path}'
                    """)
                    print(f"✅ Tabela '{db_name}.{table_name}' recriada!")
                except Exception as e:
                    print(f"❌ Falha ao recriar tabela '{db_name}.{table_name}': {e}")


✅ Database 'business' recriado!
✅ Tabela 'business.d_calendario' recriada!
✅ Tabela 'business.d_players' recriada!
✅ Tabela 'business.d_ranking' recriada!
✅ Tabela 'business.d_tournaments' recriada!
✅ Tabela 'business.f_matches' recriada!
✅ Database 'raw' recriado!
✅ Tabela 'raw.matches' recriada!
✅ Tabela 'raw.players' recriada!
✅ Tabela 'raw.rankings' recriada!
✅ Database 'staging' recriado!
✅ Tabela 'staging.player_id_mapping' recriada!
✅ Database 'trusted' recriado!
❌ Falha ao recriar tabela 'trusted._delta_log': 
You are trying to create an external table `spark_catalog`.`trusted`.`_delta_log`
from `dbfs:/user/hive/warehouse/trusted.db/_delta_log` using Delta, but there is no transaction log present at
`dbfs:/user/hive/warehouse/trusted.db/_delta_log/_delta_log`. Check the upstream job to make sure that it is writing using
format("delta") and that the path is the root of the table.

To learn more about Delta, see dbfs:/user/hive/warehouse/trusted.db/_delta_log
✅ Tabela 'trusted.ma

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.getActiveSession()

# Listar todas as tabelas no schema 'business'
tables = spark.catalog.listTables("business")

# Gerar os DDLs para verificar as tabelas do modelo final
for table in tables:
    table_name = table.name
    ddl = spark.sql(f"SHOW CREATE TABLE business.{table_name}").collect()[0][0]
    print(f"-- DDL da tabela: business.{table_name} --\n{ddl}\n")


-- DDL da tabela: business.d_calendario --
CREATE TABLE spark_catalog.business.d_calendario (
  id_data INT,
  data DATE,
  ano INT,
  mes INT,
  nome_mes STRING)
USING delta
LOCATION 'dbfs:/user/hive/warehouse/business.db/d_calendario'
TBLPROPERTIES (
  'delta.minReaderVersion' = '1',
  'delta.minWriterVersion' = '2')


-- DDL da tabela: business.d_players --
CREATE TABLE spark_catalog.business.d_players (
  player_id BIGINT,
  player_name STRING,
  hand STRING,
  height DOUBLE,
  ioc STRING,
  date_of_birth DOUBLE)
USING delta
LOCATION 'dbfs:/user/hive/warehouse/business.db/d_players'
TBLPROPERTIES (
  'delta.minReaderVersion' = '1',
  'delta.minWriterVersion' = '2')


-- DDL da tabela: business.d_ranking --
CREATE TABLE spark_catalog.business.d_ranking (
  player BIGINT,
  ranking_date BIGINT,
  rank BIGINT,
  points DOUBLE)
USING delta
LOCATION 'dbfs:/user/hive/warehouse/business.db/d_ranking'
TBLPROPERTIES (
  'delta.minReaderVersion' = '1',
  'delta.minWriterVersion' = '2')


-- 