# Fix invoice-details Delta Table Schema

This notebook will:
1. Delete the old invoice-details delta table with incompatible schema
2. Create a new one with proper nested ChargeAllocations structure
3. Write the invoice record with correct schema using PySpark

In [None]:
# Install required packages
!pip install azure-storage-file-datalake azure-identity delta-spark pyspark

In [None]:
from azure.storage.filedatalake import DataLakeServiceClient
from azure.identity import DefaultAzureCredential
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from delta import *
import json

# Azure Storage configuration
storage_account = "maccsynapsedev"
container = "macc"
table_path = "data/ingestion/DeltaTables/invoice-details.delta"

print("✅ Imports successful")

In [None]:
# Step 1: Delete all files in the old delta table directory
credential = DefaultAzureCredential()
service_client = DataLakeServiceClient(
    account_url=f"https://{storage_account}.dfs.core.windows.net",
    credential=credential
)

file_system_client = service_client.get_file_system_client(container)

# List and delete all files in the directory
paths = file_system_client.get_paths(path=table_path)
deleted_count = 0
for path in paths:
    if not path.is_directory:
        file_client = file_system_client.get_file_client(path.name)
        file_client.delete_file()
        deleted_count += 1
        print(f"Deleted: {path.name}")

print(f"\n✅ Deleted {deleted_count} old files")

In [None]:
# Step 2: Initialize Spark with Delta Lake support
builder = SparkSession.builder \
    .appName("FixInvoiceDetailsSchema") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .config("fs.azure.account.auth.type", "OAuth") \
    .config("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider") \
    .config("fs.azure.account.oauth2.client.id", "<client-id>") \
    .config("fs.azure.account.oauth2.client.secret", "<client-secret>") \
    .config("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant-id>/oauth2/token")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

print("✅ Spark session initialized")

In [None]:
# Step 3: Define the proper schema with nested structures
charge_allocations_schema = ArrayType(
    StructType([
        StructField("chargeAmount", DoubleType(), True),
        StructField("isPaymentInstrumentTaxExempt", BooleanType(), True),
        StructField("paymentInstrumentType", StructType([
            StructField("family", StringType(), True),
            StructField("type", StringType(), True)
        ]), True),
        StructField("paymentReferences", ArrayType(StringType()), True),
        StructField("taxDetails", ArrayType(StringType()), True)
    ])
)

