<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#The-Catalog-Service-API" data-toc-modified-id="The-Catalog-Service-API-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>The Catalog Service API</a></span></li><li><span><a href="#Importing-the-configuration-file" data-toc-modified-id="Importing-the-configuration-file-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Importing the configuration file</a></span></li><li><span><a href="#Catalog-Module" data-toc-modified-id="Catalog-Module-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Catalog Module</a></span></li><li><span><a href="#Get-Datasets" data-toc-modified-id="Get-Datasets-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Get Datasets</a></span></li><li><span><a href="#Get-batches" data-toc-modified-id="Get-batches-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>Get batches</a></span></li><li><span><a href="#Get-Observable-Schema" data-toc-modified-id="Get-Observable-Schema-6"><span class="toc-item-num">6&nbsp;&nbsp;</span>Get Observable Schema</a></span><ul class="toc-item"><li><span><a href="#Observable-Schema-Manager" data-toc-modified-id="Observable-Schema-Manager-6.1"><span class="toc-item-num">6.1&nbsp;&nbsp;</span>Observable Schema Manager</a></span></li></ul></li><li><span><a href="#Extracting-Batch-data" data-toc-modified-id="Extracting-Batch-data-7"><span class="toc-item-num">7&nbsp;&nbsp;</span>Extracting Batch data</a></span></li></ul></div>

In [1]:
import aepp
from aepp import catalog

# The Catalog Service API

The Catalog API is a service that would help investigating the data lake catalog that is available.\
In the Catalog API, you can access the datasets or the batches that are ingested in your organization.\
Catalog is the system of record for data location and lineage within Adobe Experience Platform.\ Catalog Service does not contain the actual files or directories that contain the data. Instead, it holds the metadata and description of those files and directories.

Catalog acts as a metadata store or "catalog" where you can find information about your data within Experience Platform.

Use Catalog to answer the following questions: 
* Where is my data located? 
* At what stage of processing is this data? 
* What systems or processes have acted on my data? 
* What errors occurred during processing? 
* If successful, how much data was processed?

# Importing the configuration file

