Skip to content

Commit

Permalink
FIX: add capability to decode old DX header, add test
Browse files Browse the repository at this point in the history
  • Loading branch information
kmuehlbauer committed Nov 6, 2020
1 parent ad180ee commit dd5a60a
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 22 deletions.
82 changes: 60 additions & 22 deletions wradlib/io/radolan.py
Expand Up @@ -116,6 +116,26 @@ def unpack_dx(raw):
return np.array(beam)


def get_dx_header_token():
"""Return array with known header token of dx data
Returns
-------
head : dict
with known header token, value set to None
"""
head = {
"BY": None,
"VS": None,
"CO": None,
"CD": None,
"CS": None,
"EP": None,
"MS": None,
}
return head


def parse_dx_header(header):
"""Internal function to retrieve and interpret the ASCII header of a DWD
DX product file.
Expand All @@ -137,25 +157,32 @@ def parse_dx_header(header):
out["datetime"] = out["datetime"].replace(tzinfo=util.UTC())
# radar location ID (always 10000 for composites)
out["radarid"] = header[8:13]
pos_by = header.find("BY")
pos_vs = header.find("VS")
pos_co = header.find("CO")
pos_cd = header.find("CD")
pos_cs = header.find("CS")
pos_ep = header.find("EP")
pos_ms = header.find("MS")

out["bytes"] = int(header[pos_by + 2 : pos_by + 7])
out["version"] = header[pos_vs + 2 : pos_vs + 4]
out["cluttermap"] = int(header[pos_co + 2 : pos_co + 3])
out["dopplerfilter"] = int(header[pos_cd + 2 : pos_cd + 3])
out["statfilter"] = int(header[pos_cs + 2 : pos_cs + 3])
out["elevprofile"] = [
float(header[pos_ep + 2 + 3 * i : pos_ep + 2 + 3 * (i + 1)]) for i in range(8)
] # noqa
out["message"] = header[
pos_ms + 5 : pos_ms + 5 + int(header[pos_ms + 2 : pos_ms + 5])
] # noqa

head = get_radolan_header_token_pos(header, mode="dx")

# iterate over token and fill output dict accordingly
for k, v in head.items():
if v:
if k == "BY":
out["bytes"] = int(header[v[0] : v[1]]) - len(header) - 1
if k == "VS":
out["version"] = header[v[0] : v[1]]
if k == "CO":
out["cluttermap"] = int(header[v[0] : v[1]])
if k == "CD":
out["dopplerfilter"] = int(header[v[0] : v[1]])
if k == "CS":
out["statfilter"] = int(header[v[0] : v[1]])
if k == "EP":
out["elevprofile"] = [
float(header[v[0] + 3 * i : v[0] + 3 * (i + 1)]) for i in range(8)
]
if k == "MS":
try:
cnt = int(header[v[0] : v[0] + 3])
out["message"] = header[v[0] + 3 : v[0] + 3 + cnt]
except ValueError:
pass

return out

Expand Down Expand Up @@ -331,21 +358,33 @@ def get_radolan_header_token():
return head


def get_radolan_header_token_pos(header):
def get_radolan_header_token_pos(header, mode="composite"):
"""Get Token and positions from DWD radolan header
Parameters
----------
header : string
(ASCII header)
Keyword Arguments
-----------------
mode : str
'composite' or 'dx', defaults to 'composite'
Returns
-------
head : dictionary
with found header tokens and positions
"""

head_dict = get_radolan_header_token()
if mode == "composite":
head_dict = get_radolan_header_token()
elif mode == "dx":
head_dict = get_dx_header_token()
else:
raise ValueError(
f"unknown mode {mode}, use either 'composite' or 'dx' depending on data source"
)

for token in head_dict.keys():
d = header.rfind(token)
Expand Down Expand Up @@ -398,7 +437,6 @@ def parse_dwd_composite_header(header):
# get dict of header token with positions
head = get_radolan_header_token_pos(header)
# iterate over token and fill output dict accordingly
# for k, v in head.iteritems():
for k, v in head.items():
if v:
if k == "BY":
Expand Down
7 changes: 7 additions & 0 deletions wradlib/tests/test_io.py
Expand Up @@ -74,6 +74,13 @@ def test_parse_dx_header(self):
head += str(c.decode())
io.radolan.parse_dx_header(head)

def test_parse_old_dx_header(self):
header = b"DX010000109080100BY 4173VS 1CO2CD2CS0MS***XXXXXXXX"
head = ""
for c in sio.BytesIO(header):
head += str(c.decode())
io.radolan.parse_dx_header(head)

def test_unpack_dx(self):
pass

Expand Down

0 comments on commit dd5a60a

Please sign in to comment.