In [None]:
from pyapacheatlas.auth import ServicePrincipalAuthentication
from pyapacheatlas.core import PurviewClient
from pyapacheatlas.core.util import GuidTracker
from dotenv import load_dotenv
import os

load_dotenv()

tenant_id = os.environ.get("TENANT_ID") 
client_id = os.environ.get("CLIENT_ID")
client_secret = os.environ.get("CLIENT_SECRET")
account_name = os.environ.get("PURVIEW_ACCOUNT")

auth = ServicePrincipalAuthentication(
    tenant_id = tenant_id, 
    client_id = client_id, 
    client_secret = client_secret
)

# Create a client to connect to your service.
client = PurviewClient(
    account_name = account_name,
    authentication = auth
)


In [None]:
client.get_all_typedefs()

In [None]:
client.endpoint_url

In [None]:
client.get_entity(qualifiedName="https://purviewatlaspoc.dfs.core.windows.net/datalake/extended_vaccine_data/{SparkPartitions}", typeName="azure_datalake_gen2_resource_set")

In [None]:
from pyapacheatlas.core import AtlasClient, AtlasEntity, TypeCategory, AtlasProcess

# Create a Tabular Schema entity
ts = AtlasEntity(
        name="demoSchema",
        typeName="tabular_schema",
        qualified_name="pyapache://demotabschema",
        guid = -1
    )
    # Create a Column entity that references your tabular schema
col01 = AtlasEntity(
        name="demoColumn",
        typeName="column",
        qualified_name="pyapache://mycolumn",
        guid= -2,
        attributes={
            "type":"String",
            "description": "This is an example column"
        },
        relationshipAttributes = {
            "composeSchema": ts.to_json(minimum=True)
        }
    )

    # Create a resource set that references the tabular schema
rs = AtlasEntity(
        name="demoresourceset",
        typeName="azure_datalake_gen2_resource_set",
        qualified_name="pyapache://demors",
        guid = -3,
        relationshipAttributes = {
            "tabular_schema": ts.to_json(minimum=True)
        }
    )

client.upload_entities([ts, rs, col01])

In [None]:
## Get datasources

import requests

client.scan_endpoint_url = "https://purviewatlaspoc.scan.purview.azure.com"
url = client.scan_endpoint_url + \
    f"/datasources"

getDataSources = requests.get(
    url,
    headers =  client.authentication.get_authentication_headers(),
    params = { "api-version": "2018-12-01-preview"}
)

response = client._handle_response(getDataSources)
response

In [None]:
## Create or update scan
import requests

scan_name = "datalakeScan"
data_source = "AzureDataLakeStorage-ZSB"

client.scan_endpoint_url = "https://purviewatlaspoc.scan.purview.azure.com"
url = client.scan_endpoint_url + \
    f"/datasources/{data_source}/scans/{scan_name}"

createOrUpdateScan = requests.put(
    url,
    headers =  client.authentication.get_authentication_headers(),
    params = { "api-version": "2018-12-01-preview"},
    json = {
    "kind": "AdlsGen2Msi",
    "properties": {
        "scanRulesetName": "AdlsGen2",
        "scanRulesetType": "System",
        "collection": {
            "referenceName": account_name,
            "type": "CollectionReference"
        }
    }
}
)

response = client._handle_response(createOrUpdateScan)
response

In [None]:
from pyspark.sql import DataFrame, SparkSession


spark = SparkSession.builder.appName("test").getOrCreate()

columns = ["language","users_count"]
data = [("Java", 20000), ("Python", 100000), ("Scala", 3000)]

df = spark.createDataFrame(data).toDF(*columns)
df.show()

In [None]:
def register_df(df: DataFrame, name: str, qualified_name: str):
    colEntities = []
    guid = 100

    ts = AtlasEntity(
            name="demoDFSchema",
            typeName="tabular_schema",
            qualified_name=f"{qualified_name}_tabular_schema",
            guid = -guid
        )

    for (col, type) in df.dtypes:
        guid +=1
        colEntities.append(
            AtlasEntity(
                name=col,
                typeName="column",
                qualified_name=f"{qualified_name}_column_{col}",
                guid= -guid,
                attributes={
                    "type": type,
                    "description": f"Column {col} has type {type}"
                },
                relationshipAttributes = {
                    "composeSchema": ts.to_json(minimum=True)
                }
            )
        )

    rs = AtlasEntity(
            name=name,
            typeName="azure_datalake_gen2_resource_set",
            qualified_name=qualified_name,
            guid = -(guid+1),
            relationshipAttributes = {
                "tabular_schema": ts.to_json(minimum=True)
            }
        )

    return (rs, client.upload_entities([ts, rs, *colEntities]))

In [None]:
rs, response = register_df(df=df, name="testDF", qualified_name="pyapache://testDF")

In [None]:
response

In [None]:
# Add root directory to path so that python can import src.utils. Alternatively, turn into installable python package.
import sys  
sys.path.insert(0, os.getcwd().rsplit('/',1)[0])

from src.utils import PurviewPOCClient

client = PurviewPOCClient(account_name=account_name, authentication=auth)
client.create_delta_table_typedefs()

In [None]:
from src.utils import PurviewPOCClient

client = PurviewPOCClient(account_name=account_name, authentication=auth)

r1, t1, cols1 = client.register_df(df, "input1", "pyapache://input1DF")
r2, t2, cols2 = client.register_df(df.alias("input2"), "input2", "pyapache://input2DF")
r3, t3, cols3 = client.register_df(df.alias("output1"), "output1", "pyapache://output1DF")

process = AtlasProcess(
  name="test_spark_job",
  qualified_name = "pyapacheatlas://test_spark_job",
  typeName="custom_spark_job_process",
  guid=-20,
  attributes = {"job_type":"join"},
  inputs = [r1, r2],
  outputs = [r3]
)

client.upload_entities([r1, t1, *cols1, r2, t2, *cols2, r3, t3, *cols3, process])