# Utils functions

XND restrictions to create a `xnd` object from semi-structured data.

- key should not contain white spaces (e.g. replace ' ' by '_')

In [1]:
from copy import deepcopy
from pytest import fixture

In [2]:
run_tests = __name__ == '__main__'

In [3]:
def get_key_value(item):
    if isinstance(item, dict):
        return item.items()
    elif isinstance(item, list):
        return enumerate(item)
    return ()

if run_tests:
    _data1 = {'a': 1}
    _data2 = ['a']
    
    assert get_key_value(_data1) == _data1.items() 
    assert list(get_key_value(_data2)) == list(enumerate(_data2))

In [4]:
def highest_value_type(values):
    """
    TODO: implement a way to treat "type weight"
    
    values: list
    returns: type
    """
    _types = {type(v) for v in values if v is not None}
    
    if not _types:
        return None
    return _types.pop()

if run_tests: 
    assert highest_value_type([1,3,4,5, None]) is int

In [5]:
def highest_type(values):
    """
    TODO: implement a way to treat "type weight"
    
    values: list
    returns: type
    """
    _types = {v for v in values if v is not type(None)}
    
    if not _types:
        return type(None)
    return _types.pop()

if run_tests:
    assert highest_type([int, int ,int , str, type(None)]) is str
    

In [6]:
def has_mixed_type(data_list):
    list_types = {type(v) for v in data_list}
    
    return bool(len(list_types) - 1)

if run_tests:
    assert not has_mixed_type([{'a': 1}])
    assert has_mixed_type([{'a': 1}, 1]) 

In [7]:
def map_type(python_type):
    types = {
        'str': 'string',
        'int': 'int64',
        'float': 'float64'
    }
    
    if python_type in types:
        return types[python_type]
    else:
        return python_type

if run_tests:
    assert map_type('int') == 'int64'
    assert map_type('float') == 'float64'
    assert map_type('str') == 'string'
    assert map_type('bool') == 'bool'

In [8]:
def apply_nullable_on_children(parent):
    if isinstance(parent, dict): 
        for k, child in parent.items():
            child['nullable'] = True
            if 'value' in child:
                apply_nullable_on_children(child['value'])
    return parent

if run_tests:                 
    _data1 = {'tree': {'value_type': dict,
      'nullable': False,
      'value': {'leaf1': {'value_type': list,
        'nullable': False,
        'count': 2,
        'value': int}}},
     'cars': {'value_type': list,
      'nullable': False,
      'count': 3,
      'value': {'model': {'nullable': False, 'value_type': str},
       'ports': {'nullable': False,
        'value_type': dict,
        'value': {'numbers': {'value_type': int, 'nullable': False}}}}}
    }
    
    result = apply_nullable_on_children(_data1)
    
    def test_node(node):
        if isinstance(node, dict):
            for k, v in node.items():
                if 'nullable' in v:
                    assert v['nullable']
                if 'value' in v:
                    test_node(v['value'])
    
    test_node(result)        

In [9]:
def json_cleaning(data):
    data = deepcopy(data)
    if isinstance(data, dict):
        for k in list(data.keys()):
            k_new = k.replace(' ', '_')
            
            if k != k_new:
                data[k_new] = data.pop(k)
            
            k = k_new
            if isinstance(data[k], (dict, list)):
                data[k] = json_cleaning(data[k])
                
    if isinstance(data, list):
        for i, v in enumerate(data):
            if isinstance(v, (dict, list)):
                data[i] = json_cleaning(data[i])
    return data

if run_tests:
    _data1 = {'k b': 2, 'a w': {'bla bla': 2, 'x x': [1, 2, {'asdf asdf ': 1}]}}
    result = json_cleaning(_data1)
    
    def test_node(node, has_space):
        if isinstance(node, dict):
            for k, v in node.items():
                assert (' ' in k) == has_space
                if isinstance(v, dict):
                    test_node(v, has_space)
    
    test_node(_data1, True)
    test_node(result, False)

