**Notebook parameters, do not edit**

In [None]:
#period to process
period = 0
#service principal
tenant_id = ""
client_id = ""
client_secret = ""
#customer info
billing_account_id = ""
customer_name = ""
#lakehouse
lakehouse_cost_table = ""
lakehouse_log_table = ""
#warehouse
warehouse_name = ""
warehouse_schema = ""
warehouse_cost_table = ""


**Libraries and configuration of the notebook**

Modify variables of Lakehouse and/or Warehouse if required

In [None]:
# ==========================
# BIBLIOTECAS
# ==========================

import requests
import json
import datetime
import uuid
import time
from azure.identity import ClientSecretCredential
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, BooleanType, DateType, FloatType
from delta.tables import DeltaTable
from pyspark.sql.functions import lit,col,to_date,regexp_extract,current_date, year, month, dayofmonth

# ==========================
# CONFIGURACIÓN
# ==========================


# Inicializar Spark
spark = SparkSession.builder.getOrCreate()

# ==========================
# 1. Calcular periodo y autenticar
# ==========================
current_date = datetime.datetime.now() + datetime.timedelta(days=period * 30)
period_name = current_date.strftime("%Y%m")

try:
    credential = ClientSecretCredential(tenant_id=tenant_id, client_id=client_id, client_secret=client_secret)
    token = credential.get_token("https://management.azure.com/.default").token
except Exception as e:
    raise Exception(f"Error en autenticación: {e}")

**Initialize export of the data using the generateCostDetailsReport API and log the status**

In [None]:
# ==========================
# 2. Iniciar exportación y registrar log
# ==========================
uri = f"https://management.azure.com/providers/Microsoft.Billing/billingAccounts/{billing_account_id}/providers/Microsoft.CostManagement/generateCostDetailsReport?api-version=2022-05-01"
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
body = {"metric": "ActualCost", "billingPeriod": period_name}

#ID unico de job
job_id = str(uuid.uuid4())

#Fecha y hora de inicio
start_date_time = datetime.datetime.now()


#Crea solicitud de exportacion de datos a la API
response = requests.post(uri, headers=headers, data=json.dumps(body))
if response.status_code != 202:
    raise Exception(f"Error al generar reporte: {response.text}")

export_url = response.headers.get("Location")
print(f"Export iniciado: {export_url}")

# Registrar log inicial en tabla delta del lakehouse
log_df = spark.createDataFrame([(job_id,period_name, customer_name, export_url, "Starting",start_date_time,start_date_time)],
                               ["JobID","Period", "Customer", "ExportURL", "Status","Start","End"])
                            
log_df.write.mode("append").saveAsTable(lakehouse_log_table)

**Check the status of the export and log the result of the status**

In [None]:
# ==========================
# 3. Polling para revisar estado de la exportación
# ==========================
status_export = "Running"
max_retries = 15
retry_count = 0


# Leemos tabla de log
delta_log_table = DeltaTable.forName(spark, lakehouse_log_table)

while status_export == "Running" and retry_count < max_retries:
    try:
        status_response = requests.get(export_url, headers={"Authorization": f"Bearer {token}"})
        status_data = status_response.json()
        print(status_data)
        status_export = status_data.get("status")

        print(f"Intento {retry_count + 1}: Estado = {status}")

        retry_count += 1
        #Espera 30 segundos entre cada intento
        time.sleep(30)
    except:
        print("The file is not ready, trying again")
        time.sleep(30)

if status_export == "Completed":
    #La exportacion concluyó
    status = "File_exported"

    # Crear DataFrame con el nuevo estado
    update_df = spark.createDataFrame([(job_id,period_name, customer_name, export_url, status,start_date_time,start_date_time)],
                                        ["JobID","Period", "Customer", "ExportURL", "Status","Start","End"])

    # MERGE
    delta_log_table.alias("target").merge(
        update_df.alias("source"),
        "target.JobID = source.JobID"
    ).whenMatchedUpdate(set={
        "Status": "source.Status",
        "ExportURL": "source.ExportURL"
    }).execute()    

else:
    # Crear DataFrame para status error
    status = "Error_getting_file"
    update_df = spark.createDataFrame([(job_id,period_name, customer_name, export_url, status,start_date_time,start_date_time)],
                                        ["JobID","Period", "Customer", "ExportURL", "Status","Start","End"])

    # MERGE
    delta_log_table.alias("target").merge(
        update_df.alias("source"),
        "target.JobID = source.JobID"
    ).whenMatchedUpdate(set={
        "Status": "source.Status",
        "ExportURL": "source.ExportURL"
    }).execute()   

    #Lanzar exception para finalizar proceso
    raise Exception(f"Export falló o no completó. Estado final: {status}")

**Schema of the file exported by the API**

In [None]:
#Esquema del archivo de consumo

