Utility is converting ini-files for RADAR SCADA to I/O-table

In [1]:
import configparser
import os
import os.path

In [2]:
import pandas as pd

# Parse ini-file

In [3]:
def get_all_keys(section) -> list:
    return [k for k in section]

In [4]:
# Name = Type(R/W), [LinkFlag], [LinkMask], Descr, Period(s), Func, [Param1, Param2, ...]
NETBIOS_COLUMNS = (
    'type',
    'link_flag',
    'link_mask',
    'description',
    'period_sec',
    'function',
    'params',
)
NETBIOS_FUNCTION = (
    '&h40', # чтение ИНР
    '&h00', # Астановка входа Алгоблока
    '&h10', # чтение архива
    '&h11', # чтение часов
    '&h12', # установка часов
    '&h1A', # скорее всего запись ДКМ
)

In [5]:
# %key%=(Mask1,Value1), (Mask2, Value2)
SELECTORS_COLUMNS = (
    'mask_value',
)

In [6]:
# %key%=(TagName),[Parent],(Type),Description,InBuffer,InBufferOffset,[k,b],[Selector/BoolMask/NullZone], \
# [OutBuffer, OutBufferOffset],[Default],[DefaultMin],[DefaultMax],[Hysteresis],[InType],[OutType],[LinkFlag],[LinkMask]
TAGS_COLUMNS = (
    'tag_name',
    'tag_parent',
    'type',
    'description',
    'in_buffer',
    'in_buffer_offset',
    'k',  # slope
    'b',  # intercept
    'selector_mask_nzone',
    'out_buffer',
    'out_buffer_offset',
    'default',
    'default_min',
    'default_max',
    'hyst',
    'in_type',
    'out_type',
    'link_flag',
    'link_mask',
)

In [7]:
OUTPUT_COLUMNS = (
    'tag_full_name',
    'description',
    'data_type',
    'direction',
    'algoblock_type',
    'algoblock_address',
    'algoblock_offset',
    'selector_mask_nzone',
)

## Read NetBios section

In [8]:
def parse_section_netbios(section) -> pd.DataFrame:
    netbios_data = pd.DataFrame(
        index=get_all_keys(section),
        columns=NETBIOS_COLUMNS,
    )
    for k in netbios_data.index:
        values = section[k].split(',')
        for i in range(min(len(NETBIOS_COLUMNS), len(values))):
            netbios_data.loc[k, NETBIOS_COLUMNS[i]] = values[i].strip()
        if len(values) > len(NETBIOS_COLUMNS):
            netbios_data.loc[k, NETBIOS_COLUMNS[-1]] = ','.join(values[len(NETBIOS_COLUMNS):])
    # I suppose &h00 used for internal variables and we don't need this
    # always we don't archive and time functions
    return netbios_data.query('function in ("&h40", "&h1A")').fillna('')

## Read Selectors section

In [9]:
def parse_section_selectors(section) -> pd.DataFrame:
    selectors_data = pd.DataFrame(
        index=get_all_keys(section),
        columns=SELECTORS_COLUMNS,
    )
    for k in selectors_data.index:
        selectors_data.loc[k, SELECTORS_COLUMNS[0]] = section[k].strip()
    return selectors_data.fillna('')

## Read Tags section

In [10]:
def parse_section_tags(section) -> pd.DataFrame:
    tags_data = pd.DataFrame(
        index=get_all_keys(section),
        columns=TAGS_COLUMNS,
    )
    for k in tags_data.index:
        values = section[k].split(',')
        for i in range(min(len(TAGS_COLUMNS), len(values))):
            tags_data.loc[k, TAGS_COLUMNS[i]] = values[i].strip()
        if len(values) > len(TAGS_COLUMNS):
            tags_data.loc[k, TAGS_COLUMNS[-1]] = ','.join(values[len(TAGS_COLUMNS):])
    return tags_data.fillna('')

## Do output table

