## IMPORTS

In [None]:
import requests
import time
import pandas as pd
from datetime import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import *

## URIs + HEADERS for requests

In [None]:
pbi_resource = "https://analysis.windows.net/powerbi/api"
pbi_Uri = 'https://api.powerbi.com/v1.0/myorg/'

## Unified functions

In [None]:
def get_token():
    return mssparkutils.credentials.getToken(pbi_resource)

In [None]:
def get_powerbiAPIclusterURI():
    fullurl = pbi_Uri+'datasets'
    pbi_access_token = get_token()
    headers = {'Content-Type': 'application/json', 'Authorization': f'Bearer {pbi_access_token}'}
    response = requests.get(fullurl, headers=headers)
    unaltered = response.json()['@odata.context']
    stripped = unaltered.split('/')
    return f'https://{stripped[2]}/beta/myorg/groups'

clusteredURI = get_powerbiAPIclusterURI()

### Functions definition

In [None]:
def get_AccessibleWorkspaces():
    fullUrl = pbi_Uri+"/groups?$filter=type eq 'Workspace'"
    pbi_access_token = get_token()
    headers = {'Content-Type': 'application/json', 'Authorization': f'Bearer {pbi_access_token}'}
    response = requests.get(fullUrl, headers=headers)
    return response.json()['value']

def get_WorkspaceUsageMetricsId(wsId):
    fullurl = f'{clusteredURI}/{wsId}/usageMetricsReportV2'
    # print('Asked for token')
    pbi_access_token = get_token()
    # print('Token received')

    headers = {'Content-Type': 'application/json', 'Authorization': f'Bearer {pbi_access_token}'}
    attemps = 0
    while attemps < 4:
       # print('Starting ' + str(attemps))
        try:
            response = requests.get(fullurl, headers=headers, timeout=60)
        #    print('Received data')
            return response.json()['models'][0]['dbName']
        except:
        #    print('Fallin asleep')
            time.sleep(30)
        #    print('Awaken')
            attemps += 1

def post_ExecuteQuery(wsId, dsId, daxQ):
    fullurl = f'{pbi_Uri}/groups/{wsId}/datasets/{dsId}/executeQueries'
    pbi_access_token = get_token()
    headers = {'Content-Type': 'application/json', 'Authorization': f'Bearer {pbi_access_token}'}
    content = {"queries": [{"query": daxQ}],"serializerSettings": {"includeNulls": True}}
    attemps = 0
    while attemps < 4:
        try:
            response = requests.post(fullurl, json=content, headers = headers, timeout=60).json()['results'][0]['tables'][0]
            return response
        except:
            time.sleep(30)
            attemps += 1

def replace_ColumnNames(frame, replacingName):
    return frame.toDF(*(c.replace(replacingName,'').replace('[','').replace(']','') for c in frame.columns))

def extract_DataFrame(response_list, preCreatedSchema):
    rl = spark.createDataFrame(response_list)
    # print('Frame Created')
    # display(rl)
    rl = rl.select(explode(rl.rows)).select("col").rdd.flatMap(lambda x: x).collect()
    #print('Columns Extracted')
    if not schema:
        rl = spark.createDataFrame(rl)
    else:
        rl = spark.createDataFrame(rl, preCreatedSchema)
    #print('Extraction Completed')
    return rl

### Tables

In [None]:
tblsInLK = spark.catalog.listTables()
tblsReady = len(tblsInLK)
if tblsReady == 0:
    doesTableUMDExists = 0
else:
    tblsInLK = spark.createDataFrame(pd.DataFrame(tblsInLK))
    doesTableUMDExists = tblsInLK.filter(col('name') == 'UsageMetricsDatasets').count()

## Workspace and Dataset IDs receiver

In [None]:
wsList = get_AccessibleWorkspaces()

In [None]:
response_list = []
dsId = None
# wsNum = 0
if doesTableUMDExists == 0:
    for ws in wsList:
        # wsNum += 1
        wsId = ws['id']
        # print('Workspace ID:' + wsId)
        dsId = get_WorkspaceUsageMetricsId(wsId)
        # print('DONE WS Number: ' + str(wsNum))
        if dsId is not None:
            response_list.append({'WorkspaceId': wsId, 'DatasetId': dsId, 'ExtractionState': 0})
    usgDatasetIds = spark.createDataFrame(response_list)
    writetolake = usgDatasetIds.write.mode("overwrite").format("delta").save("Tables/UsageMetricsDatasets")
