Skip to content
This repository has been archived by the owner on May 1, 2024. It is now read-only.

Commit

Permalink
Add Affiliate Window imports to data warehouse (#735)
Browse files Browse the repository at this point in the history
  • Loading branch information
bmedx committed Jun 19, 2019
1 parent 769cb2e commit 91d5aee
Show file tree
Hide file tree
Showing 3 changed files with 365 additions and 0 deletions.
264 changes: 264 additions & 0 deletions edx/analytics/tasks/warehouse/financial/affiliate_window.py
@@ -0,0 +1,264 @@
"""
Tasks to support pulling Affiliate Window reports from their REST API to the data warehouse.
"""
import csv
import datetime
import json
import logging
import os

import luigi
import requests
from luigi import date_interval

from edx.analytics.tasks.common.pathutil import PathSetTask
from edx.analytics.tasks.util.hive import WarehouseMixin
from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin
from edx.analytics.tasks.util.retry import retry
from edx.analytics.tasks.util.url import get_target_from_url, url_path_join

logger = logging.getLogger(__name__)


DEFAULT_RETRY_STATUS_CODES = (
requests.codes.internal_server_error, # 500
requests.codes.request_timeout, # 408
requests.codes.too_many_requests, # 429
requests.codes.service_unavailable, # 503
requests.codes.gateway_timeout # 504
)
DEFAULT_TIMEOUT_SECONDS = 300


class AffiliateWindowTaskMixin(OverwriteOutputMixin):
"""
The parameters needed to run Affiliate Window reports.
"""

output_root = luigi.Parameter(
description='The parent folder to write the Affiliate Window report data to.',
)
run_date = luigi.DateParameter(
default=datetime.datetime.utcnow().date(),
description='The date to generate a report for. Default is today, UTC.',
)
advertiser_id = luigi.Parameter(
config_path={'section': 'affiliate_window', 'name': 'advertiser_id'},
description='The Affiliate Window advertiser id, usually a number.',
)
api_token = luigi.Parameter(
config_path={'section': 'affiliate_window', 'name': 'api_token'},
description='The Affiliate Window OAuth2 token.',
)
host = luigi.Parameter(
config_path={'section': 'affiliate_window', 'name': 'host'},
description='The Affiliate Window API host name.',
)
interval_start = luigi.DateParameter(
config_path={'section': 'affiliate_window', 'name': 'interval_start'},
description='The earliest date we can pull Affiliate Window reports from.',
)


class IntervalPullFromAffiliateWindowTask(AffiliateWindowTaskMixin, WarehouseMixin, luigi.WrapperTask):
"""
Determines a set of dates to pull, and requires them.
"""
interval_end = luigi.DateParameter(
default=datetime.datetime.utcnow().date(),
significant=False,
description='Default is today, UTC.',
)

# Overwrite parameter definition to make it optional.
output_root = luigi.Parameter(
default=None,
description='URL of location to write output.',
)

def __init__(self, *args, **kwargs):
super(IntervalPullFromAffiliateWindowTask, self).__init__(*args, **kwargs)

# Provide default for output_root at this level.
if self.output_root is None:
self.output_root = url_path_join(self.warehouse_path, 'fees', 'affiliate_window')

path = self.output_root
file_pattern = '*affiliate_window.tsv'
path_targets = PathSetTask([path], include=[file_pattern], include_zero_length=True).output()

if path_targets:
paths = list(set([os.path.dirname(target.path) for target in path_targets]))
dates = [path.rsplit('/', 2)[-1] for path in paths]
latest_date = sorted(dates)[-1]
latest_completion_date = datetime.datetime.strptime(latest_date, "dt=%Y-%m-%d").date()
self.interval_start = latest_completion_date + datetime.timedelta(days=1)
print("Found previous reports to {}".format(latest_date))
else:
# If this is the first run, start from the beginning
print("Couldn't find last completed date, defaulting to start date: {}".format(self.interval_start))

self.run_interval = date_interval.Custom(self.interval_start, self.interval_end)
print("Running reports from interval {}".format(self.run_interval))

def requires(self):
"""
Internal method to actually calculate required tasks once.
"""
for run_date in self.run_interval:
print(run_date)
yield DailyProcessFromAffiliateWindowTask(
output_root=self.output_root,
run_date=run_date,
advertiser_id=self.advertiser_id,
api_token=self.api_token,
host=self.host,
interval_start=self.interval_start
)

def output(self):
return [task.output() for task in self.requires()]


def should_retry(error):
"""
Retry the request if the response status code is in the set of status codes that are retryable.
"""
error_response = getattr(error, 'response', None)
if error_response is None:
return False

return error_response.status_code in DEFAULT_RETRY_STATUS_CODES


class DailyPullFromAffiliateWindowTask(AffiliateWindowTaskMixin, luigi.Task):
"""
A task that pulls from the Affiliate Window REST API and writes to a file.
Pulls are made for only a single day.
"""
query_date = luigi.DateParameter(
default=datetime.date.today(),
description='Default is today.',
)

REPORT_NAME = 'AffiliateWindowReport'
REPORT_FORMAT = 'json'

def requires(self):
pass

@retry(should_retry=should_retry, timeout=DEFAULT_TIMEOUT_SECONDS)
def fetch_report(self):
params = (
('startDate', self.query_date.strftime('%Y-%m-%dT00:00:00')),
('endDate', self.query_date.strftime('%Y-%m-%dT23:59:59')),
('timezone', 'UTC'),
('accessToken', self.api_token),
)
url = 'https://{}/advertisers/{}/transactions/'.format(self.host, self.advertiser_id)

response = requests.get(url, params=params)
response.raise_for_status()

return response.json()

def run(self):
print("Fetching report from Affiliate Window for {}.".format(self.query_date))
transactions = self.fetch_report()

# if there are no transactions in response something is wrong.
if not transactions:
raise Exception('No transactions to process.')

with self.output().open('w') as output_file:
json.dump(transactions, output_file)

def output(self):
"""
Output is in the form {output_root}/affiliate_window/{CCYY-mm}/affiliate_window_{CCYYmmdd}.json
"""
month_year_string = self.query_date.strftime('%Y-%m') # pylint: disable=no-member
date_string = self.query_date.strftime('%Y%m%d') # pylint: disable=no-member
filename = "affiliate_window_{date_string}.{report_format}".format(
date_string=date_string,
report_format=self.REPORT_FORMAT,
)
url_with_filename = url_path_join(self.output_root, month_year_string, filename)
return get_target_from_url(url_with_filename)


class DailyProcessFromAffiliateWindowTask(AffiliateWindowTaskMixin, luigi.Task):
"""
A task that reads a local file generated from a daily Affiliate Window pull, and writes to a TSV file.
The output file should be readable by Hive.
"""
run_date = luigi.DateParameter(
default=datetime.date.today(),
description='Date to fetch Affiliate Window report. Default is today.',
)
output_root = luigi.Parameter(
description='URL of location to write output.',
)

def requires(self):
args = {
'query_date': self.run_date,
'output_root': self.output_root,
'advertiser_id': self.advertiser_id,
'api_token': self.api_token,
'host': self.host,
'interval_start': self.interval_start,
}
return DailyPullFromAffiliateWindowTask(**args)

def run(self):
print("Processing Affiliate Window report for {}".format(self.run_date))
with self.input().open('r') as input_file:
reader = json.load(input_file)
with self.output().open('w') as output_file:
writer = csv.writer(output_file, delimiter="\t")
for row in reader:
# Break out commonly used fields, put entire row blob into last column
result = [
row['id'],
row['url'],
row['publisherId'],
row['commissionSharingPublisherId'],
row['siteName'],
row['commissionStatus'],
row['commissionAmount']['amount'],
row['saleAmount']['amount'],
row['customerCountry'],
row['clickDate'],
row['transactionDate'],
row['validationDate'],
row['type'],
row['declineReason'],
row['voucherCodeUsed'],
row['voucherCode'],
row['amended'],
row['amendReason'],
row['oldSaleAmount']['amount'] if row['oldSaleAmount'] else None,
row['oldCommissionAmount']['amount'] if row['oldCommissionAmount'] else None,
row['publisherUrl'],
row['orderRef'],
row['paidToPublisher'],
row['paymentId'],
row['transactionQueryId'],
row['originalSaleAmount']['amount'] if row['originalSaleAmount'] else None,
json.dumps(row)
]

result = [col if col is not None else '\N' for col in result]
writer.writerow(result)

def output(self):
"""
The form is {output_root}/fees/affiliate_window/dt={CCYY-mm-dd}/affiliate_window.tsv
"""
date_string = self.run_date.strftime('%Y-%m-%d') # pylint: disable=no-member
filename = "affiliate_window.tsv"
url_with_filename = url_path_join(self.output_root, "dt=" + date_string, filename)
return get_target_from_url(url_with_filename)
100 changes: 100 additions & 0 deletions edx/analytics/tasks/warehouse/financial/fees.py
@@ -0,0 +1,100 @@
"""
Tasks associated with pulling and storing financial fees related data.
"""
import logging

import luigi

from edx.analytics.tasks.common.vertica_load import VerticaCopyTask
from edx.analytics.tasks.util.hive import WarehouseMixin
from edx.analytics.tasks.util.url import ExternalURL, url_path_join
from edx.analytics.tasks.warehouse.financial.affiliate_window import (
AffiliateWindowTaskMixin, IntervalPullFromAffiliateWindowTask
)

logger = logging.getLogger(__name__)


class AffiliateWindowTask(AffiliateWindowTaskMixin, WarehouseMixin, luigi.WrapperTask):
"""
Task to pull and import Affiliate Window report data.
"""
def requires(self):
yield IntervalPullFromAffiliateWindowTask(
output_root=self.output_root,
interval_end=self.run_date,
)

def output(self):
# TODO: Once VerticaCopyTask handles multiple input files update this
# to use the outputs of the sub-jobs instead of always returning all
# files.

# Affiliate Window reports for each day are stored in dated directories.
# We want to be able to load all that data into Vertica in one go, hence we use
# a wildcard('*') here.
url = url_path_join(self.warehouse_path, 'fees', 'affiliate_window') + '/dt=*/'
return ExternalURL(url=url).output()


class LoadFeesToWarehouse(WarehouseMixin, VerticaCopyTask):
"""
An entry point to loading fee-related data to the warehouse.
"""
run_date = luigi.DateParameter()

# Forcing this to overwrite now, as this script will duplicate data
# without it. This does not trickle down to the Affiliate Window calls.
# TODO: Make this optional once VerticaCopyTask handles multiple input files.
overwrite = True

@property
def insert_source_task(self):
return AffiliateWindowTask(
run_date=self.run_date,
output_root=url_path_join(self.warehouse_path, 'fees', 'affiliate_window')
)

@property
def table(self):
return 'affiliate_window_transactions'

@property
def auto_primary_key(self):
"""No automatic primary key here."""
return None

@property
def columns(self):
"""
Most values are mapped back to their original table definitions.
"""
return [
('id', 'INTEGER'),
('url', 'VARCHAR(500)'),
('publisher_id', 'INTEGER'),
('commission_sharing_publisher_id', 'INTEGER'),
('site_name', 'VARCHAR(255)'),
('commission_status', 'VARCHAR(255)'),
('commission_amount', 'DECIMAL(12,2)'),
('sale_amount', 'DECIMAL(12,2)'),
('customer_country', 'VARCHAR(5)'),
('click_date', 'TIMESTAMP'),
('transaction_date', 'TIMESTAMP'),
('validation_date', 'TIMESTAMP'),
('type', 'VARCHAR(255)'),
('decline_reason', 'VARCHAR(512)'),
('voucher_code_used', 'BOOLEAN'),
('voucher_code', 'VARCHAR(255)'),
('amended', 'BOOLEAN'),
('amend_reason', 'VARCHAR(1000)'),
('old_sale_amount', 'DECIMAL(12,2)'),
('old_commission_amount', 'DECIMAL(12,2)'),
('publisher_url', 'VARCHAR(500)'),
('order_ref', 'VARCHAR(255)'),
('paid_to_publisher', 'BOOLEAN'),
('payment_id', 'INTEGER'),
('transaction_query_id', 'INTEGER'),
('original_sale_amount', 'DECIMAL(12,2)'),
('original_json', 'VARCHAR(5000)'),
]
1 change: 1 addition & 0 deletions setup.cfg
Expand Up @@ -85,6 +85,7 @@ edx.analytics.tasks =
orders = edx.analytics.tasks.warehouse.financial.orders_import:OrderTableTask
payment_reconcile = edx.analytics.tasks.warehouse.financial.reconcile:ReconcileOrdersAndTransactionsTask
paypal = edx.analytics.tasks.warehouse.financial.paypal:PaypalTransactionsByDayTask
affiliate_window = edx.analytics.tasks.warehouse.financial.fees:LoadFeesToWarehouse

# export:
data_obfuscation = edx.analytics.tasks.export.data_obfuscation:ObfuscatedCourseDumpTask
Expand Down

0 comments on commit 91d5aee

Please sign in to comment.