In [1]:
# NOTES
# Versioning persists after deletion: e.g. I delete version 2, next version will be 3 regardless.
# Send request with json.dumps(data) to wrap json format into a string, otherwise it does not work
# Pausing takes some time and a journey version cannot be resumed unless it is in status "Paused"

# Edge Cases:
## Weights for A/B Test do not add up to 100%
## Status must remain the same if in ("Running", "Draft", "Paused")
## Journey should be ignored if "Stopped"

# Requirements:
## Save logs in file
## Send logs to Karam
## Run async since each journey update takes 5' approx.

In [73]:
import pandas as pd
import numpy as np
import requests
import re
import json
import time
from time import sleep
import datetime
from math import *
import sys
import functools
import warnings
warnings.filterwarnings("ignore")

import concurrent.futures
import multiprocessing
import time

# Logging

In [74]:
import logging

logging.basicConfig(filename='ikea_journey_weights_update.log', level=logging.DEBUG,
                    format="%(asctime)s:%(levelname)s:%(message)s")

# Handle Errors

In [75]:
class RequestError(Exception):
    
    def __init__(self, response, message):
        self.response = response
        self.message = message
        super().__init__(message)

class StatusError(Exception):
    
    def __init__(self, status, message):
        self.status = status
        self.message = message
        super().__init__(message)

# Journey Builder API Credentials

In [76]:
# Retrieve login parameters from config file
with open("config.json") as credentials:
    credentials = json.load(credentials)

client_id = credentials["client_id"]
client_secret = credentials["client_secret"]

# Journey Builder Class

In [77]:
class JourneyBuilder:
    """This class is meant to represent specific journeys within Salesforce Journey Builder API."""
    
    def __init__(self, journey_name):

        self.journey_name = journey_name
        
        # Authentication
        self.client_id = client_id
        self.client_secret = client_secret
        self.auth_url = "https://mc42bdlx7mz5h4np2xxvhsb4scvq.auth.marketingcloudapis.com/v2/token"
        self.auth_headers = {"content-type": "application/json"}
        self.auth_payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }
        
        self.rest_url = "https://mc42bdlx7mz5h4np2xxvhsb4scvq.rest.marketingcloudapis.com"

        try:
            self.access_token, self.access_token_expiration, self.rest_headers = self.get_access_token()
            if self.access_token is None:
                raise Exception("Request for access token failed.")
        except Exception as e:
            print(e)

    def get_access_token(self):
        """Get access token; associated with the decorator below."""
        try:
            authentication_response = requests.post(
                url=self.auth_url, 
                data=json.dumps(self.auth_payload), 
                headers=self.auth_headers, 
                verify=False)
            authentication_response.raise_for_status()
        except Exception as e:
            print(e)
            return None
        else:
            access_token = authentication_response.json()["access_token"]
            access_token_expiration = authentication_response.json()["expires_in"]
            rest_headers = {"authorization": f"Bearer {access_token}"}
            return access_token, access_token_expiration, rest_headers
    
    class Decorators():
        @staticmethod
        def refresh_token(decorated):
            """Methods with this decorator will refresh the access token if it is expired before running."""
            def wrapper(api,*args,**kwargs):
                if time.time() > api.access_token_expiration:
                    api.access_token, api.access_token_expiration, api.rest_headers = api.get_access_token()
                return decorated(api,*args,**kwargs)
            return wrapper
    
    @Decorators.refresh_token
    def get_metadata(self, *args):
        """Get journey metadata such as id, key, name, version, status etc.."""
        
        # Get all journeys for user
        response = requests.get(
            url=f"{self.rest_url}/interaction/v1/interactions/",
            headers=self.rest_headers,
            verify=False
        )
        # Raise exception if status not in 200s
        pattern = re.compile("20[0-9]")
        match = re.findall(pattern, str(response.status_code))
        if not match:
            raise RequestError(
                response, "get_metadata: Response not in 200s.")
        
        # Return metadata for journey = journey_name
        return [[journey[arg] for journey in json.loads(response.content)["items"]
                 if journey["name"] == self.journey_name][0] for arg in args]
    
    @Decorators.refresh_token
    def change_status(self, action):
        """Change journey status with an action: Pause, Resume, Stop."""
        
        journey_id, journey_version = self.get_metadata("id", "version")
        
        # The API needs to know where it has to perform the action: journey id + version.
        response = requests.post(
            url=f"{self.rest_url}/interaction/v1/interactions/{action}/{journey_id}?versionNumber={journey_version}",
            headers=self.rest_headers,
            verify=False
        )
        # Raise exception if status not in 200s
        pattern = re.compile("20[0-9]")
        match = re.findall(pattern, str(response.status_code))
        if not match:
            raise RequestError(
                response, f"change_status: Response not in 200s.")
    
    @Decorators.refresh_token
    def get_activities(self):
        """Get all journey activities with the journey id."""
        
        journey_id = self.get_metadata("id")[0]

        # Request gets all activities within journey based on id
        response = requests.get(
            url=f"{self.rest_url}/interaction/v1/interactions/{journey_id}",
            headers=self.rest_headers,
            verify=False
        )
        # Raise exception if status not in 200s
        pattern = re.compile("20[0-9]")
        match = re.findall(pattern, str(response.status_code))
        if not match:
            raise RequestError(
                response, "get_activities: Response not in 200s.")   
        return json.loads(response.content)
    
    @Decorators.refresh_token
    def update_version(self, journey_version_updated):
        """Update journey version with updated activities. Status cannot be 'Published'."""
        response = requests.put(
            url=f"{self.rest_url}/interaction/v1/interactions",
            headers=self.rest_headers,
            data=json.dumps(journey_version_updated),
            verify=False
        )
        # Raise exception if status not in 200s
        pattern = re.compile("20[0-9]")
        match = re.findall(pattern, str(response.status_code))
        if not match:
            raise RequestError(
                response, f"update_journey_version: Response not in 200s.")

