In [1]:
import argparse
import logging
import os
import sys

from redbaron import RedBaron

from toolz.curried import keyfilter

# Loading

In [2]:
source_dir = '../../openfisca-france/openfisca_france/model/'
filenames = []

for root, directories, files in os.walk(source_dir):    
    for filename in files:
        complete_filename = os.path.join(root, filename)
        assert complete_filename[:len(source_dir)] == source_dir
        complete_filename = complete_filename[len(source_dir):]
        filenames.append(complete_filename)
#filenames

In [3]:
filenames.remove('base.py')
filenames.remove('datatrees.py')
filenames.remove('prelevements_obligatoires/prelevements_sociaux/cotisations_sociales/preprocessing.py')

In [4]:
redbaron_trees = {}
for filename in filenames:
    with open(source_dir + filename) as source_file:
        source_code = source_file.read()
    red = RedBaron(source_code)
    redbaron_trees[filename] = red
    #print('{} parsed'.format(filename))

# Custom exceptions

In [5]:
angry_rbnode = None
angry_state = None

In [6]:
class ParsingException(Exception):
    def __init__(self, message, rbnode, s):
        global angry_rbnode
        global angry_state

        angry_rbnode = rbnode
        angry_state = s

        super(ParsingException, self).__init__(message)

In [7]:
class NotImplementedParsingError(ParsingException):
    pass

In [8]:
class AssertionParsingError(ParsingException):
    pass

In [9]:
def my_assert(cond, rbnode, s, message=''):
    if cond:
        return
    
    raise AssertionParsingError(message, rbnode, s)

# Helpers

In [10]:
import unicodedata

def rbnode_to_exception(rbnode):
    '''Because exceptions are ASCII only in python2'''
    str1 = rbnode.dumps() # unicode string wrongly known as 'str'
    str2 = unicode(str1, 'utf-8') # unicode string as unicode
    str3 = unicodedata.normalize('NFKD', str2).encode('ascii', 'ignore') # ignore special chars
    
    return str3

In [11]:
def parse_date(atomtrailer, s):
    my_assert(atomtrailer.type == 'atomtrailers', atomtrailer, s)
    my_assert(len(atomtrailer.value) == 2, atomtrailer, s)
    my_assert(atomtrailer.value[0].type == 'name', atomtrailer, s)
    my_assert(atomtrailer.value[0].value == 'date', atomtrailer, s)
    call_node = atomtrailer.value[1]
    my_assert(call_node.type == 'call', atomtrailer, s)
    my_assert(len(call_node.value) == 3, atomtrailer, s)
    my_assert(call_node.value[0].type == 'call_argument', atomtrailer, s)
    my_assert(not call_node.value[0].target, atomtrailer, s)
    my_assert(call_node.value[0].value.type == 'int', atomtrailer, s)
    year = call_node.value[0].value.value
    my_assert(call_node.value[1].type == 'call_argument', atomtrailer, s)
    my_assert(not call_node.value[1].target, atomtrailer, s)
    my_assert(call_node.value[1].value.type == 'int', atomtrailer, s)
    month = call_node.value[1].value.value
    my_assert(call_node.value[2].type == 'call_argument', atomtrailer, s)
    my_assert(not call_node.value[2].target, atomtrailer, s)
    my_assert(call_node.value[2].value.type == 'int', atomtrailer, s)
    day = call_node.value[2].value.value
    
    return {'year': year, 'month': month, 'day': day}

In [12]:
def parse_enum(atomtrailers, s):
    my_assert(atomtrailers.type == 'atomtrailers', rbnode, s)
    
    my_assert(len(atomtrailers.value) == 2, rbnode, s)
    my_assert(atomtrailers.value[0].type == 'name', rbnode, s)
    my_assert(atomtrailers.value[0].value == 'Enum', rbnode, s)
    
    call_node = atomtrailers.value[1]
    my_assert(call_node.type == 'call', rbnode, s)
    my_assert(len(call_node.value) == 1, rbnode, s)
    my_assert(call_node.value[0].type == 'call_argument', rbnode, s)
    my_assert(not call_node.value[0].target, rbnode, s)
    
    enum_list_node = call_node.value[0].value
    my_assert(enum_list_node.type == 'list', rbnode, s)
    
    enum_list = []
    for element in enum_list_node.value:
        my_assert(element.type == 'unicode_string', rbnode, s)
        enum_list.append(element.value)
        
    return enum_list

