Skip to content

Commit

Permalink
✨Source Facebook Marketing: add integration tests (airbytehq#35061)
Browse files Browse the repository at this point in the history
  • Loading branch information
askarpets authored and jatinyadav-cc committed Feb 26, 2024
1 parent 301d597 commit 41e849e
Show file tree
Hide file tree
Showing 23 changed files with 1,485 additions and 53 deletions.
Expand Up @@ -83,9 +83,9 @@ acceptance_tests:
bypass_reason: is changeable
empty_streams:
- name: "ads_insights_action_product_id"
bypass_reason: "Data not permanent"
bypass_reason: "Data cannot be seeded in the test account, integration tests added for the stream instead"
- name: "videos"
bypass_reason: "Cannot populate"
bypass_reason: "Data cannot be seeded in the test account, integration tests added for the stream instead"
timeout_seconds: 4800
expect_records:
path: "integration_tests/expected_records.jsonl"
Expand Down

Large diffs are not rendered by default.

Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
dockerImageTag: 1.3.2
dockerImageTag: 1.3.3
dockerRepository: airbyte/source-facebook-marketing
documentationUrl: https://docs.airbyte.com/integrations/sources/facebook-marketing
githubIssueLabel: source-facebook-marketing
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "1.3.2"
version = "1.3.3"
name = "source-facebook-marketing"
description = "Source implementation for Facebook Marketing."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand All @@ -17,7 +17,7 @@ include = "source_facebook_marketing"

[tool.poetry.dependencies]
python = "^3.9,<3.12"
airbyte-cdk = "==0.61.0"
airbyte-cdk = "==0.62.1"
facebook-business = "==17.0.0"
cached-property = "==1.5.2"
pendulum = "==2.1.2"
Expand Down
Expand Up @@ -6,7 +6,6 @@
import logging
from dataclasses import dataclass
from time import sleep
from typing import List

import backoff
import pendulum
Expand Down Expand Up @@ -35,7 +34,7 @@ class MyFacebookAdsApi(FacebookAdsApi):
# see `_should_restore_page_size` method docstring for more info.
# attribute to handle the reduced request limit
request_record_limit_is_reduced: bool = False
# attribute to save the status of last successfull call
# attribute to save the status of the last successful call
last_api_call_is_successful: bool = False

@dataclass
Expand Down Expand Up @@ -144,7 +143,7 @@ def _update_insights_throttle_limit(self, response: FacebookResponse):

def _should_restore_default_page_size(self, params):
"""
Track the state of the `request_record_limit_is_reduced` and `last_api_call_is_successfull`,
Track the state of the `request_record_limit_is_reduced` and `last_api_call_is_successful`,
based on the logic from `@backoff_policy` (common.py > `reduce_request_record_limit` and `revert_request_record_limit`)
"""
params = True if params else False
Expand Down
Expand Up @@ -14,7 +14,6 @@
DestinationSyncMode,
FailureType,
OAuthConfigSpecification,
SyncMode,
)
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
Expand Down
Expand Up @@ -134,7 +134,7 @@ def _get_current_throttle_value(self) -> float:
"""
Get current ads insights throttle value based on app id and account id.
It evaluated as minimum of those numbers cause when account id throttle
hit 100 it cool down very slowly (i.e. it still says 100 despite no jobs
hit 100 it cools down very slowly (i.e. it still says 100 despite no jobs
running and it capable serve new requests). Because of this behaviour
facebook throttle limit is not reliable metric to estimate async workload.
"""
Expand All @@ -144,7 +144,7 @@ def _get_current_throttle_value(self) -> float:

def _update_api_throttle_limit(self):
"""
Sends <ACCOUNT_ID>/insights GET request with no parameters so it would
Sends <ACCOUNT_ID>/insights GET request with no parameters, so it would
respond with empty list of data so api use "x-fb-ads-insights-throttle"
header to update current insights throttle limit.
"""
Expand Down
Expand Up @@ -44,8 +44,7 @@ class AdsInsights(FBMarketingIncrementalStream):
]

# Facebook store metrics maximum of 37 months old. Any time range that
# older that 37 months from current date would result in 400 Bad request
# HTTP response.
# older than 37 months from current date would result in 400 Bad request HTTP response.
# https://developers.facebook.com/docs/marketing-api/reference/ad-account/insights/#overview
INSIGHTS_RETENTION_PERIOD = pendulum.duration(months=37)

Expand Down Expand Up @@ -106,8 +105,8 @@ def insights_lookback_period(self):
"""
Facebook freezes insight data 28 days after it was generated, which means that all data
from the past 28 days may have changed since we last emitted it, so we retrieve it again.
But in some cases users my have define their own lookback window, thats
why the value for `insights_lookback_window` is set throught config.
But in some cases users my have define their own lookback window, that's
why the value for `insights_lookback_window` is set through the config.
"""
return pendulum.duration(days=self._insights_lookback_window)