"""
# create_version method in case I need it at some point.
    
    @Decorators.refreshToken
    def create_version(self, journey_id, journey_version, new_perc):
        # Update journey version
        journey_activities = self.get_activities_by_id(journey_id)
        new_version_journey_activities = self.update_journey_perc(journey_activities, new_perc, version="new")
        # Create new jounrey version
        r = requests.post(
            url=f'{self.rest_url}/interaction/v1/interactions',
            headers=self.rest_headers,
            data=json.dumps(new_version_journey_activities),
            verify=False
        )
        logging.info(f"Journey: {journey_id, journey_version+1} CREATED SUCCESSFULLY!")
        # Stop old version
        self.change_status(journey_id, journey_version, "stop")
        logging.info(f"Journey: {journey_id, journey_version} STOPPED SUCCESSFULLY!")
        """

# Update Journey Activities with New Weights

In [78]:
"""def update_journey_weights(journey_activities, journey_new_weights):
    
    # journey_new_weights = {"journey_name": "", "email_name": "", "new_weight": 00}
    
    # Iterate through the journey paths / emails
    for email_name in journey_new_weights["email_name"]:
        
        email_name_new_weight = journey_new_weights.loc[journey_new_weights["email_name"] == email_name]["new_weight"].values[0]
        # For each path, iterate through the journey activities
        # Find the activity whose name == path_name / email_name and get the activity key (e.g. "EMAILV2-1")
        # Iterate through the journey activities
        # Find outcome whose next value == activity key (e.g. Corresponding activity == "RANDOMSPLITV2-1")
        # Update that outcome ["arguments"]["percentage"] with "00" + ["metaData"]["label"] with "00%"
        for activity in journey_activities["activities"]:
             if activity["name"] == email_name:
                    key = activity["key"]
        for activity in journey_activities["activities"]:
            for outcome in activity["outcomes"]:
                if "next" in outcome:
                    if outcome["next"] == key:
                        outcome["arguments"]["percentage"] = str(email_name_new_weight)
                        outcome["metaData"]["label"] = str(email_name_new_weight) + "%"
    return journey_activities"""

'def update_journey_weights(journey_activities, journey_new_weights):\n    \n    # journey_new_weights = {"journey_name": "", "email_name": "", "new_weight": 00}\n    \n    # Iterate through the journey paths / emails\n    for email_name in journey_new_weights["email_name"]:\n        \n        email_name_new_weight = journey_new_weights.loc[journey_new_weights["email_name"] == email_name]["new_weight"].values[0]\n        # For each path, iterate through the journey activities\n        # Find the activity whose name == path_name / email_name and get the activity key (e.g. "EMAILV2-1")\n        # Iterate through the journey activities\n        # Find outcome whose next value == activity key (e.g. Corresponding activity == "RANDOMSPLITV2-1")\n        # Update that outcome ["arguments"]["percentage"] with "00" + ["metaData"]["label"] with "00%"\n        for activity in journey_activities["activities"]:\n             if activity["name"] == email_name:\n                    key = activity["ke