In [13]:
def parse_parameter_path(atomtrailers, first_index, s):
    my_assert(atomtrailers.type == 'atomtrailers', atomtrailers, s)
    
    parameter_path = []
    for i in range(first_index, len(atomtrailers.value)):
        path_component = atomtrailers.value[i]
        my_assert(path_component.type == 'name', rbnode, s)
        parameter_path.append(path_component.value)

    return parameter_path

# Module traversal functions

In [14]:
def visit_module_rbnode(rbnode, s):
    visitors = keyfilter(lambda key: key.startswith('visit_module_'), globals()) # should be defined once
    visitor = visitors.get('visit_module_' + rbnode.type)
    if visitor is None:
        raise NotImplementedParsingError(
            'Module visitor not declared for type="{type}"'.format(
                type=rbnode.type,
                ), rbnode, s)
    ofnode = visitor(rbnode, s)
    return ofnode

In [15]:
def visit_module_endl(rbnode, s):
    return

In [16]:
def visit_module_from_import(rbnode, s):
    # unmodified (TODO)
    s['imports'].append(rbnode)

In [17]:
def visit_module_import(rbnode, s):
    # unmodified (TODO)
    s['imports'].append(rbnode)

In [18]:
def visit_module_comment(rbnode, s):
    # comments are discarded for the moment (TODO)
    return

In [19]:
def visit_module_class(rbnode, s):
    name = rbnode.name
    
    if name == 'rsa_ressource_calculator':
        return
    
    my_assert(not rbnode.decorators, rbnode, s)
    
    upper_classes = []
    for upper_class in rbnode.inherit_from:
        my_assert(upper_class.type == 'name', rbnode, s)
        upper_classes.append(upper_class.value)
        
    class_obj = {
        'type': 'class',
        'name': name,
        'upper_classes': upper_classes,
        'content': rbnode.value,
        }

    s['classes'].append(class_obj)

In [20]:
def visit_module_def(rbnode, s):
    if rbnode.name in ['_revprim', 'preload_zone_apl']:
        return
    
    # unmodified (TODO)
    s['auxiliary_functions'].append(rbnode)

In [21]:
def visit_module_assignment(rbnode, s):
    my_assert(rbnode.operator == '', rbnode, s)
    
    my_assert(rbnode.target.type == 'name', rbnode, s)
    name = rbnode.target.value
    
    if name in ['zone_apl_by_depcom']:
        return
    
    if rbnode.value.type == 'int':
        s['constants'].append({
                'name': name,
                'type': 'int',
                'value': rbnode.value.value,
            })
        return
    
    if rbnode.value.type == 'name':
        my_assert(rbnode.value.value == 'None', rbnode, s)
        s['constants'].append({
                'name': name,
                'type': 'None',
                'value': None,
            })
        return
    
    if rbnode.value.type == 'atomtrailers':
        atomtrailers = rbnode.value

        my_assert(atomtrailers.value[0].type == 'name', rbnode, s)
        function_name = atomtrailers.value[0].value
        if function_name == 'Enum':
            enum_list = parse_enum(atomtrailers, s)

            s['enums'].append({
                'name': name,
                'enum_list': enum_list,
            })
            return

        if function_name == 'logging':
            # ignore logging
            return

        raise NotImplementedParsingError('Unknown atomtrailers', rbnode, s)

    raise NotImplementedParsingError('Unknown type', rbnode, s)


# Module parsing

In [22]:
parsed_modules = {}

