<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Setup" data-toc-modified-id="Setup-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Setup</a></span></li><li><span><a href="#Load-the-data" data-toc-modified-id="Load-the-data-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Load the data</a></span></li><li><span><a href="#Create-the-Schema-and-dataset" data-toc-modified-id="Create-the-Schema-and-dataset-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Create the Schema and dataset</a></span><ul class="toc-item"><li><span><a href="#Field-Group-Creation" data-toc-modified-id="Field-Group-Creation-3.1"><span class="toc-item-num">3.1&nbsp;&nbsp;</span>Field Group Creation</a></span></li><li><span><a href="#Schema-creation" data-toc-modified-id="Schema-creation-3.2"><span class="toc-item-num">3.2&nbsp;&nbsp;</span>Schema creation</a></span></li><li><span><a href="#Dataset-Creation" data-toc-modified-id="Dataset-Creation-3.3"><span class="toc-item-num">3.3&nbsp;&nbsp;</span>Dataset Creation</a></span></li></ul></li><li><span><a href="#Prepare-for-AEP-data-Ingestion" data-toc-modified-id="Prepare-for-AEP-data-Ingestion-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Prepare for AEP data Ingestion</a></span></li><li><span><a href="#Data-Ingestion" data-toc-modified-id="Data-Ingestion-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>Data Ingestion</a></span></li><li><span><a href="#CJA-Connection-and-DataView" data-toc-modified-id="CJA-Connection-and-DataView-6"><span class="toc-item-num">6&nbsp;&nbsp;</span>CJA Connection and DataView</a></span></li></ul></div>

In this notebook, we will use the power of aepp to load the data in the AEP data lake and then use that data in CJA as summary data.\
Due to time constraint, the data has been pre-loaded in CJA for this lab.

# Setup

In [1]:
import aepp
import pandas as pd
from datetime import datetime
import uuid, json

# Load the data

From the previous notebook, we have created a file name `'prediction_orders.2024.2025.csv'`.\
We will load the data associated with this file using pandas and change some elements in order to better represent the information we will want to load into AEP. 

In [2]:
df = pd.read_csv('prediction_orders.2024.2025.csv')

In [3]:
df['Orders'].max()

157.7909723598208

We want the order to be sent as Integer and not float, as there is no half order in real life.

In [4]:
df['Orders'] = df['Orders'].astype(int)

In [5]:
df.head()

Unnamed: 0,Date,Orders,7_day_rolling_avg,log_returns
0,2024-01-01,107,,
1,2024-01-02,97,,-0.928198
2,2024-01-03,109,,1.136766
3,2024-01-04,122,,1.13038
4,2024-01-05,96,,-2.415115


# Create the Schema and dataset

We will connect to AEP API via `aepp` in order to create the different element needed to load the data in AEP.

In [6]:
prod = aepp.importConfigFile('myconfig.json',sandbox='bighouse',connectInstance=True)

From aepp, we will need to use the following modules:
* fieldgroupmanager : To easily create a field group with the different field names
* schemamanager : To easily create a schema
* catalog : To create a dataset to load the data into.
* ingestion : To create the batch to load the into the dataset.
* som : To easily create the objects that we want to load via the ingestion module.

In [7]:
from aepp import ingestion, catalog, schemamanager, fieldgroupmanager, som

## Field Group Creation

A field group is a definition of different fields that are usually combine together in a business related view.\
Most of the time, the users are creating the field groups via the UI but in our example, we don't want to be pull away from our notebook, so we will use the power of aepp to generate the fields in the new field group to load the data.\
Field Groups are always associated to a class in order to limit their footprint, in our case, we will use the class defining the  summary metrics

In [8]:
predictionFieldGroup = fieldgroupmanager.FieldGroupManager(title='prediction',fg_class=['https://ns.adobe.com/xdm/classes/summarymetrics'],config=prod)

Checking the current representation

In [9]:
predictionFieldGroup.to_dict()

{'_acxpevangelist': {}}

We will create a node, called `prediction` and we will add the number of orders predicted in that node as an integer (`predictedOrders`) 

In [10]:
predictionFieldGroup.addField('_acxpevangelist.prediction',dataType='object',objectComponents={'predictedOrders':'integer'})