The complete explanation on how to prepare the config file the can be found on the first template of this serie.\
If you want to understand how you can prepare the file used, you can either read the first template of this serie, or read the [getting started](https://github.com/adobe/aepp/blob/main/docs/getting-started.md) page of the aepp module in github.

In [2]:
import aepp
prod = aepp.importConfigFile('myconfigFile.json',sandbox='prod',connectInstance=True)

# Catalog Module

You can instantiate the `Catalog` module with the config parameter once you have loaded your configuration. 

In [3]:
from aepp import catalog

Each sub module has a class to instantiate in order to create the API connection with the service, in this case, the Catalog API. The instantiation will generate a token for the API connection and takes care of generating a new one if needed.\
It will also connect you with the API in the sandbox provided in the config file, or in the variable used during the import of the config file (can be seen in this example). 

In [4]:
myCatalog = catalog.Catalog(config=prod)

The class has several data attribute that can be useful to you.
* sandbox : It will provide you which sandbox is connected to this instance
* header : In case you want to copy the header to other application (ex: POSTMAN)
* data : it will provide some dictionaries once you have ran the `getDatasets()` method

In [5]:
myCatalog.sandbox

'prod'

# Get Datasets

One use-case from the Catalog API is to retrieve the list of datasets.\
You can realize this task by using the `getDatasets` method.

In [7]:
mydatasets = myCatalog.getDataSets()

In [8]:
len(mydatasets)

155

In [10]:
type(mydatasets)

dict

As you can see the Catalog API is returning a dictionary, where each dataset ID is a key and the object is giving you all the descriptions.\
This can be a bit of a pain to handle because you are not sure what is the ID of the dataset yet.\
For that reason the data attribute is automatically created when the `getDatasets` method is being executed.

The `data` attributes will contain 3 keys for dictionaries:
* ids : The table will be name of the dataset and its ID. (key is dataset name)
* schema_ref : the table will be name of the dataset and its schema reference (key is dataset name)
* table_names : the table will be the name of the dataset and its table name for Query Service (key is the dataset name)

By knowing the name of a dataset name, you can access its `id` easily with the following selector

In [15]:
myCatalog.data.ids['datanalyst 1']

'6059fd4fc52f8819484a7c1c'

The same can be used for the `schena_ref` or `table_names`

# Get batches

Analysing oyur batches can provide you with important information regarding the data ingested in your system.\
The batch analysis is limited by default, and the same way that it is for dataset, it is returning a dictionary.\
If you wish certain batch for certain dataset, you can use one of the parameter available on the getBatches method. 

You can always analyse the possible parameters of a method with the `help` function, such as

In [16]:
help(myCatalog.getBatches)

Help on method getBatches in module aepp.catalog:

getBatches(limit: int = 10, n_results: int = None, output: str = 'raw', **kwargs) -> Union[pandas.core.frame.DataFrame, dict] method of aepp.catalog.Catalog instance
    Retrieve a list of batches.
    Arguments:
        limit : Limit response to a specified positive number of objects. Ex. limit=10 (max = 100)
        n_results : OPTIONAL :  number of result you want to get in total. (will loop - "inf" to get as many as possible)
        output : OPTIONAL : Can be "raw" response (dict) or "dataframe".
    Possible kwargs:
        created : Filter by the Unix timestamp (in milliseconds) when this object was persisted.
        createdAfter : Exclusively filter records created after this timestamp. 
        createdBefore : Exclusively filter records created before this timestamp.
        start : Returns results from a specific offset of objects. This was previously called offset. (see next line)
            offset : Will offset to the nex

It provides the complete docstring that has been documented when creating this method.

To get only the batches that have been ingested by a specific dataset, you can use the following call: 

In [24]:
lastBatches = myCatalog.getBatches(dataSet="6059fd4fc52f8819484a7c1c",orderBy="desc:created")

In [25]:
len(lastBatches)

10

By default, it will retrieve 10 batches, if there are as much, you can always increase the `limit`, up to 100, but 100 us the max of batch you can get per request, after that, you would need to play with the `n_results parameter`.

In [27]:
lastBatches[list(lastBatches.keys())[1]]

{'status': 'success',
 'tags': {'acp_stagePath': ['acp_foundation_stream/stage/01HP1Z0A9YZ733QCJN6AXCATHF'],
  'acp_sloPolicyName': ['live10Mb'],
  'aep/siphon/partitions/paths': [],
  'acp_finalized_time': ['1707316152942'],
  'acp_workflow': ['ValveWorkflow'],
  'numberOfDSFs': ['0'],
  'acp_requestType': ['service'],
  'acp_latencyTargetInMillis': ['300000'],
  'acp_dataSetViewId': ['6059fd4fc52f8819484a7c1d'],
  'acp_type': ['ingest'],
  'siphon/valve/stage/ingest': ['{"id":"41664670acb24a32abea8716db0eeb6c","status":"created","createdAt":1707315913577,"batchId":"01HP1Z0A9YZ733QCJN6AXCATHF","imsOrg":"CA590B1B5D783C2A0A495E47@AdobeOrg","bulkHead":"live","service":"platform.siphon.ingest","properties":{}}'],
  'siphon/valve/ingest/status': ['{"id":"41664670acb24a32abea8716db0eeb6c","status":"finished","createdAt":1707316151102,"batchId":"01HP1Z0A9YZ733QCJN6AXCATHF","imsOrg":"CA590B1B5D783C2A0A495E47@AdobeOrg","bulkHead":"live","output":"/acp_foundation_stream/stage/01HP1Z0A9YZ733QCJN

As you can see you can extract different information from a batch, such as: 
* status
* dataset Id (if you do not have it already)
* metris (ex: record count)
* created time

The status can have different values:
* success
* failed
* staging
* loading

# Get Observable Schema

The observable schema is the schema that is describing the fields that are being used, have been populated for that dataset.\
As it is possible that your schema, used for that dataset, contains fields that are not used.\
Getting the observable schema allow you to know which fields has been populated on this dataset.

In [28]:
danalyst_obs = myCatalog.getDataSetObservableSchema("6059fd4fc52f8819484a7c1c")

## Observable Schema Manager

The same way that the `SchemaManager` allows you to manipulate the schema you have created, the Observable schema allows to analyse the schema that is used for the dataset.\
It is different than the `SchemaManager` because the schema itself does not exist, it is a subset of an existing schema, without references to Field Groups.

You will need to pass the result of the `getDataSetObservableSchema` method to the `ObservableSchemaManager`

In [29]:
obs_datanalyt = catalog.ObservableSchemaManager(danalyst_obs)

In [34]:
obs_datanalyt.to_dataframe(description=True).sample(5)

Unnamed: 0,path,type,description
51,_emeaconsulting.datanalyst.searchKeyword,string,
4,environment,object,Information about the surrounding situation th...
21,implementationDetails.version,string,"The version identifier of the API, e.g h.18."
16,environment.ipV4,string,The numerical label assigned to a device parti...
57,marketing.campaignGroup,string,Name of the campaign group where multiple camp...


You can see above the result that can be derived from it, so you can compare with the schema reference used and see how many fields, defined in your schema, are not used.

**NOTE** : The observable schema is defined during ingestion time. It means that any field that has been ingested at least once will be marked as observable. Even if you only ingested data once. 

# Extracting Batch data

If you want to download the data from a batch, you can use the `Catalog` service and the `Data Access API`.\
We will not deep dive on the Data Access API, but it is necessary to use this API to read data from the lake directly. 

In [36]:
from aepp import dataaccess
myAccess = dataaccess.DataAccess(config=prod)## passing the same configuration

You will need to know which batch ID you want to extract.

In [37]:
myAccess.getBatchFiles('01HP1Z0A9YZ733QCJN6AXCATHF') ##passing the batch ID

[{'dataSetFileId': '01HP1Z0A9YZ733QCJN6AXCATHF-DE1',
  'isValid': False,
  '_links': {'self': {'href': 'https://platform.adobe.io:443/data/foundation/export/files/01HP1Z0A9YZ733QCJN6AXCATHF-DE1'}}}]

You will received a `dataSetFileId` that can be used on the `getFiles`

In [38]:
myAccess.getFiles('01HP1Z0A9YZ733QCJN6AXCATHF-DE1')

{'data': [{'name': 'attempt-01HP24BE2D8WBV5P9SH3ECXA0H_part-00000-f3bcbf29-ba01-44a3-a3cb-0982b69b4dad-c000.snappy.parquet',
   'length': '17396',
   '_links': {'self': {'href': 'https://platform.adobe.io:443/data/foundation/export/files/01HP1Z0A9YZ733QCJN6AXCATHF-DE1?path=attempt-01HP24BE2D8WBV5P9SH3ECXA0H_part-00000-f3bcbf29-ba01-44a3-a3cb-0982b69b4dad-c000.snappy.parquet'}}}],
 '_page': {'limit': 100, 'count': 1}}

This will return a path that can be used to download the exact file\
This path can be used in the API call than before, with a new parameter. 

In [39]:
data = myAccess.getFiles('01HP1Z0A9YZ733QCJN6AXCATHF-DE1',path='attempt-01HP24BE2D8WBV5P9SH3ECXA0H_part-00000-f3bcbf29-ba01-44a3-a3cb-0982b69b4dad-c000.snappy.parquet')

Once the data has been received, you can then use it with `pandas` and `read_parquet`

In [40]:
import pandas as pd

In [41]:
df = pd.read_parquet(data)

In [42]:
df.head()

Unnamed: 0,_emeaconsulting,device,environment,implementationDetails,placeContext,timestamp,identityMap,marketing,web,eventType,_id
0,{'datanalyst': {'pageSubCategory': 'xdm-the-ul...,"{'screenOrientation': 'landscape', 'screenHeig...","{'type': 'browser', 'browserDetails': {'viewpo...","{'environment': 'browser', 'name': 'https://ns...","{'localTimezoneOffset': -330, 'localTime': 202...",2024-02-07 14:13:45.717000+00:00,"[(ECID, [{'authenticatedState': 'ambiguous', '...","{'trackingCode': '', 'campaignGroup': None, 'c...",{'webPageDetails': {'URL': 'https://www.datana...,web.webpagedetails.pageViews\t,b9e4d850-dda4-49c7-af09-c0aa27e7210a-0


You can then export the data in a JSON format.\
That will result in the same message that have been recorded during ingestion. 

In [46]:
df.to_dict(orient='records')

[{'_emeaconsulting': {'datanalyst': {'pageSubCategory': 'xdm-the-ultimate-guide',
    'postLength': '24787',
    'searchKeyword': '',
    'pageCategory': 'adobe-experience-platform',
    'postTitle': 'XDM Schemas: The ultimate guide | Datanalyst',
    'returningVisitor': False}},
  'device': {'screenOrientation': 'landscape',
   'screenHeight': 720,
   'screenWidth': 1280},
  'environment': {'type': 'browser',
   'browserDetails': {'viewportHeight': 598,
    'viewportWidth': 1257,
    'userAgent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 Edg/121.0.0.0',
    'userAgentClientHints': {'platform': 'Windows',
     'brands': array([{'brand': 'Not A(Brand', 'version': '99'},
            {'brand': 'Microsoft Edge', 'version': '121'},
            {'brand': 'Chromium', 'version': '121'}], dtype=object),
     'mobile': False}},
   'ipV4': '223.190.82.15',
   'ipV6': None},
  'implementationDetails': {'environment': 'browser',