for name in filenames:
    #print('Visiting ' + name)
    red = redbaron_trees[name]
    

    
    s = {
        'module_name': name,
        'imports': [],
        'classes': [],
        'enums': [],
        'auxiliary_functions': [],
        'constants': [],
        }
    
    for rbnode in red:
        visit_module_rbnode(rbnode, s)
        
    parsed_modules[name] = {
        'imports': s['imports'],
        'classes': s['classes'],
        'enums': s['enums'],
        'auxiliary_functions': s['auxiliary_functions'],
        'constants': s['constants'],
    }

# Class traversal functions

In [23]:
def visit_class_rbnode(rbnode, s):
    visitors = keyfilter(lambda key: key.startswith('visit_class_'), globals()) # should be defined once
    visitor = visitors.get('visit_class_' + rbnode.type)
    if visitor is None:
        raise NotImplementedParsingError(
            'Class visitor not declared for type="{type}"'.format(
                type=rbnode.type,
                ), rbnode, s)
    ofnode = visitor(rbnode, s)
    return ofnode

In [24]:
def visit_class_endl(rbnode, s):
    return

In [25]:
def visit_class_assignment(rbnode, s):
    my_assert(rbnode.operator == '', rbnode, s)
    
    my_assert(rbnode.target.type == 'name', rbnode, s)
    target = rbnode.target.value
    
    if target == 'column':
        if rbnode.value.type == 'atomtrailers':
            my_assert(len(rbnode.value.value) == 2, rbnode, s)

            my_assert(rbnode.value.value[0].type == 'name', rbnode, s)
            column_name = rbnode.value.value[0].value

            call_node = rbnode.value.value[1]
            my_assert(call_node.type == 'call', rbnode, s)
            column_args = {}
            for arg in call_node.value:
                my_assert(arg.target.type == 'name', rbnode, s)
                column_args[arg.target.value] = arg.value

            my_assert('column' not in s['class_variables'].keys(), rbnode, s)
            s['class_variables']['column'] = column_name
            s['class_variables']['column_args'] = column_args

        elif rbnode.value.type == 'name':
            column_name = rbnode.value.value

            my_assert('column' not in s['class_variables'].keys(), rbnode, s)
            s['class_variables']['column'] = column_name
        else:
            raise NotImplementedParsingError('Unknown type', rbnode, s)
    
    elif target == 'entity_class':
        my_assert(rbnode.value.type == 'name', rbnode, s)
        
        my_assert('entity_class' not in s, rbnode, s)
        s['entity_class'] = rbnode.value.value
              
    elif target == 'label':
        # can be unicode_string or string_chain ! (TODO)
        # my_assert(rbnode.value.type == 'unicode_string', rbnode, s)
        
        my_assert('label' not in s, rbnode, s)
        s['label'] = rbnode.value
        
    elif target == 'start_date':
        date = parse_date(rbnode.value, s)
        
        my_assert('start_date' not in s, rbnode, s)
        s['start_date'] = date
         
    elif target == 'stop_date':
        date = parse_date(rbnode.value, s)

        my_assert('stop_date' not in s, rbnode, s)
        s['stop_date'] = date
        
    elif target == 'url':
        # can be a tuple, see revnet (TODO)
        # my_assert(rbnode.value.type in ['string', 'unicode_string'], rbnode, s)
        
        my_assert('url' not in s, rbnode, s)
        s['url'] = rbnode.value.value
        
             
    elif target == 'operation':
        my_assert(rbnode.value.type == 'string', rbnode, s)
        
        my_assert('operation' not in s, rbnode, s)
        s['operation'] = rbnode.value.value
        
             
    elif target == 'variable':
        my_assert(rbnode.value.type == 'name', rbnode, s)
        
        my_assert('variable' not in s, rbnode, s)
        s['variable'] = rbnode.value.value
              
    elif target == 'cerfa_field':
        # my_assert(rbnode.value.type == 'unicode_string', rbnode, s)
        # can be a unicode string or a dict
        
        my_assert('cerfa_field' not in s, rbnode, s)
        s['cerfa_field'] = rbnode.value
                
    elif target == 'is_permanent':
        my_assert(rbnode.value.type == 'name', rbnode, s)
        my_assert(rbnode.value.value in ['True', 'False'], rbnode, s)        
        
        my_assert('is_permanent' not in s, rbnode, s)
        s['is_permanent'] = rbnode.value.value == 'True'
    
    elif target == 'base_function':
        my_assert(rbnode.value.type == 'name', rbnode, s)
        
        my_assert('base_function' not in s, rbnode, s)
        s['base_function'] = rbnode.value.value
              
    elif target == 'calculate_output':
        my_assert(rbnode.value.type == 'name', rbnode, s)
        
        my_assert('calculate_output' not in s, rbnode, s)
        s['calculate_output'] = rbnode.value.value
              
    elif target == 'set_input':
        my_assert(rbnode.value.type == 'name', rbnode, s)
        
        my_assert('set_input' not in s, rbnode, s)
        s['set_input'] = rbnode.value.value
        
    elif target == 'role':
        my_assert(rbnode.value.type == 'name', rbnode, s)
        
        my_assert('role' not in s, rbnode, s)
        s['role'] = rbnode.value.value
              

    else:            
        raise NotImplementedParsingError('Unknown class variable {}'.format(target), rbnode, s)
            