{'title': 'prediction',
 'meta:resourceType': 'mixins',
 'description': 'powered by aepp',
 'type': 'object',
 'definitions': {'customFields': {'type': 'object',
   'properties': {'_acxpevangelist': {'properties': {'prediction': {'type': 'object',
       'title': 'prediction',
       'description': '',
       'properties': {'predictedOrders': {'type': 'integer'}}}},
     'type': 'object'}}},
  'property': {'type': 'object',
   'properties': {'_acxpevangelist': {'properties': {}, 'type': 'object'}}}},
 'allOf': [{'$ref': '#/definitions/customFields', 'type': 'object'},
  {'$ref': '#/definitions/property', 'type': 'object'}],
 'meta:intendedToExtend': ['https://ns.adobe.com/xdm/classes/summarymetrics'],
 'meta:containerId': 'tenant',
 'meta:tenantNamespace': '_acxpevangelist'}

You can see the result of the element being added to the Field Group.\
Below a more visual representation:

In [11]:
predictionFieldGroup.to_dict()

{'_acxpevangelist': {'prediction': {'predictedOrders': 'integer'}}}

    In order to create that field group, we can just use the `createFieldGroup` method

In [12]:
predictionFieldGroup.createFieldGroup()

{'$id': 'https://ns.adobe.com/acxpevangelist/mixins/a457146466ee18ccc1f109d8a128ae163d3560948a8fca1c',
 'meta:altId': '_acxpevangelist.mixins.a457146466ee18ccc1f109d8a128ae163d3560948a8fca1c',
 'meta:resourceType': 'mixins',
 'version': '1.0',
 'title': 'prediction',
 'type': 'object',
 'description': 'powered by aepp',
 'definitions': {'customFields': {'type': 'object',
   'properties': {'_acxpevangelist': {'properties': {'prediction': {'type': 'object',
       'title': 'prediction',
       'description': '',
       'properties': {'predictedOrders': {'type': 'integer',
         'minimum': -2147483648,
         'maximum': 2147483647,
         'meta:xdmType': 'int'}},
       'meta:xdmType': 'object'}},
     'type': 'object',
     'meta:xdmType': 'object'}},
   'meta:xdmType': 'object'},
  'property': {'type': 'object',
   'properties': {'_acxpevangelist': {'properties': {},
     'type': 'object',
     'meta:xdmType': 'object'}},
   'meta:xdmType': 'object'}},
 'allOf': [{'$ref': '#/defi

## Schema creation

A schema is composed of a class and field groups.\
We will use the Field Group created before to create the schema.\
The class used will be the `'https://ns.adobe.com/xdm/classes/summarymetrics'` class that represent the summary data. 

In [13]:
cjaSummaryPrediction = schemamanager.SchemaManager(title='AdobeStore Predictions',schemaClass='https://ns.adobe.com/xdm/classes/summarymetrics',config=prod)

We simply needs to add the field group previously created to that schema

In [14]:
cjaSummaryPrediction.addFieldGroup('https://ns.adobe.com/acxpevangelist/mixins/a457146466ee18ccc1f109d8a128ae163d3560948a8fca1c')

{
  "$id": "https://ns.adobe.com/acxpevangelist/mixins/a457146466ee18ccc1f109d8a128ae163d3560948a8fca1c",
  "meta:altId": "_acxpevangelist.mixins.a457146466ee18ccc1f109d8a128ae163d3560948a8fca1c",
  "meta:resourceType": "mixins",
  "version": "1.0",
  "title": "prediction",
  "type": "object",
  "description": "powered by aepp",
  "definitions": {
    "customFields": {
      "type": "object",
      "properties": {
        "_acxpevangelist": {
          "properties": {
            "prediction": {
              "type": "object",
              "title": "prediction",
              "description": "",
              "properties": {
                "predictedOrders": {
                  "type": "integer",
                  "minimum": -2147483648,
                  "maximum": 2147483647,
                  "meta:xdmType": "int"
                }
              },
              "meta:xdmType": "object"
            }
          },
          "type": "object",
          "meta:xdmType": "object"
      

We can verify that the schema is looking as expected via the `to_dict()` method.

In [15]:
cjaSummaryPrediction.to_dict()

{'_id': 'string',
 'eventType': 'string',
 'timestamp': 'string',
 '_acxpevangelist': {'prediction': {'predictedOrders': 'integer'}}}

We can then directly create the schema from this `SchemaManager` class.

In [16]:
cjaSummaryPrediction.createSchema()

{'$id': 'https://ns.adobe.com/acxpevangelist/schemas/1d8f0ad11daa2fece55a19b76283272d0cf6980205103332',
 'meta:altId': '_acxpevangelist.schemas.1d8f0ad11daa2fece55a19b76283272d0cf6980205103332',
 'meta:resourceType': 'schemas',
 'version': '1.0',
 'title': 'AdobeStore Predictions',
 'type': 'object',
 'description': 'powered by aepp',
 'allOf': [{'$ref': 'https://ns.adobe.com/xdm/classes/summarymetrics',
   'type': 'object',
   'meta:xdmType': 'object'},
  {'$ref': 'https://ns.adobe.com/acxpevangelist/mixins/a457146466ee18ccc1f109d8a128ae163d3560948a8fca1c',
   'type': 'object',
   'meta:xdmType': 'object'}],
 'refs': ['https://ns.adobe.com/acxpevangelist/mixins/a457146466ee18ccc1f109d8a128ae163d3560948a8fca1c',
  'https://ns.adobe.com/xdm/classes/summarymetrics'],
 'required': ['xdm:timestamp'],
 'imsOrg': 'D0F83C645C5E1CC60A495CB3@AdobeOrg',
 'additionalInfo': {'numberOfIdentities': 0,
  'numberOfRelationShips': 0,
  'classTitle': 'XDM Summary Metrics',
  'hasRelationShip': False,
  

## Dataset Creation

Once you have create the schema, you can then create a dataset directly.\
As usual, this task can be done directly via the UI.\
In our notebook, we will keep going to use the API to keep our flow going.

The dataset can be created via the Catalog API, that is part of the `catalog` module.

In [17]:
cat = catalog.Catalog(config=prod)

To create a dataset, you can use the parameterize version of the method in order to simplify your task.

In [18]:
cat.createDataSets(name='Summary Data - CJA Summit 2025 lab123',
                   schemaId='https://ns.adobe.com/acxpevangelist/schemas/1d8f0ad11daa2fece55a19b76283272d0cf6980205103332')

['@/dataSets/67aba67f109a4f2aee087796']

You can save that dataset ID, as it will be useful for the future on data ingestion 

In [19]:
datasetId = '67aba67f109a4f2aee087796'

# Prepare for AEP data Ingestion

With the data that has been loaded previously from your csv, you can go through the different rows and load the data in the different payload you would like to have.\
Our organization is `_acxpevangelist` and we can use it at the start before creating the different payload when reading the data.\
The `Som` class is an easy way to build complex data structure without caring of nested struct.

In [20]:
dataToLoad = [] ## the complete payload that will have all the different dates
for index, row in df.iterrows():
    mysom = som.Som() ## initatiate with the Som
    date = datetime.fromisoformat(row['Date']) ## reading the date
    timestamp = date.replace(hour=6).timestamp() ## setting the timestamp 
    mysom.assign('_id',str(uuid.uuid4())) ## setting a uuid
    mysom.assign('timestamp',int(timestamp*1000)) ## setting the timestamp in milliseconds
    mysom.assign('eventType','predictionData')
    mysom.assign('_acxpevangelist.prediction.predictedOrders',row['Orders'])
    dataToLoad.append(mysom.to_dict())

Verifying the data

In [22]:
dataToLoad[:5]

[{'_id': '79d117c7-204f-4567-8cee-946e842c316d',
  'timestamp': 1704085200000,
  'eventType': 'predictionData',
  '_acxpevangelist': {'prediction': {'predictedOrders': 107}}},
 {'_id': '57faea69-fc0f-4c8a-ac77-d4604a86d00c',
  'timestamp': 1704171600000,
  'eventType': 'predictionData',
  '_acxpevangelist': {'prediction': {'predictedOrders': 97}}},
 {'_id': 'c45c24da-09a7-48a7-8fdd-06e0acdc6d34',
  'timestamp': 1704258000000,
  'eventType': 'predictionData',
  '_acxpevangelist': {'prediction': {'predictedOrders': 109}}},
 {'_id': '0315ae76-3791-46b7-85f3-b71770ddfe45',
  'timestamp': 1704344400000,
  'eventType': 'predictionData',
  '_acxpevangelist': {'prediction': {'predictedOrders': 122}}},
 {'_id': '5e29109b-50d2-4b7c-99a1-66c3d65524a2',
  'timestamp': 1704430800000,
  'eventType': 'predictionData',
  '_acxpevangelist': {'prediction': {'predictedOrders': 96}}}]

Verifying the timestamp with `time` module for the first iteration:

In [23]:
import time
print(time.ctime(1704085200000/1000)) ## only support seconds
print(time.ctime(1704171600000/1000)) ## only support seconds

Mon Jan  1 06:00:00 2024
Tue Jan  2 06:00:00 2024


You can then save the data in a JSON format if you wish to send it via different form (directly in the UI or via Source).

In [None]:
with open('predicted_data.json','w') as f:
    json.dump(dataToLoad,f,indent=2)

# Data Ingestion

As usual, we will use the Notebook to complete our test by ingesting the data into AEP.\
We will use the `ingestion` module and directly load a small batch to AEP via the notebook.

In [24]:
dataIngestion = ingestion.DataIngestion(config=prod)

The process to load data via a batch in AEP is in 3 steps:
1. Create a Batch to receive data
2. Load the data in the batch (using the Batch ID created in the first operation)
3. Finish the batch by sending the `COMPLETE` operation.

In [25]:
mybatch = dataIngestion.createBatch(datasetId=datasetId,format="json",multiline=True)

In [26]:
mybatch

{'id': '01JKV939NTGF1NXKPEDG7AYMSB',
 'imsOrg': 'D0F83C645C5E1CC60A495CB3@AdobeOrg',
 'updated': 1739303528301,
 'status': 'loading',
 'created': 1739303528301,
 'relatedObjects': [{'type': 'dataSet', 'id': '67aba67f109a4f2aee087796'}],
 'version': '1.0.0',
 'tags': {'acp_stagePath': ['acp_foundation_push/stage/01JKV939NTGF1NXKPEDG7AYMSB'],
  'acp_requestType': ['user'],
  'acp_dataSetViewId': ['67aba681109a4f2aee087798'],
  'acp_type': ['ingest'],
  'acp_producer': ['fbb6f6a5024e45d08b5f346a3c9a540b',
   'aep/siphon/bi/uploadMode::']},
 'createdUser': '231A1E846763328F0A495FB7@techacct.adobe.com',
 'updatedUser': '231A1E846763328F0A495FB7@techacct.adobe.com',
 'externalId': '01JKV939NTGF1NXKPEDG7AYMSB',
 'createdClient': 'acp_foundation_push',
 'inputFormat': {'format': 'json', 'isMultiLineJson': True}}

In [27]:
res = dataIngestion.uploadSmallFile(batchId=mybatch['id'],datasetId=datasetId,filePath='data.prediction.json',data=dataToLoad)
res

{}

No error are returned, we can close the batch

In [28]:
dataIngestion.uploadSmallFileFinish(batchId=mybatch['id'])

{}

Once your batch is ingested, you can also monitor its status via the Catalog API.\
It will turn as `success` if everything is fine.

In [31]:
cat.getBatch(mybatch['id'])

{'01JKV939NTGF1NXKPEDG7AYMSB': {'status': 'success',
  'tags': {'acp_stagePath': ['acp_foundation_push/stage/01JKV939NTGF1NXKPEDG7AYMSB'],
   'acp_sloPolicyName': ['live10Mb'],
   'aep/siphon/partitions/paths': [],
   'acp_finalized_time': ['1739303731036'],
   'acp_workflow': ['ValveWorkflow'],
   'numberOfDSFs': ['0'],
   'acp_requestType': ['user'],
   'acp_latencyTargetInMillis': ['300000'],
   'acp_dataSetViewId': ['67aba681109a4f2aee087798'],
   'acp_type': ['ingest'],
   'siphon/valve/stage/ingest': ['{"id":"3391fce4bc41404bb9cd7de4e993003a","status":"created","createdAt":1739303632025,"batchId":"01JKV939NTGF1NXKPEDG7AYMSB","imsOrg":"D0F83C645C5E1CC60A495CB3@AdobeOrg","bulkHead":"live","service":"platform.siphon.ingest","properties":{}}'],
   'siphon/valve/ingest/status': ['{"id":"3391fce4bc41404bb9cd7de4e993003a","status":"finished","createdAt":1739303728993,"batchId":"01JKV939NTGF1NXKPEDG7AYMSB","imsOrg":"D0F83C645C5E1CC60A495CB3@AdobeOrg","bulkHead":"live","output":"/acp_foun

# CJA Connection and DataView

As of today, once the data has been ingested in AEP, CJA does not provide an API to load this new dataset into the existing connection, neither a way to programmatically update the Data View with these new components.\
These operations have been done manually via the UI prior to this Lab.\
I hope you could still the possibility of automation and improvement in your workflow via this Lab.

Do not forget to vote for the Lab and support via any mean the repos for `cjapy` and `aepp`, both open source solutions. 