In [79]:
# Add condition: Do weights add up to 100%.
def filter_weights_not_adding_up_to_100_perc(journey_new_weights):
    
    # Column "split" should be present, as it represents the A/B Test name to be grouped by
    journey_new_weights["total_weights"] = journey_new_weights.groupby(["journey_name", "split"]).new_weight.transform(np.sum)
    
    # Get path_names / email_names that do not match this criterion for log purposes
    email_names_not_adding_up_to_100_perc = journey_new_weights.loc[journey_new_weights["total_weights"] != 100]["email_name"].to_list()
    logging.debug(f"Email Names {email_names_not_adding_up_to_100_perc} NOT ADDING UP TO 100%.")
     
    # Filter dataframe keeping only weights adding up to 100% within their respective A/B Test
    journey_new_weights = journey_new_weights.loc[journey_new_weights["total_weights"] == 100]
    
    return journey_new_weights

def update_journey_activities(journey_activities, journey_new_weights):    
    
    journey_name = journey_new_weights["journey_name"][0]
    
    # journey_new_weights = {"journey_name": "", "email_name": "", "new_weight": 00}
    
    # Add 2 columns for tracking purposes: activity key (e.g. "EMAILV2-1") & split (e.g. "RANDOMSPLITV2-1").
    journey_new_weights["activity_key"] = ""
    journey_new_weights["split"] = ""
    
    # Find & assign activity key + split
    for email_name in journey_new_weights["email_name"]:
        # Iterate through activities and find activity with name == path_name / email_name
        for activity in journey_activities["activities"]:
            if activity["name"] == email_name:
                # Get activity key (e.g. "EMAILV2-1"), Save as activity key
                journey_new_weights.loc[journey_new_weights["email_name"]==email_name, "activity_key"] = activity["key"]
                print(f"{journey_name}{email_name} Activity key found..")
        
        # Iterate through activities and find activity with outcomes that contain next == previous activity key
        for activity in journey_activities["activities"]:
            for outcome in activity["outcomes"]:
                if "next" in outcome:
                    if (outcome["next"] == journey_new_weights.loc[journey_new_weights["email_name"] == email_name, "activity_key"]).bool():
                        # Get activity key (e.g. "RANDOMSPLITV2-1"), Save as split
                        journey_new_weights.loc[journey_new_weights["email_name"] == email_name, "split"] = activity["key"]
                        print(f"{journey_name}{email_name} Split found..")
    
    # Filter activities whose outcome weights do not add up to 100%
    journey_new_weights = filter_weights_not_adding_up_to_100_perc(journey_new_weights)
    print(f"{journey_name} Weights filtered..")
    
    # Assign new weights
    for email_name in journey_new_weights["email_name"]:
        # Get path_name / email_name new weight
        email_name_new_weight = journey_new_weights.loc[journey_new_weights["email_name"] == email_name]["new_weight"].values[0]
        # Iterate through activities and find activity whose key == split
        for activity in journey_activities["activities"]:
            if (activity["key"] == journey_new_weights.loc[journey_new_weights["email_name"] == email_name, "split"]).bool():
                # Iterate through outcomes and find outcome whose next == activity key
                for outcome in activity["outcomes"]:
                    if (outcome["next"] == journey_new_weights.loc[journey_new_weights["email_name"] == email_name, "activity_key"]).bool():
                        # Update weights
                        outcome["arguments"]["percentage"] = str(email_name_new_weight)
                        outcome["metaData"]["label"] = str(email_name_new_weight) + "%"
                        print(f"{journey_name}{email_name} Weights updated..")
                        
    return journey_activities

# Reading Data

In [80]:
# Read journey new weights - "AbandonedCart Program"
new_weights = pd.read_csv("journey_builder_api_test.csv")
new_weights

Unnamed: 0,journey_name,email_name,new_weight,status
0,test_2810,test_2810_A,50,paused
1,test_2810,test_2810_B,50,paused
2,test_kwr_multilanguage_setup,EN_A,50,running
3,test_kwr_multilanguage_setup,EN_B,50,running
4,test_kwr_multilanguage_setup,CA_A,50,running
5,test_kwr_multilanguage_setup,CA_B,50,running
6,test_kwr_multilanguage_setup,ES_A,50,running
7,test_kwr_multilanguage_setup,ES_B,50,running
8,test_kwr_multilanguage_setup,Path4_A,50,running
9,test_kwr_multilanguage_setup,Path4_B,50,running