In [26]:
def visit_class_def(rbnode, s):
    name = rbnode.name
    
    decorators = rbnode.decorators

    arguments = []
    for arg in rbnode.arguments:
        my_assert(arg.type == 'def_argument', rbnode, s)
        my_assert(arg.target.type == 'name', rbnode, s)
        arguments.append(arg.target.value)
        my_assert(not arg.value, rbnode, s)
    
    instructions = rbnode.value # unmodified (TODO)
    
    my_assert(name not in s['class_functions'], rbnode, s)
    s['class_functions'][name] = {
        'arguments': arguments,
        'decorators': decorators,
        'instructions': instructions,
    }

In [27]:
def visit_class_comment(rbnode, s):
    # ignored (TODO)
    return

In [28]:
def visit_class_string(rbnode, s):
    # ignored (TODO)
    return

# Class parsing

In [29]:
parsed_classes = {}

for module_name, module in parsed_modules.items():
    # print('Visiting module {} to parse its classes.'.format(module_name))
    
    parsed_classes[module_name] = {
        'parsed_classes': {},
    }
    
    for cl in module['classes']:
        class_name = cl['name']
        # print('Visiting class {}'.format(class_name))
        
        s = {
            'keyword': 'class',
            'class_name': name,
            'class_variables': {},
            'class_functions': {},
            }
    
        for rbnode in cl['content']:
            visit_class_rbnode(rbnode, s)

        parsed_classes[module_name]['parsed_classes'][class_name] = {
            'class_variables': s['class_variables'],
            'class_functions': s['class_functions'],
            }

# Function traversal visitors

In [73]:
def visit_function_rbnode(rbnode, s):
    visitors = keyfilter(lambda key: key.startswith('visit_function_'), globals()) # should be defined once
    visitor = visitors.get('visit_function_' + rbnode.type)
    if visitor is None:
        raise NotImplementedParsingError(
            'Function visitor not declared for type="{type}"'.format(
                type=rbnode.type,
                ), rbnode, s)
    ofnode = visitor(rbnode, s)
    
    do tests here !
    
    return ofnode

SyntaxError: invalid syntax (<ipython-input-73-6124b7b30e9d>, line 11)

In [31]:
def visit_function_endl(rbnode, s):
    return

In [53]:
def visit_function_assignment(rbnode, s):
    my_assert(s['keyword'] == 'function', rbnode, s)
    
    my_assert(rbnode.target.type == 'name', rbnode, s)
    name = rbnode.target.value
    
    child_state = {
        'keyword': 'expression',
        'local_variables': s['local_variables'],
    }
    visit_function_rbnode(rbnode.value, child_state)
    if 'var_tmp' not in child_state:
        raise ParsingError('No var_tmp after expression parsing', rbnode, s)
    s['local_variables'][name] = child_state['var_tmp']