In [10]:
def _json_meta_type_list(data, parent_nullable):
    meta = {
        'nullable': parent_nullable,
    }
    
    if not has_mixed_type(data):
        meta['count'] = len(data)
        _value_highest_type = highest_value_type(data)
        
        if _value_highest_type is dict:
            meta['value_type'] = dict
            meta['value'] = {}
            meta_value = meta['value']  # alias
            # first_iter and columns are used to find nullable fields
            first_iter = True
            columns = set()

        for item in data:
            if _value_highest_type != type(item):
                raise Exception('All items from the list should be the same type.')

            if isinstance(item, dict):
                children_value = _json_meta_type_dict(item, meta['nullable'])
                children_keys = item.keys()

                for k, v in children_value.items():
                    if parent_nullable or (not first_iter and k not in columns):
                        nullable = True
                    else:
                        nullable = False

                    if (
                        k not in meta_value or 
                        not v['value_type'] is type(None)
                    ):
                        if k not in meta_value:
                            meta_value[k] = {
                                'nullable': False,
                                'value_type': type(None)
                            }
                        _nullable = meta_value[k]['nullable']
                        
                        v['value_type'] = highest_type([meta_value[k]['value_type'], v['value_type']])
                        meta_value[k].update(v)
                        meta_value[k].update({'nullable': meta_value[k]['nullable'] | _nullable})
                    if v['value_type'] is type(None) or nullable:
                        meta_value[k]['nullable'] = True
                        if 'value' in meta_value[k] and isinstance(meta_value[k], dict):
                            apply_nullable_on_children(meta_value[k]['value'])

                for c in columns:
                    if parent_nullable or c not in children_keys:
                        meta_value[c]['nullable'] = True
                        if 'value' in meta_value[k] and isinstance(meta_value[k], dict):
                            apply_nullable_on_children(meta_value[k]['value'])

                columns |= set(children_keys)

            first_iter = False
            
        if 'value' not in meta:
            meta['value'] = _value_highest_type
    return meta
        

def _json_meta_type_dict(data, parent_nullable):
    meta = {}
    for k, item in get_key_value(data):
        meta[k] = {'value_type': item.__class__}
        meta_k = meta[k]  # alias
        
        if meta_k['value_type'] is type(None) or parent_nullable or item is None:
            meta_k['nullable'] = True
        else:
            meta_k['nullable'] = False

        if isinstance(item, dict):
            meta_k['value'] = json_meta_type(item, meta_k['nullable'])
        elif isinstance(item, list):
            meta_k.update(_json_meta_type_list(item, meta_k['nullable']))
    return meta

def json_meta_type(data, parent_nullable=False):
    """The function should return a json with all meta data normalized."""
    if isinstance(data, dict):
        meta = _json_meta_type_dict(data, parent_nullable)
    elif isinstance(data, list):
        meta = _json_meta_type_list(data, parent_nullable)
    else:
        meta = data.__class__
    return meta

if run_tests:
    _data1 = {'data': [0]*1000}
    _meta1 = {
        'data': {
            'value_type': list, 
            'nullable': False, 
            'count': 1000, 
            'value': int}
    }
    assert json_meta_type(_data1) == _meta1
    
    _data2 = [{'x': 1, 'y': 10}, {'x': 2, 'y': 20}]
    _meta2 = {
        'value_type': dict, 
        'count': 2,
        'nullable': False,
        'value': {
            'x': {'nullable': False, 'value_type': int},
            'y': {'nullable': False, 'value_type': int},
        }
    }
    assert json_meta_type(_data2) == _meta2
    
    _data3 = [{'x': 1, 'y': {'z': None}}, {'x': 2, 'y': {'z': 1}}]
    _meta3 = {
        'value_type': dict, 
        'count': 2,
        'nullable': False,
        'value': {
            'x': {'nullable': False, 'value_type': int},
            'y': {'nullable': False, 'value_type': dict, 'value': {
                'z': {
                    'value_type': int,
                    'nullable': True
                }
            }},
        }
    }
    # assert json_meta_type(_data3) == _meta3

