Skip to content

Commit

Permalink
Merge pull request #2102 from selenamarie/bug970406-pull-adi-crontabber
Browse files Browse the repository at this point in the history
Fixes bug 970406 - pull ADI/blocklist pings from Hive with crontabber
  • Loading branch information
selenamarie committed Jun 12, 2014
2 parents f263cdf + c69284c commit 2c0e7a7
Show file tree
Hide file tree
Showing 5 changed files with 353 additions and 0 deletions.
@@ -0,0 +1,36 @@
"""Fixes bug 970406 - add raw_adi_logs table
Revision ID: 32b54dec3fc0
Revises: 1ab8d5514ce2
Create Date: 2014-06-12 11:47:19.398882
"""

# revision identifiers, used by Alembic.
revision = '32b54dec3fc0'
down_revision = '1ab8d5514ce2'

from alembic import op
from socorro.lib import citexttype, jsontype, buildtype
from socorro.lib.migrations import fix_permissions, load_stored_proc

import sqlalchemy as sa
from sqlalchemy import types
from sqlalchemy.dialects import postgresql
from sqlalchemy.sql import table, column

def upgrade():
op.create_table('raw_adi_logs',
sa.Column('report_date', sa.DATE(), nullable=True),
sa.Column('product_name', sa.TEXT(), nullable=True),
sa.Column('product_os_platform', sa.TEXT(), nullable=True),
sa.Column('product_os_version', sa.TEXT(), nullable=True),
sa.Column('product_version', sa.TEXT(), nullable=True),
sa.Column('build', sa.TEXT(), nullable=True),
sa.Column('build_channel', sa.TEXT(), nullable=True),
sa.Column('product_guid', sa.TEXT(), nullable=True),
sa.Column('count', sa.INTEGER(), nullable=True)
)

def downgrade():
op.drop_table('raw_adi_logs')
2 changes: 2 additions & 0 deletions requirements.txt
Expand Up @@ -111,3 +111,5 @@ path.py==5.1
pep8>=1.4.5
# sha256: P6gKELNtUWhr93RPXcmWIs1cmM6O1kAi5imGiq_Bd2k
pyflakes==0.8.1
# sha256: IBHATOBcLURttngxfeVP4PgNdykbnQ1G-sknzAV8ZXo
https://github.com/BradRuderman/pyhs2/archive/870d5590a97d53855a4aa6da70be8439ce4d690c.zip#egg=pyhs2
162 changes: 162 additions & 0 deletions socorro/cron/jobs/fetch_adi_from_hive.py
@@ -0,0 +1,162 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import datetime
import urllib2
import csv
import getpass
import os
import tempfile

import pyhs2

from configman import Namespace
from crontabber.base import BaseCronApp
from crontabber.mixins import (
as_backfill_cron_app,
with_postgres_transactions,
with_single_postgres_transaction
)

"""
Detailed documentation on columns avaiable from our Hive system at:
https://intranet.mozilla.org/Metrics/Blocklist
Columns being queried are:
report_date
product_name
product_os_platform
product_os_version
product_version
build
build_channel
product_guid
count
"""

_QUERY = """
select
ds,
split(request_url,'/')[5],
split(split(request_url,'/')[10], '%%20')[0],
split(split(request_url,'/')[10], '%%20')[1],
split(request_url,'/')[4],
split(request_url,'/')[6],
split(request_url,'/')[9],
split(request_url,'/')[3],
count(*)
FROM v2_raw_logs
WHERE
domain='addons.mozilla.org'
and request_url like '/blocklist/3/%%'
and ds=%s
GROUP BY
ds,
split(request_url,'/')[5],
split(split(request_url,'/')[10], '%%20')[0],
split(split(request_url,'/')[10], '%%20')[1],
split(request_url,'/')[4],
split(request_url,'/')[6],
split(request_url,'/')[9],
split(request_url,'/')[3]
"""


@as_backfill_cron_app
@with_postgres_transactions()
@with_single_postgres_transaction()
class FetchADIFromHiveCronApp(BaseCronApp):
""" This cron is our daily blocklist ping web logs query
that rolls up all the browser checkins and let's us know
how many browsers we think were active on the internet
for a particular day """
app_name = 'fetch-adi-from-hive'
app_description = 'Fetch ADI From Hive App'
app_version = '0.1'

required_config = Namespace()
required_config.add_option(
'query',
default=_QUERY,
doc='Hive query for fetching ADI data')

required_config.add_option(
'hive_host',
default='localhost',
doc='Hostname to run Hive query on')

required_config.add_option(
'hive_port',
default=10000,
doc='Port to run Hive query on')

required_config.add_option(
'hive_user',
default='socorro',
doc='User to connect to Hive with')

required_config.add_option(
'hive_password',
default='ignored',
doc='Password to connect to Hive with')

required_config.add_option(
'hive_database',
default='default',
doc='Database name to connect to Hive with')

required_config.add_option(
'hive_auth_mechanism',
default='PLAIN',
doc='Auth mechanism for Hive')

def run(self, connection, date):
target_date = (date - datetime.timedelta(days=1)).strftime('%Y-%m-%d')