In [49]:
def visit_function_atomtrailers(rbnode, s):
    my_assert(s['keyword'] == 'expression', rbnode, s, s['keyword'])

    child_state = {
        'keyword': 'expression',
        'local_variables': s['local_variables'],
    }
    visit_function_rbnode(rbnode.value[0], child_state)
    base = child_state['var_tmp']

    for i in range(1, len(rbnode.value)):
        child_state = {
            'keyword': 'atomtrailers',
            'local_variables': s['local_variables'],
            'atomtrailers_base': base,
        }
        visit_function_rbnode(rbnode.value[i], child_state)
        base = child_state['atomtrailers_new_base']
        
    s['var_tmp'] = base

In [34]:
def visit_function_binary_operator(rbnode, s):
    my_assert(s['keyword'] == 'expression', rbnode, s)

    op = rbnode.value
    
    parsed_args = []
    for arg in [rbnode.first, rbnode.second]:
        child_state = {
            'keyword': 'expression',
            'local_variables': s['local_variables'],
        }
        visit_function_rbnode(arg, child_state)
        parsed_arg = child_state['var_tmp']
        parsed_args.append(parsed_arg)

    var_tmp = {
        'type': 'value', 
        'nodetype': 'arithmetic_operation', 
        'op': op, 
        'operands': parsed_args,
    }
    s['var_tmp'] = var_tmp
    return


In [35]:
def visit_function_name(rbnode, s):
    name = rbnode.value

    if s['keyword'] == 'expression':
        if name in s['local_variables']:
            s['var_tmp'] = s['local_variables'][name]
            return

        if name == 'period': 
            var_tmp = {
                'type': 'period', 
                'nodetype': 'builtin-period'
            }
            s['var_tmp'] = var_tmp
            return

        if name == 'simulation':
            var_tmp = {
                'type': 'simulation'
            }
            s['var_tmp'] = var_tmp
            return

        if name == 'self':
            var_tmp = {
                'type': 'self'
            }
            s['var_tmp'] = var_tmp
            return        

        if name in ['round', 'sum', 'not_']:
            var_tmp = {
                'type': 'arithmetic_operation_tmp', 
                'nodetype': 'arithmetic_operation_tmp', 
                'op': name, 
            }
            s['var_tmp'] = var_tmp
            return

        if name in ['CHEF', 'CONJ', 'CREF', 'ENFS', 'PAC1', 'PAC2', 'PAC3', 'PART', 'PREF', 'VOUS']:
            var_tmp = {
                'type': 'role',
                'name': name,
            }
            s['var_tmp'] = var_tmp
            return

        if name == 'apply_thresholds':
            # to deal with specifically (TODO)
            var_tmp = {
                'type': 'apply_thresholds_tmp', 
                'nodetype': 'apply_thresholds', 
            }
            s['var_tmp'] = var_tmp
            return

        raise NotImplementedParsingError('Unknown name {}'.format(name), rbnode, s)
        
    if s['keyword'] == 'atomtrailers':
        base = s['atomtrailers_base']
        
        if base['type'] == 'period':
            period_op = name
            if period_op in ['this_month', 'n_2']:
                var_tmp = {
                    'type': 'period', 
                    'nodetype': 'period-operation', 
                    'operator': period_op, 
                    'operands': [base],
                }
                s['atomtrailers_new_base'] = var_tmp
                return

            if period_op == 'start':
                var_tmp = {
                    'type': 'instant', 
                    'nodetype': 'period-to-instant', 
                    'operator': 'start', 
                    'operands': [base],
                }
                s['atomtrailers_new_base'] = var_tmp
                return

            raise NotImplementedParsingError('Unknown period operand', rbnode, s)
            
        if base['type'] == 'simulation':
            simulation_op = name
            if simulation_op in ['calculate', 'compute', 'calculate_add']:
                var_tmp = {
                    'type': 'simulation_operation_tmp', 
                    'nodetype': 'simulation_operation_tmp', 
                    'operator': simulation_op, 
                }
                s['atomtrailers_new_base'] = var_tmp
                return

            if simulation_op == 'legislation_at':
                var_tmp = {
                    'type': 'legislation_at_tmp', 
                    'nodetype': 'legislation_at_tmp', 
                }
                s['atomtrailers_new_base'] = var_tmp
                return

            raise NotImplementedParsingError('Unknown simulation op.', rbnode, s)

        if base['type'] == 'self':
            self_op = name
            if self_op in ['split_by_roles', 'sum_by_entity', 'filter_role']:
                var_tmp = {
                    'type': 'self_operation_tmp', 
                    'nodetype': 'self_operation_tmp', 
                    'operator': self_op, 
                }
                s['atomtrailers_new_base'] = var_tmp
                return

            raise NotImplementedParsingError('Unknown self op {}.'.format(self_op), rbnode, s)

        if base['type'] == 'parameter':
            var_tmp = {
                'type': 'value',
                'nodetype': 'parameter',
                'instant': base['instant'], 
                'path': base['path'] + [name],
            }
            s['atomtrailers_new_base'] = var_tmp
            return
  
        if base['type'] == 'apply_thresholds_tmp':
            var_tmp = {
                'type': 'value', 
                'nodetype': 'apply_thresholds', 
                'rbnode': rbnode,
            }
            s['atomtrailers_new_base'] = var_tmp
            return

        raise NotImplementedParsingError('Unknown name {}'.format(name), rbnode, s)

    raise NotImplementedParsingError('Wrong keyword {}'.format(name), rbnode, s)


