Skip to content

Commit

Permalink
Merge pull request #118 from rudderlabs/fix-klaviyoColumnsError
Browse files Browse the repository at this point in the history
fix: fixes klaviyo events columns error
  • Loading branch information
a-rampalli committed May 25, 2023
2 parents c756151 + c8a8bcd commit 7df69b0
Showing 1 changed file with 21 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import datetime
from abc import ABC, abstractmethod
import json
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import pendulum
Expand Down Expand Up @@ -248,6 +249,25 @@ def path(self, **kwargs) -> str:
return "metrics"


FLATTEN_LEVELS: int = 2


def process_record(record):
processed_record = record

# Recursively traverse record dict and string-ify all json values in the 3rd level
def flatten_dict(rec, level):
for key, value in rec.items():
if isinstance(value, dict):
if level > FLATTEN_LEVELS:
rec[key] = json.dumps(value)
else:
flatten_dict(value, level + 1)

flatten_dict(processed_record, 1)
return processed_record


class Events(IncrementalKlaviyoStream):
"""Docs: https://developers.klaviyo.com/en/reference/metrics-timeline"""

Expand All @@ -269,7 +289,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
record["flow_message_id"] = flow_message_id
record["campaign_id"] = flow_message_id if not flow else None

yield record
yield process_record(record)


class Flows(ReverseIncrementalKlaviyoStream):
Expand Down

0 comments on commit 7df69b0

Please sign in to comment.