raw_adi_logs_pathname = os.path.join(
tempfile.gettempdir(),
"%s.raw_adi_logs.TEMPORARY%s" % (
target_date,
'.txt'
)
)
try:
with open(raw_adi_logs_pathname, 'w') as f:
hive = pyhs2.connect(
host=self.config.hive_host,
port=self.config.hive_port,
authMechanism=self.config.hive_auth_mechanism,
user=self.config.hive_user,
password=self.config.hive_password,
database=self.config.hive_database
)

cur = hive.cursor()
query = self.config.query % target_date
cur.execute(query)
for row in cur:
f.write("\t".join(str(v) for v in row))
f.write("\n")

with open(raw_adi_logs_pathname, 'r') as f:
pgcursor = connection.cursor()
pgcursor.copy_from(
f,
'raw_adi_logs',
null='None',
columns=[
'report_date',
'product_name',
'product_os_platform',
'product_os_version',
'product_version',
'build',
'build_channel',
'product_guid',
'count'
]
)
finally:
if os.path.isfile(raw_adi_logs_pathname):
os.remove(raw_adi_logs_pathname)
28 changes: 28 additions & 0 deletions socorro/external/postgresql/models.py
Expand Up @@ -349,6 +349,34 @@ class PluginsReport(DeclarativeBase):
__mapper_args__ = {"primary_key": (report_id, plugin_id, date_processed, version)}


class RawAdiLogs(DeclarativeBase):
__tablename__ = 'raw_adi_logs'

#column definitions
report_date = Column(u'report_date', DATE())
product_name = Column(u'product_name', TEXT())
product_os_platform = Column(u'product_os_platform', TEXT())
product_os_version = Column(u'product_os_version', TEXT())
product_version = Column(u'product_version', TEXT())
build = Column(u'build', TEXT())
build_channel = Column(u'build_channel', TEXT())
product_guid = Column(u'product_guid', TEXT())
count = Column(u'count', INTEGER())

__mapper_args__ = {"primary_key": (
report_date,
product_name,
product_os_platform,
product_os_version,
product_version,
build,
build_channel,
product_guid,
count
)
}


class RawAdu(DeclarativeBase):
__tablename__ = 'raw_adu'

Expand Down
125 changes: 125 additions & 0 deletions socorro/unittest/cron/jobs/test_fetch_adi_from_hive.py
@@ -0,0 +1,125 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import datetime
from mock import patch
from nose.plugins.attrib import attr
from nose.tools import eq_

from crontabber.app import CronTabber
from crontabber.tests.base import IntegrationTestCaseBase

from socorro.cron.jobs import fetch_adi_from_hive


@attr(integration='postgres')
class TestFetchADIFromHive(IntegrationTestCaseBase):

def setUp(self):
super(TestFetchADIFromHive, self).setUp()

def tearDown(self):
cursor = self.conn.cursor()
cursor.execute("TRUNCATE raw_adi_logs")
super(TestFetchADIFromHive, self).tearDown()

def _setup_config_manager(self):
_super = super(TestFetchADIFromHive, self)._setup_config_manager
return _super(
'socorro.cron.jobs.fetch_adi_from_hive.FetchADIFromHiveCronApp|1d'
)

def test_mocked_fetch(self):
config_manager = self._setup_config_manager()

def return_test_data(fake):
test_data = [
['2014-01-01',
'WinterWolf',
'Ginko',
'2.3.1',
'10.0.4',
'nightly-ww3v20',
'nightly',
'a-guid',
'1'],
['2019-01-01',
'NothingMuch',
'Ginko',
'2.3.2',
'10.0.5a',
None,
'release',
'a-guid',
'2']
]
for item in test_data:
yield item

with patch('socorro.cron.jobs.fetch_adi_from_hive.pyhs2') as fake_hive:
fake_hive.connect.return_value \
.cursor.return_value.__iter__ = return_test_data

with config_manager.context() as config:
tab = CronTabber(config)
tab.run_all()

information = self._load_structure()
assert information['fetch-adi-from-hive']

if information['fetch-adi-from-hive']['last_error']:
raise AssertionError(
information['fetch-adi-from-hive']['last_error']
)

fake_hive.connect.assert_called_with(
database='default',
authMechanism='PLAIN',
host='localhost',
user='socorro',
password='ignored',
port=10000
)

pgcursor = self.conn.cursor()
columns = (
'report_date',
'product_name',
'product_os_platform',
'product_os_version',
'product_version',
'build',
'build_channel',
'product_guid',
'count'
)
pgcursor.execute(
""" select %s from raw_adi_logs """ % ','.join(columns)
)

adi = [dict(zip(columns, row)) for row in pgcursor.fetchall()]

eq_(adi, [
{
'report_date': datetime.date(2014, 1, 1),
'product_name': 'WinterWolf',
'product_os_platform': 'Ginko',
'product_os_version': '2.3.1',
'product_version': '10.0.4',
'build': 'nightly-ww3v20',
'build_channel': 'nightly',
'product_guid': 'a-guid',
'count': 1
}, {
'report_date': datetime.date(2019, 1, 1),
'product_name': 'NothingMuch',
'product_os_platform': 'Ginko',
'product_os_version': '2.3.2',
'product_version': '10.0.5a',
'build': None,
'build_channel': 'release',
'product_guid': 'a-guid',
'count': 2
}
])

0 comments on commit 2c0e7a7

Please sign in to comment.