Skip to content

Commit

Permalink
Add logic to batch processing into states
Browse files Browse the repository at this point in the history
Processing will be batched into states if records count exceeds 50000 annually
  • Loading branch information
muya committed Mar 5, 2016
1 parent bcec28b commit 24c7800
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 4 deletions.
1 change: 1 addition & 0 deletions constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
states_list = ['AK', 'AL', 'AR', 'AS', 'AZ', 'CA', 'CO', 'CT', 'DC', 'DE', 'FL', 'GA', 'GU', 'HI', 'IA', 'ID', 'IL', 'IN', 'KS', 'KY', 'LA', 'MA', 'MD', 'ME', 'MI', 'MN', 'MO', 'MP', 'MS', 'MT', 'NA', 'NC', 'ND', 'NE', 'NH', 'NJ', 'NM', 'NV', 'NY', 'OH', 'OK', 'OR', 'PA', 'PR', 'RI', 'SC', 'SD', 'TN', 'TX', 'UT', 'VA', 'VI', 'VT', 'WA', 'WI', 'WV', 'WY']
36 changes: 32 additions & 4 deletions harvest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import sys
import getopt

import utils
import nass_query


def begin_nass_harvest(database_host, database_name, database_user,
database_password, port, start_date, end_date):
database_password, port, start_date, end_date, api_key):
print "\nThis is a starter script for the Gro Hackathon's NASS harvest. It meets the API " \
"requirements defined for the hackathon\n\n"

Expand All @@ -14,19 +17,36 @@ def begin_nass_harvest(database_host, database_name, database_user,
print "Database Host: {}".format(database_host)
print "Database Name: {}".format(database_name)
print "Database Username: {}".format(database_user)
print "Database Password: {}".format(database_password)
print "Database Password: {}".format("like I'm gonna show you this :-P")
print "Database Port (hard-coded): {}".format(port)
print "Harvest Start Date: {}".format(start_date)
print "Harvest End Date: {}\n".format(end_date)

# validate dates
print (
"DISCLAIMER: POINT-IN-TIME data is not available for all years, "
"so if your dates start or end mid-year (e.g. start date 2015-04-01"
" end date 2015-09-31, data for the whole of 2015 will be returned"
" instead). You have to love API limitations\n\n")

# build query
nq = nass_query.NassQuery()
nq.start_date = start_date
nq.end_date = end_date
nq.validate_dates()
nq.build_get_query()

nass_util = utils.NassUtils(api_key=api_key)
nass_util.fetch_data(nq)


# #################################################
# PUT YOUR CODE ABOVE THIS LINE
# #################################################
def main(argv):
try:
opts, args = getopt.getopt(argv, "h", ["database_host=", "database_name=", "start_date=",
"database_user=", "database_pass=", "end_date="])
"database_user=", "database_pass=", "end_date=", "api_key="])
except getopt.GetoptError:
print 'Flag error. Probably a mis-typed flag. Make sure they start with "--". Run python ' \
'harvest.py -h'
Expand All @@ -40,6 +60,7 @@ def main(argv):
database_password = 'gro123'
start_date = '2005-1-1'
end_date = '2015-12-31'
api_key_given = False

for opt, arg in opts:
if opt == '-h':
Expand All @@ -66,9 +87,16 @@ def main(argv):
start_date = arg
elif opt in ("--end_date"):
end_date = arg
elif opt in ("--api_key"):
api_key = arg
api_key_given = True

if not api_key_given:
print "Please specify API key using --api-key"
sys.exit()

begin_nass_harvest(database_host, database_name, database_user, database_password,
port, start_date, end_date)
port, start_date, end_date, api_key)

if __name__ == "__main__":
main(sys.argv[1:])
66 changes: 66 additions & 0 deletions nass_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from dateutil import parser


class NassQuery(object):
"""
Object to hold query data
"""
def __init__(self):
self.start_date = None
self.end_date = None

self.start_year = None
self.end_year = None

# whether or not the query spans multiple years
self.has_year_range = False

self.get_payload = {}

def validate_dates(self):
"""
validates the dates provided
"""
print "Validating data: start_date: %s | end_date: %s" % (
self.start_date, self.end_date)

try:
self.start_year = parser.parse(self.start_date).year
self.end_year = parser.parse(self.end_date).year

if self.start_year > self.end_year:
print(
"C'mon... start year MUST come before end year")
raise Exception("Start year is after end year")
except Exception, e:
print (
"Exception thrown while parsing dates. Please ensure dates "
"are correct")
raise e

