In [18]:
import boto3
dx = boto3.client('dataexchange', region_name='us-east-1')

In [19]:
s3 = boto3.client('s3')

In [20]:
mc = boto3.client('marketplace-catalog', region_name='us-east-1')

In [21]:
obj = s3.get_object(Bucket='trifacta-covid-trifactabucket-q1itzd5kh96', Key='case_publish_to_dx.json')

In [22]:
import json
config = json.load(obj['Body'])

In [23]:
config

{'data_set_id': 'f4257e11b4919ddfeef7f2526a870460',
 'assets': ['trifacta/queryResults/admin@trifacta.local/CASE_DATA.csv',
  'trifacta/queryResults/admin@trifacta.local/US_AGG.csv'],
 'bucket': 'trifacta-covid-trifactabucket-q1itzd5kh96',
 'product_id': 'prod-fpr76dnjiujpu'}

In [25]:
def publish_to_dx(config):
    import boto3, time, datetime, json
    region = 'us-east-1'
    data_set_id = config['data_set_id']
    product_id = config['product_id']
    
    dataexchange = boto3.client(service_name='dataexchange', region_name=region)
    marketplace_catalog = boto3.client(service_name='marketplace-catalog', region_name=region)

    # parse the s3 details from the triggered event
    bucket_name = config['bucket']
    assets = config['assets']

    # CREATE REVISION under the dataset provided as an environment variable
    current_time_for_creating_revision = datetime.datetime.utcnow().strftime("%d %B %Y %I:%M%p UTC")
    create_revision_response = dataexchange.create_revision(DataSetId=data_set_id,
                                                     Comment='Revision created programmatically on ' + current_time_for_creating_revision)
    revision_id = create_revision_response['Id']

    # CREATE JOB under the revision to import file from S3 to DataExchange
    create_job_s3_import = dataexchange.create_job(
        Type='IMPORT_ASSETS_FROM_S3',
        Details={
            'ImportAssetsFromS3': {
                'DataSetId': data_set_id,
                'RevisionId': revision_id,
                'AssetSources': [{'Bucket': bucket_name, 'Key': asset} for asset in assets]
            }
        }
    )

    # Filter the ID of the Job from the response
    job_id = create_job_s3_import['Id']

    # invoke START JOB on the created job to change it from Waiting to Completed state
    start_created_job = dataexchange.start_job(JobId=job_id)

    # GET JOB details to track the state of the job and wait until it reaches COMPLETED state
    job_status = ''

    while job_status != 'COMPLETED':
        get_job_status = dataexchange.get_job(JobId=job_id)
        job_status = get_job_status['State']
        print('Job Status ' + job_status)
        
        if job_status=='ERROR' :
            job_errors = get_job_status['Errors']
            raise Exception('JobId: {} failed with error:{}'.format(job_id, job_errors))
        
        time.sleep(.5)
        
    # Finalize revision by invoking UPDATE REVISION
    current_time_for_finalize_revision = datetime.datetime.utcnow().strftime("%d %B %Y %I:%M%p UTC")
    print(current_time_for_finalize_revision)
    finalize_revision = dataexchange.update_revision(DataSetId=data_set_id, RevisionId=revision_id, Finalized=True,
                                              Comment='Revision finalized programmatically on ' + current_time_for_finalize_revision)
    
    print ('New dataset version created and finalized')

    # New dataset version created and finalized, now letâ€™s add it to an existing product specified as an env variable

    # Describe Product details to get the metadata about the product
    describe_entity = marketplace_catalog.describe_entity(Catalog='AWSMarketplace', EntityId=product_id)

    # Use the output to pull out producttype, productid and datasetarn for startchangeset call
    entity_type = describe_entity['EntityType']
    entity_id = describe_entity['EntityIdentifier']
    dataset_arn = ((json.loads(describe_entity['Details']))['DataSets'][0]['DataSetArn'])
    revision_arn = create_revision_response['Arn']
 

    # StartChangeSet to add the newly finalized revision to an existing product
    start_change_set = marketplace_catalog.start_change_set(
        Catalog='AWSMarketplace',
        ChangeSetName="Adding revision to my Product",
        ChangeSet=[
            {
                "ChangeType": "AddRevisions",
                "Entity": {
                    "Identifier": entity_id,
                    "Type": entity_type
                },
                "Details": json.dumps({
                    "DataSetArn": dataset_arn,
                    "RevisionArns": [revision_arn]
                })
            }
        ]
    )
    
    #Filter the changeset id from the response
    changeset_id = start_change_set['ChangeSetId']

    # DESCRIBE CHANGESET to get the status of the changeset and wait until it reaches SUCCEEDED state
    change_set_status = ''

    while change_set_status != 'SUCCEEDED':
        describe_change_set = marketplace_catalog.describe_change_set(
            Catalog='AWSMarketplace',
            ChangeSetId=changeset_id
            )
        change_set_status = describe_change_set['Status']
        print('Change Set Status ' + change_set_status)

        if change_set_status=='FAILED' :
            print(describe_change_set)
            failurereason = describe_change_set['FailureDescription']
            raise Exception('ChangeSetID: {} failed with error:\n{}'.format(changeset_id, failurereason))
        time.sleep(1)
        
    print('Your data has been published successfully')
    return True