In [11]:
def compute_modbus_address(block_type, block_number, var_offset) -> int:
    if block_type not in ('ИНР', 'ДКМ'):
        return -1
    if block_type == 'ИНР':
        if 0 <= int(block_number) <= 7:
            base_addr = 0
            block_nr = int(block_number)
        elif 8 <= int(block_number) <= 15:
            base_addr = 32_768
            block_nr = int(block_number)-8
        elif 16 <= int(block_number) <= 23:
            base_addr = 34_816
            block_nr = int(block_number)-16
        else:
            return -1
        return base_addr + block_nr * 256 + int(var_offset)
    # ДКМ записывается по номеру блока и выходу
    return -1

In [12]:
def gen_output_table(buffers: pd.DataFrame, selectors: pd.DataFrame, tags: pd.DataFrame) -> pd.DataFrame:
    # at first for reading tags
    sorted_tags = tags.copy()
    sorted_tags['in_buffer_offset'] = pd.to_numeric(sorted_tags['in_buffer_offset'])
    sorted_tags = sorted_tags.sort_values(by=['in_buffer', 'in_buffer_offset'], ignore_index=True)
    output = pd.DataFrame(index=sorted_tags.index, columns=OUTPUT_COLUMNS, data='')
    for idx in sorted_tags.index:
        if sorted_tags.loc[idx, 'selector_mask_nzone'] in selectors.index:
            selector_mask_nzone = selectors.loc[sorted_tags.loc[idx, 'selector_mask_nzone'], 'mask_value']
        else:
            selector_mask_nzone = sorted_tags.loc[idx, 'selector_mask_nzone']
        next_line = {
            'tag_full_name': sorted_tags.loc[idx, 'tag_parent']+'.'+sorted_tags.loc[idx, 'tag_name'],
            'description': sorted_tags.loc[idx, 'description'],
            'data_type': sorted_tags.loc[idx, 'type'].lower(),
            'direction': buffers.loc[sorted_tags.loc[idx, 'in_buffer'], 'type'],
            'algoblock_type': buffers.loc[sorted_tags.loc[idx, 'in_buffer'], 'function'],
            'algoblock_address': buffers.loc[sorted_tags.loc[idx, 'in_buffer'], 'params'],
            'algoblock_offset': sorted_tags.loc[idx, 'in_buffer_offset'],
            'selector_mask_nzone': selector_mask_nzone,
        }
        output.loc[idx] = next_line
    # and now for writing tags
    buffer_index = ', '.join([f'"{t}"' for t in buffers.index])
    sorted_tags = tags.query(f'out_buffer in ({buffer_index})')
#     sorted_tags['out_buffer_offset'] = pd.to_numeric(sorted_tags['out_buffer_offset'])
    sorted_tags = sorted_tags.sort_values(by=['out_buffer', 'out_buffer_offset'], ignore_index=True)
    output2 = pd.DataFrame(index=sorted_tags.index, columns=OUTPUT_COLUMNS, data='')
    for idx in sorted_tags.index:
        next_line = {
            'tag_full_name': sorted_tags.loc[idx, 'tag_parent']+'.'+sorted_tags.loc[idx, 'tag_name'],
            'description': sorted_tags.loc[idx, 'description'],
            'data_type': sorted_tags.loc[idx, 'type'].lower(),
            'direction': buffers.loc[sorted_tags.loc[idx, 'out_buffer'], 'type'],
            'algoblock_type': buffers.loc[sorted_tags.loc[idx, 'out_buffer'], 'function'],
            'algoblock_address': buffers.loc[sorted_tags.loc[idx, 'out_buffer'], 'params'],
            'algoblock_offset': sorted_tags.loc[idx, 'out_buffer_offset'],
            'selector_mask_nzone': sorted_tags.loc[idx, 'selector_mask_nzone'],
        }
        output2.loc[idx] = next_line
    # and concat
    output = pd.concat([output, output2], ignore_index=True)
    # readability
    output['data_type'].replace({'single': 'FLOAT (4 bytes)',
                                 'byte': 'INT (1 byte)',
                                 'integer': 'INT (2 bytes)',
                                 'long': 'INT (4 bytes)',
                                 'boolean': 'BOOL (1 bit)',
                                }, inplace=True)
    output['direction'].replace({'R': 'Read', 'W': 'Write'}, inplace=True)
    output['algoblock_type'].replace({'&h40': 'ИНР', '&h1A': 'ДКМ'}, inplace=True)
    # and add modbus addresses
    output['modbus_address'] = -1
    for idx in output.index:
        output.loc[idx, 'modbus_address'] = compute_modbus_address(
            output.loc[idx, 'algoblock_type'], 
            output.loc[idx, 'algoblock_address'], 
            output.loc[idx, 'algoblock_offset']
        )
    return output