# Full schema for invoice-details
invoice_schema = StructType([
    StructField("AccountId", StringType(), True),
    StructField("Action", StringType(), True),
    StructField("AssetFriendlyName", StringType(), True),
    StructField("AssetId", StringType(), True),
    StructField("Availability", StringType(), True),
    StructField("AvailabilityReference", StringType(), True),
    StructField("BillingCurrencyCode", StringType(), True),
    StructField("BillingGroupId", StringType(), True),
    StructField("BillingPeriodEndDate", TimestampType(), True),
    StructField("BillingPeriodId", StringType(), True),
    StructField("BillingPeriodStartDate", TimestampType(), True),
    StructField("BillingPlanCurrency", StringType(), True),
    StructField("BillingPlanPrice", StringType(), True),
    StructField("BillingRecordLineItemReferences", StringType(), True),
    StructField("CaptureId", StringType(), True),
    StructField("CaptureVersion", StringType(), True),
    StructField("ChargeAllocations", charge_allocations_schema, True),
    StructField("ChargeType", StringType(), True),
    StructField("CompanyCode", StringType(), True),
    StructField("CompanyName", StringType(), True),
    StructField("CountryCode", StringType(), True),
    StructField("CreditDocumentDisplayNumber", StringType(), True),
    StructField("CurrencyCode", StringType(), True),
    StructField("CustomerId", StringType(), True),
    StructField("CustomerIntent", StringType(), True),
    StructField("DiscountDescriptions", StringType(), True),
    StructField("DiscountPercentage", StringType(), True),
    StructField("DisplayDescription", StringType(), True),
    StructField("DocumentCreatedDatetime", TimestampType(), True),
    StructField("DocumentReference", StringType(), True),
    StructField("DocumentState", StringType(), True),
    StructField("DocumentType", StringType(), True),
    StructField("EventDate", TimestampType(), True),
    StructField("ExchangeRate", StringType(), True),
    StructField("ExchangeRateDate", TimestampType(), True),
    StructField("ExcludedPaymentInstruments", StringType(), True),
    StructField("GroupId", StringType(), True),
    StructField("IngestionDate", TimestampType(), True),
    StructField("IngestionTimestamp", TimestampType(), True),
    StructField("InvoiceId", StringType(), True),
    StructField("InvoiceLineItemId", StringType(), True),
    StructField("IsBillImmediateDocument", StringType(), True),
    StructField("IsBillingPlan", StringType(), True),
    StructField("IsConsumption", BooleanType(), True),
    StructField("IsDuplicateRebill", BooleanType(), True),
    StructField("IsImmediateSettleDocument", StringType(), True),
    StructField("IsTelco", BooleanType(), True),
    StructField("IsThirdParty", BooleanType(), True),
    StructField("IsTrial", BooleanType(), True),
    StructField("LineItemCreatedDatetime", TimestampType(), True),
    StructField("LineItemDetails", ArrayType(StringType()), True),
    StructField("ListUnitPrice", StringType(), True),
    StructField("OrderId", StringType(), True),
    StructField("OrderSetId", StringType(), True),
    StructField("OrderVersion", StringType(), True),
    StructField("OriginalDocumentId", StringType(), True),
    StructField("PartNumber", StringType(), True),
    StructField("PricingCurrencyCode", StringType(), True),
    StructField("ProducerId", StringType(), True),
    StructField("Product", StringType(), True),
    StructField("ProductDescription", StringType(), True),
    StructField("ProductFamily", StringType(), True),
    StructField("ProductId", StringType(), True),
    StructField("ProjectId", StringType(), True),
    StructField("ProjectName", StringType(), True),
    StructField("PromotionId", StringType(), True),
    StructField("PublisherId", StringType(), True),
    StructField("PublisherName", StringType(), True),
    StructField("PurchaseRecordLineItemReference", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("QuoteId", StringType(), True),
    StructField("ReasonCode", StringType(), True),
    StructField("RebillFor", StringType(), True),
    StructField("RebillForDocumentCreatedDatetime", StringType(), True),
    StructField("RecipientInfo", StringType(), True),
    StructField("ResellerMpnId", StringType(), True),
    StructField("SchemaVersion", StringType(), True),
    StructField("ServiceFamily", StringType(), True),
    StructField("ServicePeriodEndDate", TimestampType(), True),
    StructField("ServicePeriodStartDate", TimestampType(), True),
    StructField("Sku", StringType(), True),
    StructField("SkuId", StringType(), True),
    StructField("SubTotal", StringType(), True),
    StructField("TaxCalculationId", StringType(), True),
    StructField("TaxDetails", StringType(), True),
    StructField("TaxTotal", StringType(), True),
    StructField("TaxationAddress", StringType(), True),
    StructField("TermDescription", StringType(), True),
    StructField("TermEndDate", TimestampType(), True),
    StructField("TermId", StringType(), True),
    StructField("TermStartDate", TimestampType(), True),
    StructField("Total", StringType(), True),
    StructField("TotalRetailPrice", StringType(), True),
    StructField("UnitPrice", StringType(), True),
    StructField("UnitType", StringType(), True),
    StructField("Units", StringType(), True)
])

print("✅ Schema defined")

In [None]:
# Step 4: Create the invoice record data
from datetime import datetime

invoice_data = [{
    "AccountId": "ACC-org-ayla2",
    "Action": "Purchase",
    "AssetFriendlyName": "Azure VM",
    "AssetId": "asset-ayla2",
    "Availability": "Available",
    "AvailabilityReference": "AV-ayla2",
    "BillingCurrencyCode": "USD",
    "BillingGroupId": "bg-ayla2",
    "BillingPeriodEndDate": datetime(2025, 12, 31, 23, 59, 59),
    "BillingPeriodId": "BP-2025-10-ayla2",
    "BillingPeriodStartDate": datetime(2025, 10, 1, 0, 0, 0),
    "BillingPlanCurrency": "USD",
    "BillingPlanPrice": "0",
    "BillingRecordLineItemReferences": "BRLI-ayla2",
    "CaptureId": "CAP-ayla2",
    "CaptureVersion": "v1",
    "ChargeAllocations": [{
        "chargeAmount": 10.56,
        "isPaymentInstrumentTaxExempt": None,
        "paymentInstrumentType": {
            "family": "commitment",
            "type": "prepay"
        },
        "paymentReferences": [],
        "taxDetails": []
    }],
    "ChargeType": "Usage",
    "CompanyCode": "MS001",
    "CompanyName": "Ayla Test Org 2",
    "CountryCode": "US",
    "CreditDocumentDisplayNumber": "",
    "CurrencyCode": "USD",
    "CustomerId": "org-ayla2",
    "CustomerIntent": "Standard",
    "DiscountDescriptions": "None",
    "DiscountPercentage": "0",
    "DisplayDescription": "Azure VM Usage for commitment test",
    "DocumentCreatedDatetime": datetime(2025, 10, 26, 4, 0, 0),
    "DocumentReference": "G20251001957621847",
    "DocumentState": "Active",
    "DocumentType": "Invoice",
    "EventDate": datetime(2025, 10, 26, 4, 0, 0),
    "ExchangeRate": "1",
    "ExchangeRateDate": datetime(2025, 10, 26, 0, 0, 0),
    "ExcludedPaymentInstruments": "None",
    "GroupId": "GRP-ayla2",
    "IngestionDate": datetime(2025, 10, 26, 4, 0, 0),
    "IngestionTimestamp": datetime(2025, 10, 26, 4, 0, 0),
    "InvoiceId": "INV-G20251001957621847-008",
    "InvoiceLineItemId": "LINE-G20251001957621847-008",
    "IsBillImmediateDocument": "No",
    "IsBillingPlan": "Standard",
    "IsConsumption": True,
    "IsDuplicateRebill": False,
    "IsImmediateSettleDocument": "No",
    "IsTelco": False,
    "IsThirdParty": False,
    "IsTrial": False,
    "LineItemCreatedDatetime": datetime(2025, 10, 26, 4, 0, 0),
    "LineItemDetails": [],
    "ListUnitPrice": "0.096",
    "OrderId": "ORD-G20251001957621847",
    "OrderSetId": "ORDSET-ayla2",
    "OrderVersion": "v1",
    "OriginalDocumentId": "ORIG-ayla2",
    "PartNumber": "AAA-16582",
    "PricingCurrencyCode": "USD",
    "ProducerId": "copilot-test-producer",
    "Product": "Azure Virtual Machines",
    "ProductDescription": "Azure VM Compute",
    "ProductFamily": "Compute",
    "ProductId": "DZH318Z0BQFF",
    "ProjectId": "proj-ayla2",
    "ProjectName": "Ayla Test Project",
    "PromotionId": "",
    "PublisherId": "PUB-MS",
    "PublisherName": "Microsoft",
    "PurchaseRecordLineItemReference": "PRLI-ayla2",
    "Quantity": 100,
    "QuoteId": "Q-ayla2",
    "ReasonCode": "Usage",
    "RebillFor": "",
    "RebillForDocumentCreatedDatetime": "2025-01-01 00:00:00",
    "RecipientInfo": "org-ayla2",
    "ResellerMpnId": "",
    "SchemaVersion": "v1",
    "ServiceFamily": "Compute",
    "ServicePeriodEndDate": datetime(2025, 12, 31, 23, 59, 59),
    "ServicePeriodStartDate": datetime(2025, 10, 1, 0, 0, 0),
    "Sku": "Standard_D2s_v3",
    "SkuId": "SKU-0001",
    "SubTotal": "9.6",
    "TaxCalculationId": "TAX-ayla2",
    "TaxDetails": "No tax",
    "TaxTotal": "0.96",
    "TaxationAddress": "US Address",
    "TermDescription": "3 months",
    "TermEndDate": datetime(2025, 12, 31, 23, 59, 59),
    "TermId": "TERM-ayla2",
    "TermStartDate": datetime(2025, 10, 1, 0, 0, 0),
    "Total": "10.56",
    "TotalRetailPrice": "10.56",
    "UnitPrice": "0.096",
    "UnitType": "Hours",
    "Units": "100"
}]

print("✅ Invoice data prepared")

In [None]:
# Step 5: Create DataFrame and write to Delta table
df = spark.createDataFrame(invoice_data, schema=invoice_schema)

# Show the data
print("Data to write:")
df.show(truncate=False)

print("\nSchema:")
df.printSchema()

# Write to Delta table
delta_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/{table_path}"
df.write.format("delta").mode("append").save(delta_path)

print(f"\n✅ Successfully wrote invoice record to {delta_path}")

In [None]:
# Step 6: Verify the write
verify_df = spark.read.format("delta").load(delta_path)
print(f"Total records in delta table: {verify_df.count()}")
print("\nSample record:")
verify_df.select("InvoiceId", "BillingGroupId", "DocumentReference", "ChargeAllocations", "Total").show(truncate=False)

print("\n✅ Verification complete!")