#### Parse Json File Dynamically

Author: Mahesh KC  
Description: Program to dynamically parse JSON dataset. Creates profiling, flatten dataset with column header.  
Date: 2020-03-11

#### Remove CRLF

In [86]:
def clean_string(string_value):
    if string_value == None:
        return None
    else:
        return string_value.replace("\r","").replace("\n","").replace("\t","").strip()

**Create Directory**

In [87]:
def create_dir(dirname):
    if not os.path.exists(dirname):
        os.makedirs(dirname)

**Directory HouseKeeping**

In [88]:
import os
from datetime import datetime

file_name = 'style_format.json'
incoming_dataset = 'source_dataset'
base_dataset = 'processed_dataset'
profile_metadata = 'profiled_metadata'
processing_dir = file_name.split('.')[0]
now = datetime.now().strftime("%Y%m%d%H%M%S")
today = now[0:8]
tmp_dir = 'tmp'
current_dir = os.path.join(base_dataset, processing_dir,today)
profile_dir = os.path.join(profile_metadata, processing_dir, today)
tmp_dir = os.path.join(tmp_dir, 'tmp' + '_' + today, processing_dir)

# create directory
create_dir(incoming_dataset)
create_dir(current_dir)
create_dir(profile_dir)
create_dir(tmp_dir)

# output file format
file_suffix = 'tsv'
field_delimiter = '\t'
record_delimiter = '\n'

**Move dataset to source directory**

In [89]:
# uncomment if your are testing and json file exists in current directory.
#import shutil
#shutil.move(file_name,incoming_dataset)

**Load JSON File**

In [90]:
import json

with open(os.path.join(incoming_dataset,file_name)) as json_file:
    data_file = json.load(json_file)

#### Recursively parse nested object

In [91]:
element_list = list()
parent_attr = list()
_parent_name = set()
column_name_delimiter = '_'
def recursive_dict(dict_name):
    if isinstance(dict_name, dict):
        for index,(k,v) in enumerate(dict_name.items()):
            if not isinstance(v, dict) and not isinstance(v, list):
                if len(parent_attr) != 0:
                    parent_name = column_name_delimiter.join(parent_attr) + column_name_delimiter + k             
                else:
                    parent_name = k                    
                element_tuple = (parent_name,v)
                element_list.append(element_tuple)
            if isinstance(v, list):
                for i in range(len(v)):
                    parent_attr.append(k)
                    recursive_dict(v[i])
                    parent_attr.pop()
            if isinstance(v, dict):
                parent_attr.append(k)
                _parent_name.add(k)
                #print("Nested dictionary found. Total elements:",len(v))
                recursive_dict(v)
                parent_attr.pop()
    return element_list

#### Create tsv file from original dataset for all nested object

In [92]:
dataset_file = file_name.replace('json','txt')
is_dict = isinstance(data_file, dict)
dataset = list()
if (is_dict):
     for k,v in data_file.items():
            if isinstance(v, list):
                for i in range(len(v)):
                    data = recursive_dict(v[i])
                    dataset.append(data[:])
                    # Writing to file is consuming lot of memory so this is disabled
                    #with open(os.path.join(tmp_dir, dataset_file), 'a+') as datafile:
                        #datafile.writelines(str(data) + "\n")
                    #for tup in range(len(data)):
                        #with open(os.path.join(tmp_dir,'flattened_data.tsv'), 'a+') as realdata:
                            #data_value = clean_string(data[tup][1])
                            #realdata.writelines(data_value + '\t')
                            #if tup == len(data) - 1:
                                #realdata.writelines("\n")
                    element_list.clear()
            else:
                data = recursive_dict(data_file)
                dataset.append(data[:])
                element_list.clear()
                break

#### Create data profiling to store metadata about file and all the records

