In [34]:
import os
import gdal
import ogr
import fiona
import shapely
import logging

from osm2dh import filter_features, check_data_model, create_feature, multi2single_geoms, toUTM
# from DFlowFM_tools import write_layer

osm_fn = r'd:\OneDrive\projects\1230843_Challenge_Fund\training_folder\data\OpenStreetMap\manzese.osm'
# osm_fn = r'c:\Users\hcwin\OneDrive\projects\1230843_Challenge_Fund\training_folder\data\OpenStreetMap\manzese.osm'
key = 'tunnel'
value = 'culvert'
layer_index = 1
check_keys = {
                'width': float,
                'depth': float,
                'covered': str,
                'layer': int,
                'diameter': float
                }
prop_types = {}
for field in check_fields.keys():
    prop_types[field] = check_fields[field].__name__
check_values = {'width': [2, 3],
               'covered': ['yes', 'dirt', 'garbage'],
               'layer': [-1],
               'diameter': [],
               }
schema = {
          'geometry': 'LineString',  # TODO find geometry type in layer
          'properties': prop_types,
         }
check_fields


{'covered': str,
 'depth': float,
 'diameter': float,
 'layer': int,
 'width': float}

## functions to check data type and value restrictions in a data model

In [49]:
def check_Ftype(ftype, value):
    try:
        v_out = ftype(value)  # parse
        if isinstance(v_out, str):
            v_out = v_out.lower()
    except:
        v_out = None
    return v_out

def _check_data_model(check_fields, check_values, fieldName, v):
    """
    Checks if value in a given field name is according to a given data model,
    consisting of a mandated data type and allowed values
    
    """
    if not(fieldName in check_fields) and not(fieldName in check_values):
        # nothing to be checked, so return with a flag=None
        return v, None
    
    if (fieldName in check_fields) and (v is not None):
        # field value should have a mandated data type, check valididty
        ftype = check_fields[fieldName]
        v_checked = check_Ftype(ftype, v)
        if v_checked:  # if not an empty string or None is returned
            flag = 0  # there is data, we assume in the right data model if any
            if fieldName in check_values:
                # check on data model needed
                if not((v_checked in check_values[fieldName]) or (len(check_values[fieldName]) == 0)):
                    flag = 1  # there is data, but value not in range
            
        else:
            v_checked = ''
            if v:
                flag = 2  # there is data, but wrong data type
            else:
                flag = 3  # there is no data (v = '')
        
    else:
        # only flags 0, 1 and 3 are possible
        # field value should only follow a valid value range
        flag = 0  # first assume value is within range, then check and update
        if isinstance(v, str):
            v_checked = v.lower()
        else:
            v_checked = v
        if not((v_checked in check_values[fieldName]) or (len(check_values[fieldName]) == 0)):
            flag = 1  # there is data, but value not in range
        if v_checked is None: # v should be checked, but is None, and 
            flag = 3
    return v_checked, flag


In [3]:
# checks on a typical float

print 'Right data entered:', _check_data_model(check_fields, check_values, 'width', 3.0)
print 'Wrong data entered, but correct data model:', _check_data_model(check_fields, check_values, 'width', -1)
print 'Data entered in incorrect data type:', _check_data_model(check_fields, check_values, 'width', '3meter')
print 'No data entered with empty string', _check_data_model(check_fields, check_values, 'width', '')
print 'No data entered with nothing:', _check_data_model(check_fields, check_values, 'width', None)



Right data entered: (3.0, 0)
Wrong data entered, but correct data model: (-1.0, 1)
Data entered in incorrect data type: ('', 2)
No data entered with empty string ('', 3)
No data entered with nothing: (None, 3)


In [4]:
# checks on a key/value that is not in the data model at all

print 'Data entered (float) not in data model (flag=None expected):', _check_data_model(check_fields, check_values, 'some_key', 3.0)
print 'Data entered (string) not in data model (flag=None expected):', _check_data_model(check_fields, check_values, 'some_other_key', 'some randomness')
print 'None entered not in data model', _check_data_model(check_fields, check_values, 'yet_another_key', None)



Data entered (float) not in data model (flag=None expected): (3.0, None)
Data entered (string) not in data model (flag=None expected): ('some randomness', None)
None entered not in data model (None, None)


In [5]:
# checks on a typical string

print 'Right data entered with a capital (only lower case is used, flag=0):', _check_data_model(check_fields, check_values, 'covered', 'Yes')
print 'Right data entered with small case (flag=0):', _check_data_model(check_fields, check_values, 'covered', 'yes')
print 'Right data entered with second choice (flag=0):', _check_data_model(check_fields, check_values, 'covered', 'DIRT')
print 'Wrong data entered, but correct data model (flag=1):', _check_data_model(check_fields, check_values, 'covered', 'something')
print 'Data entered in incorrect data type (string is very interpretable, so flag=1):', _check_data_model(check_fields, check_values, 'covered', 2)
print 'No data entered with empty string (flag=3)', _check_data_model(check_fields, check_values, 'covered', '')
print 'No data entered with nothing (flag=3):', _check_data_model(check_fields, check_values, 'covered', None)