In [36]:
def visit_function_int(rbnode, s):
    my_assert(s['keyword'] == 'expression', rbnode, s)

    var_tmp = {
        'type': 'value', 
        'nodetype': 'int', 
        'value': rbnode.value, 
    }
    s['var_tmp'] = var_tmp
    return


In [37]:
def visit_function_float(rbnode, s):
    my_assert(s['keyword'] == 'expression', rbnode, s)

    var_tmp = {
        'type': 'value', 
        'nodetype': 'float', 
        'value': rbnode.value, 
    }
    s['var_tmp'] = var_tmp
    return


In [38]:
def visit_function_associative_parenthesis(rbnode, s):
    my_assert(s['keyword'] == 'expression', rbnode, s)

    visit_function_rbnode(rbnode.value, s)

In [39]:
def visit_function_return(rbnode, s):
    my_assert(s['keyword'] == 'function', rbnode, s)

    my_assert(rbnode.value.type == 'tuple', rbnode, s)
    
    returned_tuple = rbnode.value
    my_assert(len(returned_tuple.value) ==  2, rbnode, s)

    rb_period = returned_tuple.value[0]
    child_state = {
        'keyword': 'expression',
        'local_variables': s['local_variables'],
    }
    visit_function_rbnode(rb_period, child_state)
    returned_period = child_state['var_tmp']
    my_assert(returned_period['type'] == 'period', rbnode, s)

    rb_value = returned_tuple.value[1]
    child_state = {
        'keyword': 'expression',
        'local_variables': s['local_variables'],
    }
    visit_function_rbnode(rb_value, child_state)
    returned_value = child_state['var_tmp']
    my_assert(returned_value['type'] == 'value', rbnode, s)

    returned_value = {
        'type': 'return', 
        'nodetype': 'return', 
        'period': returned_period, 
        'value': returned_value,
    }
    my_assert('return' not in s, rbnode, s)
    s['return'] = returned_value
    return

    