Expand Down Expand Up @@ -174,7 +173,7 @@ def state(self) -> MutableMapping[str, Any]:
def state(self, value: Mapping[str, Any]):
"""State setter, will ignore saved state if time_increment is different from previous."""
# if the time increment configured for this stream is different from the one in the previous state
# then the previous state object is invalid and we should start replicating data from scratch
# then the previous state object is invalid, and we should start replicating data from scratch
# to achieve this, we skip setting the state
transformed_state = self._transform_state_from_old_format(value, ["time_increment"])
if transformed_state.get("time_increment", 1) != self.time_increment:
Expand Down
Expand Up @@ -61,12 +61,12 @@ def reduce_request_record_limit(details):

def revert_request_record_limit(details):
"""
This method is triggered `on_success` after successfull retry,
This method is triggered `on_success` after successful retry,
sets the internal class flags to provide the logic to restore the previously reduced
`limit` param.
"""
# reference issue: https://github.com/airbytehq/airbyte/issues/25383
# set the flag to the api class that the last api call was ssuccessfull
# set the flag to the api class that the last api call was successful
details.get("args")[0].last_api_call_is_successfull = True
# set the flag to the api class that the `limit` param is restored
details.get("args")[0].request_record_limit_is_reduced = False
Expand Down
Expand Up @@ -36,7 +36,7 @@ def fetch_thumbnail_data_url(url: str) -> Optional[str]:


class AdCreatives(FBMarketingStream):
"""AdCreative is append only stream
"""AdCreative is append-only stream
doc: https://developers.facebook.com/docs/marketing-api/reference/ad-creative
"""

Expand All @@ -48,7 +48,7 @@ def __init__(self, fetch_thumbnail_images: bool = False, **kwargs):
self._fetch_thumbnail_images = fetch_thumbnail_images

def fields(self, **kwargs) -> List[str]:
"""Remove "thumbnail_data_url" field because it is computed field and it's not a field that we can request from Facebook"""
"""Remove "thumbnail_data_url" field because it is a computed field, and it's not a field that we can request from Facebook"""
if self._fields:
return self._fields

Expand Down Expand Up @@ -231,7 +231,7 @@ def list_objects(self, params: Mapping[str, Any], account_id: str) -> Iterable:
return [FBAdAccount(self._api.get_account(account_id=account_id).get_id()).api_get(fields=fields)]
except FacebookRequestError as e:
# This is a workaround for cases when account seem to have all the required permissions
# but despite of that is not allowed to get `owner` field. See (https://github.com/airbytehq/oncall/issues/3167)
# but despite that is not allowed to get `owner` field. See (https://github.com/airbytehq/oncall/issues/3167)
if e.api_error_code() == 200 and e.api_error_message() == "(#200) Requires business_management permission to manage the object":
fields.remove("owner")
return [FBAdAccount(self._api.get_account(account_id=account_id).get_id()).api_get(fields=fields)]
Expand Down
Expand Up @@ -11,7 +11,7 @@
logger = logging.getLogger("airbyte")

# Facebook store metrics maximum of 37 months old. Any time range that
# older that 37 months from current date would result in 400 Bad request
# older than 37 months from current date would result in 400 Bad request
# HTTP response.
# https://developers.facebook.com/docs/marketing-api/reference/ad-account/insights/#overview
DATA_RETENTION_PERIOD = 37
Expand Down
@@ -0,0 +1,55 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#


from __future__ import annotations

from datetime import datetime
from typing import Any, List, MutableMapping

import pendulum

ACCESS_TOKEN = "test_access_token"
ACCOUNT_ID = "111111111111111"
CLIENT_ID = "test_client_id"
CLIENT_SECRET = "test_client_secret"
DATE_FORMAT = "%Y-%m-%d"
DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
END_DATE = "2023-01-01T23:59:59Z"
NOW = pendulum.now(tz="utc")
START_DATE = "2023-01-01T00:00:00Z"


class ConfigBuilder:
def __init__(self) -> None:
self._config: MutableMapping[str, Any] = {
"account_ids": [ACCOUNT_ID],
"access_token": ACCESS_TOKEN,
"start_date": START_DATE,
"end_date": END_DATE,
"include_deleted": True,
"fetch_thumbnail_images": True,
"custom_insights": [],
"page_size": 100,
"insights_lookback_window": 28,
"insights_job_timeout": 60,
"action_breakdowns_allow_empty": True,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
}

def with_account_ids(self, account_ids: List[str]) -> ConfigBuilder:
self._config["account_ids"] = account_ids
return self

def with_start_date(self, start_date: datetime) -> ConfigBuilder:
self._config["start_date"] = start_date.strftime(DATE_TIME_FORMAT)
return self