else:
    sqlPrepare = spark.sql("SELECT * FROM UsageMetricsDatasets WHERE DatasetId IS NOT NULL")
    # print('SQL Selection Completed')
    existingWsIds = sqlPrepare.select(col('WorkspaceId'))
    receivedListOfWs = spark.createDataFrame(wsList).select(col('id').alias("WorkspaceId"))
    listOfUsableWs = existingWsIds.intersectAll(receivedListOfWs)
    # print('Intersection Completed')
    listOfNewWs = receivedListOfWs.exceptAll(existingWsIds)
    # print('Except Completed')
    extractionPrepare = "('"+"','".join(list(listOfUsableWs.toPandas()['WorkspaceId']))+"')"
    # print('List of Workspace IDs for SQL Created')
    existedRecords = spark.sql(f'SELECT * FROM UsageMetricsDatasets WHERE WorkspaceId IN {extractionPrepare} AND DatasetId IS NOT NULL')
    # print('Extraction of prepared dataset ID completed')
    collectionOfNewWs = listOfNewWs.collect()
    if listOfNewWs.count() != 0:
        for ws in collectionOfNewWs:
            # wsNum += 1
            wsId = ws['WorkspaceId']
            # print('Workspace ID:' + wsId)
            dsId = get_WorkspaceUsageMetricsId(wsId)
            # print('DONE WS Number: ' + str(wsNum))
            response_list.append({'WorkspaceId': wsId, 'DatasetId': dsId, 'ExtractionState': 0})
            newRecords = spark.createDataFrame(response_list)
        writetolake = newRecords.write.mode("append").format("delta").save("Tables/UsageMetricsDatasets")
        usgDatasetIds = existedRecords.union(newRecords)
    else: 
        usgDatasetIds = existedRecords

### Data Extraction

#### Preparation

In [None]:
wsListUpdated = usgDatasetIds.collect()

#### Reports

In [None]:
basicListOfTables = ['Reports','Users','Report pages','Workspace views'] # List of all basic tables from that need to be extracted
schema = None
for bsTbl in basicListOfTables:
    response_list = []
    for ws in wsListUpdated:
        dsId = ws[0]
        wsId = ws[2]
        dsExportState = ws[1]
        response = post_ExecuteQuery(wsId,dsId,f"EVALUATE '{bsTbl}'")
        size = len(response['rows'])
        if size != 0:
            response_list.append(response)
        time.sleep(0.5)
    df = extract_DataFrame(response_list,schema)
    df = replace_ColumnNames(df,bsTbl)
    updatedText = bsTbl.replace("'","").title().replace(" ","")
    if bsTbl == 'Users':
        df = df.distinct()    
    writetolake = df.write.mode("overwrite").format("delta").save(f"Tables/{updatedText}")