# Main Function

In [81]:
def main_function(journey_name):
    
    print(f"{journey_name} Starting..")
    
    # Get journey new weights from file
    journey_new_weights = new_weights.loc[new_weights["journey_name"] == journey_name].reset_index()
        
    # Instantiate journey object
    journey_object = JourneyBuilder(journey_name)
    
    # Get journey activities
    journey_activities = journey_object.get_activities()
    
    # Check if status in ("Published", "Draft", "Paused"): log
    journey_id, journey_version, journey_status = journey_object.get_metadata("id", "version", "status")
        
    # If status == "Published" then Pause / Update / sleep(300) / Resume: log
    # Journey cannot be updated if status == "Published"
    # Journey cannot be resumed if status != "Paused"
    # Takes 5' approx to pause a journey
    if journey_status in ["Published"]:

        print(f"{journey_name} First cond: Published..")

        # Pause journey
        journey_object.change_status("Pause")
        logging.debug(f"Journey {journey_id}, Version {journey_version}, Status {journey_status}, PAUSED SUCCESSFULLY")

        print(f"{journey_name} Paused Success..")

        # Get journey activities, again because status cannot be "Published" in activities updated
        journey_activities = journey_object.get_activities()
        # Update journey activities
        journey_activities_updated = update_journey_activities(journey_activities, journey_new_weights)
        # Update journey version in API
        journey_object.update_version(journey_activities_updated)
        logging.debug(f"Journey {journey_id}, Version {journey_version}, Status {journey_status}, UPDATED SUCCESSFULLY")
        print(f"{journey_name} Updated Success..")
        sleep(300)
        # Raise an error if status != "Paused"
        journey_status = journey_object.get_metadata("status")[0]
        if journey_status != "Paused":
            raise StatusError(
                journey_status, f"status = {journey_status}, should be Paused.") 
        # Resume journey
        journey_object.change_status("Resume")
        logging.debug(f"Journey {journey_id}, Version {journey_version}, Status {journey_status}, RESUMED SUCCESSFULLY")
        print(f"{journey_name} Resumed Success..")

    # If status in ("Paused", "Draft") then Update / Do not publish: log
    elif journey_status in ["Paused", "Draft"]:

        print(f"{journey_name} First cond: Paused/Draft..")

        # Update journey activities
        journey_activities_updated = update_journey_activities(journey_activities, journey_new_weights)
        # Update journey version in API
        journey_object.update_version(journey_activities_updated)
        logging.debug(f"Journey {journey_id}, Version {journey_version}, Status {journey_status}, UPDATED SUCCESSFULLY")
        print(f"{journey_name} Updated Success..")

    else:
        logging.debug(f"Journey {journey_id}, Version {journey_version}, Status {journey_status}, IGNORED.")
        

In [82]:
start = time.perf_counter()

processes = []

for journey_name in list(new_weights["journey_name"].unique()):
    p = multiprocessing.Process(target=main_function, args=(journey_name,))
    p.start()
    processes.append(p)
    
for process in processes:
    process.join()
    
end = time.perf_counter()

print(f"total time: {end-start}")

test_2810 Starting..
test_kwr_multilanguage_setup Starting..
test_2810 First cond: Paused/Draft..
test_2810test_2810_A Activity key found..
test_2810test_2810_A Split found..
test_2810test_2810_B Activity key found..
test_2810test_2810_B Split found..
test_2810 Weights filtered..
test_2810test_2810_A Weights updated..
test_2810test_2810_B Weights updated..
test_kwr_multilanguage_setup First cond: Published..
test_2810 Updated Success..
test_kwr_multilanguage_setup Paused Success..
test_kwr_multilanguage_setupEN_A Activity key found..
test_kwr_multilanguage_setupEN_A Split found..
test_kwr_multilanguage_setupEN_B Activity key found..
test_kwr_multilanguage_setupEN_B Split found..
test_kwr_multilanguage_setupCA_A Activity key found..
test_kwr_multilanguage_setupCA_A Split found..
test_kwr_multilanguage_setupCA_B Activity key found..
test_kwr_multilanguage_setupCA_B Split found..
test_kwr_multilanguage_setupES_A Activity key found..
test_kwr_multilanguage_setupES_A Split found..
test_kwr_