Skip to content

Commit

Permalink
fix: fixes klaviyo events columns error
Browse files Browse the repository at this point in the history
  • Loading branch information
a-rampalli committed May 24, 2023
1 parent c756151 commit 541dffe
Showing 1 changed file with 15 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import datetime
from abc import ABC, abstractmethod
import json
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import pendulum
Expand Down Expand Up @@ -247,6 +248,19 @@ class Metrics(KlaviyoStream):
def path(self, **kwargs) -> str:
return "metrics"

def processRecord(record):
flattenLevels = 2
processRecord = record
# Recursively traverse record dict and string-ify all json values in the 3rd level
def flattenDict(record, level):
for key, value in record.items():
if isinstance(value, dict):
if level > flattenLevels:
record[key] = json.dumps(value)
else:
flattenDict(value, level + 1)
flattenDict(processRecord, 1)
return processRecord

class Events(IncrementalKlaviyoStream):
"""Docs: https://developers.klaviyo.com/en/reference/metrics-timeline"""
Expand All @@ -269,7 +283,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
record["flow_message_id"] = flow_message_id
record["campaign_id"] = flow_message_id if not flow else None

yield record
yield processRecord(record)


class Flows(ReverseIncrementalKlaviyoStream):
Expand Down

0 comments on commit 541dffe

Please sign in to comment.