In [29]:
# Standard Library Imports
import uuid
import datetime

# Third-Party Imports
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from pyspark.sql.functions import lit

# Local Imports
import sempy.fabric as fabric

StatementMeta(, 698799bc-0eff-4d5d-96da-4c27ebe6306e, 41, Finished, Available, Finished)

In [30]:
# Retrieve DAX Queries

df = spark.sql(
    "SELECT Repository_ID, Object_ID, Path, Semantic_Model_Name, DAX_Query FROM DQV_Example.`DAX Query View Test Files` LIMIT 1000"
)

workspace_name = mssparkutils.env.getWorkspaceName()
df = df.withColumn("workspace_name", lit(workspace_name))

def clean_data(df):
    """
    Filter rows where 'Path' does NOT end with the given suffixes.
    """
    df = df.filter(
        ~df["Path"].rlike(r"\.RLS\.Test\.dax$|\.RLS\.Tests\.dax$")
    )
    return df

df_clean = clean_data(df)
display(df_clean)

StatementMeta(, 698799bc-0eff-4d5d-96da-4c27ebe6306e, 42, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 582a9720-4206-4296-b2de-8a3c46eb9c4d)

In [31]:
# Retrieve DAX Queries

from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from pyspark.sql.functions import lit
import uuid
import datetime
import sempy.fabric as fabric

df = spark.sql(
    "SELECT * FROM DQV_Example.`DAX Query View Test Files` LIMIT 1000"
)

workspace_name = mssparkutils.env.getWorkspaceName()
df = df.withColumn("workspace_name", lit(workspace_name))

def clean_data(df):
    """
    Filter rows where 'Path' does NOT end with the given suffixes.
    """
    df = df.filter(
        ~df["Path"].rlike(r"\.RLS\.Test\.dax$|\.RLS\.Tests\.dax$")
    )
    return df

df_clean = clean_data(df)
display(df_clean)

# Define schema explicitly
schema = StructType([
    StructField("Run_GUID", StringType(), True),
    StructField("Test_Name", StringType(), True),
    StructField("Expected_Value", StringType(), True),
    StructField("Actual_Value", StringType(), True),
    StructField("Passed", StringType(), True),
    StructField("Timestamp", TimestampType(), True),
    StructField("Semantic_Model", StringType(), True),
    StructField("Workspace_Name", StringType(), True),
])

# Function to process query and return results
def send_query(row):
    """
    Process DAX query results and return structured data.
    """
    dax_query = row["DAX_Query"]
    semantic_model = row["Semantic_Model_Name"]
    workspace_name = row["workspace_name"]

    # Generate Run GUID and Timestamp
    run_uuid = str(uuid.uuid4())
    run_dt = datetime.datetime.now()

    # Replace with actual function call
    dqs = fabric.evaluate_dax(semantic_model, dax_query, workspace_name)

    # Ensure dqs contains expected columns
    expected_cols = {"[TestName]", "[ExpectedValue]", "[ActualValue]", "[Passed]"}
    if not expected_cols.issubset(dqs.columns):
        raise ValueError(f"Missing expected columns in the DAX result. Found: {list(dqs.columns)}")

    # Extract and rename fields
    test_results = dqs[["[TestName]", "[ExpectedValue]", "[ActualValue]", "[Passed]"]].copy()
    test_results["Run_GUID"] = run_uuid
    test_results["Test_Name"] = test_results["[TestName]"]
    test_results["Expected_Value"] = test_results["[ExpectedValue]"].astype(str)
    test_results["Actual_Value"] = test_results["[ActualValue]"].astype(str)
    test_results["Passed"] = test_results["[Passed]"].astype(str)
    test_results["Timestamp"] = run_dt
    test_results["Semantic_Model"] = semantic_model
    test_results["Workspace_Name"] = workspace_name

    # Convert each row into a Row object for Spark RDD
    return [
        Row(
            Run_GUID=row["Run_GUID"],
            Test_Name=row["Test_Name"],
            Expected_Value=row["Expected_Value"],
            Actual_Value=row["Actual_Value"],
            Passed=row["Passed"],
            Timestamp=datetime.datetime.strptime(str(row["Timestamp"]), "%Y-%m-%d %H:%M:%S.%f"),
            Semantic_Model=row["Semantic_Model"],
            Workspace_Name=row["Workspace_Name"],
        )
        for _, row in test_results.iterrows()
    ]

# Convert DataFrame to RDD, process each row, and convert back with schema
df_rdd = df_clean.rdd.flatMap(send_query)
df_results = spark.createDataFrame(df_rdd, schema)

# Display results in Spark DataFrame
df_results.show(truncate=False)

StatementMeta(, 698799bc-0eff-4d5d-96da-4c27ebe6306e, 43, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 02b1ddfd-8d33-4eb8-9253-5414e713cc4f)

+------------------------------------+-----------------------------------------------------------------------------------------+------------------------------------+------------------------------------+------+--------------------------+--------------+--------------+
|Run_GUID                            |Test_Name                                                                                |Expected_Value                      |Actual_Value                        |Passed|Timestamp                 |Semantic_Model|Workspace_Name|
+------------------------------------+-----------------------------------------------------------------------------------------+------------------------------------+------------------------------------+------+--------------------------+--------------+--------------+
|2b0d72ba-bee2-455a-9674-c96109253a91|Calculated Column: Month Year column should be blank when no date (1/1/0001) is selected.|Not Available                       |Not Available                     