In [None]:
basicListOfTables = ["Report page views","Report load times","Report views"] # List of all basic tables from that need to be extracted
md = ""
dax = ""
for bsTbl in basicListOfTables:
    doesTableExists = 0
    findTable = ""
    updatedText = bsTbl.replace("'","").title().replace(" ","")
    if tblsReady == 0:
        doesTableExists = 0
    else:
        findTable = tblsInLK.filter(col('name') == 'updatedText')
        doesTableExists = findTable.count()
    response_list = []
    if bsTbl == "Report page views":
        schema = \
            StructType([ 
                StructField(f"{bsTbl}[UserKey]",StringType(),True), \
                StructField(f"{bsTbl}[UserId]",StringType(),True), \
                StructField(f"{bsTbl}[TenantId]",StringType(),True), \
                StructField(f"{bsTbl}[Timestamp]",StringType(),True), \
                StructField(f"{bsTbl}[AppGuid]", StringType(), True), \
                StructField(f"{bsTbl}[SectionId]", StringType(), True), \
                StructField(f"{bsTbl}[AppName]", StringType(), True), \
                StructField(f"{bsTbl}[Date]", StringType(), True), \
                StructField(f"{bsTbl}[DeviceBrowserVersion]", StringType(), True), \
                StructField(f"{bsTbl}[ReportId]", StringType(), True), \
                StructField(f"{bsTbl}[SessionSource]", StringType(), True), \
                StructField(f"{bsTbl}[OriginalReportId]", StringType(), True), \
                StructField(f"{bsTbl}[WorkspaceId]", StringType(), True), \
                StructField(f"{bsTbl}[Client]", StringType(), True), \
                StructField(f"{bsTbl}[OriginalWorkspaceId]", StringType(), True), \
                StructField(f"{bsTbl}[DeviceOSVersion]", StringType(), True) \
            ])
    elif bsTbl == "Report load times":
        schema = \
            StructType([ 
                StructField(f"{bsTbl}[DeviceOSVersion]",StringType(),True), \
                StructField(f"{bsTbl}[AppGuid]",StringType(),True), \
                StructField(f"{bsTbl}[Timestamp]",StringType(),True), \
                StructField(f"{bsTbl}[UserId]",StringType(),True), \
                StructField(f"{bsTbl}[loadTime]", StringType(), True), \
                StructField(f"{bsTbl}[Client]", StringType(), True), \
                StructField(f"{bsTbl}[DeviceBrowserVersion]", StringType(), True), \
                StructField(f"{bsTbl}[OriginalGroupId]", StringType(), True), \
                StructField(f"{bsTbl}[TenantId]", StringType(), True), \
                StructField(f"{bsTbl}[EndTime]", StringType(), True), \
                StructField(f"{bsTbl}[SessionSource]", StringType(), True), \
                StructField(f"{bsTbl}[AppName]", StringType(), True), \
                StructField(f"{bsTbl}[Browser]", StringType(), True), \
                StructField(f"{bsTbl}[OriginalReportId]", StringType(), True), \
                StructField(f"{bsTbl}[GroupId]", StringType(), True), \
                StructField(f"{bsTbl}[StartTime]", StringType(), True), \
                StructField(f"{bsTbl}[Date]", StringType(), True), \
                StructField(f"{bsTbl}[LocationCity]", StringType(), True), \
                StructField(f"{bsTbl}[Country]", StringType(), True) \
            ])
    elif bsTbl == "Report views":
        schema = \
            StructType([ 
                StructField(f"{bsTbl}[AppName]",StringType(),True), \
                StructField(f"{bsTbl}[CapacityId]",StringType(),True), \
                StructField(f"{bsTbl}[CapacityName]",StringType(),True), \
                StructField(f"{bsTbl}[ConsumptionMethod]", StringType(), True), \
                StructField(f"{bsTbl}[CreationTime]",StringType(),True), \
                StructField(f"{bsTbl}[DatasetName]", StringType(), True), \
                StructField(f"{bsTbl}[Date]", StringType(), True), \
                StructField(f"{bsTbl}[DistributionMethod]", StringType(), True), \
                StructField(f"{bsTbl}[OriginalConsumptionMethod]",StringType(),True), \
                StructField(f"{bsTbl}[ReportId]", StringType(), True), \
                StructField(f"{bsTbl}[ReportName]", StringType(), True), \
                StructField(f"{bsTbl}[ReportType]", StringType(), True),  \
                StructField(f"{bsTbl}[UserAgent]", StringType(), True), \
                StructField(f"{bsTbl}[UserId]",StringType(),True), \
                StructField(f"{bsTbl}[UserKey]",StringType(),True) \
            ])
    else:
        schema = None
    for ws in wsListUpdated:
        dsId = ws[0]
        wsId = ws[2]
        dsExportState = ws[1]
        if doesTableExists == 0:
            dax = f"EVALUATE '{bsTbl}'"
            md = "overwrite"
        else: 
            if dsExportState == 0:
                dax = f"EVALUATE '{bsTbl}'"
                spark.sql(f"UPDATE '{bsTbl}' SET ExtractionState = 1 WHERE WorkspaceId = '{wsId}' AND DatasetId = '{dsId}'")
            else:
                tableLastDate = spark.sql(f"SELECT MAX(Date) AS date FROM {updatedText}").collect()[0]['date']
                fromDate = datetime.strptime(df, '%Y-%m-%dT%H:%M:%S').date().strftime('DATE(%Y,%m,%d)')
                dax = f"DEFINE VAR _td = TODAY() VAR _from = {fromDate} EVALUATE FILTER('{bsTbl}',[Date]>_from && [Date]<_td)"
            md = "append"
        response = post_ExecuteQuery(wsId,dsId,dax)
        size = len(response['rows'])
        if size != 0:
            response_list.append(response)
        time.sleep(0.5)
    # print('Data extracted')
    exports = extract_DataFrame(response_list,schema)
    exports = replace_ColumnNames(exports,bsTbl)
    writetolake = exports.write.mode(md).format("delta").save(f"Tables/{updatedText}")
    # print('Data written')