Browse files

Process traffic logs on Amazon EMR.

  • Loading branch information...
1 parent 7b28d3c commit a70a6eac62b26cbd1c6e028a07505d172f3b884b @bsimpson63 bsimpson63 committed May 25, 2012
View
10 r2/example.ini
@@ -328,6 +328,16 @@ min_promote_bid = 20
max_promote_bid = 9999
min_promote_future = 2
+# traffic log processing
+TRAFFIC_ACCESS_KEY =
+TRAFFIC_SECRET_KEY =
+RAW_LOG_DIR =
+PROCESSED_DIR =
+AGGREGATE_DIR =
+AWS_LOG_DIR =
+TRAFFIC_SRC_DIR =
+TRAFFIC_LOG_HOSTS =
+
# -- spreadshirt --
spreadshirt_url =
spreadshirt_vendor_id =
View
1 r2/r2/lib/app_globals.py
@@ -184,6 +184,7 @@ class Globals(object):
'allowed_pay_countries',
'case_sensitive_domains',
'reserved_subdomains',
+ 'TRAFFIC_LOG_HOSTS',
],
ConfigValue.choice: {
View
277 r2/r2/lib/emr_helpers.py
@@ -0,0 +1,277 @@
+# The contents of this file are subject to the Common Public Attribution
+# License Version 1.0. (the "License"); you may not use this file except in
+# compliance with the License. You may obtain a copy of the License at
+# http://code.reddit.com/LICENSE. The License is based on the Mozilla Public
+# License Version 1.1, but Sections 14 and 15 have been added to cover use of
+# software over a computer network and provide for limited attribution for the
+# Original Developer. In addition, Exhibit A has been modified to be consistent
+# with Exhibit B.
+#
+# Software distributed under the License is distributed on an "AS IS" basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for
+# the specific language governing rights and limitations under the License.
+#
+# The Original Code is reddit.
+#
+# The Original Developer is the Initial Developer. The Initial Developer of
+# the Original Code is reddit Inc.
+#
+# All portions of the code written by reddit are Copyright (c) 2006-2012 reddit
+# Inc. All Rights Reserved.
+###############################################################################
+
+from copy import copy
+import datetime
+
+from pylons import g
+
+from r2.lib.memoize import memoize
+from r2.lib.utils import storage
+
+LIVE_STATES = ['RUNNING', 'STARTING', 'WAITING', 'BOOTSTRAPPING']
+COMPLETED = 'COMPLETED'
+PENDING = 'PENDING'
+NOTFOUND = 'NOTFOUND'
+
+
+@memoize('emr_describe_jobflows', time=30, timeout=60)
+def describe_jobflows_cached(emr_connection):
+ """Return a list of jobflows on this connection.
+
+ It's good to cache this information because hitting AWS too often can
+ result in rate limiting, and it's not particularly detrimental to have
+ slightly out of date information in most cases. Non-running jobflows and
+ information we don't need are discarded to reduce the size of cached data.
+
+ """
+
+ jobflows = emr_connection.describe_jobflows()
+
+ r_jobflows = []
+ for jf in jobflows:
+ # skip old not live jobflows
+ d = jf.steps[-1].creationdatetime.split('T')[0]
+ last_step_start = datetime.datetime.strptime(d, '%Y-%m-%d').date()
+ now = datetime.datetime.now().date()
+ if (jf.state not in LIVE_STATES and
+ now - last_step_start > datetime.timedelta(2)):
+ continue
+
+ # keep only fields we need
+ r_jf = storage(name=jf.name,
+ jobflowid=jf.jobflowid,
+ state=jf.state)
+ r_bootstrapactions = []
+ for i in jf.bootstrapactions:
+ s = storage(name=i.name,
+ path=i.path,
+ args=[a.value for a in i.args])
+ r_bootstrapactions.append(s)
+ r_jf['bootstrapactions'] = r_bootstrapactions
+ r_steps = []
+ for i in jf.steps:
+ s = storage(name=i.name,
+ state=i.state,
+ jar=i.jar,
+ args=[a.value for a in i.args])
+ r_steps.append(s)
+ r_jf['steps'] = r_steps
+ r_instancegroups = []
+ for i in jf.instancegroups:
+ s = storage(name=i.name,
+ instancegroupid=i.instancegroupid,
+ instancerequestcount=i.instancerequestcount)
+ r_instancegroups.append(s)
+ r_jf['instancegroups'] = r_instancegroups
+ r_jobflows.append(r_jf)
+ return r_jobflows
+
+
+def update_jobflows_cached(emr_connection):
+ r = describe_jobflows_cached(emr_connection, _update=True)
+
+
+def describe_jobflows_by_ids(emr_connection, jobflow_ids, _update=False):
+ g.reset_caches()
+ jobflows = describe_jobflows_cached(emr_connection, _update=_update)
+ return [jf for jf in jobflows if jf.jobflowid in jobflow_ids]
+
+
+def describe_jobflows_by_state(emr_connection, states, _update=False):
+ g.reset_caches()
+ jobflows = describe_jobflows_cached(emr_connection, _update=_update)
+ return [jf for jf in jobflows if jf.state in states]
+
+
+def describe_jobflows(emr_connection, _update=False):
+ g.reset_caches()
+ jobflows = describe_jobflows_cached(emr_connection, _update=_update)
+ return jobflows
+
+
+def describe_jobflow(emr_connection, jobflow_id, _update=False):
+ r = describe_jobflows_by_ids(emr_connection, [jobflow_id], _update=_update)
+ if r:
+ return r[0]
+
+
+def get_compatible_jobflows(emr_connection, bootstrap_actions=None,
+ setup_steps=None):
+ """Return jobflows that have specified bootstrap actions and setup steps.
+
+ Assumes there are no conflicts with bootstrap actions or setup steps:
+ a jobflow is compatible if it contains at least the requested
+ bootstrap_actions and setup_steps (may contain additional).
+
+ """
+
+ bootstrap_actions = bootstrap_actions or []
+ setup_steps = setup_steps or []
+
+ # update list of running jobflows--ensure we don't pick a recently dead one
+ update_jobflows_cached(emr_connection)
+
+ jobflows = describe_jobflows_by_state(emr_connection, LIVE_STATES,
+ _update=True)
+ if not jobflows:
+ return []
+
+ required_bootstrap_actions = set((i.name, i.path, tuple(sorted(i.args())))
+ for i in bootstrap_actions)
+ required_setup_steps = set((i.name, i.jar(), tuple(sorted(i.args())))
+ for i in setup_steps)
+
+ if not required_bootstrap_actions and not required_setup_steps:
+ return jobflows
+
+ running = []
+ for jf in jobflows:
+ extant_bootstrap_actions = set((i.name, i.path, tuple(sorted(i.args)))
+ for i in jf.bootstrapactions)
+ if not (required_bootstrap_actions <= extant_bootstrap_actions):
+ continue
+
+ extant_setup_steps = set((i.name, i.jar, tuple(sorted(i.args)))
+ for i in jf.steps)
+ if not (required_setup_steps <= extant_setup_steps):
+ continue
+ running.append(jf)
+ return running
+
+
+def get_step_state(emr_connection, jobflowid, step_name):
+ """Return the state of a step.
+
+ If jobflowid/step_name combination is not unique this will return the state
+ of the most recent step.
+
+ """
+
+ jobflow = describe_jobflow(emr_connection, jobflowid)
+ if not jobflow:
+ return NOTFOUND
+
+ for step in reversed(jobflow.steps):
+ if step.name == step_name:
+ return step.state
+ else:
+ return NOTFOUND
+
+
+def get_jobflow_by_name(emr_connection, jobflow_name):
+ """Return the most recent jobflow with specified name."""
+ jobflows = describe_jobflows_by_state(emr_connection, LIVE_STATES,
+ _update=True)
+ for jobflow in jobflows:
+ if jobflow.name == jobflow_name:
+ return jobflow
+ else:
+ return None
+
+
+def terminate_jobflow(emr_connection, jobflow_name):
+ jobflow = get_jobflow_by_name(emr_connection, jobflow_name)
+ if jobflow:
+ emr_connection.terminate_jobflow(jobflow.jobflowid)
+
+
+def modify_slave_count(emr_connection, jobflow_name, num_slaves=1):
+ jobflow = get_jobflow_by_name(emr_connection, jobflow_name)
+ if not jobflow:
+ return
+
+ slave_instancegroupid = None
+ slave_instancerequestcount = 0
+ for instance in jobflow.instancegroups:
+ if instance.name == 'slave':
+ slave_instancegroupid = instance.instancegroupid
+ slave_instancerequestcount = instance.instancerequestcount
+ break
+
+ if slave_instancegroupid and slave_instancerequestcount != num_slaves:
+ print ('Modifying slave instance count of %s (%s -> %s)' %
+ (jobflow_name, slave_instancerequestcount, num_slaves))
+ emr_connection.modify_instance_groups(slave_instancegroupid,
+ num_slaves)
+
+
+class EmrJob(object):
+ def __init__(self, emr_connection, name, steps=[], setup_steps=[],
+ bootstrap_actions=[], log_uri=None, keep_alive=True,
+ ec2_keyname=None, hadoop_version='0.20.205',
+ ami_version='latest', master_instance_type='m1.small',
+ slave_instance_type='m1.small', num_slaves=1):
+
+ self.jobflowid = None
+ self.conn = emr_connection
+ self.name = name
+ self.steps = steps
+ self.setup_steps = setup_steps
+ self.bootstrap_actions = bootstrap_actions
+ self.log_uri = log_uri
+ self.enable_debugging = bool(log_uri)
+ self.keep_alive = keep_alive
+ self.ec2_keyname = ec2_keyname
+ self.hadoop_version = hadoop_version
+ self.ami_version = ami_version
+ self.master_instance_type = master_instance_type
+ self.slave_instance_type = slave_instance_type
+ self.num_instances = num_slaves + 1
+
+ def run(self):
+ steps = copy(self.setup_steps)
+ steps.extend(self.steps)
+
+ job_flow_args = dict(name=self.name,
+ steps=steps, bootstrap_actions=self.bootstrap_actions,
+ keep_alive=self.keep_alive, ec2_keyname=self.ec2_keyname,
+ hadoop_version=self.hadoop_version, ami_version=self.ami_version,
+ master_instance_type=self.master_instance_type,
+ slave_instance_type=self.slave_instance_type,
+ num_instances=self.num_instances,
+ enable_debugging=self.enable_debugging,
+ log_uri=self.log_uri)
+
+ self.jobflowid = self.conn.run_jobflow(**job_flow_args)
+ return
+
+ @property
+ def jobflow_state(self):
+ if self.jobflowid:
+ return describe_jobflow(self.conn, self.jobflowid).state
+ else:
+ return NOTFOUND
+
+ def terminate(self):
+ terminate_jobflow(self.conn, self.name)
+
+ def modify_slave_count(self, num_slaves=1):
+ modify_slave_count(self.conn, self.name, num_slaves)
+
+
+class EmrException(Exception):
+ def __init__(self, msg):
+ self.msg = msg
+
+ def __str__(self):
+ return self.msg
View
111 r2/r2/lib/s3_helpers.py
@@ -0,0 +1,111 @@
+# The contents of this file are subject to the Common Public Attribution
+# License Version 1.0. (the "License"); you may not use this file except in
+# compliance with the License. You may obtain a copy of the License at
+# http://code.reddit.com/LICENSE. The License is based on the Mozilla Public
+# License Version 1.1, but Sections 14 and 15 have been added to cover use of
+# software over a computer network and provide for limited attribution for the
+# Original Developer. In addition, Exhibit A has been modified to be consistent
+# with Exhibit B.
+#
+# Software distributed under the License is distributed on an "AS IS" basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for
+# the specific language governing rights and limitations under the License.
+#
+# The Original Code is reddit.
+#
+# The Original Developer is the Initial Developer. The Initial Developer of
+# the Original Code is reddit Inc.
+#
+# All portions of the code written by reddit are Copyright (c) 2006-2012 reddit
+# Inc. All Rights Reserved.
+###############################################################################
+
+import os
+import sys
+
+from boto.s3.key import Key
+
+HADOOP_FOLDER_SUFFIX = '_$folder$'
+
+
+def _to_path(bucket, key):
+ if not bucket:
+ raise ValueError
+ return 's3://%s/%s' % (bucket, key)
+
+
+def _from_path(path):
+ """Return bucket and key names from an s3 path.
+
+ Path of 's3://BUCKET/KEY/NAME' would return 'BUCKET', 'KEY/NAME'.
+
+ """
+
+ if not path.startswith('s3://'):
+ raise ValueError('Bad S3 path %s' % path)
+
+ r = path[len('s3://'):].split('/', 1)
+ bucket = key = None
+
+ if len(r) == 2:
+ bucket, key = r[0], r[1]
+ else:
+ bucket = r[0]
+
+ if not bucket:
+ raise ValueError('Bad S3 path %s' % path)
+
+ return bucket, key
+
+
+def get_text_from_s3(s3_connection, path):
+ """Read a file from S3 and return it as text."""
+ bucket_name, key_name = _from_path(path)
+ bucket = s3_connection.get_bucket(bucket_name)
+ k = Key(bucket)
+ k.key = key_name
+ txt = k.get_contents_as_string()
+ return txt
+
+
+def mv_file_s3(s3_connection, src_path, dst_path):
+ """Move a file within S3."""
+ src_bucket_name, src_key_name = _from_path(src_path)
+ dst_bucket_name, dst_key_name = _from_path(dst_path)
+
+ src_bucket = s3_connection.get_bucket(src_bucket_name)
+ k = Key(src_bucket)
+ k.key = src_key_name
+ k.copy(dst_bucket_name, dst_key_name)
+ k.delete()
+
+
+def s3_key_exists(s3_connection, path):
+ bucket_name, key_name = _from_path(path)
+ bucket = s3_connection.get_bucket(bucket_name)
+ key = bucket.get_key(key_name)
+ return bool(key)
+
+
+def copy_to_s3(s3_connection, local_path, dst_path, verbose=False):
+ def callback(trans, total):
+ sys.stdout.write('%s/%s' % trans, total)
+ sys.stdout.flush()
+
+ dst_bucket_name, dst_key_name = _from_path(dst_path)
+ bucket = s3_connection.get_bucket(dst_bucket_name)
+
+ filename = os.path.basename(local_path)
+ if not filename:
+ return
+
+ key_name = os.path.join(dst_key_name, filename)
+ k = Key(bucket)
+ k.key = key_name
+
+ kw = {}
+ if verbose:
+ print 'Uploading %s to %s' % (local_path, dst_path)
+ kw['cb'] = callback
+
+ k.set_contents_from_filename(logfile, **kw)
View
23 r2/r2/lib/traffic/__init__.py
@@ -0,0 +1,23 @@
+# The contents of this file are subject to the Common Public Attribution
+# License Version 1.0. (the "License"); you may not use this file except in
+# compliance with the License. You may obtain a copy of the License at
+# http://code.reddit.com/LICENSE. The License is based on the Mozilla Public
+# License Version 1.1, but Sections 14 and 15 have been added to cover use of
+# software over a computer network and provide for limited attribution for the
+# Original Developer. In addition, Exhibit A has been modified to be consistent
+# with Exhibit B.
+#
+# Software distributed under the License is distributed on an "AS IS" basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for
+# the specific language governing rights and limitations under the License.
+#
+# The Original Code is reddit.
+#
+# The Original Developer is the Initial Developer. The Initial Developer of
+# the Original Code is reddit Inc.
+#
+# All portions of the code written by reddit are Copyright (c) 2006-2012 reddit
+# Inc. All Rights Reserved.
+###############################################################################
+
+from traffic import *
View
211 r2/r2/lib/traffic/emr_traffic.py
@@ -0,0 +1,211 @@
+# The contents of this file are subject to the Common Public Attribution
+# License Version 1.0. (the "License"); you may not use this file except in
+# compliance with the License. You may obtain a copy of the License at
+# http://code.reddit.com/LICENSE. The License is based on the Mozilla Public
+# License Version 1.1, but Sections 14 and 15 have been added to cover use of
+# software over a computer network and provide for limited attribution for the
+# Original Developer. In addition, Exhibit A has been modified to be consistent
+# with Exhibit B.
+#
+# Software distributed under the License is distributed on an "AS IS" basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for
+# the specific language governing rights and limitations under the License.
+#
+# The Original Code is reddit.
+#
+# The Original Developer is the Initial Developer. The Initial Developer of
+# the Original Code is reddit Inc.
+#
+# All portions of the code written by reddit are Copyright (c) 2006-2012 reddit
+# Inc. All Rights Reserved.
+###############################################################################
+
+from copy import copy
+from pylons import g
+import os
+from time import time, sleep
+
+from boto.emr.step import InstallPigStep, PigStep
+from boto.emr.bootstrap_action import BootstrapAction
+
+from r2.lib.emr_helpers import (EmrJob, get_compatible_jobflows,
+ get_step_state, EmrException, update_jobflows_cached,
+ LIVE_STATES, COMPLETED, PENDING, NOTFOUND)
+
+
+class MemoryIntesiveBootstrap(BootstrapAction):
+ def __init__(self):
+ name = 'memory intensive'
+ path = 's3://elasticmapreduce/bootstrap-actions/' \
+ 'configurations/latest/memory-intensive'
+ args = []
+ BootstrapAction.__init__(self, name, path, args)
+
+
+class TrafficBase(EmrJob):
+
+ """Base class for all traffic jobs.
+
+ Includes required bootstrap actions and setup steps.
+
+ """
+
+ BOOTSTRAP_NAME = 'traffic binaries'
+ BOOTSTRAP_SCRIPT = os.path.join(g.TRAFFIC_SRC_DIR, 'traffic_bootstrap.sh')
+ _defaults = dict(master_instance_type='m1.small',
+ slave_instance_type='m1.xlarge', num_slaves=1)
+
+ def __init__(self, emr_connection, jobflow_name, steps=None, **kw):
+ combined_kw = copy(self._defaults)
+ combined_kw.update(kw)
+ bootstrap_actions = self._bootstrap_actions()
+ setup_steps = self._setup_steps()
+ steps = steps or []
+ EmrJob.__init__(self, emr_connection, jobflow_name,
+ bootstrap_actions=bootstrap_actions,
+ setup_steps=setup_steps,
+ steps=steps,
+ **combined_kw)
+
+ @classmethod
+ def _bootstrap_actions(cls):
+ name = cls.BOOTSTRAP_NAME
+ path = cls.BOOTSTRAP_SCRIPT
+ bootstrap_action_args = [g.TRAFFIC_SRC_DIR, g.tracking_secret]
+ bootstrap = BootstrapAction(name, path, bootstrap_action_args)
+ return [MemoryIntesiveBootstrap(), bootstrap]
+
+ @classmethod
+ def _setup_steps(self):
+ return [InstallPigStep()]
+
+
+class PigProcessHour(PigStep):
+ STEP_NAME = 'pig process hour'
+ PIG_FILE = os.path.join(g.TRAFFIC_SRC_DIR, 'mr_process_hour.pig')
+
+ def __init__(self, log_path, output_path):
+ self.log_path = log_path
+ self.output_path = output_path
+ self.name = '%s (%s)' % (self.STEP_NAME, self.log_path)
+ pig_args = ['-p', 'OUTPUT=%s' % self.output_path,
+ '-p', 'LOGFILE=%s' % self.log_path]
+ PigStep.__init__(self, self.name, self.PIG_FILE, pig_args=pig_args)
+
+
+class PigAggregate(PigStep):
+ STEP_NAME = 'pig aggregate'
+ PIG_FILE = os.path.join(g.TRAFFIC_SRC_DIR, 'mr_aggregate.pig')
+
+ def __init__(self, input_path, output_path):
+ self.input_path = input_path
+ self.output_path = output_path
+ self.name = '%s (%s)' % (self.STEP_NAME, self.input_path)
+ pig_args = ['-p', 'INPUT=%s' % self.input_path,
+ '-p', 'OUTPUT=%s' % self.output_path]
+ PigStep.__init__(self, self.name, self.PIG_FILE, pig_args=pig_args)
+
+
+class PigCoalesce(PigStep):
+ STEP_NAME = 'pig coalesce'
+ PIG_FILE = os.path.join(g.TRAFFIC_SRC_DIR, 'mr_coalesce.pig')
+
+ def __init__(self, input_path, output_path):
+ self.input_path = input_path
+ self.output_path = output_path
+ self.name = '%s (%s)' % (self.STEP_NAME, self.input_path)
+ pig_args = ['-p', 'INPUT=%s' % self.input_path,
+ '-p', 'OUTPUT=%s' % self.output_path]
+ PigStep.__init__(self, self.name, self.PIG_FILE, pig_args=pig_args)
+
+
+def _add_step(emr_connection, step, jobflow_name, **jobflow_kw):
+ """Add step to a running jobflow.
+
+ Append the step onto a compatible jobflow with the specified name if one
+ exists, otherwise create a new jobflow and run it. Returns the jobflowid.
+ NOTE: jobflow_kw will be used to configure the jobflow ONLY if a new
+ jobflow is created.
+
+ """
+
+ running = get_compatible_jobflows(
+ emr_connection,
+ bootstrap_actions=TrafficBase._bootstrap_actions(),
+ setup_steps=TrafficBase._setup_steps())
+
+ for jf in running:
+ if jf.name == jobflow_name:
+ jobflowid = jf.jobflowid
+ emr_connection.add_jobflow_steps(jobflowid, step)
+ print 'Added %s to jobflow %s' % (step.name, jobflowid)
+ break
+ else:
+ base = TrafficBase(emr_connection, jobflow_name, steps=[step],
+ **jobflow_kw)
+ base.run()
+ jobflowid = base.jobflowid
+ print 'Added %s to new jobflow %s' % (step.name, jobflowid)
+
+ return jobflowid
+
+
+def _wait_for_step(emr_connection, step, jobflowid, sleeptime):
+ """Poll EMR and wait for a step to finish."""
+ sleep(180)
+ start = time()
+ update_jobflows_cached(emr_connection)
+ step_state = get_step_state(emr_connection, jobflowid, step.name)
+ while step_state in LIVE_STATES + [PENDING]:
+ sleep(sleeptime)
+ step_state = get_step_state(emr_connection, jobflowid, step.name)
+ end = time()
+ print '%s took %0.2fs (exit: %s)' % (step.name, end - start, step_state)
+ return step_state
+
+
+def run_traffic_step(emr_connection, step, jobflow_name,
+ wait=True, sleeptime=60, retries=1, **jobflow_kw):
+ """Run a traffic processing step.
+
+ Helper function to force all steps to be executed by the same jobflow
+ (jobflow_name). Also can hold until complete (wait) and retry on
+ failure (retries).
+
+ """
+
+ jobflowid = _add_step(emr_connection, step, jobflow_name, **jobflow_kw)
+
+ if not wait:
+ return
+
+ attempts = 1
+ exit_state = _wait_for_step(emr_connection, step, jobflowid, sleeptime)
+ while attempts <= retries and exit_state != COMPLETED:
+ jobflowid = _add_step(emr_connection, step, jobflow_name, **jobflow_kw)
+ exit_state = _wait_for_step(emr_connection, step, jobflowid, sleeptime)
+ attempts += 1
+
+ if exit_state != COMPLETED:
+ msg = '%s failed (exit: %s)' % (step.name, exit_state)
+ if retries:
+ msg += 'retried %s times' % retries
+ raise EmrException(msg)
+
+
+def extract_hour(emr_connection, jobflow_name, log_path, output_path,
+ **jobflow_kw):
+ step = PigProcessHour(log_path, output_path)
+ run_traffic_step(emr_connection, step, jobflow_name, **jobflow_kw)
+
+
+def aggregate_interval(emr_connection, jobflow_name, input_path, output_path,
+ **jobflow_kw):
+ step = PigAggregate(input_path, output_path)
+ run_traffic_step(emr_connection, step, jobflow_name, **jobflow_kw)
+
+
+def coalesce_interval(emr_connection, jobflow_name, input_path, output_path,
+ **jobflow_kw):
+ step = PigCoalesce(input_path, output_path)
+ run_traffic_step(emr_connection, step, jobflow_name, **jobflow_kw)
View
415 r2/r2/lib/traffic/traffic.py
@@ -0,0 +1,415 @@
+# The contents of this file are subject to the Common Public Attribution
+# License Version 1.0. (the "License"); you may not use this file except in
+# compliance with the License. You may obtain a copy of the License at
+# http://code.reddit.com/LICENSE. The License is based on the Mozilla Public
+# License Version 1.1, but Sections 14 and 15 have been added to cover use of
+# software over a computer network and provide for limited attribution for the
+# Original Developer. In addition, Exhibit A has been modified to be consistent
+# with Exhibit B.
+#
+# Software distributed under the License is distributed on an "AS IS" basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for
+# the specific language governing rights and limitations under the License.
+#
+# The Original Code is reddit.
+#
+# The Original Developer is the Initial Developer. The Initial Developer of
+# the Original Code is reddit Inc.
+#
+# All portions of the code written by reddit are Copyright (c) 2006-2012 reddit
+# Inc. All Rights Reserved.
+###############################################################################
+
+import datetime
+import calendar
+import os
+from time import sleep
+import urllib
+
+from boto.s3.connection import S3Connection
+from boto.emr.connection import EmrConnection
+from boto.exception import S3ResponseError
+from pylons import g
+
+from r2.lib.emr_helpers import (EmrException, terminate_jobflow,
+ modify_slave_count)
+from r2.lib.s3_helpers import get_text_from_s3, s3_key_exists, copy_to_s3
+from r2.lib.traffic.emr_traffic import (extract_hour, aggregate_interval,
+ coalesce_interval)
+from r2.lib.utils import tup
+from r2.models.traffic import (SitewidePageviews, PageviewsBySubreddit,
+ PageviewsBySubredditAndPath, PageviewsByLanguage,
+ ClickthroughsByCodename, TargetedClickthroughsByCodename,
+ AdImpressionsByCodename, TargetedImpressionsByCodename)
+
+
+RAW_LOG_DIR = g.RAW_LOG_DIR
+PROCESSED_DIR = g.PROCESSED_DIR
+AGGREGATE_DIR = g.AGGREGATE_DIR
+AWS_LOG_DIR = g.AWS_LOG_DIR
+
+s3_connection = S3Connection(g.TRAFFIC_ACCESS_KEY, g.TRAFFIC_SECRET_KEY)
+emr_connection = EmrConnection(g.TRAFFIC_ACCESS_KEY, g.TRAFFIC_SECRET_KEY)
+
+traffic_categories = (SitewidePageviews, PageviewsBySubreddit,
+ PageviewsBySubredditAndPath, PageviewsByLanguage,
+ ClickthroughsByCodename, TargetedClickthroughsByCodename,
+ AdImpressionsByCodename, TargetedImpressionsByCodename)
+
+traffic_subdirectories = {
+ SitewidePageviews: 'sitewide',
+ PageviewsBySubreddit: 'subreddit',
+ PageviewsBySubredditAndPath: 'srpath',
+ PageviewsByLanguage: 'lang',
+ ClickthroughsByCodename: 'clicks',
+ TargetedClickthroughsByCodename: 'clicks_targeted',
+ AdImpressionsByCodename: 'thing',
+ TargetedImpressionsByCodename: 'thingtarget',
+}
+
+
+def _get_processed_path(basedir, interval, category_cls, filename):
+ return os.path.join(basedir, interval,
+ traffic_subdirectories[category_cls], filename)
+
+
+def get_aggregate(interval, category_cls):
+ """Return the aggregate output file from S3."""
+ part = 0
+ data = {}
+
+ while True:
+ path = _get_processed_path(AGGREGATE_DIR, interval, category_cls,
+ 'part-r-%05d' % part)
+ if not s3_key_exists(s3_connection, path):
+ break
+
+ # Sometimes S3 doesn't let us read immediately after key is written
+ for i in xrange(5):
+ try:
+ txt = get_text_from_s3(s3_connection, path)
+ except S3ResponseError as e:
+ print 'S3ResponseError on %s, retrying' % path
+ sleep(300)
+ else:
+ break
+ else:
+ print 'Could not retrieve %s' % path
+ raise e
+
+ for line in txt.splitlines():
+ tuples = line.rstrip('\n').split('\t')
+ group, uniques, pageviews = tuples[:-2], tuples[-2], tuples[-1]
+ if len(group) > 1:
+ group = tuple(group)
+ else:
+ group = group[0]
+ data[group] = (int(uniques), int(pageviews))
+
+ part += 1
+
+ if not data:
+ raise ValueError("No data for %s/%s" % (interval,
+ category_cls.__name__))
+
+ return data
+
+
+def report_interval(interval, background=True):
+ if background:
+ from multiprocessing import Process
+ p = Process(target=_report_interval, args=(interval,))
+ p.start()
+ else:
+ _report_interval(interval)
+
+
+def _name_to_kw(category_cls, name):
+ """Get the keywords needed to build an instance of traffic data."""
+ def target_split(name):
+ """Split a name that contains multiple words.
+
+ Name is (link,campaign-subreddit) where link and campaign are
+ thing fullnames. campaign and subreddit are each optional, so
+ the string could look like any of these:
+ (t3_bh,t8_ab-pics), (t3_bh,t8_ab), (t3_bh,-pics), (t3_bh,)
+ Also check for the old format (t3_by, pics)
+
+ """
+
+ link_codename, target_info = name
+ campaign_codename = None
+ if not target_info:
+ subreddit = ''
+ elif target_info.find('-') != -1:
+ campaign_codename, subreddit = target_info.split('-', 1)
+ elif target_info.find('_') != -1:
+ campaign_codename = target_info
+ subreddit = ''
+ else:
+ subreddit = target_info
+ return {'codename': campaign_codename or link_codename,
+ 'subreddit': subreddit}
+
+ d = {SitewidePageviews: lambda n: {},
+ PageviewsBySubreddit: lambda n: {'subreddit': n},
+ PageviewsBySubredditAndPath: lambda n: {'srpath': n},
+ PageviewsByLanguage: lambda n: {'lang': n},
+ ClickthroughsByCodename: lambda n: {'codename': name},
+ AdImpressionsByCodename: lambda n: {'codename': name},
+ TargetedClickthroughsByCodename: target_split,
+ TargetedImpressionsByCodename: target_split}
+ return d[category_cls](name)
+
+
+def _report_interval(interval):
+ """Read aggregated traffic from S3 and write to postgres."""
+ from sqlalchemy.orm import scoped_session, sessionmaker
+ from r2.models.traffic import engine
+ Session = scoped_session(sessionmaker(bind=engine))
+
+ # determine interval_type from YYYY-MM[-DD][-HH]
+ pieces = interval.split('-')
+ pieces = [int(i) for i in pieces]
+ if len(pieces) == 4:
+ interval_type = 'hour'
+ elif len(pieces) == 3:
+ interval_type = 'day'
+ pieces.append(0)
+ elif len(pieces) == 2:
+ interval_type = 'month'
+ pieces.append(1)
+ pieces.append(0)
+ else:
+ raise
+
+ pg_interval = "%04d-%02d-%02d %02d:00:00" % tuple(pieces)
+ print 'reporting interval %s (%s)' % (pg_interval, interval_type)
+
+ # Read aggregates and write to traffic db
+ for category_cls in traffic_categories:
+ now = datetime.datetime.now()
+ print '*** %s - %s - %s' % (category_cls.__name__, interval, now)
+ data = get_aggregate(interval, category_cls)
+ len_data = len(data)
+ step = max(len_data / 5, 100)
+ for i, (name, (uniques, pageviews)) in enumerate(data.iteritems()):
+ try:
+ for n in tup(name):
+ unicode(n)
+ except UnicodeDecodeError:
+ print '%s - %s - %s - %s' % (category_cls.__name__, name,
+ uniques, pageviews)
+ continue
+
+ if i % step == 0:
+ now = datetime.datetime.now()
+ print '%s - %s - %s/%s - %s' % (interval, category_cls.__name__,
+ i, len_data, now)
+
+ kw = {'date': pg_interval, 'interval': interval_type,
+ 'unique_count': uniques, 'pageview_count': pageviews}
+ kw.update(_name_to_kw(category_cls, name))
+ r = category_cls(**kw)
+ Session.merge(r)
+ Session.commit()
+ Session.remove()
+ now = datetime.datetime.now()
+ print 'finished reporting %s (%s) - %s' % (pg_interval, interval_type, now)
+
+
+def process_pixel_log(log_path, fast=False):
+ """Process an hourly pixel log file.
+
+ Extract data from raw hourly log and aggregate it and report it. Also
+ depending on the specific date and options, aggregate and report the day
+ and month. Setting fast=True is appropriate for backfilling as it
+ eliminates reduntant steps.
+
+ """
+
+ if log_path.endswith('/*'):
+ log_dir = log_path[:-len('/*')]
+ date_fields = os.path.basename(log_dir).split('.', 1)[0].split('-')
+ else:
+ date_fields = os.path.basename(log_path).split('.', 1)[0].split('-')
+ year, month, day, hour = (int(i) for i in date_fields)
+ hour_date = '%s-%02d-%02d-%02d' % (year, month, day, hour)
+ day_date = '%s-%02d-%02d' % (year, month, day)
+ month_date = '%s-%02d' % (year, month)
+
+ # All logs from this day use the same jobflow
+ jobflow_name = 'Traffic Processing (%s)' % day_date
+
+ output_path = os.path.join(PROCESSED_DIR, 'hour', hour_date)
+ extract_hour(emr_connection, jobflow_name, log_path, output_path,
+ log_uri=AWS_LOG_DIR)
+
+ input_path = os.path.join(PROCESSED_DIR, 'hour', hour_date)
+ output_path = os.path.join(AGGREGATE_DIR, hour_date)
+ aggregate_interval(emr_connection, jobflow_name, input_path, output_path,
+ log_uri=AWS_LOG_DIR)
+ if not fast:
+ report_interval(hour_date)
+
+ if hour == 23 or (not fast and (hour == 0 or hour % 4 == 3)):
+ # Don't aggregate and report day on every hour
+ input_path = os.path.join(PROCESSED_DIR, 'hour', '%s-*' % day_date)
+ output_path = os.path.join(AGGREGATE_DIR, day_date)
+ aggregate_interval(emr_connection, jobflow_name, input_path,
+ output_path, log_uri=AWS_LOG_DIR)
+ if not fast:
+ report_interval(day_date)
+
+ if hour == 23:
+ # Special tasks for final hour of the day
+ input_path = os.path.join(PROCESSED_DIR, 'hour', '%s-*' % day_date)
+ output_path = os.path.join(PROCESSED_DIR, 'day', day_date)
+ coalesce_interval(emr_connection, jobflow_name, input_path,
+ output_path, log_uri=AWS_LOG_DIR)
+ terminate_jobflow(emr_connection, jobflow_name)
+
+ if not fast:
+ aggregate_month(month_date)
+ report_interval(month_date)
+
+
+def aggregate_month(month_date):
+ jobflow_name = 'Traffic Processing (%s)' % month_date
+ input_path = os.path.join(PROCESSED_DIR, 'day', '%s-*' % month_date)
+ output_path = os.path.join(AGGREGATE_DIR, month_date)
+ aggregate_interval(emr_connection, jobflow_name, input_path, output_path,
+ log_uri=AWS_LOG_DIR, slave_instance_type='m2.2xlarge')
+ terminate_jobflow(emr_connection, jobflow_name)
+
+
+def process_month_hours(month_date, start_hour=0, days=None):
+ """Process hourly logs from entire month.
+
+ Complete monthly backfill requires running [verify_month_inputs,]
+ process_month_hours, aggregate_month, [verify_month_outputs,] and
+ report_entire_month.
+
+ """
+
+ year, month = month_date.split('-')
+ year, month = int(year), int(month)
+
+ days = days or xrange(1, calendar.monthrange(year, month)[1] + 1)
+ hours = xrange(start_hour, 24)
+
+ for day in days:
+ for hour in hours:
+ hour_date = '%04d-%02d-%02d-%02d' % (year, month, day, hour)
+ log_path = os.path.join(RAW_LOG_DIR, '%s.log.gz' % hour_date)
+ if not s3_key_exists(s3_connection, log_path):
+ log_path = os.path.join(RAW_LOG_DIR, '%s.log.bz2' % hour_date)
+ if not s3_key_exists(s3_connection, log_path):
+ print 'Missing log for %s' % hour_date
+ continue
+ print 'Processing %s' % log_path
+ process_pixel_log(log_path, fast=True)
+ hours = xrange(24)
+
+
+def report_entire_month(month_date, start_hour=0, start_day=1):
+ """Report all hours and days from month."""
+ year, month = month_date.split('-')
+ year, month = int(year), int(month)
+ hours = xrange(start_hour, 24)
+
+ for day in xrange(start_day, calendar.monthrange(year, month)[1] + 1):
+ for hour in hours:
+ hour_date = '%04d-%02d-%02d-%02d' % (year, month, day, hour)
+ try:
+ report_interval(hour_date, background=False)
+ except ValueError:
+ print 'Failed for %s' % hour_date
+ continue
+ hours = xrange(24)
+ day_date = '%04d-%02d-%02d' % (year, month, day)
+ try:
+ report_interval(day_date, background=False)
+ except ValueError:
+ print 'Failed for %s' % day_date
+ continue
+ report_interval(month_date, background=False)
+
+
+def verify_month_outputs(month_date):
+ """Check existance of all hour, day, month aggregates for month_date."""
+ year, month = month_date.split('-')
+ year, month = int(year), int(month)
+ missing = []
+
+ for day in xrange(1, calendar.monthrange(year, month)[1] + 1):
+ for hour in xrange(24):
+ hour_date = '%04d-%02d-%02d-%02d' % (year, month, day, hour)
+ for category_cls in traffic_categories:
+ for d in [AGGREGATE_DIR, os.path.join(PROCESSED_DIR, 'hour')]:
+ path = _get_processed_path(d, hour_date, category_cls,
+ 'part-r-00000')
+ if not s3_key_exists(s3_connection, path):
+ missing.append(hour_date)
+
+ day_date = '%04d-%02d-%02d' % (year, month, day)
+ for category_cls in traffic_categories:
+ for d in [AGGREGATE_DIR, os.path.join(PROCESSED_DIR, 'day')]:
+ path = _get_processed_path(d, day_date, category_cls,
+ 'part-r-00000')
+ if not s3_key_exists(s3_connection, path):
+ missing.append(day_date)
+
+ month_date = '%04d-%02d' % (year, month)
+ for c in traffic_categories:
+ path = _get_processed_path(AGGREGATE_DIR, month_date, category_cls,
+ 'part-r-00000')
+ if not s3_key_exists(s3_connection, path):
+ missing.append(month_date)
+
+ for d in sorted(list(set(missing))):
+ print d
+
+
+def verify_month_inputs(month_date):
+ """Check existance of all hourly traffic logs for month_date."""
+ year, month = month_date.split('-')
+ year, month = int(year), int(month)
+ missing = []
+
+ for day in xrange(1, calendar.monthrange(year, month)[1] + 1):
+ for hour in xrange(24):
+ hour_date = '%04d-%02d-%02d-%02d' % (year, month, day, hour)
+ log_path = os.path.join(RAW_LOG_DIR, '%s.log.gz' % hour_date)
+ if not s3_key_exists(s3_connection, log_path):
+ log_path = os.path.join(RAW_LOG_DIR, '%s.log.bz2' % hour_date)
+ if not s3_key_exists(s3_connection, log_path):
+ missing.append(hour_date)
+
+ for d in missing:
+ print d
+
+
+def process_hour(hour_date):
+ """Process hour_date's traffic.
+
+ Can't fire at the very start of an hour because it takes time to bzip and
+ upload the file to S3. Check the bucket for the file and sleep if it
+ doesn't exist.
+
+ """
+
+ SLEEPTIME = 180
+
+ log_dir = os.path.join(RAW_LOG_DIR, hour_date)
+ files_missing = [os.path.join(log_dir, '%s.log.bz2' % h)
+ for h in g.TRAFFIC_LOG_HOSTS]
+ files_missing = [f for f in files_missing
+ if not s3_key_exists(s3_connection, f)]
+
+ while files_missing:
+ print 'Missing log(s) %s, sleeping' % files_missing
+ sleep(SLEEPTIME)
+ files_missing = [f for f in files_missing
+ if not s3_key_exists(s3_connection, f)]
+ process_pixel_log(os.path.join(log_dir, '*'))
View
23 scripts/traffic/Makefile
@@ -0,0 +1,23 @@
+CCOPTS=-Wall -O2 -std=c99
+
+all: parse_hour decrypt_userinfo verify
+
+decrypt_userinfo: decrypt_userinfo.o utils.o
+ gcc ${CCOPTS} -o $@ $< utils.o -lcrypto
+
+verify: verify.o utils.o
+ gcc ${CCOPTS} -o $@ $< utils.o -lcrypto
+
+parse_hour: parse.o
+ gcc ${CCOPTS} -o $@ $< -lpcre -lz
+
+%.o: %.c
+ gcc ${CCOPTS} -c -o $@ $<
+
+.PHONY: clean realclean
+
+clean:
+ -rm parse.o utils.o verify.o decrypt_userinfo.o
+
+realclean: clean
+ -rm parse_hour decrypt_userinfo verify
View
173 scripts/traffic/decrypt_userinfo.c
@@ -0,0 +1,173 @@
+#include <stdio.h>
+#include <assert.h>
+#include <stdbool.h>
+#include <string.h>
+#include <ctype.h>
+
+#include <openssl/bio.h>
+#include <openssl/evp.h>
+
+#include "utils.h"
+
+#define MAX_LINE 2048
+#define KEY_SIZE 16
+
+enum InputField {
+ FIELD_USER=0,
+ FIELD_SRPATH,
+ FIELD_LANG,
+ FIELD_CNAME,
+
+ FIELD_COUNT
+};
+
+int main(int argc, char** argv)
+{
+ const char* secret;
+ secret = getenv("TRACKING_SECRET");
+ if (!secret) {
+ fprintf(stderr, "TRACKING_SECRET not set\n");
+ return 1;
+ }
+
+ char input_line[MAX_LINE];
+ char plaintext[MAX_LINE];
+ const EVP_CIPHER* cipher = EVP_aes_128_cbc();
+
+ while (fgets(input_line, MAX_LINE, stdin) != NULL) {
+ /* split the line into unique_id and query */
+ char *unique_id, *query;
+ split_fields(input_line, &unique_id, &query, NO_MORE_FIELDS);
+
+ /* parse the query string to get the value we need */
+ char *blob = NULL;
+
+ char *key, *value;
+ while (parse_query_param(&query, &key, &value) >= 0) {
+ if (strcmp(key, "v") == 0) {
+ blob = value;
+ break;
+ }
+ }
+
+ if (blob == NULL)
+ continue;
+
+ /* undo url encoding on the query string */
+ int b64_size = url_decode(blob);
+ if (b64_size < 0)
+ continue;
+
+ /* split off the initialization vector from the actual ciphertext */
+ char *initialization_vector, *ciphertext;
+
+ initialization_vector = blob;
+ initialization_vector[KEY_SIZE] = '\0';
+ ciphertext = blob + 32;
+ b64_size -= 32;
+
+ /* base 64 decode and decrypt the ciphertext */
+ BIO* bio = BIO_new_mem_buf(ciphertext, b64_size);
+
+ BIO* b64 = BIO_new(BIO_f_base64());
+ BIO_set_flags(b64, BIO_FLAGS_BASE64_NO_NL);
+ bio = BIO_push(b64, bio);
+
+ BIO* aes = BIO_new(BIO_f_cipher());
+ BIO_set_cipher(
+ aes,
+ cipher,
+ (unsigned char*)secret,
+ (unsigned char*)initialization_vector,
+ 0 /* decryption */
+ );
+ bio = BIO_push(aes, bio);
+
+ /* stream the output through the filters */
+ int plaintext_length = BIO_read(bio, plaintext, b64_size);
+ plaintext[plaintext_length] = '\0';
+
+ if (!BIO_get_cipher_status(bio)) {
+ BIO_free_all(bio);
+ continue;
+ }
+
+ /* clean up */
+ BIO_free_all(bio);
+
+ /* check that the plaintext isn't garbage; if there are
+ * non-ascii characters in it, it's likely bad */
+ bool non_ascii_junk = false;
+ for (unsigned char* c = (unsigned char*)plaintext; *c != '\0'; c++) {
+ if (*c > 0x7F) {
+ non_ascii_junk = true;
+ break;
+ }
+ }
+
+ if (non_ascii_junk) {
+ continue;
+ }
+
+ /* write out the unique id since we don't need it for ourselves */
+ fputs(unique_id, stdout);
+
+ /* split the fields out of the plaintext */
+ char* current_string = plaintext;
+ int field_index = 0;
+
+ for (int i = 0; i < plaintext_length; i++) {
+ char *c = plaintext + i;
+ if (*c != '|')
+ continue;
+
+ *c = '\0';
+
+ switch (field_index) {
+ case FIELD_USER:
+ /* we don't use the username; skip it */
+ break;
+ case FIELD_SRPATH:
+ fputc('\t', stdout);
+ fputs(current_string, stdout);
+
+ fputc('\t', stdout);
+ for (char* c2=current_string; *c2 != '\0'; c2++) {
+ if (*c2 == '-') {
+ *c2 = '\0';
+ break;
+ }
+ }
+ fputs(current_string, stdout);
+ break;
+ case FIELD_LANG:
+ fputc('\t', stdout);
+ for (char* c2=current_string; *c2 != '\0'; c2++) {
+ *c2 = tolower(*c2);
+ }
+ fputs(current_string, stdout);
+ break;
+ case FIELD_CNAME:
+ assert(0!=1);
+ }
+
+ current_string = c + 1;
+ field_index += 1;
+ }
+
+ if (field_index < FIELD_COUNT) {
+ fputc('\t', stdout);
+ fputs(current_string, stdout);
+ field_index += 1;
+ }
+
+ for (; field_index < FIELD_COUNT; field_index++)
+ fputc('\t', stdout);
+
+ /* all done! */
+ fputc('\n', stdout);
+ }
+
+ return 0;
+}
+
View
157 scripts/traffic/mr_aggregate.pig
@@ -0,0 +1,157 @@
+/* Aggregate output from processed logs:
+ *
+ * Go from entry per unique_id (including # of impressions)
+ * to total # of uniques, total # of impressions
+ *
+ * Needs to be passed: INPUT, OUTPUT
+ */
+
+/****************************************************
+ * DEFINITIONS
+ ****************************************************/
+
+-- Cleanup
+rmf $OUTPUT
+
+/****************************************************
+ * AGGREGATE
+ ****************************************************/
+
+-- sitewide --
+sitewide = LOAD '$INPUT/sitewide' AS (unique_id, count:long);
+
+sitewide_grouped = GROUP sitewide BY unique_id;
+sitewide_combined = FOREACH sitewide_grouped
+ GENERATE group AS unique_id,
+ SUM(sitewide.count) as count;
+
+sitewide_grouped2 = GROUP sitewide_combined ALL;
+sitewide_totals = FOREACH sitewide_grouped2
+ GENERATE group,
+ COUNT(sitewide_combined),
+ SUM(sitewide_combined.count);
+
+STORE sitewide_totals INTO '$OUTPUT/sitewide';
+
+-- subreddit --
+subreddit = LOAD '$INPUT/subreddit' AS (subreddit, unique_id, count:long);
+
+subreddit_grouped = GROUP subreddit BY (subreddit, unique_id);
+subreddit_combined = FOREACH subreddit_grouped
+ GENERATE group.subreddit AS subreddit,
+ group.unique_id AS unique_id,
+ SUM(subreddit.count) AS count;
+
+subreddit_grouped2 = GROUP subreddit_combined BY subreddit;
+subreddit_totals = FOREACH subreddit_grouped2
+ GENERATE group,
+ COUNT(subreddit_combined),
+ SUM(subreddit_combined.count);
+
+STORE subreddit_totals INTO '$OUTPUT/subreddit';
+
+-- subreddit path
+srpath = LOAD '$INPUT/srpath' AS (srpath, unique_id, count:long);
+
+srpath_grouped = GROUP srpath BY (srpath, unique_id);
+srpath_combined = FOREACH srpath_grouped
+ GENERATE group.srpath AS srpath,
+ group.unique_id AS unique_id,
+ SUM(srpath.count) AS count;
+
+srpath_grouped2 = GROUP srpath_combined BY srpath;
+srpath_totals = FOREACH srpath_grouped2
+ GENERATE group,
+ COUNT(srpath_combined),
+ SUM(srpath_combined.count);
+
+STORE srpath_totals INTO '$OUTPUT/srpath';
+
+-- language
+lang = LOAD '$INPUT/lang' AS (lang, unique_id, count:long);
+
+lang_grouped = GROUP lang BY (lang, unique_id);
+lang_combined = FOREACH lang_grouped
+ GENERATE group.lang AS lang,
+ group.unique_id AS unique_id,
+ SUM(lang.count) AS count;
+
+lang_grouped2 = GROUP lang_combined BY lang;
+lang_totals = FOREACH lang_grouped2
+ GENERATE group,
+ COUNT(lang_combined),
+ SUM(lang_combined.count);
+
+STORE lang_totals INTO '$OUTPUT/lang';
+
+-- clicks
+click = LOAD '$INPUT/clicks' AS (fullname, unique_id, count:long);
+
+click_grouped = GROUP click BY (fullname, unique_id);
+click_combined = FOREACH click_grouped
+ GENERATE group.fullname AS fullname,
+ group.unique_id AS unique_id,
+ SUM(click.count) AS count;
+
+click_grouped2 = GROUP click_combined BY fullname;
+click_totals = FOREACH click_grouped2
+ GENERATE group,
+ COUNT(click_combined),
+ SUM(click_combined.count);
+
+STORE click_totals INTO '$OUTPUT/clicks';
+
+-- targeted clicks
+t_click = LOAD '$INPUT/clicks_targeted' AS (fullname, sr, unique_id, count:long);
+
+t_click_grouped = GROUP t_click BY (fullname, sr, unique_id);
+t_click_combined = FOREACH t_click_grouped
+ GENERATE group.fullname AS fullname,
+ group.sr AS sr,
+ group.unique_id AS unique_id,
+ SUM(t_click.count) AS count;
+
+t_click_grouped2 = GROUP t_click_combined BY (fullname, sr);
+t_click_totals = FOREACH t_click_grouped2
+ GENERATE group.fullname,
+ group.sr,
+ COUNT(t_click_combined),
+ SUM(t_click_combined.count);
+
+STORE t_click_totals INTO '$OUTPUT/clicks_targeted';
+
+-- things
+thing = LOAD '$INPUT/thing'AS (fullname, unique_id, count:long);
+
+thing_grouped = GROUP thing BY (fullname, unique_id);
+thing_combined = FOREACH thing_grouped
+ GENERATE group.fullname AS fullname,
+ group.unique_id AS unique_id,
+ SUM(thing.count) AS count;
+
+thing_grouped2 = GROUP thing_combined BY fullname;
+thing_totals = FOREACH thing_grouped2
+ GENERATE group,
+ COUNT(thing_combined),
+ SUM(thing_combined.count);
+
+STORE thing_totals INTO '$OUTPUT/thing';
+
+-- targeted things
+t_thing = LOAD '$INPUT/thingtarget' AS (fullname, sr, unique_id, count:long);
+
+t_thing_grouped = GROUP t_thing BY (fullname, sr, unique_id);
+t_thing_combined = FOREACH t_thing_grouped
+ GENERATE group.fullname AS fullname,
+ group.sr AS sr,
+ group.unique_id AS unique_id,
+ SUM(t_thing.count) AS count;
+
+t_thing_grouped2 = GROUP t_thing_combined BY (fullname, sr);
+t_thing_totals = FOREACH t_thing_grouped2
+ GENERATE group.fullname,
+ group.sr,
+ COUNT(t_thing_combined),
+ SUM(t_thing_combined.count);
+
+STORE t_thing_totals INTO '$OUTPUT/thingtarget';
View
106 scripts/traffic/mr_coalesce.pig
@@ -0,0 +1,106 @@
+/* EMR Version
+ *
+ * Coalesce output from multiple processed logs within interval
+ * hours --> day
+ * days --> month
+ *
+ * Needs to be passed: INPUT, OUTPUT
+ */
+
+/****************************************************
+ * DEFINITIONS
+ ****************************************************/
+
+-- Cleanup
+rmf $OUTPUT
+
+/****************************************************
+ * COALESCE
+ ****************************************************/
+
+-- sitewide
+sitewide = LOAD '$INPUT/sitewide' AS (unique_id, count:long); -- load all input files (multiple hours)
+
+sitewide_grouped = GROUP sitewide BY unique_id; -- (unique_id, {(unique_id, count), ...}, ...)
+
+sitewide_coalesced = FOREACH sitewide_grouped
+ GENERATE group, SUM(sitewide.count); -- ((unique_id, SUM(sitewide.count), ...)
+
+STORE sitewide_coalesced INTO '$OUTPUT/sitewide';
+
+-- subreddit
+subreddit_counters = LOAD '$INPUT/subreddit' AS (subreddit, unique_id, count:long);
+
+subreddits_grouped = GROUP subreddit_counters BY (subreddit, unique_id);
+
+subreddits_coalesced = FOREACH subreddits_grouped
+ GENERATE group.subreddit, group.unique_id,
+ SUM(subreddit_counters.count) AS count;
+
+STORE subreddits_coalesced INTO '$OUTPUT/subreddit';
+
+-- subreddit path
+srpath = LOAD '$INPUT/srpath' AS (srpath, unique_id, count:long);
+
+srpath_grouped = GROUP srpath BY (srpath, unique_id);
+
+srpath_coalesced = FOREACH srpath_grouped
+ GENERATE group.srpath, group.unique_id,
+ SUM(srpath.count) AS count;
+
+STORE srpath_coalesced INTO '$OUTPUT/srpath';
+
+-- language
+lang = LOAD '$INPUT/lang' AS (lang, unique_id, count:long);
+
+lang_grouped = GROUP lang BY (lang, unique_id);
+
+lang_coalesced = FOREACH lang_grouped
+ GENERATE group.lang, group.unique_id,
+ SUM(lang.count) AS count;
+
+STORE lang_coalesced INTO '$OUTPUT/lang';
+
+-- click
+click = LOAD '$INPUT/clicks' AS (fullname, unique_id, count:long);
+
+click_grouped = GROUP click BY (fullname, unique_id);
+
+click_coalesced = FOREACH click_grouped
+ GENERATE group.fullname, group.unique_id,
+ SUM(click.count) AS count;
+
+STORE click_coalesced INTO '$OUTPUT/clicks';
+
+-- clicktarget
+clicktarget = LOAD '$INPUT/clicks_targeted' AS (fullname, sr, unique_id, count:long);
+
+clicktarget_grouped = GROUP clicktarget BY (fullname, sr, unique_id);
+
+clicktarget_coalesced = FOREACH clicktarget_grouped
+ GENERATE group.fullname, group.sr, group.unique_id,
+ SUM(clicktarget.count) AS count;
+
+STORE clicktarget_coalesced INTO '$OUTPUT/clicks_targeted';
+
+-- thing
+thing = LOAD '$INPUT/thing' AS (fullname, unique_id, count:long);
+
+thing_grouped = GROUP thing BY (fullname, unique_id);
+
+thing_coalesced = FOREACH thing_grouped
+ GENERATE group.fullname, group.unique_id,
+ SUM(thing.count) AS count;
+
+STORE thing_coalesced INTO '$OUTPUT/thing';
+
+-- thingtarget
+thingtarget = LOAD '$INPUT/thingtarget' AS (fullname, sr, unique_id, count:long);
+
+thingtarget_grouped = GROUP thingtarget BY (fullname, sr, unique_id);
+
+thingtarget_coalesced = FOREACH thingtarget_grouped
+ GENERATE group.fullname, group.sr, group.unique_id,
+ SUM(thingtarget.count) AS count;
+
+STORE thingtarget_coalesced INTO '$OUTPUT/thingtarget';
View
158 scripts/traffic/mr_process_hour.pig
@@ -0,0 +1,158 @@
+/* EMR Version
+ *
+ * Process hourly logfile
+ * for each category sitewide/subreddit/srpath/lang/clicks/clicks_targeted/thing/thingtarget
+ * generate output with entries like:
+ * category, unique_id, count (e.g. subreddit: pics, 123456, 9)
+ *
+ * Needs to be passed: LOGFILE, OUTPUT
+ */
+
+/****************************************************
+ * DEFINITIONS
+ ****************************************************/
+%default TEMPDIR 'hdfs:///tmp/processed_logs';
+
+-- Binaries - location is specified in traffic_bootstrap.sh
+DEFINE PARSE_HOUR `/home/hadoop/traffic/parse_hour`;
+DEFINE DECRYPT_USERINFO `/home/hadoop/traffic/decrypt_userinfo`;
+DEFINE VERIFY `/home/hadoop/traffic/verify`;
+
+-- Pixel definitions
+%default URL_USERINFO '/pixel/of_destiny.png';
+%default URL_ADFRAME '/pixel/of_defenestration.png';
+%default URL_PROMOTEDLINK '/pixel/of_doom.png';
+%default URL_CLICK '/click';
+
+-- Cleanup
+rmf $OUTPUT
+rmf $TEMPDIR
+
+/****************************************************
+ * LOAD LOGFILE
+ ****************************************************/
+
+log_raw = LOAD '$LOGFILE' USING TextLoader() AS (line);
+log_parsed = STREAM log_raw THROUGH PARSE_HOUR AS (ip, path:chararray, query, unique_id);
+
+SPLIT log_parsed INTO
+ pageviews_with_path IF path == '$URL_USERINFO',
+ unverified_hits IF (path == '$URL_ADFRAME' OR
+ path == '$URL_PROMOTEDLINK' OR
+ path == '$URL_CLICK');
+
+pageviews_encrypted = FOREACH pageviews_with_path GENERATE unique_id, query;
+
+-- Store intermediate results because of bug: https://issues.apache.org/jira/browse/PIG-2442
+-- If/when EMR gets to 0.10 or 0.9.3 we can remove this
+STORE pageviews_encrypted INTO '$TEMPDIR/pageviews_encrypted';
+STORE unverified_hits INTO '$TEMPDIR/unverified_hits';
+
+ /****************************************************
+ * PAGEVIEWS
+ ****************************************************/
+
+pageviews_encrypted = LOAD '$TEMPDIR/pageviews_encrypted' AS (unique_id, query);
+
+pageviews = STREAM pageviews_encrypted THROUGH DECRYPT_USERINFO AS (unique_id, srpath, subreddit, lang, cname);
+
+-- sitewide
+sitewide_pageviews = FOREACH pageviews GENERATE unique_id; -- (unique_id)
+sitewide_hourly_uniques_grouped = GROUP sitewide_pageviews BY unique_id; -- (unique_id, {(unique_id), ...}
+sitewide_hourly_uniques = FOREACH sitewide_hourly_uniques_grouped
+ GENERATE group AS unique_id, COUNT(sitewide_pageviews) AS count; -- (unique_id, count)
+
+STORE sitewide_hourly_uniques INTO '$OUTPUT/sitewide';
+
+-- subreddit
+subreddit_pageviews_filtered = FILTER pageviews
+ BY subreddit IS NOT NULL; -- exclude entries without subreddit
+subreddit_pageviews_raw = FOREACH subreddit_pageviews_filtered
+ GENERATE subreddit, unique_id; -- limit to (subreddit, unique_id)
+subreddit_hourly_uniques_grouped = GROUP subreddit_pageviews_raw
+ BY (subreddit, unique_id); -- (subreddit, unique_id, {(subreddit, unique_id), ...})
+subreddit_hourly_uniques = FOREACH subreddit_hourly_uniques_grouped
+ GENERATE group.subreddit, group.unique_id,
+ COUNT(subreddit_pageviews_raw) AS count; -- (subreddit, unique_id, count)
+
+STORE subreddit_hourly_uniques INTO '$OUTPUT/subreddit';
+
+-- subreddit path
+srpath_filtered = FILTER pageviews BY srpath IS NOT NULL;
+srpath_pageviews = FOREACH srpath_filtered
+ GENERATE srpath, unique_id;
+srpath_hourly_uniques_grouped = GROUP srpath_pageviews
+ BY (srpath, unique_id);
+srpath_hourly_uniques = FOREACH srpath_hourly_uniques_grouped
+ GENERATE group.srpath, group.unique_id,
+ COUNT(srpath_pageviews) AS count;
+
+STORE srpath_hourly_uniques INTO '$OUTPUT/srpath';
+
+-- language
+lang_filtered = FILTER pageviews BY lang IS NOT NULL;
+lang_pageviews = FOREACH lang_filtered GENERATE lang, unique_id;
+lang_hourly_uniques_grouped = GROUP lang_pageviews BY (lang, unique_id);
+lang_hourly_uniques = FOREACH lang_hourly_uniques_grouped
+ GENERATE group.lang, group.unique_id,
+ COUNT(lang_pageviews) AS count;
+
+STORE lang_hourly_uniques INTO '$OUTPUT/lang';
+
+ /****************************************************
+ * HITS
+ ****************************************************/
+
+unverified_hits = LOAD '$TEMPDIR/unverified_hits' AS (ip, path:chararray, query, unique_id);
+
+-- process unverified hits
+verified = STREAM unverified_hits THROUGH VERIFY AS (unique_id, path:chararray, fullname, sr);
+
+-- ads and promoted links
+
+SPLIT verified INTO
+ clicks_raw IF path == '$URL_CLICK',
+ ad_impressions IF (path == '$URL_ADFRAME' OR
+ path == '$URL_PROMOTEDLINK');
+
+-- clicks
+clicks = FOREACH clicks_raw GENERATE fullname, unique_id;
+clicks_grouped = GROUP clicks BY (fullname, unique_id);
+clicks_by_hour = FOREACH clicks_grouped
+ GENERATE group.fullname, group.unique_id,
+ COUNT(clicks) AS count;
+
+STORE clicks_by_hour INTO '$OUTPUT/clicks';
+
+-- targeted clicks
+targeted_clicks = FOREACH clicks_raw GENERATE fullname, sr, unique_id;
+targeted_clicks_grouped = GROUP targeted_clicks BY (fullname, sr, unique_id);
+targeted_clicks_by_hour = FOREACH targeted_clicks_grouped
+ GENERATE group.fullname, group.sr, group.unique_id,
+ COUNT(targeted_clicks) AS count;
+
+STORE targeted_clicks_by_hour INTO '$OUTPUT/clicks_targeted';
+
+-- things
+thing_impressions = FOREACH ad_impressions GENERATE fullname, unique_id;
+thing_impressions_grouped = GROUP thing_impressions BY (fullname, unique_id);
+thing_impressions_hourly = FOREACH thing_impressions_grouped
+ GENERATE group.fullname, group.unique_id,
+ COUNT(thing_impressions) AS count;
+
+STORE thing_impressions_hourly INTO '$OUTPUT/thing';
+
+-- targeted things
+targeted_thing_impressions = FOREACH ad_impressions
+ GENERATE fullname, sr, unique_id;
+targeted_thing_impressions_grouped = GROUP targeted_thing_impressions
+ BY (fullname, sr, unique_id);
+targeted_thing_impressions_hourly = FOREACH targeted_thing_impressions_grouped
+ GENERATE group.fullname, group.sr,
+ group.unique_id,
+ COUNT(targeted_thing_impressions)
+ AS count;
+
+STORE targeted_thing_impressions_hourly INTO '$OUTPUT/thingtarget';
+
+rmf $TEMPDIR
View
133 scripts/traffic/parse.c
@@ -0,0 +1,133 @@
+#include <stdio.h>
+#include <string.h>
+#include <assert.h>
+#include <stdint.h>
+#include <inttypes.h>
+
+#include <arpa/inet.h>
+
+#include <pcre.h>
+#include <zlib.h>
+
+#define MAX_LINE 2048
+
+/******************************
+ * this regular expression has the following capture groups in it (in order):
+ * - ip
+ * - path
+ * - query
+ * - user agent
+ ******************************/
+#define RE "(?:[0-9.]+,\\ )*([0-9.]+)"\
+ "[^\"]+"\
+ "\"GET\\s([^\\s?]+)\\?([^\\s]+)\\s[^\"]+\""\
+ "[^\"]+"\
+ "\"[^\"]+\""\
+ "[^\"]+"\
+ "\"([^\"]+)\""
+
+#define GROUP_IP 1
+#define GROUP_PATH 2
+#define GROUP_QUERY 3
+#define GROUP_UA 4
+
+int main(int argc, char** argv)
+{
+ /* compile the pattern */
+ const char* error;
+ int error_offset;
+ pcre *re = pcre_compile(RE, 0, &error, &error_offset, NULL);
+ if (re == NULL) {
+ fprintf(
+ stderr,
+ "character %d: failed to compile regex: %s\n",
+ error_offset,
+ error
+ );
+
+ return 1;
+ }
+
+ /* study it to speed it up */
+ pcre_extra *extra = pcre_study(re, 0, &error);
+
+ /* allocate enough space for the capturing groups */
+ int group_count;
+ pcre_fullinfo(re, extra, PCRE_INFO_CAPTURECOUNT, &group_count);
+ int match_vector_size = (group_count + 1) * 3;
+ int *matches = malloc(sizeof(int) * match_vector_size);
+
+ /* iterate through the input */
+ char input_line[MAX_LINE];
+ while (fgets(input_line, MAX_LINE, stdin)) {
+ int length = strlen(input_line);
+
+ /* run the regular expression against the line */
+ int match_result = pcre_exec(
+ re,
+ extra,
+ input_line,
+ length,
+ 0,
+ 0,
+ matches,
+ match_vector_size
+ );
+
+ /* bail out if the line didn't match */
+ if (match_result < 0) {
+ continue;
+ }
+
+ /* iterate through the groups */
+ /* NOTE: the crc function uses int32_t instead of uint32_t
+ * and has the funky (2^31 - crc) bit of math for backwards
+ * compatibility with the old python code. fix this when
+ * such compatibility is no longer necessary. */
+ uint32_t address = 0;
+ int32_t crc;
+ uint64_t unique_id;
+
+ for (int i = 1; i < match_result; i++) {
+ int start_position = matches[i * 2];
+ int end_position = matches[i * 2 + 1];
+ int substr_length = end_position - start_position;
+
+ switch (i) {
+ case GROUP_UA:
+ crc = crc32(0L, Z_NULL, 0);
+ crc = crc32(crc, (unsigned char*)input_line + start_position,
+ substr_length);
+ unique_id = (((uint64_t)address << 32) & 0xffffffff00000000) |
+ (2147483648 - crc);
+ fprintf(stdout, "%" PRIu64, unique_id);
+ break;
+ case GROUP_IP:
+ /* parse and store the ip so we can use it in GROUP_UA
+ * to calculate the unique id */
+ input_line[end_position] = 0;
+ address = inet_addr(input_line + start_position);
+
+ /* fall through so it gets written out as well */
+ case GROUP_PATH:
+ case GROUP_QUERY:
+ /* write them out verbatim */
+ (void)fwrite(
+ input_line + start_position,
+ sizeof(char),
+ substr_length,
+ stdout
+ );
+ break;
+ }
+
+ /* tab-delimit the data */
+ fputc('\t', stdout);
+ }
+
+ fputc('\n', stdout);
+ }
+
+ return 0;
+}
+
View
36 scripts/traffic/traffic_bootstrap.sh
@@ -0,0 +1,36 @@
+#!/bin/bash
+# The contents of this file are subject to the Common Public Attribution
+# License Version 1.0. (the "License"); you may not use this file except in
+# compliance with the License. You may obtain a copy of the License at
+# http://code.reddit.com/LICENSE. The License is based on the Mozilla Public
+# License Version 1.1, but Sections 14 and 15 have been added to cover use of
+# software over a computer network and provide for limited attribution for the
+# Original Developer. In addition, Exhibit A has been modified to be consistent
+# with Exhibit B.
+#
+# Software distributed under the License is distributed on an "AS IS" basis,
+# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for
+# the specific language governing rights and limitations under the License.
+#
+# The Original Code is reddit.
+#
+# The Original Developer is the Initial Developer. The Initial Developer of
+# the Original Code is reddit Inc.
+#
+# All portions of the code written by reddit are Copyright (c) 2006-2012 reddit
+# Inc. All Rights Reserved.
+###############################################################################
+
+TRAFFIC_SRC_DIR=$1
+TRAFFIC_INSTALL_DIR=$HADOOP_HOME/traffic
+
+# Build traffic regexes
+mkdir $TRAFFIC_INSTALL_DIR
+hadoop dfs -copyToLocal $TRAFFIC_SRC_DIR/* $TRAFFIC_INSTALL_DIR
+cd $TRAFFIC_INSTALL_DIR
+make
+
+# Export userinfo secret
+TARGET=$HADOOP_HOME/.pam_environment
+TRACKING_SECRET=$2
+echo "TRACKING_SECRET=$TRACKING_SECRET" >> $TARGET
View
93 scripts/traffic/utils.c
@@ -0,0 +1,93 @@
+#include "utils.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <assert.h>
+
+void split_fields(char *line, ...)
+{
+ va_list args;
+
+ va_start(args, line);
+ char* c = line;
+ for (char** field = va_arg(args, char**); field != NULL;
+ field = va_arg(args, char**)) {
+ *field = c;
+
+ for (; *c != '\t' && *c != '\n'; c++) {
+ assert(*c != '\0');
+ }
+
+ *c = '\0';
+ c += 1;
+ }
+ va_end(args);
+}
+
+int url_decode(char* buffer)
+{
+ char *c = buffer, *o = buffer;
+ int size = 0;
+
+ for (;*c != '\0';) {
+ if (*c == '%') {
+ int count = sscanf(c + 1, "%2hhx", o);
+ if (count != 1) {
+ size = -1;
+ break;
+ }
+
+ c += 3;
+ } else {
+ *o = *c;
+ c += 1;
+ }
+
+ o += 1;
+ size += 1;
+ }
+
+ if (size > 0)
+ *o = '\0';
+
+ return size;
+}
+
+int parse_query_param(char** query, char** key, char** value)
+{
+ if (*query == NULL) {
+ return -1;
+ }
+
+ *key = *query;
+ *value = NULL;
+
+ bool string_ended = false;
+ int value_length = -1;
+ char *c;
+
+ for(c = *query; *c != '&'; c++) {
+ if (*c == '\0') {
+ string_ended = true;
+ break;
+ } else if (*value == NULL && *c == '=') {
+ *c = '\0';
+ *value = c + 1;
+ }
+
+ if (*value != NULL) {
+ value_length += 1;
+ }
+ }
+
+ *c = '\0';
+
+ if (!string_ended) {
+ *query = c + 1;
+ } else {
+ *query = NULL;
+ }
+
+ return value_length;
+}
+
View
24 scripts/traffic/utils.h
@@ -0,0 +1,24 @@
+#ifndef UTILS_H
+#define UTILS_H
+
+#include <stdarg.h>
+#include <stdbool.h>
+
+/*****
+ * split a tab separated line of text into its constituent fields.
+ */
+#define NO_MORE_FIELDS ((char**)NULL)
+void split_fields(char* line, ...);
+
+/*****
+ * parse one parameter from the query string. returns length of value.
+ */
+int parse_query_param(char** query, char** key, char** value);
+
+/*****
+ * undo url encoding in place. returns length of decoded string.
+ */
+int url_decode(char* encoded);
+
+#endif/*UTILS_H_*/
+
View
136 scripts/traffic/verify.c
@@ -0,0 +1,136 @@
+#include <stdio.h>
+#include <assert.h>
+#include <stdbool.h>
+#include <string.h>
+#include <stdlib.h>
+
+#include <openssl/sha.h>
+
+#include "utils.h"
+
+#define MAX_LINE 2048
+
+int main(int argc, char** argv)
+{
+ const char* secret;
+ secret = getenv("TRACKING_SECRET");
+ if (!secret) {
+ fprintf(stderr, "TRACKING_SECRET not set\n");
+ return 1;
+ }
+
+ char input_line[MAX_LINE];
+ unsigned char input_hash[SHA_DIGEST_LENGTH];
+ unsigned char expected_hash[SHA_DIGEST_LENGTH];
+ int secret_length = strlen(secret);
+
+ while (fgets(input_line, MAX_LINE, stdin) != NULL) {
+ /* get the fields */
+ char *ip, *path, *query, *unique_id;
+
+ split_fields(
+ input_line,
+ &ip,
+ &path,
+ &query,
+ &unique_id,
+ NO_MORE_FIELDS
+ );
+
+ /* in the query string, grab the fields we want to verify */
+ char *id = NULL;
+ char *hash = NULL;
+
+ char *key, *value;
+ while (parse_query_param(&query, &key, &value) >= 0) {
+ if (strcmp(key, "id") == 0) {
+ id = value;
+ } else if (strcmp(key, "hash") == 0) {
+ hash = value;
+ }
+ }
+
+ if (id == NULL || hash == NULL)
+ continue;
+
+ /* decode the params */
+ int id_length = url_decode(id);
+ if (id_length < 0)
+ continue;
+
+ if (url_decode(hash) != 40)
+ continue;
+
+ /* turn the expected hash into bytes */
+ bool bad_hash = false;
+ for (int i = 0; i < SHA_DIGEST_LENGTH; i++) {
+ int count = sscanf(&hash[i*2], "%2hhx", &input_hash[i]);
+ if (count != 1) {
+ bad_hash = true;
+ break;
+ }
+ }
+
+ if (bad_hash)
+ continue;
+
+ /* generate the expected hash */
+ SHA_CTX ctx;
+ int result = 0;
+
+ result = SHA1_Init(&ctx);
+ if (result == 0)
+ continue;
+
+ if (strcmp("/pixel/of_defenestration.png", path) != 0) {
+ /* the IP is not included on adframe tracker hashes */
+ result = SHA1_Update(&ctx, ip, strlen(ip));
+ if (result == 0)
+ continue;
+ }
+
+ result = SHA1_Update(&ctx, id, id_length);
+ if (result == 0)
+ continue;
+
+ result = SHA1_Update(&ctx, secret, secret_length);
+ if (result == 0)
+ continue;
+
+ result = SHA1_Final(expected_hash, &ctx);
+ if (result == 0)
+ continue;
+
+ /* check that the hashes match */
+ if (memcmp(input_hash, expected_hash, SHA_DIGEST_LENGTH) != 0)
+ continue;
+
+ /* split out the fullname and subreddit if necessary */
+ char *fullname = id;
+ char *subreddit = NULL;
+
+ for (char *c = id; *c != '\0'; c++) {
+ if (*c == '-') {
+ subreddit = c + 1;
+ *c = '\0';
+ break;
+ }
+ }
+
+ /* output stuff! */
+ fputs(unique_id, stdout);
+ fputc('\t', stdout);
+
+ fputs(path, stdout);
+ fputc('\t', stdout);
+
+ fputs(fullname, stdout);
+ fputc('\t', stdout);
+
+ if (subreddit != NULL) {
+ fputs(subreddit, stdout);
+ }
+
+ fputc('\n', stdout);
+ }
+}
View
13 upstart/reddit-job-hourly_traffic.conf
@@ -0,0 +1,13 @@
+description "process hourly traffic logs on EMR"
+
+manual
+task
+stop on reddit-stop or runlevel [016]
+
+nice 10
+
+script
+ . /etc/default/reddit
+ d=`date -u +"%Y-%m-%d-%H" -d '1 hour ago'`
+ wrap-job paster run $REDDIT_INI -c 'from r2.lib.traffic import process_hour; process_hour(\"$d\")'
+end script

0 comments on commit a70a6ea

Please sign in to comment.