## Main parse

In [13]:
def parse_radar_ini(path: str) -> pd.DataFrame:
    config = configparser.ConfigParser(
        comment_prefixes=r'//',
        interpolation=None,
    )
    config.read(path)
    for sct in ('NetBios', 'Selectors', 'Tags'):
        if not config.has_section(sct):
            print(f'Section {sct} is missing in {path}')
            return pd.DataFrame()
    buffers = parse_section_netbios(config['NetBios'])
    selectors = parse_section_selectors(config['Selectors'])
    tags = parse_section_tags(config['Tags'])
    # only tags with used input buffers
    buffer_index = ', '.join([f'"{t}"' for t in buffers.index])
    tags.query(f'in_buffer in ({buffer_index}) or out_buffer in ({buffer_index})', inplace=True)
    output = gen_output_table(buffers, selectors, tags)
#     display(buffers)
#     display(selectors)
#     display(tags)
#     display(output)
#     return tags
    return output

# Main

In [14]:
INP_DIR = 'input'
OUT_DIR = 'output'

In [15]:
# scan input directory for ini-files
with os.scandir(INP_DIR) as it:
    for entry in it:
        if entry.is_file() and entry.name.endswith('.ini'):
            print(entry.name)
            out = parse_radar_ini(entry.path)
#             break
            out_name = os.path.join(OUT_DIR, os.path.splitext(entry.name)[0]+'.csv')
            out.to_csv(out_name, sep=';', index=False, encoding='cp1251')
#             break

001_KO1_8_Контактный осветлитель 1.ini
002_KO9_16_Контактный осветлитель 2.ini
003_Wash_Промывная станция 2.ini
004_Mkf_Микрофильтры.ini
005_1p_НС 1 подъем шахта 1.ini
006_1p2_НС 1 подъем шахта 2.ini
007_2p_НС 2 Подъем.ini
008_Filters_Скорые фильтры.ini
009_Wash_old_Промывная станция 1.ini
010_KR500_1_Качество воды.ini
011_rashod_ПТК учета.ini
012_sulfat_Сульфат аммония.ini
018_cmo_ЦМО.ini
019_sgus_Сгуститель.ini
020_usr_Усреднитель.ini
021_spiv_СПИВ.ini
023_P3_1_НС 3 Подъем 1.ini
024_NUR_НС НЮР.ini
025_P3_2_НС 3 Подъем 2.ini
026_SZR_ПНС СЗР.ini
027_NLAP_ПНС Лапсары.ini
028_9ptl_ПНС 9-й Пятилетки.ini
029_mkBG_ПНМ мкр Богдановка.ini
030_Sosn_СтД Сосновка.ini
031_okt_СтД Октябрьский.ini
032_sev_СтД Северный 1.ini
033_tal_СтД Сосновка Тальн.ini
034_skv2_СтД Северный 2.ini
035_ctp1_ЦТП1.ini
036_ctp7_ЦТП7.ini
037_ctp6_ЦТП6.ini
038_ctpM38_ЦТП Московский 38Б.ini
