In [None]:
# default_exp parser

In [None]:
#hide
import sys, os
from pathlib import Path

# Insert in Path Project Directory
sys.path.insert(0, str(Path().cwd().parent))

%load_ext autoreload
%autoreload 2    

# Parser

In [None]:
#export
from collections import defaultdict
from functools import partial
from fastcore.basics import uniqueify
from fastcore.utils import parallel
from fastcore.foundation import L
from rfpy.blocks import create_base_block, block_constructor
from rfpy.utils import bin2int, bin2str, optimize, getattrs
from rfpy.constants import *
import pandas as pd
import numpy as np
import os
from pathlib import Path
from typing import *
from datetime import datetime

path_type = Union[str, Any]
bin_val = Union[int, bytes]
bytes_encoded = Union[int, bytes]
datetime_object = datetime

Fonte: # https://github.com/fastai/fastai/blob/master/fastai/data/transforms.py#L26

In [None]:
#export
def _get_files(p, fs, extensions=None):
    p = Path(p)
    res = [p/f for f in fs if not f.startswith('.')
           and ((not extensions) or f'.{f.split(".")[-1].lower()}' in extensions)]
    return res

def get_files(path, extensions=None, recurse=True, folders=None, followlinks=True):
    "Get all the files in `path` with optional `extensions`, optionally with `recurse`, only in `folders`, if specified."
    path = Path(path)
    folders=L(folders)
    if extensions is not None:
        extensions = set(uniqueify(extensions))
        extensions = {e.lower() for e in extensions}
    if recurse:
        res = []
        for i,(p,d,f) in enumerate(os.walk(path, followlinks=followlinks)): # returns (dirpath, dirnames, filenames)
            if len(folders) !=0 and i==0: d[:] = [o for o in d if o in folders]
            else:                         d[:] = [o for o in d if not o.startswith('.')]
            if len(folders) !=0 and i==0 and '.' not in folders: continue
            res += _get_files(p, f, extensions)
    else:
        f = [o.name for o in os.scandir(path) if o.is_file()]
        res = _get_files(path, f, extensions)
    return L(res)

In [None]:
#export
def parse_bin(bin_file, bytes_header: int = BYTES_HEADER, marker: bytes = ENDMARKER, slice_=None):
    with open(bin_file, mode='rb') as bfile:
        # O primeiro bloco do arquivo é o cabeçalho e tem 36 bytes de tamanho.
        header = bfile.read(bytes_header)
        body = bfile.read()
    if slice_ is not None:
        assert slice_.start >= bytes_header, f"The start of your slice has to be >= {bytes_header}, you passed {slice_.start} "
        body = body[slice_]
    return {'file_version': bin2int(header[:4]), 'string': bin2str(header[4:]), 'blocks': classify_blocks(L(body.split(marker)))}

```python
# Python >=3.8 only
def binary_iter(bin_file: path_type, marker: bytes = b'\x00UUUU', block_size: int = 4096) -> Iterator[bytes]:
    """
    str, bytes, int > bytes
    :param bin_file: arquivo binario que contém os dados
    :param marker: separador de blocos
    :param block_size: tamanho em bytes que é "lido" por vez no arquivo, evitando problemas de memória
    :return: bloco em formato binario
    Gerador que fornece a partir de de um arquivo binário, um bloco binário por vez.

    """
    with open(bin_file, mode='rb') as bfile:
        # O primeiro bloco do arquivo é o cabeçalho e tem 36 bytes de tamanho.
        yield bfile.read(36)
        # As demais partes podem prosseguir normalmente
        current = b''
        while block := bfile.read(block_size):
            current += block
            while (markerpos := current.find(marker)) > 0:
                yield current[:markerpos]
                current = current[markerpos + len(marker):]
```

A função a seguir mapeia o arquivo `.bin` nos devidos blocos

In [None]:
#export
def classify_blocks(blocks: L)->Mapping[int,L]:
    """Receives an iterable L with binary blocks and returns a defaultdict with a tuple (block types, thread_id) as keys and a list of the Class Blocks as values
        :param file: A string or pathlib.Path like path to a `.bin`file generated by CFRS - Logger
        :return: A Dictionary with block types as keys and a list of the Class Blocks available as values
    """
    map_block = defaultdict(L)
    #index = BYTES_HEADER
    for bloco in blocks.map(create_base_block):
        data_size, btype, btid = bloco.data_size, bloco.type, bloco.thread_id
        if btype == btid == 0: #Ignore empty block at the beginning if present
            continue
        #stop = index + DATA_BLOCK_HEADER + len(bloco.data) + CHECKSUM
        #map_block[(btype, btid)].append(((index,stop), block_constructor(btype, bloco)))
        map_block[(btype, btid)].append(block_constructor(btype, bloco))
        #map_block[btype].append(((index,stop), block_constructor(btype, bloco)))
        #index = stop + LEN_MARKER
    return map_block