In [26]:
publish_to_dx(config)

Job Status IN_PROGRESS
Job Status IN_PROGRESS
Job Status IN_PROGRESS
Job Status IN_PROGRESS
Job Status IN_PROGRESS
Job Status IN_PROGRESS
Job Status IN_PROGRESS
Job Status IN_PROGRESS
Job Status IN_PROGRESS
Job Status IN_PROGRESS
Job Status IN_PROGRESS
Job Status IN_PROGRESS
Job Status IN_PROGRESS
Job Status IN_PROGRESS
Job Status IN_PROGRESS
Job Status IN_PROGRESS
Job Status IN_PROGRESS
Job Status IN_PROGRESS
Job Status IN_PROGRESS
Job Status COMPLETED
24 May 2020 11:08PM UTC
New dataset version created and finalized
Change Set Status PREPARING
Change Set Status PREPARING
Change Set Status PREPARING
Change Set Status PREPARING
Change Set Status PREPARING
Change Set Status APPLYING
Change Set Status APPLYING
Change Set Status APPLYING
Change Set Status APPLYING
Change Set Status APPLYING
Change Set Status SUCCEEDED
Your data has been published successfully


True

In [27]:
config

{'data_set_id': 'f4257e11b4919ddfeef7f2526a870460',
 'assets': ['trifacta/queryResults/admin@trifacta.local/CASE_DATA.csv',
  'trifacta/queryResults/admin@trifacta.local/US_AGG.csv'],
 'bucket': 'trifacta-covid-trifactabucket-q1itzd5kh96',
 'product_id': 'prod-fpr76dnjiujpu'}

In [29]:
s3.put_object(Body=json.dumps(config), Bucket='trifacta-covid-trifactabucket-q1itzd5kh96', Key='dx_job.json')

{'ResponseMetadata': {'RequestId': '96F692FCEE8D6A17',
  'HostId': 'dzCYD+Qr5YYIE5mB2KWlV0nmbOrYu9J/hmktAlYLxX/SSNSGYys7iljKPQNjAFlV8+ZizOPvhd8=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'dzCYD+Qr5YYIE5mB2KWlV0nmbOrYu9J/hmktAlYLxX/SSNSGYys7iljKPQNjAFlV8+ZizOPvhd8=',
   'x-amz-request-id': '96F692FCEE8D6A17',
   'date': 'Sun, 24 May 2020 23:27:31 GMT',
   'etag': '"73f94729cd14f51de67eaeda3e9856ef"',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'ETag': '"73f94729cd14f51de67eaeda3e9856ef"'}

In [35]:
sqs = boto3.client('sqs', region_name='us-west-2')
queue_url = 'https://sqs.us-west-2.amazonaws.com/163305015547/dataexchange-publish'

In [37]:
sqs.send_message(QueueUrl=queue_url, MessageBody='first sqs message')

{'MD5OfMessageBody': '8b803385021da7a42aeb4811513ac631',
 'MessageId': 'b99c8f1e-42d4-4138-ae4d-20ecd0304ae6',
 'ResponseMetadata': {'RequestId': '74327d55-64e4-55bf-afea-2a10a34091a0',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '74327d55-64e4-55bf-afea-2a10a34091a0',
   'date': 'Mon, 25 May 2020 13:17:24 GMT',
   'content-type': 'text/xml',
   'content-length': '378'},
  'RetryAttempts': 0}}