diff --git a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py index c0a7132fe24bc..7a97e535515a0 100644 --- a/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py +++ b/airbyte-integrations/connectors/source-klaviyo/source_klaviyo/streams.py @@ -4,6 +4,7 @@ import datetime from abc import ABC, abstractmethod +import json from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union import pendulum @@ -248,6 +249,25 @@ def path(self, **kwargs) -> str: return "metrics" +FLATTEN_LEVELS: int = 2 + + +def process_record(record): + processed_record = record + + # Recursively traverse record dict and string-ify all json values in the 3rd level + def flatten_dict(rec, level): + for key, value in rec.items(): + if isinstance(value, dict): + if level > FLATTEN_LEVELS: + rec[key] = json.dumps(value) + else: + flatten_dict(value, level + 1) + + flatten_dict(processed_record, 1) + return processed_record + + class Events(IncrementalKlaviyoStream): """Docs: https://developers.klaviyo.com/en/reference/metrics-timeline""" @@ -269,7 +289,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp record["flow_message_id"] = flow_message_id record["campaign_id"] = flow_message_id if not flow else None - yield record + yield process_record(record) class Flows(ReverseIncrementalKlaviyoStream):