def build_get_query(self):
"""
builds the query to be used in the GET request to the API
"""
# the 'WHAT' part
self.get_payload["sector_desc"] = "CROPS"

# the 'WHERE' part
self.get_payload["agg_level_desc"] = "COUNTY"

# the 'WHEN' part
# build this depending on whether the start & end years are different
if self.start_year == self.end_year:
self.has_year_range = False
self.get_payload["year"] = self.start_year
else:
# dates had already been validated
self.has_year_range = True
self.get_payload["year__GE"] = self.start_year
self.get_payload["year__LE"] = self.end_year

self.get_payload["freq_desc"] = "ANNUAL"

print "payload created: %s" % self.get_payload

return
5 changes: 5 additions & 0 deletions requirements.text
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
argparse==1.2.1
python-dateutil==2.5.0
requests==2.9.1
six==1.10.0
wsgiref==0.1.2
125 changes: 125 additions & 0 deletions utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import requests
import json
import constants


class NassUtils(object):
"""
Utility functions to use in the NASS (http://quickstats.nass.usda.gov/api/)
application
"""
def __init__(self, **kwargs):
self.default_nass_api_url = "http://quickstats.nass.usda.gov/api/"
self.nass_api_key = kwargs.get("api_key", None)

def nass_api_client(self, endpoint, payload={}, **kwargs):
base_url = kwargs.get("base_url", self.default_nass_api_url)
http_method = kwargs.get("http_method", "GET")

url = "%s/%s" % (base_url.strip("/"), endpoint)

# key is required in all calls
payload["key"] = self.nass_api_key

print "Calling %s with params: %s" % (url, payload)

try:
if http_method == "GET":
return requests.get(url, params=payload)
else:
return None
if r.status_code != requests.codes.ok:
r.raise_for_status()
except Exception, e:
raise e

def fetch_record_count(self, nass_query):
# check how many records we'll get
r_count_estimate = self.nass_api_client(
"get_counts",
nass_query.get_payload)
print "Size estimate received from API: %s" % r_count_estimate.content

record_count = json.loads(r_count_estimate.content).get("count")
return record_count

def fetch_records(self, nass_query):
records = self.nass_api_client(
"api_GET",
nass_query.get_payload)

fetched_data = json.loads(records.content).get("data") or []
return fetched_data

def fetch_data_in_single_batch(self, nass_query):
all_data = self.fetch_records(nass_query)
return all_data

def fetch_data_in_annual_batches(self, nass_query, batch_count, **kwargs):
fetch_by_state = kwargs.get("fetch_by_state", False)
all_data = []

for x in xrange(batch_count):
curr_year = nass_query.start_year + x

# fetch data for this year
nass_query.get_payload["year"] = curr_year

if "year__GE" in nass_query.get_payload:
del(nass_query.get_payload["year__GE"])

if "year__LE" in nass_query.get_payload:
del(nass_query.get_payload["year__LE"])

if fetch_by_state:
curr_year_data = []
for state in constants.states_list:
curr_state_data = []
nass_query.get_payload["state_alpha"] = state
curr_state_data = self.fetch_records(nass_query)
print "Record count for state [%s]: %s" % (
state, len(curr_state_data))
curr_year_data = (
curr_year_data + curr_state_data)
else:
curr_year_data = self.fetch_records(nass_query)

all_data = all_data + curr_year_data
break

return all_data

def fetch_data(self, nass_query):
# determine how many calls will be required to ensure we fetch all
# data in batches of max 50k
total_estimated_rc = self.fetch_record_count(nass_query)
batch_estimated_rc = int(total_estimated_rc)
batch_size = 50000
if nass_query.has_year_range:
total_years = nass_query.end_year - nass_query.start_year
else:
total_years = 1

print "total years: %s" % total_years

if batch_estimated_rc > batch_size:
# let's try to split time period into no. of periods that *might*
# have 50k records each

# if time periods are > total no. of years, we may have to process
# by state
possible_time_periods = int(batch_estimated_rc / batch_size) + 1

if possible_time_periods > total_years:
print "More than %s records per year, we'll process by state" % batch_size
fetched_data = self.fetch_data_in_annual_batches(
nass_query, total_years, fetch_by_state=True)
else:
print "Processing will be split into %s time periods" % possible_time_periods
fetched_data = self.fetch_data_in_annual_batches(
nass_query, total_years)
else:
fetched_data = self.fetch_data_in_single_batch(nass_query, total_years)

print fetched_data
print "Record count: %s" % len(fetched_data)

0 comments on commit 24c7800

Please sign in to comment.