In [11]:
def _get_xnd_type_item(k, item):
    nullable_str = '?' if 'nullable' in item and item['nullable'] else ''
        
    if 'value_type' not in item:
        return []
    
    if k is None:
        k_str = ''
    else:
        k_str = '{}: '.format(k)
        
    if 'count' in item:
        count_str = '{} * '.format(item['count'])
    else:
        count_str = ''

    if item['value_type'] is dict:
        if isinstance(item['value'], dict):
            _xnd_item_value = get_xnd_type(item['value'])
        else:
            _xnd_item_value = item['value'].__name__
        _xnd_item_template = '{}{}{}{{}}'.format(k_str, count_str, nullable_str)

    elif item['value_type'] is list:
        if isinstance(item['value'], dict):
            _xnd_item_value = get_xnd_type(item['value'])
        else:
            _xnd_item_value = map_type(item['value'].__name__)
        _xnd_item_template = '{}{}{}{{}}'.format(k_str, count_str, nullable_str)
    else:
        _xnd_item_value = map_type(item['value_type'].__name__)
        _xnd_item_template = '{}{}{}{{}}'.format(k_str, count_str, nullable_str)
    return [_xnd_item_template.format(_xnd_item_value)]

def get_xnd_type(meta):
    """
    
    Parameters:
      meta: dict
    """
    _xnd_type_value = []
    
    if 'value_type' not in meta:  # it is a dictionary
        _xnd_type_template = '{{{}}}'
        for k, item in meta.items():
            _xnd_type_value.extend(_get_xnd_type_item(k, item))
    else:
        _xnd_type_template = '{}'
        _xnd_type_value.extend(_get_xnd_type_item(None, meta))
            
    return _xnd_type_template.format(', '.join(_xnd_type_value))

if run_tests:
    _meta1 = {
        'data': {
            'value_type': list, 
            'nullable': False, 
            'count': 1000, 
            'value': int}
    }
    _type1 = '{data: 1000 * int64}'
    assert get_xnd_type(_meta1) == _type1
    
    _meta2 = {
        'value_type': dict, 
        'count': 2,
        'nullable': False,
        'value': {
            'x': {'nullable': False, 'value_type': int},
            'y': {'nullable': False, 'value_type': int},
        }
    }
    _type2 = '2 * {x: int64, y: int64}'
    assert get_xnd_type(_meta2) == _type2

In [12]:
def json_normalize(data, meta=None):
    data = deepcopy(data)
    
    if meta is None:
        meta = json_meta_type(data)

    if 'value_type' not in meta:  # dictionary keys
        meta_value = meta
    elif meta['value_type'] is dict:
        meta_value = meta['value']
    else:
        meta_value = meta

    if isinstance(data, dict):
        for k_meta, item_meta in meta_value.items():
            if k_meta not in data:
                data[k_meta] = None
        for k_data, item_data in data.items():
            meta_k = meta[k_data]
            if 'value' in meta_k and isinstance(meta_k['value'], dict):
                data[k_data] = json_normalize(item_data, meta[k_data]['value'])
    elif isinstance(data, list):
        for k_meta, item_meta in meta_value.items():
            for k_data, item_data in enumerate(data):
                if item_data is not None and k_meta not in item_data:
                    if 'value' in item_meta and isinstance(item_meta['value'], dict):
                        data[k_data].update({k_meta: json_normalize({}, item_meta['value'])})
                    else:
                        data[k_data].update({k_meta: None})
                    
    return data

if run_tests:
    _data1 = [
        {'a': 1},
        {'b': 2},
    ]
    _result_expected1 = [
        {'a': 1, 'b': None},
        {'a': None, 'b': 2}
    ]
    assert json_normalize(_data1) == _result_expected1
    
    _data2 = [
        {'a': {'c': True}},
        {'b': {'d': False}},
    ]
    _result_expected2 = [
        {'a': {'c': True}, 'b': {'d': None}},
        {'a': {'c': None}, 'b': {'d': False}}
    ]
    _result2 = json_normalize(_data2)
    assert _result2 == _result_expected2
    
    _data3 = {
        'tree': {
            'leaf1': [1, 2],
            'leaf2': [2, 4]}, 
        'cars': [
            {'model': 'model1', 'year': 2010},
            {'model': 'model2', 'color': 'blue'},
            {'ports': {'numbers': 4}}
        ]
    }
    _result_expected3 = {
        'tree': {
            'leaf1': [1, 2],
            'leaf2': [2, 4]}, 
        'cars': [
            {'model': 'model1', 'year': 2010, 'ports': {'numbers': None}, 'color': None},
            {'model': 'model2', 'color': 'blue', 'year': None, 'ports': {'numbers': None}},
            {'ports': {'numbers': 4}, 'model': None, 'year': None, 'color': None}
        ]
    }
    _result3 = json_normalize(_data3)
    assert _result3 == _result_expected3