In [None]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
from dotenv import load_dotenv
import os

# Carregar o arquivo .env com verificação de erro
try:
    load_dotenv('C:\\Users\\Lincoln\\source\\repos\\lakehouse\\delta\\.env')
except Exception as e:
    print(f"Erro ao carregar o arquivo .env: {str(e)}")
    raise

# Definir todas as variáveis necessárias com verificação
required_vars = [
    "AZURE_ACCOUNT_NAME",
    "AZURE_CLIENT_ID",
    "AZURE_ACCOUNT_KEY",
    "AZURE_ACCOUNT_SECRET"
]

# Verificar se todas as variáveis estão definidas
missing_vars = []
for var in required_vars:
    value = os.getenv(var)
    if value is None:
        # Defina valores padrão para desenvolvimento/teste
        os.environ[var] = f"dummy-{var.lower()}"
        print(f"Variável de ambiente '{var}' não encontrada. Usando valor padrão para desenvolvimento.")

# Criar o dicionário storage_options apenas após verificar todas as variáveis
storage_options = {
    "AZURE_STORAGE_ACCOUNT_NAME": os.getenv("AZURE_STORAGE_ACCOUNT_NAME"),
    "AZURE_STORAGE_ACCESS_KEY": os.getenv("AZURE_STORAGE_ACCESS_KEY"),
    "AZURE_STORAGE_CLIENT_ID": os.getenv("AZURE_STORAGE_CLIENT_ID"),
    "AZURE_STORAGE_CLIENT_SECRET": os.getenv("AZURE_STORAGE_CLIENT_SECRET")
}

print("Variáveis carregadas com sucesso:", storage_options)



In [None]:
import duckdb

# Necessário para escrita no Delta Lake
from delta.tables import DeltaTable
# Configurar Spark com Delta Lake
builder = SparkSession.builder.appName("DeltaLakeExample") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.azure.account.key." + os.getenv("AZURE_STORAGE_ACCOUNT_NAME") + ".blob.core.windows.net", os.getenv("AZURE_STORAGE_ACCESS_KEY"))

spark = builder.getOrCreate()

def escreve_delta(data, path, mode):
    uri = f'az://bronze/delta/delta-operacoes/{path}'
    try:
        data.write.format("delta") \
            .mode(mode) \
            .option("overwriteSchema", "true") \
            .save(uri)
        print("Dados escritos com sucesso no Delta Lake!")
    except Exception as e:   print(f"Erro ao escrever dados no Delta Lake: {e}")

# %%
def ler_delta(path):
    return DeltaTable.forPath(spark, f'az://bronze/delta/delta-operacoes/{path}')

con = duckdb.connect()
df = con.sql("SELECT * FROM 'C:\\Users\\Lincoln\\source\\repos\\lakehouse\\categories_test.csv'").to_df()
df

# %%
#con.close()



In [None]:
def escreve_delta(data, path, mode):
    uri = f'az://bronze/delta/delta-operacoes/{path}'
    try:
        write_deltalake(uri,
                         data,
                         mode=mode,
                         storage_options=storage_options)
        print("Dados escritos com sucesso no Delta Lake!")
    except DeltaError as e:
        print(f"Erro ao escrever dados no Delta Lake: {e}")

def ler_delta(path):
    return DeltaTable(f'az://bronze/delta/delta-operacoes/{path}', storage_options=storage_options)

df = con.sql("SELECT * FROM 'C:\\Users\\Lincoln\\source\\repos\\lakehouse\\categories_test.csv'").to_df()
df

con.close()

escreve_delta(df, 'categories', mode='overwrite')

dados = ler_delta('categories')
dados.to_pandas()

dt = ler_delta('categories')
#dt.to_pandas()
versao = dt.version()
print("Versão da tabela Delta:", versao)

# %%
dt.delete(dt['category_id'] > 4)

# %%
df_dt = dt.to_pandas()
df_filtrado = df_dt[df_dt["category_id"] <= 4]

# Sobrescrever a tabela Delta com os dados filtrados
df_filtrado
#escreve_delta(df_filtrado, 'categories', mode='overwrite')

# %%
# Usando o objeto dt já carregado com deltalake
try:
    dt = ler_delta('categories')
    print("Tabela Delta carregada com sucesso!")
    print("Versão da tabela Delta:", dt.version())
    # Exibir algumas informações básicas sobre a tabela
    print("Caminho da tabela:", dt.path)
    print("Número de arquivos:", len(dt.files()))
    print("Número de partições:", len(dt.to_pandas().columns))
    print("Número de versões:", dt.history().count())
    print("Número de colunas:", len(dt.to_pandas().columns))
    print("Número de linhas:", dt.to_pandas().shape[0])
    print("Colunas:", dt.to_pandas().columns.tolist())
    print("Schema:", dt.schema().json())
except DataError as e:
    print(f"Tabela não encontrada ou erro: {str(e)}")
from delta import DeltaTable, write_deltalake, DeltaError, DataError    


In [None]:
def ler_delta(path):
    return DeltaTable(f'az://bronze/delta/delta-operacoes/{path}', storage_options=storage_options)

In [None]:
df = con.sql("SELECT * FROM 'C:\\Users\\Lincoln\\source\\repos\\lakehouse\\categories_test.csv'").to_df()
df

In [None]:
con.close()

In [None]:
escreve_delta(df, 'categories', mode='overwrite')

In [None]:
dados = ler_delta('categories')
dados.to_pandas()

In [None]:
dt = ler_delta('categories')
#dt.to_pandas()
versao = dt.version()
print("Versão da tabela Delta:", versao)

In [None]:
dt.delete(dt['category_id'] > 4)

In [None]:
df_dt = dt.to_pandas()
df_filtrado = df_dt[df_dt["category_id"] <= 4]

# Sobrescrever a tabela Delta com os dados filtrados
df_filtrado
#escreve_delta(df_filtrado, 'categories', mode='overwrite')

In [None]:
# Usando o objeto dt já carregado com deltalake

try:
    # Mostrar informações básicas da tabela Delta
    print("Informações da tabela:")
    print("Número de linhas:", dt.to_pandas().shape[0])
    print("Colunas:", dt.to_pandas().columns.tolist())
    print("Schema:", dt.schema().json())
except DeltaError as e:
    print(f"Tabela não encontrada ou erro: {str(e)}")