In [551]:
import numpy as np
import pandas as pd
import json
from pandas.io.json import json_normalize 
import logging
from functools import reduce

## Configuration 

In [1056]:
#etl = 'kobo2elastic'
etl = 'curis2elastic'
#etl = 'oldcuris2newcuris'

input_schema_file = ''
input_data_file = ''
mapping_file = ''
    
if etl == 'curis2elastic':
    #old curis to elasticsearch
    input_schema_file = 'schema/input/curisSchema.1-item.json'
    input_data_file = 'data/curisData.1-items.json'
    mapping_file = 'schema/map/couchbase2elastic.map.csv'
elif etl == 'kobo2elastic':
    #kobo to elasticsearch
    input_schema_file = 'schema/input/aqmSchema.complete.json'
    input_data_file = 'data/aqmData.2-items.json'
    mapping_file = 'schema/map/kobo2elastic.map.csv'
elif etl == 'oldcuris2newcuris':
    #kobo to elasticsearch
    input_schema_file = 'schema/input/curisData.1-Schema.avro.json'
    input_data_file = 'data/curisData.1-items.json'
    mapping_file = 'schema/map/kobo2elastic.map.csv'

## Get input JSON SChema

In [1059]:
with open(input_schema_file) as f:
    d = json.load(f)

#d['items']['fields']
schema_df = pd.DataFrame()
schema_df = json_normalize(d)
schema_df

Unnamed: 0,$id,$schema,items.$id,items.properties.demographics.$id,items.properties.demographics.properties.active.$id,items.properties.demographics.properties.active.title,items.properties.demographics.properties.active.type,items.properties.demographics.properties.address.$id,items.properties.demographics.properties.address.items.$id,items.properties.demographics.properties.address.items.properties.add_date.$id,...,items.properties.demographics.properties.deceased.properties.year.title,items.properties.demographics.properties.deceased.properties.year.type,items.properties.demographics.properties.deceased.title,items.properties.demographics.properties.deceased.type,items.properties.demographics.title,items.properties.demographics.type,items.title,items.type,title,type
0,http://example.com/root.json,http://json-schema.org/draft-07/schema#,#/items,#/items/properties/demographics,#/items/properties/demographics/properties/active,The Active Schema,boolean,#/items/properties/demographics/properties/add...,#/items/properties/demographics/properties/add...,#/items/properties/demographics/properties/add...,...,The Year Schema,string,The Deceased Schema,object,The Demographics Schema,object,The Items Schema,object,The Root Schema,array


In [1061]:
flattenDict(d)

