# Measure the adoption of your Amazon QuickSight dashboards and manage the assets [2025 version]

In [35]:
!source activate python3 && pip install --upgrade boto3

Looking in indexes: https://pypi.org/simple, https://pip.repos.neuron.amazonaws.com
Collecting boto3
  Downloading boto3-1.37.38-py3-none-any.whl (139 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.9/139.9 kB[0m [31m16.6 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting s3transfer<0.12.0,>=0.11.0
  Downloading s3transfer-0.11.5-py3-none-any.whl (84 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.8/84.8 kB[0m [31m14.8 MB/s[0m eta [36m0:00:00[0m
Collecting botocore<1.38.0,>=1.37.38
  Downloading botocore-1.37.38-py3-none-any.whl (13.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.5/13.5 MB[0m [31m90.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Installing collected packages: botocore, s3transfer, boto3
  Attempting uninstall: botocore
    Found existing installation: botocore 1.27.30
    Uninstalling botocore-1.27.30:
      Successfully uninstalled botocore-1.27.30
  Attempting uninstall: s3transfer
    Foun

In [2]:

from IPython.display import IFrame, display
from IPython.core.display import HTML 

def show_webpage(url, width=900, height=500):
    """
    Renders a webpage in an IFrame within a Jupyter Notebook.

    Args:
        url (str): The URL of the webpage to display.
        width (int, optional): The width of the IFrame. Defaults to 900 pixels.
        height (int, optional): The height of the IFrame. Defaults to 500 pixels.
    """
    iframe = IFrame(url, width=width, height=height)
    display(iframe)

In [13]:
from IPython.display import HTML

HTML('<a href="https://d172fagwhsdxeg.cloudfront.net/index.html?dashboardid=deeb0ed5-3260-4023-8b11-dcd604d76332&dashboardregion=us-east-1" target="_blank">Click here to open the sample dashboard</a>')


#show_webpage("https://d172fagwhsdxeg.cloudfront.net/index.html?dashboardid=deeb0ed5-3260-4023-8b11-dcd604d76332&dashboardregion=us-east-1")

In [10]:
from IPython.display import HTML

HTML('<a href="https://aws.amazon.com/blogs/business-intelligence/measure-the-adoption-of-your-amazon-quicksight-dashboards-and-view-your-bi-portfolio-in-a-single-pane-of-glass/" target="_blank">Click here to open the old blog</a>')



## The sample screenshot

In [15]:
from IPython.display import Image
from IPython.core.display import HTML 
Image(url= "https://d2908q01vomqb2.cloudfront.net/b6692ea5df920cad691c20319a6fffd7a4a766b8/2022/10/13/l4-1024x610.png")

## The old architecture diagram

In [6]:
from IPython.display import Image
from IPython.core.display import HTML 
Image(url= "https://d32le6a78ncrct.cloudfront.net/image/Old_Admin_Console_Arch.drawio.png")

## The new architecture diagram

In [5]:
from IPython.display import Image
from IPython.core.display import HTML 
Image(url= "https://d32le6a78ncrct.cloudfront.net/image/Admin_Console_Arch.drawio.png")

**Above workflow involves the following steps:**

1. The AWS Glue Python Shell job is scheduled to run hourly on regular basis. This process calls QuickSight APIs to get the QuickSight namespace, group, user, and asset access permissions information and save the results into Amazon Simple Storage Service (Amazon S3) bucket.
2. AWS CloudTrail logs are stored in an S3 bucket.
3. Based on the file in Amazon S3 that contains user-group information, dataset information, QuickSight assets access permissions information, as well as dashboard views and user login events from the CloudTrail logs, five Amazon Athena tables are created. Optionally,
    a. You can access the Amazon S3 data using Amazon Redshift spectrum tables
    b. Also, the BI engineer can combine these tables with employee information tables to display human resource information of the users.
4. Four QuickSight datasets fetch the data from the Athena tables created in Step 3 and import them into SPICE. Then, based on these datasets, a QuickSight dashboard is created.

**Prerequisites**
For this walkthrough, you should have the following prerequisites:

* An AWS account
* Access to the following AWS services:
    * AWS Athena
    * AWS CloudFormation
    *  ~~AWS Lambda~~ (Retired)
    * AWS QuickSight
    * Amazon Simple Storage Service (Amazon S3)
    * AWS Glue ~~(Optional)~~ (Recommanded)
    *  Amazon Redshift (Optional)
    * Basic knowledge of Python
    * Basic knowledge of SQL

### CloudFormation template for AWS Glue Python Shell job

In [23]:
from IPython.display import HTML

image_url = "https://d32le6a78ncrct.cloudfront.net/image/Picture1.png"
link_url = "https://console.aws.amazon.com/cloudformation/home?region=us-east-1#/stacks/create/template?stackName=adminconsoleglue&templateURL=https://admin-console-cfn-dataprepare-code.s3.us-east-1.amazonaws.com/cfn/admin_console_glue_2025.template"

HTML(f'<a href="{link_url}" target="_blank"><img src="{image_url}" width="300"/></a>')

* In the Glue console, manually run the `adminconsoledatasetdashboardinfo` and `adminconsoleuserdataaccessinfo` jobs to quickly generate the datasets instead of waiting for the next scheduled run.
* Verify the data is available in the `admin-console-new-[AWS-account-ID]` S3 bucket after the jobs complete.
* If data is in the S3 bucket, continue deployment of the remaining modules.

### Enable CloudTrail Log

* Create a CloudTrail log if you don’t already have one and note down the S3 bucket name of the log files for future use.
* Note down all the resources created from the previous steps. If the S3 bucket name for the CloudTrail log from step 2 is different from the one in step 1’s output, use the S3 bucket from step 2.

### CloudFormation template for Athena tables<br>
To create your Athena tables, complete the following steps:<br>
The following table summarizes the keys and values you use when creating the Athena tables with the next CloudFormation stack.

| Key            | Value                       | Description                                                           |
|----------------|-----------------------------|-----------------------------------------------------------------------|
| CloudtrailLocation         | for example: cloudtrail-awslogs-[aws-account-id]-do-not-delete/AWSLogs/[aws-account-id]           | The Amazon S3 location of the CloudTrail log end with `CloudTrail/`                      |
| StartDateParameter        | for example: `2023/02/01`       | The date for the first log CloudTrail delivered to that location, in YYYY/MM/DD format         |


You can find the location of your CloudTrail logs in the CloudTrail management console:

In [22]:
from IPython.display import Image
from IPython.core.display import HTML 
Image(url= "https://d2908q01vomqb2.cloudfront.net/d02560dd9d7db4467627745bd6701e809ffca6e3/2024/09/19/CT1.png")

And the date of the first log file delivered by CloudTrail in the S3 management console:

In [25]:
from IPython.display import Image
from IPython.core.display import HTML 
Image(url= "https://d2908q01vomqb2.cloudfront.net/d02560dd9d7db4467627745bd6701e809ffca6e3/2024/09/19/CT2.png")

### Choose the below Launch Stack to create the Athena tables.

In [21]:
from IPython.display import HTML

image_url = "https://d32le6a78ncrct.cloudfront.net/image/Picture1.png"
link_url = "https://console.aws.amazon.com/cloudformation/home?region=us-east-1#/stacks/create/template?stackName=adminconsoleathena&templateURL=https://admin-console-cfn-dataprepare-code.s3.us-east-1.amazonaws.com/cfn/admin-console-athena-tables-2025.template"

HTML(f'<a href="{link_url}" target="_blank"><img src="{image_url}" width="300"/></a>')

After a successful deployment, you have a database called `admin-console-2025` created in AwsDataCatalog in Athena and five tables in the database: `cloudtrail_logs_pp`, `group_membership`, `object_access`, `datasets_info` and `data_dict`.  
* Confirm the tables via the Athena console.

For instructions on building an Athena table with CloudTrail events, see [Amazon QuickSight Now Supports Audit Logging with AWS CloudTrail](https://aws.amazon.com/blogs/big-data/amazon-quicksight-now-supports-audit-logging-with-cloudtrail/)

* After all five tables are created in Athena, go to the security permissions on the QuickSight console to enable bucket access for s3://admin-console[AWS-account-ID] and s3://cloudtrail-awslogs-[aws-account-id]-do-not-delete (or the appropriate S3 bucket if you used a different name).
* Go to lakeformation to grant the database and table access to QuickSight service role and the admin console execution role
* Enable Athena access under Security & Permissions.<br>
Now QuickSight can access all five tables through Athena.

### CloudFormation template for QuickSight objects

To create the QuickSight objects, complete the following steps:

* Get the QuickSight admin user’s ARN by running following command in the AWS Command Line Interface (AWS CLI):
`aws quicksight describe-user --aws-account-id [aws-account-id] --namespace default --user-name [admin-user-name]` 

For example: arn:aws:quicksight:us-east-1:12345678910:user/default/admin/xyz.  

In [26]:
import json
import boto3
import logging
import csv
import io
import os
import tempfile
from typing import Any, Callable, Dict, List, Optional, Union
import sys
import botocore

#start-Initial set up for the sdk env#
def default_botocore_config() -> botocore.config.Config:
    """Botocore configuration."""
    retries_config: Dict[str, Union[str, int]] = {
        "max_attempts": int(os.getenv("AWS_MAX_ATTEMPTS", "5")),
    }
    mode: Optional[str] = os.getenv("AWS_RETRY_MODE")
    if mode:
        retries_config["mode"] = mode
    return botocore.config.Config(
        retries=retries_config,
        connect_timeout=10,
        max_pool_connections=10,
        user_agent_extra=f"qs_sdk_admin_console",
    )
sts_client = boto3.client("sts", config=default_botocore_config())
account_id = sts_client.get_caller_identity()["Account"]
aws_region = 'us-east-1'
qs_client = boto3.client('quicksight', region_name=aws_region, config=default_botocore_config())


arn = qs_client.describe_user(
    UserName='admin/wangzyn-Isengard',
    AwsAccountId=account_id,
    Namespace='default'
)

arn = arn['User']['Arn']
arn

'arn:aws:quicksight:us-east-1:499080683179:user/default/admin/wangzyn-Isengard'

### Choose the below Launch Stack to create the QuickSight objects.

In [10]:
from IPython.display import HTML

image_url = "https://d32le6a78ncrct.cloudfront.net/image/Picture1.png"
link_url = "https://console.aws.amazon.com/cloudformation/home?region=us-east-1#/stacks/create/template?stackName=admin-console-qs-objects-2025&templateURL=https://admin-console-cfn-dataprepare-code.s3.us-east-1.amazonaws.com/cfn/admin-console-qs-analysis-objects-2025.template"

HTML(f'<a href="{link_url}" target="_blank"><img src="{image_url}" width="300"/></a>')

**Set the CFN execution role to be arn:aws:iam::[accountId]:role/QuickSightAdminConsole2025**

In [7]:
import json
import boto3
import logging
import csv
import io
import os
import tempfile
from typing import Any, Callable, Dict, List, Optional, Union
import sys
import botocore

#start-Initial set up for the sdk env#
def default_botocore_config() -> botocore.config.Config:
    """Botocore configuration."""
    retries_config: Dict[str, Union[str, int]] = {
        "max_attempts": int(os.getenv("AWS_MAX_ATTEMPTS", "5")),
    }
    mode: Optional[str] = os.getenv("AWS_RETRY_MODE")
    if mode:
        retries_config["mode"] = mode
    return botocore.config.Config(
        retries=retries_config,
        connect_timeout=10,
        max_pool_connections=10,
        user_agent_extra=f"qs_sdk_admin_console",
    )
sts_client = boto3.client("sts", config=default_botocore_config())
account_id = sts_client.get_caller_identity()["Account"]
aws_region = 'us-east-1'
#qs_client = boto3.client('quicksight', region_name=aws_region, config=default_botocore_config())
qs_client = boto3.client('quicksight')


"""
permissions=qs_client.describe_analysis(
    AwsAccountId=account_id,
    AnalysisId='admin-console-analysis-2025499080683179',
    
)
"""





response = qs_client.start_asset_bundle_export_job(
    AwsAccountId=account_id,
    AssetBundleExportJobId='12345678',
    ResourceArns=[
        'arn:aws:quicksight:us-east-1:499080683179:analysis/admin-console-analysis-2025499080683179',
    ],
    IncludeAllDependencies=True,
    ExportFormat='CLOUDFORMATION_JSON')


from IPython.display import JSON
JSON(response)

<IPython.core.display.JSON object>

In [8]:
response = qs_client.describe_asset_bundle_export_job(
    AwsAccountId=account_id,
    AssetBundleExportJobId='12345678'
)



from IPython.display import JSON
JSON(response)

<IPython.core.display.JSON object>

In [6]:
permissions=qs_client.describe_analysis(
    AwsAccountId=account_id,
    AnalysisId='admin-console-analysis-2025499080683179',
    
)
from IPython.display import JSON
JSON(permissions)

<IPython.core.display.JSON object>

**Give the role quicksight-admin-console-2025 the trust policy:**

{
  "Effect": "Allow",
  "Principal": {
    "Service": "cloudformation.amazonaws.com"
  },
  "Action": "sts:AssumeRole"
}


* Choose Launch Stack to create the QuickSight datasets and dashboard:

In [9]:
from IPython.display import HTML

image_url = "https://d32le6a78ncrct.cloudfront.net/image/Picture1.png"
link_url = "https://console.aws.amazon.com/cloudformation/home?region=us-east-1#/stacks/create/template?stackName=admin-console-qs-objects-2025&templateURL=https://admin-console-cfn-dataprepare-code.s3.us-east-1.amazonaws.com/cfn/admin-console-qs-analysis-objects-2025.template"

HTML(f'<a href="{link_url}" target="_blank"><img src="{image_url}" width="300"/></a>')

In [19]:
import json
import boto3
import logging
import csv
import io
import os
import tempfile
from typing import Any, Callable, Dict, List, Optional, Union
import sys
from awsglue.utils import getResolvedOptions
import botocore

#start-Initial set up for the sdk env#
def default_botocore_config() -> botocore.config.Config:
    """Botocore configuration."""
    retries_config: Dict[str, Union[str, int]] = {
        "max_attempts": int(os.getenv("AWS_MAX_ATTEMPTS", "5")),
    }
    mode: Optional[str] = os.getenv("AWS_RETRY_MODE")
    if mode:
        retries_config["mode"] = mode
    return botocore.config.Config(
        retries=retries_config,
        connect_timeout=10,
        max_pool_connections=10,
        user_agent_extra=f"qs_sdk_admin_console",
    )
#end-Initial set up for the sdk env#

#start-Initial set up for the glue and qs client of boto3#
sts_client = boto3.client("sts", config=default_botocore_config())
account_id = sts_client.get_caller_identity()["Account"]
aws_region = 'us-east-1'
args = getResolvedOptions(sys.argv, ['AWS_REGION'])
print('region', args['AWS_REGION'])
glue_aws_region = args['AWS_REGION']
qs_client = boto3.client('quicksight', config=default_botocore_config())
qs_local_client = boto3.client('quicksight', region_name=glue_aws_region, config=default_botocore_config())

print(glue_aws_region)

ssm = boto3.client('ssm', region_name=glue_aws_region, config=default_botocore_config())
#end-Initial set up for the glue and qs client of boto3#

#start-advanced settings for ssm usage of qs-configuration#
#optional for this use case#
"""
def get_ssm_parameters(ssm_string):
    config_str = ssm.get_parameter(
        Name=ssm_string
    )['Parameter']['Value']
    return json.loads(config_str)


def get_s3_info(account_id, glue_aws_region):
    bucket_name = get_ssm_parameters('/qs/config/groups')
    bucket_name = bucket_name['bucket-name']
    return bucket_name
"""
#end-advanced settings for ssm usage of qs-configuration#

#parent class of list* functions
def _list(
        func_name: str,
        attr_name: str,
        account_id: str,
        aws_region: str,
        **kwargs, ) -> List[Dict[str, Any]]:
    qs_client = boto3.client('quicksight', region_name=aws_region, config=default_botocore_config())
    func: Callable = getattr(qs_client, func_name)
    response = func(AwsAccountId=account_id, **kwargs)
    next_token: str = response.get("NextToken", None)
    result: List[Dict[str, Any]] = response[attr_name]
    while next_token is not None:
        response = func(AwsAccountId=account_id, NextToken=next_token, **kwargs)
        next_token = response.get("NextToken", None)
        result += response[attr_name]
    return result

#start-user related list* functions#
def list_group_memberships(
        group_name: str,
        account_id: str,
        aws_region: str,
        namespace: str = "default"
) -> List[Dict[str, Any]]:
    return _list(
        func_name="list_group_memberships",
        attr_name="GroupMemberList",
        account_id=account_id,
        GroupName=group_name,
        Namespace=namespace,
        aws_region=aws_region
    )


def list_users(account_id, aws_region, ns) -> List[Dict[str, Any]]:
    return _list(
        func_name="list_users",
        attr_name="UserList",
        Namespace=ns,
        account_id=account_id,
        aws_region=aws_region
    )



def list_groups(
        account_id, aws_region, ns
) -> List[Dict[str, Any]]:
    return _list(
        func_name="list_groups",
        attr_name="GroupList",
        Namespace=ns,
        account_id=account_id,
        aws_region=aws_region
    )


def list_user_groups(UserName, account_id, aws_region, ns) -> List[Dict[str, Any]]:
    return _list(
        func_name="list_user_groups",
        attr_name="GroupList",
        Namespace=ns,
        UserName=UserName,
        account_id=account_id,
        aws_region=aws_region
    )


def list_namespaces(
        account_id, aws_region
) -> List[Dict[str, Any]]:
    return _list(
        func_name="list_namespaces",
        attr_name="Namespaces",
        account_id=account_id,
        aws_region=aws_region
    )
#end-user related list* functions#

#start-assets related list* functions#
def list_dashboards(
        account_id,
        aws_region
) -> List[Dict[str, Any]]:
    return _list(
        func_name="list_dashboards",
        attr_name="DashboardSummaryList",
        account_id=account_id,
        aws_region=aws_region
    )

def list_analyses(
        account_id,
        aws_region
) -> List[Dict[str, Any]]:
    return _list(
        func_name="list_analyses",
        attr_name="AnalysisSummaryList",
        account_id=account_id,
        aws_region=aws_region
    )

def list_themes(
        account_id,
        aws_region
) -> List[Dict[str, Any]]:
    return _list(
        func_name="list_themes",
        attr_name="ThemeSummaryList",
        account_id=account_id,
        aws_region=aws_region
    )

def list_datasets(
        account_id,
        aws_region
) -> List[Dict[str, Any]]:
    return _list(
        func_name="list_data_sets",
        attr_name="DataSetSummaries",
        account_id=account_id,
        aws_region=aws_region
    )


def list_datasources(
        account_id,
        aws_region
) -> List[Dict[str, Any]]:
    return _list(
        func_name="list_data_sources",
        attr_name="DataSources",
        account_id=account_id,
        aws_region=aws_region
    )

def list_ingestions(
        account_id,
        aws_region,
        DataSetId
) -> List[Dict[str, Any]]:
    return _list(
        func_name="list_ingestions",
        attr_name="Ingestions",
        account_id=account_id,
        aws_region=aws_region,
        DataSetId=DataSetId
    )


def list_folders(
        account_id,
        aws_region
) -> List[Dict[str, Any]]:
    return _list(
        func_name="list_folders",
        attr_name="FolderSummaryList",
        account_id=account_id,
        aws_region=aws_region
    )


def list_folder_members(
        account_id,
        aws_region,
        folderid
) -> List[Dict[str, Any]]:
    return _list(
        func_name="list_folder_members",
        attr_name="FolderMemberList",
        account_id=account_id,
        aws_region=aws_region,
        FolderId=folderid
    )


def list_topics(
        account_id,
        aws_region
) -> List[Dict[str, Any]]:
    return _list(
        func_name="list_topics",
        attr_name="TopicsSummaries",
        account_id=account_id,
        aws_region=aws_region
)

#parent class of describe* functions
def _describe(
        func_name: str,
        attr_name: str,
        account_id: str,
        aws_region: str,
        **kwargs, ) -> List[Dict[str, Any]]:
    qs_client = boto3.client('quicksight', region_name=aws_region, config=default_botocore_config())
    func: Callable = getattr(qs_client, func_name)
    response = func(AwsAccountId=account_id, **kwargs)
    result = response[attr_name]
    return result


#start-assets management describe functions
def describe_folder(
        account_id,
        id,
        aws_region) -> Dict[str, Any]:
    return _describe(
        func_name="describe_folder",
        attr_name="Folder",
        account_id=account_id,
        aws_region=aws_region,
        FolderId=id
    )

def describe_dashboard(account_id, dashboardid, aws_region):
    qs_client = boto3.client('quicksight', region_name=aws_region, config=default_botocore_config())
    res = qs_client.describe_dashboard(
        AwsAccountId=account_id,
        DashboardId=dashboardid
    )
    return res


def describe_analysis(account_id, id, aws_region):
    qs_client = boto3.client('quicksight', region_name=aws_region, config=default_botocore_config())
    res = qs_client.describe_analysis(
        AwsAccountId=account_id,
        AnalysisId=id
    )
    return res


def describe_data_set(account_id, id, aws_region):
    qs_client = boto3.client('quicksight', region_name=aws_region, config=default_botocore_config())
    res = qs_client.describe_data_set(
        AwsAccountId=account_id,
        DataSetId=id
    )
    return res


def describe_data_source(account_id, id, aws_region):
    qs_client = boto3.client('quicksight', region_name=aws_region, config=default_botocore_config())
    res = qs_client.describe_data_source(
        AwsAccountId=account_id,
        DataSourceId=id
    )
    return res
#end-assets management describe functions

#start-access control related describe functions
def describe_folder_permissions(
        account_id,
        id,
        aws_region) -> Dict[str, Any]:
    return _describe(
        func_name="describe_folder_permissions",
        attr_name="Permissions",
        account_id=account_id,
        aws_region=aws_region,
        FolderId=id
    )


def describe_dashboard_permissions(
    account_id,
    dashboardid,
    aws_region):
    qs_client = boto3.client('quicksight', region_name=aws_region, config=default_botocore_config())
    res = qs_client.describe_dashboard_permissions(
        AwsAccountId=account_id,
        DashboardId=dashboardid
    )
    return res

def describe_theme_permissions(account_id, aid, aws_region):
    qs_client = boto3.client('quicksight', region_name=aws_region, config=default_botocore_config())
    res = qs_client.describe_theme_permissions(
        AwsAccountId=account_id,
        ThemeId=aid
    )
    return res


def describe_analysis_permissions(account_id, aid, aws_region):
    qs_client = boto3.client('quicksight', region_name=aws_region, config=default_botocore_config())
    res = qs_client.describe_analysis_permissions(
        AwsAccountId=account_id,
        AnalysisId=aid
    )
    return res

def describe_theme_permissions(account_id, aid, aws_region):
    qs_client = boto3.client('quicksight', region_name=aws_region, config=default_botocore_config())
    res = qs_client.describe_theme_permissions(
        AwsAccountId=account_id,
        ThemeId=aid
    )
    return res



def describe_data_set_permissions(account_id, datasetid, aws_region):
    qs_client = boto3.client('quicksight', region_name=aws_region, config=default_botocore_config())
    res = qs_client.describe_data_set_permissions(
        AwsAccountId=account_id,
        DataSetId=datasetid
    )
    return res


def describe_data_source_permissions(account_id, DataSourceId, aws_region):
    qs_client = boto3.client('quicksight', region_name=aws_region, config=default_botocore_config())
    res = qs_client.describe_data_source_permissions(
        AwsAccountId=account_id,
        DataSourceId=DataSourceId
    )
    return res
#end-access control related describe functions

if __name__ == "__main__":
    sts_client = boto3.client("sts", region_name=aws_region, config=default_botocore_config())
    account_id = sts_client.get_caller_identity()["Account"]

    # call s3 bucket
    s3 = boto3.resource('s3')
    bucketname = 'admin-console-new-' + account_id
    bucket = s3.Bucket(bucketname)

    key = 'monitoring/quicksight/group_membership/group_membership.csv'
    key2 = 'monitoring/quicksight/object_access/object_access.csv'
    tmpdir = tempfile.mkdtemp()
    local_file_name = 'group_membership.csv'
    local_file_name2 = 'object_access.csv'
    path = os.path.join(tmpdir, local_file_name)

    lists = []
    access = []
    namespaces = list_namespaces(account_id, aws_region)
    for ns in namespaces:
        try:
            ns = ns['Name']
            users = list_users(account_id, aws_region, ns)
        
        except botocore.exceptions.ClientError as error:
            if error.response['Error']['Code'] == 'ResourceNotFoundException':
                print(f"Account " + account_id + " is not signed up with QuickSight with namespace " + ns + " . Skipping this namespace.")
                continue
        except Exception as e:
            print(e)

        for user in users:
            if user['UserName'] == 'N/A':
                continue
            try:
                groups = list_user_groups(user['UserName'], account_id, aws_region, ns)
                if len(groups) == 0:
                    lists.append([account_id, ns, None, user['UserName'], user['Email'], user['Role'], user['IdentityType'], user['Arn']])
                else:
                    for group in groups:
                        lists.append([account_id, ns, group['GroupName'], user['UserName'], user['Email'], user['Role'], user['IdentityType'], user['Arn']])
            except Exception as e:
                print(e)

    print(len(lists))
    #print(lists)

    with open(path, 'w', newline='') as outfile:
        writer = csv.writer(outfile)
        for line in lists:
            writer.writerow(line)
    bucket.upload_file(path, key)

    path = os.path.join(tmpdir, local_file_name2)
    print(path)
    dashboards = list_dashboards(account_id, glue_aws_region)

    for dashboard in dashboards:
        dashboardid = dashboard['DashboardId']

        response = describe_dashboard_permissions(account_id, dashboardid, glue_aws_region)
        permissions = response['Permissions']
        for principal in permissions:
            actions = '|'.join(principal['Actions'])
            principal = principal['Principal'].split("/")
            ptype = principal[0].split(":")
            ptype = ptype[-1]
            additional_info = principal[1]
            if len(principal)==4:
                # This will handle user
                principal = principal[2]+'/'+principal[3]
            elif len(principal)==3:
                # This will handle group
                principal = principal[2]
            else:
                # This will handle namespace
                principal = principal[1]

            access.append(
                [account_id, glue_aws_region, 'dashboard', dashboard['Name'], dashboardid, ptype, principal, additional_info, actions])

    datasets = list_datasets(account_id, glue_aws_region)

    for dataset in datasets:
        if dataset['Name'] not in ['Business Review', 'People Overview', 'Sales Pipeline',
                                   'Web and Social Media Analytics']:
            datasetid = dataset['DataSetId']

            response = describe_data_set_permissions(account_id, datasetid, glue_aws_region)
            permissions = response['Permissions']
            for principal in permissions:
                actions = '|'.join(principal['Actions'])
                principal = principal['Principal'].split("/")
                ptype = principal[0].split(":")
                ptype = ptype[-1]
                additional_info = principal[-2]
                if len(principal)==4:
                    principal = principal[2]+'/'+principal[3]
                elif len(principal)==3:
                    principal = principal[2]
                else:
                    principal = principal[1]
                
                access.append(
                    [account_id, glue_aws_region, 'dataset', dataset['Name'], datasetid, ptype, principal, additional_info, actions])

    datasources = list_datasources(account_id, glue_aws_region)

    for datasource in datasources:
        #print(datasource)
        if datasource['Name'] not in ['Business Review', 'People Overview', 'Sales Pipeline',
                                      'Web and Social Media Analytics']:
            datasourceid = datasource['DataSourceId']
            if 'DataSourceParameters' in datasource:
                #print(datasourceid)
                try:
                    response = describe_data_source_permissions(account_id, datasourceid, glue_aws_region)
                    #print(response)
                    permissions = response['Permissions']
                    #print(permissions)
                    for principal in permissions:
                        actions = '|'.join(principal['Actions'])
                        principal = principal['Principal'].split("/")
                        ptype = principal[0].split(":")
                        ptype = ptype[-1]
                        additional_info = principal[-2]
                        if len(principal)==4:
                            principal = principal[2]+'/'+principal[3]
                        elif len(principal)==3:
                            principal = principal[2]

                        access.append([account_id, glue_aws_region, 'data_source', datasource['Name'], datasourceid, ptype, principal,
                                       additional_info, actions])
                except Exception as e:
                    print(e)

    analyses = list_analyses(account_id, glue_aws_region)

    for analysis in analyses:
        if analysis['Status'] != 'DELETED':
            analysisid = analysis['AnalysisId']

            response = describe_analysis_permissions(account_id, analysisid, glue_aws_region)
            permissions = response['Permissions']
            for principal in permissions:
                actions = '|'.join(principal['Actions'])
                principal = principal['Principal'].split("/")
                ptype = principal[0].split(":")
                ptype = ptype[-1]
                additional_info = principal[-2]
                if len(principal)==4:
                    principal = principal[2]+'/'+principal[3]
                elif len(principal)==3:
                    principal = principal[2]
                else:
                    principal = principal[1]

                access.append(
                    [account_id, glue_aws_region, 'analysis', analysis['Name'], analysisid, ptype, principal, additional_info, actions])

    themes = list_themes(account_id, glue_aws_region)
    for theme in themes:
        if theme['ThemeId'] not in ['SEASIDE', 'CLASSIC', 'MIDNIGHT', 'RAINIER', 'AQUASCAPE']:
            themeid = theme['ThemeId']
            response = describe_theme_permissions(account_id, themeid, glue_aws_region)
            permissions = response['Permissions']
            for principal in permissions:
                actions = '|'.join(principal['Actions'])
                principal = principal['Principal'].split("/")
                ptype = principal[0].split(":")
                ptype = ptype[-1]
                additional_info = principal[-2]
                if len(principal)==4:
                    principal = principal[2]+'/'+principal[3]
                elif len(principal)==3:
                    principal = principal[2]
                else:
                    principal = principal[1]

                access.append(
                    [account_id, glue_aws_region, 'theme', theme['Name'], themeid, ptype, principal, additional_info,
                     actions])

    print(access)
    with open(path, 'w', newline='') as outfile:
        writer = csv.writer(outfile)
        for line in access:
            writer.writerow(line)

    # upload file from tmp to s3 key

    bucket.upload_file(path, key2)




ModuleNotFoundError: No module named 'awsglue'

In [None]:
import json
import boto3
import logging
import csv
import io
import os
import tempfile
from typing import Any, Callable, Dict, List, Optional, Union
import sys
from awsglue.utils import getResolvedOptions
import botocore

def default_botocore_config() -> botocore.config.Config:
    """Botocore configuration."""
    retries_config: Dict[str, Union[str, int]] = {
        "max_attempts": int(os.getenv("AWS_MAX_ATTEMPTS", "5")),
    }
    mode: Optional[str] = os.getenv("AWS_RETRY_MODE")
    if mode:
        retries_config["mode"] = mode
    return botocore.config.Config(
        retries=retries_config,
        connect_timeout=10,
        max_pool_connections=10,
        user_agent_extra=f"qs_sdk_admin_console",
    )

sts_client = boto3.client("sts", config=default_botocore_config())
account_id = sts_client.get_caller_identity()["Account"]
aws_region = 'us-east-1'
args = getResolvedOptions(sys.argv, ['AWS_REGION'])
print('region', args['AWS_REGION'])
glue_aws_region = args['AWS_REGION']
qs_client = boto3.client('quicksight', config=default_botocore_config())
qs_local_client = boto3.client('quicksight', region_name=glue_aws_region, config=default_botocore_config())

def _list(
        func_name: str,
        attr_name: str,
        account_id: str,
        aws_region: str,
        **kwargs, ) -> List[Dict[str, Any]]:
    qs_client = boto3.client('quicksight', region_name=aws_region, config=default_botocore_config())
    func: Callable = getattr(qs_client, func_name)
    response = func(AwsAccountId=account_id, **kwargs)
    next_token: str = response.get("NextToken", None)
    result: List[Dict[str, Any]] = response[attr_name]
    while next_token is not None:
        response = func(AwsAccountId=account_id, NextToken=next_token, **kwargs)
        next_token = response.get("NextToken", None)
        result += response[attr_name]
    return result


def list_dashboards(
        account_id,
        aws_region
) -> List[Dict[str, Any]]:
    return _list(
        func_name="list_dashboards",
        attr_name="DashboardSummaryList",
        account_id=account_id,
        aws_region=aws_region
    )


def list_analyses(
        account_id,
        aws_region
) -> List[Dict[str, Any]]:
    return _list(
        func_name="list_analyses",
        attr_name="AnalysisSummaryList",
        account_id=account_id,
        aws_region=aws_region
    )


def list_themes(
        account_id,
        aws_region
) -> List[Dict[str, Any]]:
    return _list(
        func_name="list_themes",
        attr_name="ThemeSummaryList",
        account_id=account_id,
        aws_region=aws_region
    )

def list_datasets(
        account_id,
        aws_region
) -> List[Dict[str, Any]]:
    return _list(
        func_name="list_data_sets",
        attr_name="DataSetSummaries",
        account_id=account_id,
        aws_region=aws_region
    )


def list_datasources(
        account_id,
        aws_region
) -> List[Dict[str, Any]]:
    return _list(
        func_name="list_data_sources",
        attr_name="DataSources",
        account_id=account_id,
        aws_region=aws_region
    )


def list_ingestions(
        account_id,
        aws_region,
        DataSetId
) -> List[Dict[str, Any]]:
    return _list(
        func_name="list_ingestions",
        attr_name="Ingestions",
        account_id=account_id,
        aws_region=aws_region,
        DataSetId=DataSetId
    )


def describe_dashboard(account_id, dashboardid, aws_region):
    qs_client = boto3.client('quicksight', region_name=aws_region, config=default_botocore_config())
    res = qs_client.describe_dashboard(
        AwsAccountId=account_id,
        DashboardId=dashboardid
    )
    return res


def describe_analysis(account_id, id, aws_region):
    qs_client = boto3.client('quicksight', region_name=aws_region, config=default_botocore_config())
    res = qs_client.describe_analysis(
        AwsAccountId=account_id,
        AnalysisId=id
    )
    return res


def describe_data_set(account_id, id, aws_region):
    qs_client = boto3.client('quicksight', region_name=aws_region, config=default_botocore_config())
    res = qs_client.describe_data_set(
        AwsAccountId=account_id,
        DataSetId=id
    )
    return res


def describe_data_source(account_id, id, aws_region):
    qs_client = boto3.client('quicksight', region_name=aws_region, config=default_botocore_config())
    res = qs_client.describe_data_source(
        AwsAccountId=account_id,
        DataSourceId=id
    )
    return res


def describe_dashboard_permissions(account_id, dashboardid, aws_region):
    qs_client = boto3.client('quicksight', region_name=aws_region, config=default_botocore_config())
    res = qs_client.describe_dashboard_permissions(
        AwsAccountId=account_id,
        DashboardId=dashboardid
    )
    return res


def describe_analysis_permissions(account_id, aid, aws_region):
    qs_client = boto3.client('quicksight', region_name=aws_region, config=default_botocore_config())
    res = qs_client.describe_analysis_permissions(
        AwsAccountId=account_id,
        AnalysisId=aid
    )
    return res


def describe_theme_permissions(account_id, aid, aws_region):
    qs_client = boto3.client('quicksight', region_name=aws_region, config=default_botocore_config())
    res = qs_client.describe_theme_permissions(
        AwsAccountId=account_id,
        ThemeId=aid
    )
    return res


def describe_data_set_permissions(account_id, datasetid, aws_region):
    qs_client = boto3.client('quicksight', region_name=aws_region, config=default_botocore_config())
    res = qs_client.describe_data_set_permissions(
        AwsAccountId=account_id,
        DataSetId=datasetid
    )
    return res


def describe_data_source_permissions(account_id, DataSourceId, aws_region):
    qs_client = boto3.client('quicksight', region_name=aws_region, config=default_botocore_config())
    res = qs_client.describe_data_source_permissions(
        AwsAccountId=account_id,
        DataSourceId=DataSourceId
    )
    return res

if __name__ == "__main__":
    sts_client = boto3.client("sts", region_name=aws_region, config=default_botocore_config())
    account_id = sts_client.get_caller_identity()["Account"]

    # call s3 bucket
    s3 = boto3.resource('s3')
    bucketname = 'admin-console-new-' + account_id
    bucket = s3.Bucket(bucketname)

    key = 'monitoring/quicksight/datsets_info/datsets_info.csv'
    key2 = 'monitoring/quicksight/datsets_ingestion/datsets_ingestion.csv'
    key3 = 'monitoring/quicksight/data_dictionary/data_dictionary.csv'
    tmpdir = tempfile.mkdtemp()
    local_file_name = 'datsets_info.csv'
    local_file_name2 = 'datsets_ingestion.csv'
    local_file_name3 = 'data_dictionary.csv'
    path = os.path.join(tmpdir, local_file_name)
    path2 = os.path.join(tmpdir, local_file_name2)
    path3 = os.path.join(tmpdir, local_file_name3)

    access = []

    data_dictionary = []

    dashboards = list_dashboards(account_id, glue_aws_region)

    for dashboard in dashboards:
        dashboardid = dashboard['DashboardId']

        response = describe_dashboard(account_id, dashboardid, glue_aws_region)
        Dashboard = response['Dashboard']
        Name = Dashboard['Name']
        
        print(Dashboard)
        if 'SourceEntityArn' in Dashboard['Version']:
            SourceEntityArn = Dashboard['Version']['SourceEntityArn']
            SourceType = SourceEntityArn.split(":")[-1].split("/")[0]
            print(SourceType)
            if SourceType == 'analysis':
                Sourceid = SourceEntityArn.split("/")[-1]
                try:
                    Source = describe_analysis(account_id, Sourceid, glue_aws_region)
                    SourceName = Source['Analysis']['Name']
                except botocore.exceptions.ClientError as error:
                    if error.response['Error']['Code'] == 'ResourceNotFoundException':
                        print("Analysis ID: " + Sourceid + " not found/does not exist in your account. So, not able to retrieve the Source ID and Source Name.")
                        Sourceid = 'N/A'
                        SourceName = 'N/A'
                    else:
                        raise error
            else:
                print("Source Type is not in analysis. So, not able to retrieve the Source ID and Source Name.")
                Sourceid = 'N/A'
                SourceName = 'N/A'
        else:
            print("SourceEntityArn does not exists in Dashboard: " + Name +". So, not able to retrieve the Source ID and Source Name.")
            Sourceid = 'N/A'
            SourceName = 'N/A'

        DataSetArns = Dashboard['Version']['DataSetArns']
        for ds in DataSetArns:
            dsid = ds.split("/")
            dsid = dsid[-1]
            try:
                dataset = describe_data_set(account_id, dsid, glue_aws_region)
                dsname = dataset['DataSet']['Name']
                LastUpdatedTime = dataset['DataSet']['LastUpdatedTime']
                PhysicalTableMap = dataset['DataSet']['PhysicalTableMap']
                for sql in PhysicalTableMap:
                    sql = PhysicalTableMap[sql]
                    if 'RelationalTable' in sql:
                        DataSourceArn = sql['RelationalTable']['DataSourceArn']
                        DataSourceid = DataSourceArn.split("/")
                        DataSourceid = DataSourceid[-1]
                        try:
                            datasource = describe_data_source(account_id, DataSourceid, glue_aws_region)
                            datasourcename = datasource['DataSource']['Name']
                        except botocore.exceptions.ClientError as error:
                            if error.response['Error']['Code'] == 'ResourceNotFoundException':
                                print(f"Data source ID: " + DataSourceid + " not found/does not exist in your account. Setting datasourcename and DataSourceid to 'N/A'.")
                                DataSourceid = 'N/A'
                                datasourcename = 'N/A'
                            else:
                                raise error
                        if 'Catalog' in sql['RelationalTable']:
                            Catalog = sql['RelationalTable']['Catalog']
                        else:
                            Catalog = 'N/A'
                        Schema = sql['RelationalTable']['Schema']
                        sqlName = sql['RelationalTable']['Name']

                        access.append(
                            [glue_aws_region, Name, dashboardid, SourceName, Sourceid, dsname, dsid, LastUpdatedTime,
                             datasourcename, DataSourceid, Catalog, Schema, sqlName])

                    if 'CustomSql' in sql:
                        DataSourceArn = sql['CustomSql']['DataSourceArn']
                        DataSourceid = DataSourceArn.split("/")
                        DataSourceid = DataSourceid[-1]
                        try:
                            datasource = describe_data_source(account_id, DataSourceid, glue_aws_region)
                            datasourcename = datasource['DataSource']['Name']
                        except botocore.exceptions.ClientError as error:
                            if error.response['Error']['Code'] == 'ResourceNotFoundException':
                                print(f"Data source ID: " + DataSourceid + " not found/does not exist in your account. Setting datasourcename and DataSourceid to 'N/A'.")
                                DataSourceid = 'N/A'
                                datasourcename = 'N/A'
                            else:
                                raise error                        
                        SqlQuery = sql['CustomSql']['SqlQuery'].replace("\n", "").replace("\r", "").replace("\t", "")
                        sqlName = sql['CustomSql']['Name']

                        access.append(
                            [glue_aws_region, Name, dashboardid, SourceName, Sourceid, dsname, dsid, LastUpdatedTime,
                             datasourcename, DataSourceid, 'N/A', sqlName, SqlQuery])

            except botocore.exceptions.ClientError as error:
                if error.response['Error']['Code'] == 'ResourceNotFoundException':
                    print(f"Dataset ID: " + dsid + " not found/does not exist in your account. Skipping this dataset.")
                    continue     

            except Exception as e:
                if (str(e).find('flat file') != -1):
                    pass
                else:
                    raise e


    with open(path, 'w', newline='') as outfile:
        writer = csv.writer(outfile, delimiter='|')
        for line in access:
            writer.writerow(line)
    outfile.close()
    # upload file from tmp to s3 key

    bucket.upload_file(path, key)

    datasets = list_datasets(account_id, glue_aws_region)
    for item in datasets:
        try:
            dsid = item['DataSetId']
            datasetname = item['Name']
            dataset_details = describe_data_set(account_id, dsid, glue_aws_region)
            OutputColumns = dataset_details['DataSet']['OutputColumns']
            for column in OutputColumns:
                columnname = column['Name']
                columntype = column['Type']
                if 'Description' in column.keys():
                    columndesc = column['Description']
                else:
                    columndesc = None
                data_dictionary.append(
                    [datasetname, dsid, columnname, columntype, columndesc]
                )
        except botocore.exceptions.ClientError as error:
            if error.response['Error']['Code'] == 'ResourceNotFoundException':
                print(f"Dataset ID: " + dsid + " not found/does not exist in your account. Skipping this dataset.")
                continue
        
        except Exception as e:
            if (str(e).find('data set type is not supported') != -1):
                pass
            else:
                raise e

    print(data_dictionary)
    with open(path3, 'w', newline='') as outfile:
        writer = csv.writer(outfile, delimiter=',')
        for line in data_dictionary:
            writer.writerow(line)
    outfile.close()
    # upload file from tmp to s3 key
    bucket.upload_file(path3, key3)