In [93]:
from collections import OrderedDict
from datetime import datetime
import getpass
import hashlib
profile_report_name = file_name.split('.')[0] + now + '_profiling.' + file_name.split('.')[1]
def profile_dataset(data_element):
    data_element_profile = OrderedDict()
    metadata_data = OrderedDict()
    attribute_data = OrderedDict()
    dataset_properties = list()
    metadata_type = ['created_by','created_date', 'email', 'total_data_element']
    attribute_type = ['record_id', 'total_element', 'attribute_name', 'attribute_hash']
    date_format = "%Y-%m-%d %H:%M:%S"
    email_address = 'engineering@analyticstensor.com'
    metadata_data[metadata_type[0]] = getpass.getuser()
    metadata_data[metadata_type[1]] = datetime.now().strftime(date_format)
    metadata_data[metadata_type[2]] = email_address
    metadata_data[metadata_type[3]] = len(data_element)
    data_element_profile['metadata'] = metadata_data
    for i in range(len(data_element)):
        attribute_name = ''        
        for j in range(len(data_element[i])):
            attribute_name += data_element[i][j][0] + ',' 
        attribute_data[attribute_type[0]] = i + 1
        attribute_data[attribute_type[1]] = len(data_element[i])
        attribute_data[attribute_type[2]] = attribute_name[0:len(attribute_name)-1]
        attribute_data[attribute_type[3]] = hashlib.sha1(attribute_data[attribute_type[2]].lower().encode()).hexdigest()
        dataset_properties.append(attribute_data.copy())
        data_element_profile['data_attributes'] = dataset_properties
    profile_report = json.dumps(data_element_profile, separators=(',', ': '), indent = 4)
    with open(os.path.join(profile_dir, profile_report_name), 'a+') as profiling:
        profiling.writelines(profile_report)
    print("Data profiling report for {} is located at {} directory with filename {}".format(file_name,profile_dir,profile_report_name))

In [94]:
profile_dataset(dataset)

Data profiling report for style_format.json is located at profiled_metadata/style_format/20210329 directory with filename style_format20210329180502_profiling.json


#### Data Exploration

In [95]:
import pandas as pd
pd.set_option('display.max_columns', None)   # set pd columns option

df_data = pd.DataFrame.from_records(dataset)
df_data.head(5)

Unnamed: 0,0,1,2,3,4,5,6,7
0,"(id, 0)","(detail_info_country_name, Global Country)","(detail_info_country_capital, None)","(detail_info_popular_state, None)","(detail_info_country_phone_code, 00)","(detail_info_style_version, 1)","(detail_info_style_create_date, 2019-10-04)","(detail_info_style_vowel, 100)"
1,"(id, 1)","(detail_info_country_name, United States)","(detail_info_country_capital, Washing DC)","(detail_info_country_phone_code, 01)","(detail_info_style_version, 1)","(detail_info_style_create_date, 2019-10-04)","(detail_info_style_constant, 80)","(detail_info_style_vowel, 20)"
2,"(id, 2)","(detail_info_country_name, European)","(detail_info_country_capital, Washing DC)","(detail_info_country_phone_code, 00)","(detail_info_style_version, 1)","(detail_info_style_create_date, 2019-10-04)","(detail_info_style_constant, 100)",
3,"(id, 3)","(detail_info_country_name, United Kingdom)","(detail_info_country_capital, Washing DC)","(detail_info_country_phone_code, 00)","(detail_info_style_version, 1)","(detail_info_style_create_date, 2019-10-04)","(detail_info_style_constant, 80)","(detail_info_style_vowel, 20)"


**Categorize and collect records having same hashcode and add header**

In [96]:
data = json.load(open(os.path.join(profile_dir, profile_report_name)))
df_metadata = pd.DataFrame(data["data_attributes"])   # load data_attributes objects
all_records_attributes = df_metadata['attribute_hash']

# Add record_id and attribute_hash in original dataset
df_data['record_id'] = df_metadata['record_id']
df_data['attribute_hash'] = df_metadata['attribute_hash']
df_data['attribute_name'] = df_metadata['attribute_name']

# create a dictionary for grouped records having same attribute_hash key
df_grouped_record = {}
for i in range(len(df_data)):
    df_attr = all_records_attributes[i]
    # add dataframe to dictionary
    df_tmp = df_data[df_data['attribute_hash'] == df_attr].dropna(axis=1)  # filter and remove empty columns
    column_header = df_data['attribute_name'].tolist()[i] + ",record_id,attribute_hash"
    column_header = str(column_header)[:len(str(column_header))].split(',') 
    df_tmp.drop(columns=['attribute_name'], inplace=True)
    df_tmp.columns = column_header
    df_grouped_record[df_attr] = df_tmp

**Processed Dataset**

In [97]:
df_tmp.head(3)

Unnamed: 0,id,detail_info_country_name,detail_info_country_capital,detail_info_country_phone_code,detail_info_style_version,detail_info_style_create_date,detail_info_style_constant,detail_info_style_vowel,record_id,attribute_hash
1,"(id, 1)","(detail_info_country_name, United States)","(detail_info_country_capital, Washing DC)","(detail_info_country_phone_code, 01)","(detail_info_style_version, 1)","(detail_info_style_create_date, 2019-10-04)","(detail_info_style_constant, 80)","(detail_info_style_vowel, 20)",2,58e25cfcd4f1556adae0ab45fd228e558f85a189
3,"(id, 3)","(detail_info_country_name, United Kingdom)","(detail_info_country_capital, Washing DC)","(detail_info_country_phone_code, 00)","(detail_info_style_version, 1)","(detail_info_style_create_date, 2019-10-04)","(detail_info_style_constant, 80)","(detail_info_style_vowel, 20)",4,58e25cfcd4f1556adae0ab45fd228e558f85a189


