In [None]:
import msal
import requests
import pandas as pd
import notebookutils

In [None]:
tenant_id = ""

In [None]:
tables = ["customerrecords","productrecords","salesrecords"]

In [None]:
keyvault_name = ""
keyvault_ir = f'https://{keyvault_name}.vault.azure.net/'
dataverse_prefix = "" # Example: cr8dc

client_id = notebookutils.credentials.getSecret(keyvault_ir,'dataverseFabricIntegrationClientId')
client_secret = notebookutils.credentials.getSecret(keyvault_ir,'dataverseFabricIntegrationClientSecret')
dataverse_name = "" # Example: org9q35785d
resource = f'https://{dataverse_name}.crm4.dynamics.com'

authority = f'https://login.microsoftonline.com/{tenant_id}'


entity_prefix = f'{dataverse_prefix}_'

In [None]:
def get_access_token():
    app = msal.ConfidentialClientApplication(
        client_id,
        authority=authority,
        client_credential=client_secret
    )
    token_response = app.acquire_token_for_client(scopes=[f"{resource}/.default"])
    return token_response.get("access_token")

def query_dataverse(api_url, access_token):
    headers = {
        "Authorization": f"Bearer {access_token}",
        "OData-MaxVersion": "4.0",
        "OData-Version": "4.0",
        "Accept": "application/json",
        "Content-Type": "application/json"
    }
    
    response = requests.get(api_url, headers=headers)

    if response.status_code == 200:
        data = response.json().get('value', [])
        return pd.DataFrame(data)
    else:
        print(f"Error: {response.status_code} - {response.text}")
        return None

def select_and_rename_columns(df, prefix):
    selected_columns = [col for col in df.columns if col.startswith(prefix)]
    renamed_columns = {col: col[len(prefix):] for col in selected_columns}
    new_df = df[selected_columns].rename(columns=renamed_columns)
    return new_df

In [None]:
# Demo Parameters
entity_name = entity_prefix+'salesrecords'
entity_url = f'{resource}/api/data/v9.2/{entity_name}'

# Query Dataverse
access_token = get_access_token()
result_df = query_dataverse(entity_url, access_token)

# Display Results
if result_df is not None:
    result_df = select_and_rename_columns(result_df,entity_prefix)
    display(result_df)

In [None]:
# Vytvoření trídy MountedWriter pro zápis do OneLake
class MountedWriter:
    def __init__(self, workspace_id, lakehouse_id, parent_folder_name):
        self.workspace = workspace_id
        self.lakehouse = lakehouse_id
        self.parent_folder_name = parent_folder_name
        self.mount_name = "/mnt/lakehouse"
        self.mount = notebookutils.fs.mount(
            f"abfss://{self.workspace}@onelake.dfs.fabric.microsoft.com/{self.lakehouse}",
            self.mount_name
        )

    def get_mounted_path(self):
        self.mount_path = notebookutils.fs.getMountPath(self.mount_name)
        return self.mount_path

    def check_or_create_existing_directory(self, folder_name):
        self.mount_path = self.get_mounted_path()
        output_dir = f"{self.mount_path}/Files/{self.parent_folder_name}/{folder_name}"
        notebookutils.fs.mkdirs(output_dir)

    def create_file(self, df_to_be_written, folder_name, file_name_parquet):
        self.check_or_create_existing_directory(folder_name)
        output_dir = f"{self.mount_path}/Files/{self.parent_folder_name}/{folder_name}"
        output_file = f"{output_dir}/{file_name_parquet}"
        df_to_be_written.to_parquet(output_file)

    def end_mounting(self):
        notebookutils.fs.unmount(self.mount_name)

In [None]:
workspace_id = ""
lakehouse_id = ""

parent_folder_name = "Dataverse"
folder_name = "Sales"
# Will create folder structure: /mnt/lakehouse/Files/Dataverse/Sales

file_name = "export"
file_name_parquet = f"{file_name}.parquet"

In [None]:
mount_writer = MountedWriter(
    workspace_id = workspace_id, 
    lakehouse_id = lakehouse_id, 
    parent_folder_name = parent_folder_name
)

In [None]:
mount_writer.create_file(df_to_be_written = result_df, folder_name = folder_name, file_name_parquet = file_name_parquet)

In [None]:
for table in tables:
    entity_name = entity_prefix+table
    entity_url = f'{resource}/api/data/v9.2/{entity_name}'

    print(f"Currently handling table: {table}")

    # Query Dataverse
    access_token = get_access_token()
    result_df = query_dataverse(entity_url, access_token)

    # Wrtite Results
    if result_df is not None:
        result_df = select_and_rename_columns(result_df,entity_prefix)
        mount_writer.create_file(df_to_be_written = result_df, folder_name = table, file_name_parquet = file_name_parquet)
        print(f"Data from table '{table}' has been written do Lakehouse '{lakehouse_id}'")

In [None]:
notebookutils.notebook.exit("Done!")