Right data entered with a capital (only lower case is used, flag=0): ('yes', 0)
Right data entered with small case (flag=0): ('yes', 0)
Right data entered with second choice (flag=0): ('dirt', 0)
Wrong data entered, but correct data model (flag=1): ('something', 1)
Data entered in incorrect data type (string is very interpretable, so flag=1): ('2', 1)
No data entered with empty string (flag=3) ('', 3)
No data entered with nothing (flag=3): (None, 3)


In [2]:
# here we check if the new splitted functions result in the same dataset as the combined messy function filter_features_old
all_features = filter_features(osm_fn, key=key, value=value,
                               layer_index=layer_index, wgs2utm=False, logger=logging)

In [30]:
feats_checked = check_data_model(all_features, check_keys=check_fields, check_values=check_values, schema=schema, 
                     keep_original=True, logger=logging)


In [4]:
from fiona import crs
import numpy as np
def write_layer(db, layer_name, data, write_mode='w', format='ESRI Shapefile', schema=schema, crs=crs.from_epsg(4326), logger=logging):
    """
    write fiona gis file

    Args:
        db: filename or database
        layer_name: layer name
        data: list of dictionaries with 'geometry' (containing shapely geom) and 'properties' (containing dictionary of attributes)
        crs: fiona.crs object (default epsg:4326)

    Returns:
        Nothing, only a written file

    """
    # Define a polygon feature geometry with one attribute
    # prepare properties
    template = data[0]

    # try remove existing file if exists, otherwise fiona dies without warning when writing
    try:
        if write_mode == 'w':
            if os.path.isfile(db):
                os.unlink(db)
                if db.endswith('.shp'):
                    [os.unlink('{}{}'.format(db[:-3], ext)) for ext in ['cpg', 'dbf', 'prj', 'shx']
                     if os.path.isfile('{}{}'.format(db[:-3], ext))]
    except WindowsError, e:
        logger.error(e)
    if schema is None:
        # try to make a schema based upon the first feature in the list of features (should not contain 'None'!)
        props = {}
        for key in template['properties'].keys():
            if isinstance(template['properties'][key], np.generic):
                prop_type = type(np.asscalar(template['properties'][key])).__name__
            else:
                prop_type = type(template['properties'][key]).__name__

            props[key] = prop_type
        schema = {
                  'geometry': type(template['geometry']).__name__,  # ['type']
                  'properties': props,
                  }
    # Write a new Shapefile
    with fiona.open(db, write_mode, format, schema, layer=layer_name, crs=crs) as c:
        for o in data:
            o['geometry'] = shapely.geometry.mapping(o['geometry'])
            c.write(o)
    logger.info('file successfully written to {}'.format(db))
    return


In [5]:
fn_out = 'culvert_new.json'
prop_with_flags = {}
for prop in prop_types.keys():
    prop_with_flags[prop] = prop_types[prop]
    prop_with_flags[prop + '_flag'] = 'int'

schema_flag = {
          'geometry': 'LineString',  # TODO find geometry type in layer
          'properties': prop_with_flags,
         }
write_layer(fn_out, None, feats_checked,
                   format='GeoJSON', write_mode='w', crs=fiona.crs.from_epsg(4326), schema=schema_flag, logger=logging)

In [7]:
culverts_all = filter_features(osm_fn, key='tunnel', value='culvert',
                               layer_index=layer_index, logger=logging)
culvert_new = check_data_model(culverts_all, check_keys=check_fields, check_values=check_values, schema=schema,
                     keep_original=False, logger=logging)

## Reporting

In [31]:
import pandas as pd
total_features = len(feats_checked)
validation_report = {}
for key in schema['properties']:
    key_flag = key + '_flag'
    # retrieve amount of values of 0, 1, 2, 3
    flag = [feat['properties'][key_flag] for feat in feats_checked]
    validation_report[key] = [flag.count(0),
                              flag.count(1),
                              flag.count(2),
                              flag.count(3),
                             ]
#     validation_report['index_names'] = ['correct', 'invalid value', 'invalid data type', 'missing value']
df = pd.DataFrame(validation_report)
df.index = ['correct', 'invalid value', 'invalid data type', 'missing value']
df.index.name = 'validation'
#     data = {'Column 1'     : [1., 2., 3., 4.],
#         'Index Title'  : ["Apples", "Oranges", "Puppies", "Ducks"]}




    

In [33]:
df.to_excel('test.xlsx')



In [29]:
feats_checked[0]


{'geometry': <shapely.geometry.linestring.LineString at 0x4241940>,
 'properties': {'covered': 'yes',
  'covered_flag': 0,
  'depth': None,
  'depth_flag': None,
  'diameter': 1.0,
  'diameter_flag': None,
  'layer': -1,
  'layer_flag': None,
  'width': None,
  'width_flag': 3}}