schema = StructType([
    StructField("InvoiceSectionName", StringType(), True),
    StructField("AccountName", StringType(), True),
    StructField("AccountOwnerId", StringType(), True),
    StructField("SubscriptionId", StringType(), True),
    StructField("SubscriptionName", StringType(), True),
    StructField("ResourceGroup", StringType(), True),
    StructField("ResourceLocation", StringType(), True),
    StructField("Date", StringType(), True),
    StructField("ProductName", StringType(), True),
    StructField("MeterCategory", StringType(), True),
    StructField("MeterSubCategory", StringType(), True),
    StructField("MeterId", StringType(), True),
    StructField("MeterName", StringType(), True),
    StructField("MeterRegion", StringType(), True),
    StructField("UnitOfMeasure", StringType(), True),
    StructField("Quantity", FloatType(), True),
    StructField("EffectivePrice", FloatType(), True),
    StructField("CostInBillingCurrency", FloatType(), True),
    StructField("CostCenter", StringType(), True),
    StructField("ConsumedService", StringType(), True),
    StructField("ResourceId", StringType(), True),
    StructField("Tags", StringType(), True),
    StructField("OfferId", StringType(), True),
    StructField("AdditionalInfo", StringType(), True),
    StructField("ServiceInfo1", StringType(), True),
    StructField("ServiceInfo2", StringType(), True),
    StructField("ResourceName", StringType(), True),
    StructField("ReservationId", StringType(), True),
    StructField("ReservationName", StringType(), True),
    StructField("UnitPrice", FloatType(), True),
    StructField("ProductOrderId", StringType(), True),
    StructField("ProductOrderName", StringType(), True),
    StructField("Term", StringType(), True),
    StructField("PublisherType", StringType(), True),
    StructField("PublisherName", StringType(), True),
    StructField("ChargeType", StringType(), True),
    StructField("Frequency", StringType(), True),
    StructField("PricingModel", StringType(), True),
    StructField("AvailabilityZone", StringType(), True),
    StructField("BillingAccountId", StringType(), True),
    StructField("BillingAccountName", StringType(), True),
    StructField("BillingCurrencyCode", StringType(), True),
    StructField("BillingPeriodStartDate", StringType(), True),
    StructField("BillingPeriodEndDate", StringType(), True),
    StructField("BillingProfileId", StringType(), True),
    StructField("BillingProfileName", StringType(), True),
    StructField("InvoiceSectionId", StringType(), True),
    StructField("IsAzureCreditEligible", StringType(), True),
    StructField("PartNumber", StringType(), True),
    StructField("PayGPrice", FloatType(), True),
    StructField("PlanName", StringType(), True),
    StructField("ServiceFamily", StringType(), True),
    StructField("CostAllocationRuleName", StringType(), True),
    StructField("benefitId", StringType(), True),
    StructField("benefitName", StringType(), True)
])


**Download generated CSV file by the API and then load it into a table of the lakehouse**

In [None]:
# ==========================
# 4. Descargar archivo CSV desde blobLink y cargar en Delta
# ==========================


# Extraer blobLink del JSON
blob_link = status_data.get("manifest", {}).get("blobs", [])[0].get("blobLink")
if not blob_link:
    raise Exception("No se encontró blobLink en la respuesta del API.")

print(f"Descargando archivo desde: {blob_link}")

# Descargar el archivo CSV
file_response = requests.get(blob_link)
if file_response.status_code != 200:
    raise Exception(f"Error al descargar archivo: {file_response.text}")

# Guardar en Lakehouse local path
local_file = f"/lakehouse/default/Files/{period_name}_costs.csv"
with open(local_file, "wb") as f:
    f.write(file_response.content)

print(f"Archivo CSV descargado en: {local_file}")

# Leer CSV con Spark
cost_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .schema(schema) \
    .load(f"Files/{period_name}_costs.csv")

# Escribir en Delta Table usada como staging en lakehouse
cost_df.write.mode("overwrite").saveAsTable(lakehouse_cost_table)


**Update the log record to completed**

In [None]:
# ==========================
# 5. Actualizar log como completado
# ==========================

#fecha y hora de finalizacion
end_date_time = datetime.datetime.now()

# Leemos tabla de log
delta_log_table = DeltaTable.forName(spark, lakehouse_log_table)

# Crear DataFrame con el estado final
final_df = spark.createDataFrame([(job_id,period_name, customer_name, export_url, "Completed",start_date_time,end_date_time)],
                                 ["JobID","Period", "Customer", "ExportURL", "Status","Start","End"])

# MERGE para actualizar o insertar
delta_log_table.alias("target").merge(
    final_df.alias("source"),
    "target.JobID = source.JobID"
).whenMatchedUpdate(set={
    "Status": "source.Status",
    "ExportURL": "source.ExportURL",
    "End": "source.End",
}).whenNotMatchedInsert(values={
    "JobID": "source.JobID",
    "Period": "source.Period",
    "Customer": "source.Customer",
    "ExportURL": "source.ExportURL",
    "Status": "source.Status",
    "Start": "source.Start",
    "End": "source.End"
}).execute()

print("Proceso completado y log actualizado con MERGE.")

**Pre processing of fields**

In [None]:
# ==========================
# 6. Procesamiento de campos
# ==========================
from pyspark.sql.functions import col, current_date, year, month, dayofmonth

# Función para extraer números de una columna
def extract_numbers(df, source_col, target_col="numeric_value"):
    """
    Extrae el primer número encontrado en la columna source_col y lo guarda en target_col.
    """
    return df.withColumn(target_col, regexp_extract(col(source_col), r'(\d+)', 1).cast("int"))


# Convertir la columna "Date" a tipo date con formato explícito
cost_df = cost_df.withColumn("Date", to_date(col("Date"), "MM/dd/yyyy"))

# Agrega campo UnitOfMeasureNumeric que sera utilizado en stored procedure del warehouse
cost_df = extract_numbers(cost_df, "UnitOfMeasure", "UnitOfMeasureNumeric")

# Cuando el periodo es 0 entonces borramos los datos del dia actual ya que estan incompletos

if period == 0:
    # Filtrar registros que NO son del día actual (para conservarlos)
    cost_df = cost_df.filter(
        ~(
            (year(col("Date")) == year(current_date())) &
            (month(col("Date")) == month(current_date())) &
            (dayofmonth(col("Date")) == dayofmonth(current_date()))
        )
    )

display(cost_df)


**Write the dataframe into a warehouse table**

In [None]:
import com.microsoft.spark.fabric

#Escribir datos en tabla de warehouse

cost_df.write.mode("append").synapsesql(f"{warehouse_name}.{warehouse_schema}.{warehouse_cost_table}")