In [40]:
def visit_function_comparison(rbnode, s):
    my_assert(s['keyword'] == 'expression', rbnode, s)

    my_assert(rbnode.value.type == "comparison_operator", rbnode, s)
    op = rbnode.value.first
    my_assert(not rbnode.value.second, rbnode, s)

    parsed_args = []
    for arg in [rbnode.first, rbnode.second]:
        child_state = {
            'keyword': 'expression',
            'local_variables': s['local_variables'],
        }
        visit_function_rbnode(arg, child_state)
        parsed_arg = child_state['var_tmp']
        parsed_args.append(parsed_arg)

    var_tmp = {
        'type': 'value', 
        'nodetype': 'arithmetic_operation', 
        'op': op, 
        'operands': parsed_args,
    }
    s['var_tmp'] = var_tmp
    return


In [41]:
def visit_function_comment(rbnode, s):
    # ignored (TODO)
    return

In [42]:
def visit_function_list(rbnode, s):
    my_assert(s['keyword'] == 'expression', rbnode, s)

    # ignored (TODO)
    var_tmp = {
        'type': 'list', 
        'nodetype': 'list', 
        'rbnode': rbnode,
    }
    s['var_tmp'] = var_tmp
    return


In [43]:
def visit_function_for(rbnode, s):
    my_assert(s['keyword'] == 'function', rbnode, s)

    # ignored (TODO)
    return

In [44]:
def visit_function_string(rbnode, s):
    # ignored (TODO)
    return

In [71]:
def visit_function_call(rbnode, s):
    my_assert(s['keyword'] == 'atomtrailers', rbnode, s)
        
    arg_list = []
    arg_dict = {}
    
    no_target = True
    args = rbnode.value
    for arg in args:
        my_assert(arg.type == 'call_argument', rbnode, s)
        if arg.target:
            no_target = False
            my_assert(arg.target.type == 'name', rbnode, s)
            target = arg.target.value
        else:
            my_assert(no_target, rbnode, s)

        child_state = {
            'keyword': 'expression',
            'local_variables': s['local_variables'],
        }
        visit_function_rbnode(arg.value, child_state)
        if 'var_tmp' not in child_state:
            raise ParsingException('No var_tmp after expression parsing ({})'.format(child_state), rbnode, s)
        value = child_state['var_tmp']
        
        if arg.target:
            arg_dict[target] = value
        else:
            arg_list.append(value)
    

    base = s['atomtrailers_base']
    
    if base['type'] == 'simulation':
        if simulation_op in ['calculate', 'compute', 'calculate_add']:
            my_assert(len(arg_list) in [1, 2], rbnode, s)
            my_assert(not arg_dict, rbnode, s)

            my_assert(arg_list[0]['type'] == 'string', rbnode, s)
            called_var = arg_list[0]['value']
            operands = [called_var]

            if len(arg_list) == 2:
                arg_period = arg_list[1]
                my_assert(arg_period['type'] == 'period', rbnode, s)
                operands.append(arg_period)

            tmp_var = {
                'type': 'value', 
                'nodetype': 'variable_for_period', 
                'operator': simulation_op, 
                'operands': operands,
            }
            s['atomtrailers_new_base'] = tmp_var
            return

        if simulation_op == 'legislation_at':
            my_assert(len(arg_list) == 1, rbnode, s)
            my_assert(len(arg_dict) == 0, rbnode, s)

            instant_arg = arg_list[0]
            my_assert(instant_arg['type'] == 'instant', rbnode, s)

            tmp_var = {
                'type': 'parameter', 
                'nodetype': 'parameter', 
                'instant': instant_arg, 
                'path': [],
            }
            s['atomtrailers_new_base'] = tmp_var
            return

        raise NotImplementedParsingError('Unknown simulation op {}'.format(simulation_op), rbnode, s)
    

    if base['type'] == 'self_operation_tmp':
        self_op = base['self_op']

        if self_op in ['split_by_roles', 'sum_by_entity', 'filter_role']:
            # no assertion about args (TODO)
            tmp_var = {
                'type': 'value', 
                'nodetype': 'self_operation', 
                'operator': self_op, 
                'arg_list': arg_list,
                'arg_dict': arg_dict,
            }
            s['atomtrailers_new_base'] = tmp_var
            return

        raise NotImplementedParsingError('Unknown self_operation_tmp {}'.format(simulation_op), rbnode, s)

    if base['type'] == 'arithmetic_operation_tmp':
        op = base['op']
        
        if op in ['round', 'sum', 'not_']:
            # no assertion about args (TODO)
            tmp_var = {
                'type': 'value', 
                'nodetype': 'arithmetic_operation', 
                'op': base['op'], 
                'arg_list': arg_list,
                'arg_dict': arg_dict,
            }
            s['atomtrailers_new_base'] = tmp_var
            return

        raise NotImplementedParsingError('Unknown arithmetic_operation_tmp.', rbnode, s)

    raise NotImplementedParsingError('Unknown caller', rbnode, s)