**Flatten tuples to retrieve values and store in file for each dictionary keys**

In [98]:
for k,v in df_grouped_record.items():
    filename = os.path.join(tmp_dir, k + "_" + now + "." + file_suffix)
    file = open(filename, "w")
    cols = df_grouped_record[k].columns.tolist()
    clean_col = [c.strip() for c in cols]
    clean_col = str(clean_col).replace("'","").replace(", ",field_delimiter).replace("[","").replace("]","")
    file.write(clean_col)
    file.write(record_delimiter)
    for rec in df_grouped_record[k].itertuples(index=False):   # iteritems()
        record = list(rec)
        for i in range(len(record)):
            if i >=len(record)-2:
                value = str(record[i]) + field_delimiter
            else:
                value = clean_string(str(record[i][1])) + field_delimiter
            file.write(value)
        file.write(record_delimiter)
    file.close()

**Glob multiple files**

In [99]:
import glob
pd.set_option('display.max_columns', 200)
pd.set_option('display.max_rows', 500)

file_path = os.path.join(tmp_dir, '*.tsv')
files = glob.glob(file_path)

**Read all file in pandas**

In [100]:
dfAll = pd.concat(map(lambda file: pd.read_csv(file, sep='\t', header='infer', index_col=False), files), ignore_index=True)

of pandas will change to not sort by default.

To accept the future behavior, pass 'sort=False'.


  """Entry point for launching an IPython kernel.


In [101]:
dfAll.head(3)

Unnamed: 0,attribute_hash,detail_info_country_capital,detail_info_country_name,detail_info_country_phone_code,detail_info_popular_state,detail_info_style_constant,detail_info_style_create_date,detail_info_style_version,detail_info_style_vowel,id,record_id
0,58e25cfcd4f1556adae0ab45fd228e558f85a189,Washing DC,United States,1,,80.0,2019-10-04,1,20.0,1,2
1,58e25cfcd4f1556adae0ab45fd228e558f85a189,Washing DC,United Kingdom,0,,80.0,2019-10-04,1,20.0,3,4
2,60fc94cea6d14a741d32683574a73fbba81e7513,Washing DC,European,0,,100.0,2019-10-04,1,,2,3


**Sort columns order based on highest columns attribute**

In [102]:
# get highest attribute_hash value
max_attribute_hash = df_metadata[df_metadata['total_element'] == df_metadata['total_element'].max()]['attribute_hash'].values[0]
ordered_column = df_metadata[df_metadata['attribute_hash'] == max_attribute_hash]['attribute_name'].values[0].split(",")
ordered_column += ['record_id', 'attribute_hash']
dfAll = dfAll[ordered_column]

**Drop empty parent column**

In [103]:
remove_cols = [col for col in dfAll.columns.tolist() if col in _parent_name]
dfAll = dfAll.drop(columns=remove_cols)

**Output Final Dataset**

In [104]:
# Sort order by record_id asc
dfAll = dfAll.sort_values(by=['record_id'])
dfAll.to_csv(os.path.join(current_dir,"allfile.tsv"), sep='\t', index=False)

**Clean tmp directory**

In [105]:
import shutil
shutil.rmtree(tmp_dir)

**@todo:**  
* Sample all the records.  
* Determine column data type check for elastic search usecases.     
* Generate summary of the report.  
* Add logging for all the transactions.   

  **Done**
    * Optimization (Exclude write tmp files to reduce processing time) done

In [106]:
dfAll.describe()

Unnamed: 0,id,detail_info_country_phone_code,detail_info_style_version,detail_info_style_vowel,record_id
count,4.0,4.0,4.0,3.0,4.0
mean,1.5,0.25,1.0,46.666667,2.5
std,1.290994,0.5,0.0,46.188022,1.290994
min,0.0,0.0,1.0,20.0,1.0
25%,0.75,0.0,1.0,20.0,1.75
50%,1.5,0.0,1.0,20.0,2.5
75%,2.25,0.25,1.0,60.0,3.25
max,3.0,1.0,1.0,100.0,4.0