def with_end_date(self, end_date: datetime) -> ConfigBuilder:
self._config["end_date"] = end_date.strftime(DATE_TIME_FORMAT)
return self

def build(self) -> MutableMapping[str, Any]:
return self._config
@@ -0,0 +1,24 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#


from typing import Any, Dict
from urllib.parse import urlunparse

from airbyte_cdk.test.mock_http.request import HttpRequest
from airbyte_cdk.test.mock_http.response_builder import PaginationStrategy

NEXT_PAGE_TOKEN = "QVFIUlhOX3Rnbm5Y"


class FacebookMarketingPaginationStrategy(PaginationStrategy):
def __init__(self, request: HttpRequest, next_page_token: str) -> None:
self._next_page_token = next_page_token
self._next_page_url = f"{urlunparse(request._parsed_url)}&after={self._next_page_token}"

def update(self, response: Dict[str, Any]) -> None:
# set a constant value for paging.cursors.after so we know how the 'next' link is built
# https://developers.facebook.com/docs/graph-api/results
response["paging"]["cursors"]["after"] = self._next_page_token
response["paging"]["next"] = self._next_page_url
@@ -0,0 +1,83 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#


from __future__ import annotations

from typing import Any, List, Mapping, Optional, Union

from airbyte_cdk.test.mock_http.request import HttpRequest

from .config import ACCESS_TOKEN, ACCOUNT_ID


def get_account_request(account_id: Optional[str] = ACCOUNT_ID) -> RequestBuilder:
return RequestBuilder.get_account_endpoint(access_token=ACCESS_TOKEN, account_id=account_id)


class RequestBuilder:

@classmethod
def get_account_endpoint(cls, access_token: str, account_id: str) -> RequestBuilder:
return cls(access_token=access_token).with_account_id(account_id)

@classmethod
def get_videos_endpoint(cls, access_token: str, account_id: str) -> RequestBuilder:
return cls(access_token=access_token, resource="advideos").with_account_id(account_id)

@classmethod
def get_insights_endpoint(cls, access_token: str, account_id: str) -> RequestBuilder:
return cls(access_token=access_token, resource="insights").with_account_id(account_id)

@classmethod
def get_execute_batch_endpoint(cls, access_token: str) -> RequestBuilder:
return cls(access_token=access_token)

@classmethod
def get_insights_download_endpoint(cls, access_token: str, job_id: str) -> RequestBuilder:
return cls(access_token=access_token, resource=f"{job_id}/insights")

def __init__(self, access_token: str, resource: Optional[str] = "") -> None:
self._account_id = None
self._resource = resource
self._query_params = {"access_token": access_token}
self._body = None

def with_account_id(self, account_id: str) -> RequestBuilder:
self._account_id = account_id
return self

def with_limit(self, limit: int) -> RequestBuilder:
self._query_params["limit"] = limit
return self

def with_summary(self) -> RequestBuilder:
self._query_params["summary"] = "true"
return self

def with_fields(self, fields: List[str]) -> RequestBuilder:
self._query_params["fields"] = self._get_formatted_fields(fields)
return self

def with_next_page_token(self, next_page_token: str) -> RequestBuilder:
self._query_params["after"] = next_page_token
return self

def with_body(self, body: Union[str, bytes, Mapping[str, Any]]) -> RequestBuilder:
self._body = body
return self

def build(self) -> HttpRequest:
return HttpRequest(
url=f"https://graph.facebook.com/v17.0/{self._account_sub_path()}{self._resource}",
query_params=self._query_params,
body=self._body,
)

def _account_sub_path(self) -> str:
return f"act_{self._account_id}/" if self._account_id else ""

@staticmethod
def _get_formatted_fields(fields: List[str]) -> str:
return ",".join(fields)
@@ -0,0 +1,33 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#


import json
from http import HTTPStatus
from typing import Any, List, Mapping, Optional, Union

from airbyte_cdk.test.mock_http import HttpResponse

from .config import ACCOUNT_ID


def build_response(
body: Union[Mapping[str, Any], List[Mapping[str, Any]]],
status_code: HTTPStatus,
headers: Optional[Mapping[str, str]] = None,
) -> HttpResponse:
headers = headers or {}
return HttpResponse(body=json.dumps(body), status_code=status_code.value, headers=headers)


def get_account_response(account_id: Optional[str] = ACCOUNT_ID) -> HttpResponse:
response = {"account_id": account_id, "id": f"act_{account_id}"}
return build_response(body=response, status_code=HTTPStatus.OK)


def error_reduce_amount_of_data_response() -> HttpResponse:
response = {
"error": {"code": 1, "message": "Please reduce the amount of data you're asking for, then retry your request"},
}
return build_response(body=response, status_code=HTTPStatus.INTERNAL_SERVER_ERROR)

0 comments on commit 41e849e

Please sign in to comment.