# Function parsing

In [72]:
parsed_functions = {}

for module_name, module in parsed_classes.items():
    print('Visiting module {}'.format(module_name))
    
    parsed_functions[module_name] = {
        'classes': {},
    }
    
    for class_name, cl in module['parsed_classes'].items():
        print('Visiting class {} to parse its function(s)'.format(class_name))
        
        parsed_functions[module_name]['classes'][class_name] = {
            'parsed_functions': {},
        }
        
        for function_name, fn in cl['class_functions'].items():
            print('Visiting function {}'.format(function_name))
            
            s = {
                'keyword': 'function',
                'module_name': module_name,
                'class_name': class_name,
                'function_name': function_name,
                'local_variables': {},
            }
            
            for rbnode in fn['instructions']:
                visit_function_rbnode(rbnode, s)

            parsed_functions[module_name]['classes'][class_name]['parsed_functions'][function_name] = {
                'return': s['return'],
                }


Visiting module prestations/education.py
Visiting class boursier to parse its function(s)
Visiting class bourse_college to parse its function(s)
Visiting function function


ParsingException: No var_tmp after expression parsing ({'keyword': 'expression', 'local_variables': {'period': {'nodetype': 'period-operation', 'type': 'period', 'operator': 'this_month', 'operands': [{'nodetype': 'builtin-period', 'type': 'period'}]}}})

In [69]:
angry_rbnode

In [70]:
angry_rbnode.help()

[38;5;148mCallNode[39m[38;5;197m([39m[38;5;197m)[39m
[38;5;15m  [39m[38;5;242m# identifiers: call, call_, callnode[39m
[38;5;15m  [39m[38;5;15mvalue[39m[38;5;15m [39m[38;5;197m->[39m
[38;5;15m    [39m[38;5;197m*[39m[38;5;81m CallArgumentNode[39m[38;5;197m([39m[38;5;197m)[39m
[38;5;15m        [39m[38;5;242m# identifiers: call_argument, call_argument_, callargument, callargumentnode[39m
[38;5;15m        [39m[38;5;15mtarget[39m[38;5;15m [39m[38;5;197m->[39m
[38;5;15m          [39m[38;5;186mNone[39m
[38;5;15m        [39m[38;5;15mvalue[39m[38;5;15m [39m[38;5;197m->[39m
[38;5;15m          [39m[38;5;148mStringNode[39m[38;5;197m([39m[38;5;197m)[39m[38;5;15m [39m[38;5;197m...[39m
[38;5;15m    [39m[38;5;197m*[39m[38;5;81m CallArgumentNode[39m[38;5;197m([39m[38;5;197m)[39m
[38;5;15m        [39m[38;5;242m# identifiers: call_argument, call_argument_, callargument, callargumentnode[39m
[38;5;15m        [39m[38;5;15mtarg

In [None]:
angry_state

In [56]:
d = {'a':3}

In [58]:
len(d)

1

In [None]:
parsed_classes['prelevements_obligatoires/impot_revenu/reductions_impot.py']['parsed_classes'][
    'patnat']['class_functions']['function_20100101_20101231']['instructions']