In [None]:
#export
def decode_compressed_block(block):
    MIN = block.offset - 127.5
    RUN = 255
    ESC = 254
    src = block.data[block.start:block.stop]
    nsrc = len(src)
    thresh = block.thresh
    dest = np.full(block.norig, MIN, dtype=np.float16)
    i = 0
    j = 0
    while i < nsrc:
        ib = src[i] 
        i+=1
        if ib == RUN:
            nrun = src[i] 
            i+=1
            dest[j:j+nrun] = MIN + thresh/2.
            j+=nrun
        elif ib == ESC:
            # next value is literal
            dest[j] = MIN + src[i]/2.
            i+=1 ; j+=1
        else:
            # value
            dest[j] = MIN + ib/2.
            j+=1
    return dest

In [None]:
#export
def decode_compressed_blocks(block_array):
    array, block = block_array
    array = decode_block(block)

In [None]:
#export
def _extract_level(bloco: Any):
    return np.expand_dims(bloco.block_data, 0) # reshape((-1, 1))


def extract_block_levels(spectrum_blocks: Mapping[int,L], pivoted: bool = True, dtypes: Mapping[str, str] = None)->pd.DataFrame:
    """Receives a mapping `spectrum_blocks` and returns the Matrix with the Levels as values, Frequencies as columns and Block Number as index.
       :param pivoted: If False, optionally returns an unpivoted version of the Matrix
    """
    assert len(spectrum_blocks), f"The spectrum block list is empty"
    spectrum_blocks = spectrum_blocks.itemgot(1)
    levels = np.concatenate(parallel(_extract_level, spectrum_blocks, n_workers=os.cpu_count(), progress=False))
    frequencies = getattr(spectrum_blocks[0], 'frequencies')
    if not pivoted:
#         unpivot = pivot.melt(var_name="Frequency(MHz)", value_name="Nivel(dBm)")
#         unpivot['Block_Number'] = levels.shape[0] * list(range(levels.shape[1]))
#         unpivot.sort_values(['Block_Number', 'Frequency(MHz)'], inplace=True)
#         unpivot.reset_index(drop=True, inplace=True)
#         unpivot =  unpivot.astype({'Block_Number': 'category', 'Frequency(MHz)': 'float32', "Nivel(dBm)" : 'float16'})

        # Doing directly as numpy is faster than pandas
        unpivot = np.array([np.repeat(np.arange(levels.shape[1]), levels.shape[0]),
                            np.tile(frequencies, levels.shape[0]),
                            levels.flatten()]).T
        if not dtypes:
            dtypes = LEVELS
        return pd.DataFrame(unpivot, columns=LEVELS.keys()).astype(dtypes)
    return pd.DataFrame(levels, columns=frequencies)
    #return levels, frequencies

In [None]:
#export
def meta2df(meta_list: Iterable)->pd.DataFrame:
    """Receives and Iterable `metalist` with metadata and converts it to a DataFrame"""
    df = pd.DataFrame(meta_list)
    dt_features = ['wallclock_datetime'] if 'wallclock_datetime' in df.columns else []
    df = optimize(df, dt_features)
    if dt_features:
        df = df.set_index('wallclock_datetime')
    return df

In [None]:
#export
def extract_metadata(map_blocks: Mapping[int,L])->pd.DataFrame:
    """Receives a Mapping with the different `. bin` Blocks and extracts the metadata from them excluding spectral data.
    """
    metadata = L()
    file_version, string, blocks = map_blocks.values()
    for (tipo, tid), blocos in blocks.items():
        if tipo not in MAIN_BLOCKS:
            continue
        df = meta2df(blocos.map(lambda item: {**dict(zip(('start_byte', 'stop_byte'), item[0])), **getattrs(item[1])}))
        df['file_version'] = file_version
        metadata.append(((tipo, tid), df))
    return metadata

In [None]:
#hide
from nbdev.export import notebook2script; notebook2script()

Converted 00_main.ipynb.
Converted 01_parser.ipynb.
Converted 02_utils.ipynb.
Converted 03_blocks.ipynb.
Converted 04_constants.ipynb.
Converted index.ipynb.