{'$schema': 'http://json-schema.org/draft-07/schema#',
 '$id': 'http://example.com/root.json',
 'type': 'array',
 'title': 'The Root Schema',
 'items.$id': '#/items',
 'items.type': 'object',
 'items.title': 'The Items Schema',
 'items.properties.demographics.$id': '#/items/properties/demographics',
 'items.properties.demographics.type': 'object',
 'items.properties.demographics.title': 'The Demographics Schema',
 'items.properties.demographics.properties.awh_id.$id': '#/items/properties/demographics/properties/awh_id',
 'items.properties.demographics.properties.awh_id.type': 'string',
 'items.properties.demographics.properties.awh_id.title': 'The Awh_id Schema',
 'items.properties.demographics.properties.awh_id.pattern': '^(.*)$',
 'items.properties.demographics.properties.active.$id': '#/items/properties/demographics/properties/active',
 'items.properties.demographics.properties.active.type': 'boolean',
 'items.properties.demographics.properties.active.title': 'The Active Schema',
 '

## Get valid index List only (for JSON Schema Only)

In [1062]:
validIndexLists = []
for indexList in list(schema_df):
    if "._" not in indexList:
        if len(indexList.split(sep='.')) > 2:
            if indexList.split(sep='.')[-1] == 'type' or indexList.split(sep='.')[-1] == 'title':
                validIndexLists.append(indexList)

In [1063]:
required_field_df = pd.DataFrame()
required_field_df = schema_df[validIndexLists]
required_field_df.T

Unnamed: 0,0
items.properties.demographics.properties.active.title,The Active Schema
items.properties.demographics.properties.active.type,boolean
items.properties.demographics.properties.address.items.properties.add_date.title,The Add_date Schema
items.properties.demographics.properties.address.items.properties.add_date.type,string
items.properties.demographics.properties.address.items.properties.commnty.title,The Commnty Schema
items.properties.demographics.properties.address.items.properties.commnty.type,string
items.properties.demographics.properties.address.items.properties.country.title,The Country Schema
items.properties.demographics.properties.address.items.properties.country.type,string
items.properties.demographics.properties.address.items.properties.is_recent.title,The Is_recent Schema
items.properties.demographics.properties.address.items.properties.is_recent.type,boolean


## Clean index

In [1065]:
def clean_value(x):
    return x.lower().replace("/", ".").replace("the", "").replace("schema", "").strip()

def clean_index(x):
    return x.lower().replace("/", ".").replace(".properties", "").replace("items.", "").strip()

newSchema_df = pd.DataFrame()
newSchema_df['value'] = required_field_df.T[0].apply(clean_value)
newSchema_df.reset_index(level=0, inplace=True)
newSchema_df['index'] = newSchema_df['index'].apply(clean_index)
newSchema_df

Unnamed: 0,index,value
0,demographics.active.title,active
1,demographics.active.type,boolean
2,demographics.address.add_date.title,add_date
3,demographics.address.add_date.type,string
4,demographics.address.commnty.title,commnty
5,demographics.address.commnty.type,string
6,demographics.address.country.title,country
7,demographics.address.country.type,string
8,demographics.address.is_recent.title,is_recent
9,demographics.address.is_recent.type,boolean


In [557]:
valueSchema_df = pd.DataFrame()

number = len(newSchema_df)
index = 0
counter = 0
counter1 = 1

array_key = []
array_type = []

while (counter < number):
    array_key.append(newSchema_df.iloc[counter]['index'].replace('.title',''))
    counter += 2
        
while (counter1 < number):
    array_type.append(newSchema_df.iloc[counter1]['value'])
    counter1 += 2

valueSchema_df['source_key'] = array_key
valueSchema_df['source_type'] = array_type


#type(valueSchema_df['source_type'][6])
#valueSchema_df.head(10)
valueSchema_df = valueSchema_df.sort_values(['source_key']).reset_index(drop=True)
valueSchema_df

Unnamed: 0,source_key,source_type
0,demographics,object
1,demographics.active,boolean
2,demographics.address,object
3,demographics.address,array
4,demographics.address.add_date,string
5,demographics.address.commnty,string
6,demographics.address.country,string
7,demographics.address.is_recent,boolean
8,demographics.address.zip,integer
9,demographics.awh_id,string


## Flatten Dict

In [1053]:
def flattenDict(d, result=None):
    if result is None:
        result = {}
    for key in d:
        value = d[key]
        if isinstance(value, str):
            if "\n" in value:
                print('----------------',value) 
                value = value.replace("\n", ' and ')
                print('----------------',value) 

        if isinstance(value, dict):
        #if d['type'] =='object':
            value1 = {}
            for keyIn in value:
                value1[".".join([key,keyIn])]=value[keyIn]
            flattenDict(value1, result)
        elif isinstance(value, (list, tuple)):   
            for indexB, element in enumerate(value):
                if isinstance(element, dict):
                    value1 = {}
                    index = 0
                    for keyIn in element:
                        newkey = ".".join([key,keyIn])        
                        value1[".".join([key,keyIn])]=value[indexB][keyIn]
                        index += 1
                    for keyA in value1:
                        flattenDict(value1, result)   
        else:
            result[key]=value
    return result


In [1054]:
d = []
with open(input_data_file) as f:
    #d = json.load(f)
    d = json.load(f)
    print('----')
    print(d)
    print('---type--')
print(type(d))

----
[{'id': '0003ff38-28fb-4005-9437-d276cbb9da4d', 'address': [{'barangay': 'naganacan', 'country': 'Philippines', 'lot_or_house_number': '', 'postal_code': '', 'province': 'santa maria isabela', 'contact_number': [{'fax_number': '0234-324'}, {'mobile_number': [{'country_code': '+63', 'number': '9914293423'}]}, {'landline_number': [{'country_code': '+03', 'number': '64752233'}, {'country_code': '+03', 'number': '12345686'}, {'country_code': '+02', 'number': '34223212'}]}]}, {'barangay': 'siam', 'country': 'Reap', 'lot_or_house_number': '', 'postal_code': '', 'province': 'santa maria isabela', 'contact_number': [{'fax_number': '0234-324'}, {'mobile_number': [{'country_code': '+63', 'number': '9914293423'}]}, {'landline_number': [{'country_code': '+22', 'number': '22222'}, {'country_code': '+22', 'number': '333333'}]}]}], 'birthdate': '09/19/1951', 'contact_number': {'country_code': '+63', 'number': None}, 'email_address': None, 'family_members': ['5289d20e-c80f-4c9e-9e79-7cd3cc2a3e90'

## Get input data

In [None]:

flat_json_0 = {}
flat_json_1 = {}
flat_json_2 = {}

flat_json_0 = flattenDict(d[0])
print('--flatten_dict_type--')
print(type(flat_json_0))
print('--flatten_dict_data--')
flat_json_0

print('--normalize flatten_dict--')
json_flat_norm_0 = json_normalize(flat_json_0)


flat_json_1 = flattenDict(d[1])
json_flat_norm_1 = json_normalize(flat_json_1)

flat_json_2 = flattenDict(d[2])
json_flat_norm_2 = json_normalize(flat_json_2)



In [561]:
json_flat_norm_0

Unnamed: 0,demographics.active,demographics.address.add_date,demographics.address.commnty,demographics.address.country,demographics.address.is_recent,demographics.address.zip,demographics.awh_id,demographics.deceased.is_dead
0,True,2019-02-27T13:11:01.453528,New community,KHM,False,121217158,c92118421a1f8a87a73dadcd954dd75e,False


In [562]:
json_flat_norm_1

Unnamed: 0,demographics.active,demographics.address.add_date,demographics.address.country,demographics.address.is_recent,demographics.address.zip,demographics.awh_id,demographics.deceased.is_dead,demographics.deceased.year
0,True,2013-03-27T13:11:01.454978,KHM,False,17158,1ac9dc3ce4fa83906374505bf91d0f1c,False,


In [563]:
json_flat_norm_2

Unnamed: 0,demographics.active,demographics.address.country,demographics.awh_id
0,True,KHM,2ac9dc3ce4fa83906374505bf91d0f1c


## TESTING: flattendict with nested array of objects

In [1048]:

def flatten_json(nested_json):
    """
        Flatten json object with nested keys into a single level.
        Args:
            nested_json: A nested json object.
        Returns:
            The flattened json object if successful, None otherwise.
    """
    out = {}

    def flatten(x, name=''):
        if type(x) is dict:
            for a in x:
                flatten(x[a], name + a + '.')
        elif type(x) is list:
            i = 0
            for a in x:
                flatten(a, name + '[' + str(i) + '].')
                i += 1
        else:
            out[name[:-1]] = x

    flatten(nested_json)
    return out


def flattenDictArrays(d, result=None):
    print('flatendDict')
    
    if result is None:
        result = {}
    for key in d:
        
        value = d[key]
        print('-----value=d[key]----')
        print(type(value))
        print(value)
        
        
        if isinstance(value, str):
            print('-----isinstance(value, str):----')
            if "\n" in value:
                print('----------------',value) 
                value = value.replace("\n", ' and ')
                print('----------------',value) 

        if isinstance(value, dict):
            print('-----isinstance(value, dict):----')
        #if d['type'] =='object':
            value1 = {}
            for keyIn in value:

                print('-----value----')
                print(type(value))
                print((value))
                
                print('-----keyIn----')
                print(type(keyIn))
                print((keyIn))
                value1[".".join([key,keyIn])]=value[keyIn]
            
            print('-----value1 = {}----')
            print(type(value1))
            print(value1)
            
            flattenDict(value1, result)
        elif isinstance(value, (list, tuple)):   
            print('-----isinstance(value, (list, tuple)):----')
            
            for indexB, element in enumerate(value):
                if isinstance(element, dict):
                    value1 = {}
                    index = 0
                    for keyIn in element:
                        newkey = ".".join([key,keyIn])        
                        value1[".".join([key,keyIn])]=value[indexB][keyIn]
                        index += 1
                    for keyA in value1:
                        flattenDict(value1, result)   
        else:
            result[key]=value
    return result

def get_flat_json(json_data, header_string, header, row):
    """Parse json files with nested key-vales into flat lists using nested column labeling"""
    for root_key, root_value in json_data.items():
        if isinstance(root_value, dict):
            get_flat_json(root_value, header_string + '_' + str(root_key), header, row)
        elif isinstance(root_value, list):
            for value_index in range(len(root_value)):
                for nested_key, nested_value in root_value[value_index].items():
                    header[0].append((header_string +
                                      '_' + str(root_key) +
                                      '_' + str(nested_key) +
                                      '_' + str(value_index)).strip('_'))
                    if nested_value is None:
                        nested_value = ''
                    row[0].append(str(nested_value))
        else:
            if root_value is None:
                root_value = ''
            header[0].append((header_string + '_' + str(root_key)).strip('_'))
            row[0].append(root_value)
    return header, row

In [1051]:
dx = []
dx = {}
dx_list = []
dx_dict = {}
dx_normalize_df = pd.DataFrame()

input_data_file_test = 'data/curisData.1-items.json'
with open(input_data_file_test) as f:
    dx_list = json.load(f)
    #dx = json.load(f)

type(dx_list[0])

dict

In [1052]:
dx_flatten = flatten_json(dx_list)
dx_flatten

{'[0].id': '0003ff38-28fb-4005-9437-d276cbb9da4d',
 '[0].address.[0].barangay': 'naganacan',
 '[0].address.[0].country': 'Philippines',
 '[0].address.[0].lot_or_house_number': '',
 '[0].address.[0].postal_code': '',
 '[0].address.[0].province': 'santa maria isabela',
 '[0].address.[0].contact_number.[0].fax_number': '0234-324',
 '[0].address.[0].contact_number.[1].mobile_number.[0].country_code': '+63',
 '[0].address.[0].contact_number.[1].mobile_number.[0].number': '9914293423',
 '[0].address.[0].contact_number.[2].landline_number.[0].country_code': '+03',
 '[0].address.[0].contact_number.[2].landline_number.[0].number': '64752233',
 '[0].address.[0].contact_number.[2].landline_number.[1].country_code': '+03',
 '[0].address.[0].contact_number.[2].landline_number.[1].number': '12345686',
 '[0].address.[0].contact_number.[2].landline_number.[2].country_code': '+02',
 '[0].address.[0].contact_number.[2].landline_number.[2].number': '34223212',
 '[0].address.[1].barangay': 'siam',
 '[0].a

In [886]:
dx_normalize_df = json_normalize(dx_flatten)

dx_normalize_df.T.to_csv('processed_data.csv',sep=',')
type(dx_normalize_df)

pandas.core.frame.DataFrame

In [887]:
list(dx_normalize_df.columns)

['[1].demographics.active',
 '[1].demographics.address.[1].contact_number.[1].fax_number',
 '[1].demographics.address.[1].country',
 '[1].demographics.address.[1].description',
 '[1].demographics.address.[1].is_preferred',
 '[1].demographics.address.[1].location.[1].geopoint.[1].latitude',
 '[1].demographics.address.[1].location.[1].geopoint.[2].longitude',
 '[1].demographics.address.[1].location.[2].meridian',
 '[1].demographics.address.[1].start_date',
 '[1].demographics.address.[1].zip',
 '[1].demographics.address.[2].contact_number',
 '[1].demographics.address.[2].description',
 '[1].demographics.address.[2].is_preferred',
 '[1].demographics.address.[2].location',
 '[1].demographics.address.[2].start_date',
 '[1].demographics.address.[2].zip',
 '[1].demographics.address.[3].contact_number.[1].fax_number',
 '[1].demographics.address.[3].contact_number.[2].mobile_number.[1].country_code',
 '[1].demographics.address.[3].contact_number.[2].mobile_number.[1].number',
 '[1].demographics.

In [815]:
dx_normalize_df

Unnamed: 0,[0].demographics.active,[0].demographics.address.[0].contact_number,[0].demographics.address.[0].country,[0].demographics.address.[0].description,[0].demographics.address.[0].is_preferred,[0].demographics.address.[0].location.[0].geopoint.[0].latitude,[0].demographics.address.[0].location.[0].geopoint.[1].longitude,[0].demographics.address.[0].location.[1].meridian,[0].demographics.address.[0].start_date,[0].demographics.address.[0].zip,...,[1].demographics.address.[0].country,[1].demographics.address.[0].is_preferred,[1].demographics.address.[0].start_date,[1].demographics.address.[0].zip,[1].demographics.awh_id,[1].demographics.deceased.is_dead,[1].demographics.deceased.year,[2].demographics.active,[2].demographics.address.[0].country,[2].demographics.awh_id
0,True,,PHL,permanent address,True,10.02332,20.02332,+UT8,2018-03-27T13:11:01.453528,92000,...,KHM,False,2013-03-27T13:11:01.454978,17158,1ac9dc3ce4fa83906374505bf91d0f1c,False,,True,KHM,2ac9dc3ce4fa83906374505bf91d0f1c


## TODO: ITERATE OVER LIST and MERGE normalize headers

In [824]:
input_data_df = pd.DataFrame()
input_data_df = json_flat_norm_0
input_data_df = input_data_df.append(json_flat_norm_1, sort=True)
input_data_df = input_data_df.append(json_flat_norm_2, sort=True)
input_data_df.sort_values(['demographics.awh_id']).reset_index(drop=True)
#input_data_df.sort_values(p'd)

Unnamed: 0,demographics.active,demographics.address.add_date,demographics.address.commnty,demographics.address.country,demographics.address.is_recent,demographics.address.zip,demographics.awh_id,demographics.deceased.is_dead,demographics.deceased.year
0,True,2013-03-27T13:11:01.454978,,KHM,False,17158.0,1ac9dc3ce4fa83906374505bf91d0f1c,False,
1,True,,,KHM,,,2ac9dc3ce4fa83906374505bf91d0f1c,,
2,True,2019-02-27T13:11:01.453528,New community,KHM,False,121217158.0,c92118421a1f8a87a73dadcd954dd75e,False,


In [565]:
type(input_data_df)

pandas.core.frame.DataFrame

## Clean input data column header

In [566]:
input_data_df.columns = input_data_df.columns.str.lower().str.replace('/','.')
input_data_df

Unnamed: 0,demographics.active,demographics.address.add_date,demographics.address.commnty,demographics.address.country,demographics.address.is_recent,demographics.address.zip,demographics.awh_id,demographics.deceased.is_dead,demographics.deceased.year
0,True,2019-02-27T13:11:01.453528,New community,KHM,False,121217158.0,c92118421a1f8a87a73dadcd954dd75e,False,
0,True,2013-03-27T13:11:01.454978,,KHM,False,17158.0,1ac9dc3ce4fa83906374505bf91d0f1c,False,
0,True,,,KHM,,,2ac9dc3ce4fa83906374505bf91d0f1c,,


## Get Mapping 

In [567]:
mapping_df = pd.read_csv(mapping_file, skiprows=0)
mapping_df

Unnamed: 0,source_key,destination_key
0,demographics.awh_id,demographics.awh.id
1,demographics.address.country,address.country
2,demographics.address.commnty,address.commnty
3,demographics.address.zip,address.zip


## Get values input data that are included mapping

In [568]:
selected_data_df = pd.DataFrame()
selected_data_df = input_data_df[list(mapping_df['source_key'])]
selected_data_df

Unnamed: 0,demographics.awh_id,demographics.address.country,demographics.address.commnty,demographics.address.zip
0,c92118421a1f8a87a73dadcd954dd75e,KHM,New community,121217158.0
0,1ac9dc3ce4fa83906374505bf91d0f1c,KHM,,17158.0
0,2ac9dc3ce4fa83906374505bf91d0f1c,KHM,,


## Renamce source header fields into destination header fields

In [569]:
selected_data_df.columns = list(mapping_df['destination_key'])
selected_data_df

Unnamed: 0,demographics.awh.id,address.country,address.commnty,address.zip
0,c92118421a1f8a87a73dadcd954dd75e,KHM,New community,121217158.0
0,1ac9dc3ce4fa83906374505bf91d0f1c,KHM,,17158.0
0,2ac9dc3ce4fa83906374505bf91d0f1c,KHM,,


## Encode to as Filesystem (HDFS) or S3 (avro) format)

## Decode for cleaning

## Decode to for computation

## Decode to for analytics transformation

## Transform flat file into output schema format

In [570]:
flat_json = ''
flat_json = selected_data_df.to_json(orient='records')
json_json = json.loads(flat_json)
json_json

[{'demographics.awh.id': 'c92118421a1f8a87a73dadcd954dd75e',
  'address.country': 'KHM',
  'address.commnty': 'New community',
  'address.zip': 121217158.0},
 {'demographics.awh.id': '1ac9dc3ce4fa83906374505bf91d0f1c',
  'address.country': 'KHM',
  'address.commnty': None,
  'address.zip': 17158.0},
 {'demographics.awh.id': '2ac9dc3ce4fa83906374505bf91d0f1c',
  'address.country': 'KHM',
  'address.commnty': None,
  'address.zip': None}]

## Convert dot notated fields into nested json

In [571]:
input_file = 'data/data.json'
with open(input_file) as f:
    d = (f)
 


In [572]:

def dot_to_json(a):
    output = {}
    for key, value in a.items():
        path = key.split('.')[1:]  # ignore the json. prefix
        #path = key
        target = reduce(lambda d, k: d.setdefault(k, {}), path[:-1], output)
        target[path[-1]] = value
    return output
 
data = {'json.message.status.time':50, 'json.message.code.response':80, 'json.time':100}
type(data)


data

{'json.message.status.time': 50,
 'json.message.code.response': 80,
 'json.time': 100}

## data

In [573]:
data

{'json.message.status.time': 50,
 'json.message.code.response': 80,
 'json.time': 100}

## dot data

In [574]:
dict_json = dot_to_json(data)
dict_json

{'message': {'status': {'time': 50}, 'code': {'response': 80}}, 'time': 100}

## dictionary to json

In [575]:
def _dict2json(results):
    counter = 0
    data = []

    for row in results: 
        data.append(json.dumps(row))
        counter += 1
    
    return data

json_dict = _dict2json(dict_json) 
json_dict

['"message"', '"time"']

In [968]:
iris = pd.DataFrame()
iris = pd.read_csv('data/sql.csv')
iris
#iris.to_json(orient='records')
list(iris['0'])

['demographics.awh_id',
 'demographics.active',
 'demographics.address.country',
 'demographics.address.description',
 'demographics.address.is_preferred',
 'demographics.address.location',
 'demographics.address.location.geopoint.latitude',
 'demographics.address.location.geopoint.longitude',
 'demographics.address.location.meridian',
 'demographics.address.start_date',
 'demographics.address.zip',
 'demographics.address.contact_number',
 'demographics.address.contact_number.landline_number.area_code',
 'demographics.address.contact_number.landline_number.number',
 'demographics.address.contact_number.mobile_number.area_code',
 'demographics.address.contact_number.mobile_number.number',
 'demographics.address.contact_number.fax_number',
 'demographics.deceased.is_dead',
 'demographics.org']

In [980]:
iris

Unnamed: 0,0,1,2,3,4,5,6
0,demographics.awh_id,c92118421a1f8a87a73dadcd954dd75e,c92118421a1f8a87a73dadcd954dd75e,c92118421a1f8a87a73dadcd954dd75e,c92118421a1f8a87a73dadcd954dd75e,c92118421a1f8a87a73dadcd954dd75e,c92118421a1f8a87a73dadcd954dd75e
1,demographics.active,,,,,,
2,demographics.address.country,,,,,,
3,demographics.address.description,mailing address,working address,,,,
4,demographics.address.is_preferred,FALSE,FALSE,,,,
5,demographics.address.location,PO Box 23423,,,,,
6,demographics.address.location.geopoint.latitude,,221 Baker Street,,,,
7,demographics.address.location.geopoint.longitude,,,,,,
8,demographics.address.location.meridian,,,,,,
9,demographics.address.start_date,2019-02-27T13:11:01.453528,2019-02-27T13:11:01.453528,,,,
