/
api.py
113 lines (81 loc) · 3.71 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
from typing import TypeVar
from typing import Optional
from typing import Tuple
from typing import Dict
from typing import Iterator
from datetime import date
from itertools import zip_longest
from io import BytesIO
from xml.etree import ElementTree
from xml.etree.ElementTree import Element
import aiohttp
from numpy import uint32
from numpy import float32
from numpy import nan
from .reports import Report
T = TypeVar('T')
Attributes = Dict[str, str]
ParsedElement = Tuple[str, Element]
date_format = "%m-%d-%Y"
base_url = 'https://mpr.datamart.ams.usda.gov/ws/report/v1/hogs'
report_url = lambda report: f'{base_url}/{report}'
date_filter = lambda start, end: f'{{"fieldName":"Report date","operatorType":"BETWEEN","values":["{start}","{end}"]}}'
request_url = lambda report, start, end: f'{report_url(report)}?filter={{"filters":[{date_filter(start, end)}]}}'
def strip_commas(value: str) -> str:
return value.replace(',', '')
def get_optional(attr: Attributes, key: str) -> Optional[T]:
return attr[key] if key in attr and attr[key] != 'null' else None
def opt_float(attr: Attributes, key: str) -> float32:
value = get_optional(attr, key)
return float32(strip_commas(value)) if value else nan
def opt_int(attr: Attributes, key: str) -> uint32:
value = get_optional(attr, key)
return uint32(strip_commas(value)) if value else 0
def chunk(iterator: Iterator[T], n: int) -> Iterator[Iterator[T]]:
args = [iterator] * n
return zip_longest(*args, fillvalue=None)
def filter_section(records: Iterator[Attributes], section: str) -> Iterator[Attributes]:
return filter(lambda it: it['label'] == section, records)
def filter_sections(records: Iterator[Attributes], *args: str) -> Iterator[Iterator[Attributes]]:
attrs = filter(lambda it: it['label'] in args, records)
return chunk(attrs, len(args))
async def fetch(report: Report, start: date, end=date.today()) -> Iterator[Attributes]:
url = request_url(
report=report.value,
start=start.strftime(date_format),
end=end.strftime(date_format))
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = BytesIO(await response.read())
elements = ElementTree.iterparse(data, events=['start', 'end'])
return parse_elements(elements)
def parse_elements(elements: Iterator[ParsedElement], min_depth=1, max_depth=4) -> Iterator[Attributes]:
"""
Parses a USDA report by saving metadata from parent elements to a dictionary while traversing down the tree.
When at the maximum depth, yield all collected metadata with each child element's attributes.
Typical layout of a USDA report:
<results exportTime>
<report label slug>
<record report_date reported_for_date>
<report label>
<record ...attributes/>
Usually all we care about is the report date (depth=2); the report section label (depth=3);
and the record data attributes (depth=4).
"""
depth = 0
metadata: Attributes = dict()
for event, element in elements:
if event == 'start':
if min_depth <= depth < max_depth:
# Parsing a parent element: merge its properties into the metadata
metadata.update(element.items())
elif depth == max_depth:
# Parsing a child element: generate a dict combining metadata and child attributes
yield dict(metadata.items() | element.items())
depth += 1
if event == 'end':
depth -= 1
if depth == min_depth:
# clear the metadata and element tree after each report section
element.clear()
metadata.clear()