## RLS Example

In [32]:
%pip install semantic-link-labs

StatementMeta(, 698799bc-0eff-4d5d-96da-4c27ebe6306e, 48, Finished, Available, Finished)


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.



In [40]:
# Imports
import uuid
import datetime

# Third-Party Imports
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from pyspark.sql.functions import lit

# Local Imports
import sempy_labs as labs

StatementMeta(, 698799bc-0eff-4d5d-96da-4c27ebe6306e, 57, Finished, Available, Finished)

In [45]:
# Retrieve RLS DAX Query Data
rls_df = spark.sql(
    "SELECT  Repository_ID, User, Object_ID, Semantic_Model_Name, DAX_Query FROM DQV_Example.`RLS DAX Query View Test Files` LIMIT 1000"
)

workspace_name = mssparkutils.env.getWorkspaceName()
rls_df = rls_df.withColumn('workspace_name', lit(workspace_name))

# Display DataFrame
display(rls_df)


StatementMeta(, 698799bc-0eff-4d5d-96da-4c27ebe6306e, 62, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 3e771fb4-ebd5-4b3f-9f0e-145636213ce7)

In [46]:
# Define schema explicitly
rls_schema = StructType([
    StructField('Run_GUID', StringType(), True),
    StructField('Test_Name', StringType(), True),
    StructField('User', StringType(), True),
    StructField('Expected_Value', StringType(), True),
    StructField('Actual_Value', StringType(), True),
    StructField('Passed', StringType(), True),
    StructField('Timestamp', TimestampType(), True),
    StructField('Semantic_Model', StringType(), True),
    StructField('Workspace_Name', StringType(), True)
])

# Function to process query and return results
def send_rls_query(row):
    dax_query = row["DAX_Query"]
    semantic_model = row["Semantic_Model_Name"]
    workspace_name = row["workspace_name"]
    impersonated_user = row["User"]

    # Generate Run GUID and Timestamp
    run_uuid = str(uuid.uuid4())
    run_dt = datetime.datetime.now()  # Ensure it's a Python datetime object

    # Replace with actual function call
    # dqs = fabric.evaluate_dax(semantic_model, dax_query, workspace_name)  # Ensure output is a DataFrame
    dqs = labs.evaluate_dax_impersonation(
        semantic_model, dax_query, impersonated_user, workspace_name
    )

    # Ensure dqs contains expected columns
    expected_cols = {'[TestName]', '[ExpectedValue]', '[ActualValue]', '[Passed]'}
    if not expected_cols.issubset(dqs.columns):
        raise ValueError(f"Missing expected columns in the DAX result. Found: {list(dqs.columns)}")

    # Extract and rename fields
    test_results = dqs[['[TestName]', '[ExpectedValue]', '[ActualValue]', '[Passed]']].copy()
    test_results['Run_GUID'] = run_uuid
    test_results['Test_Name'] = test_results['[TestName]']
    test_results['Expected_Value'] = test_results['[ExpectedValue]'].astype(str)
    test_results['Actual_Value'] = test_results['[ActualValue]'].astype(str)
    test_results['Passed'] = test_results['[Passed]'].astype(str)
    
    # Ensure Timestamp is correctly converted
    test_results['Timestamp'] = run_dt  # Keep as Python datetime object

    test_results['Semantic_Model'] = semantic_model
    test_results['Workspace_Name'] = workspace_name

    # Convert each row into a Row object for Spark RDD
    return [
        Row(
            Run_GUID=row['Run_GUID'],
            Test_Name=row['Test_Name'],
            User=impersonated_user,
            Expected_Value=row['Expected_Value'],
            Actual_Value=row['Actual_Value'],
            Passed=row['Passed'],
            Timestamp=datetime.datetime.strptime(str(row['Timestamp']), "%Y-%m-%d %H:%M:%S.%f"),  # Explicit conversion
            Semantic_Model=row['Semantic_Model'],
            Workspace_Name=row['Workspace_Name']
        ) 
        for _, row in test_results.iterrows()
    ]

# Convert DataFrame to RDD, process each row, and convert back with schema
rls_df_rdd = rls_df.rdd.flatMap(send_rls_query)  # flatMap handles list output from function
rls_df_results = spark.createDataFrame(rls_df_rdd, rls_schema)

# Display results in Spark DataFrame
rls_df_results.show(truncate=False)


StatementMeta(, 698799bc-0eff-4d5d-96da-4c27ebe6306e, 63, Finished, Available, Finished)

+------------------------------------+--------------------------------------+---------------------+--------------+------------+------+--------------------------+--------------+--------------+
|Run_GUID                            |Test_Name                             |User                 |Expected_Value|Actual_Value|Passed|Timestamp                 |Semantic_Model|Workspace_Name|
+------------------------------------+--------------------------------------+---------------------+--------------+------------+------+--------------------------+--------------+--------------+
|5222c852-f983-43b6-b14b-44b6f5093115|RLS Test: A should not see B          |DataOpsSvc@kerski.net|1             |1           |True  |2025-04-02 07:12:48.851446|RLSExample    |fabcon-2025   |
|5222c852-f983-43b6-b14b-44b6f5093115|RLS Test: A should only see one record|DataOpsSvc@kerski.net|1             |1           |True  |2025-04-02 07:12:48.851446|RLSExample    |fabcon-2025   |
+------------------------------------+--