In [1]:
!pip install jsonpath_ng

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting jsonpath_ng
  Downloading jsonpath_ng-1.5.3-py3-none-any.whl (29 kB)
Collecting ply (from jsonpath_ng)
  Downloading ply-3.11-py2.py3-none-any.whl (49 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.6/49.6 kB[0m [31m6.2 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: ply, jsonpath_ng
Successfully installed jsonpath_ng-1.5.3 ply-3.11


In [2]:
import os
import json
import numpy as np 
import pandas as pd 
import sqlite3
import functools as ft
import matplotlib.pyplot as plt
from abc import ABC, abstractmethod
from jsonpath_ng import parse
from enum import Enum
%matplotlib inline 

# Source to Target Mapping - OOP Approach for Single Table

### Data

In [16]:
Fact_df = pd.read_excel('Fact Table (1).xlsx')
Fact_df
Fact_df.to_json("Fact Table (1).json")
json_Fact_df = Fact_df.astype(str).to_dict(orient='records')
json_Fact_df

[{'Date': '2005-06-30', 'PatientNumber': '1', 'Age': '75', 'RoutineTest': '1'},
 {'Date': '2005-07-08', 'PatientNumber': '1', 'Age': '75', 'RoutineTest': '1'},
 {'Date': '2005-10-24', 'PatientNumber': '1', 'Age': '75', 'RoutineTest': '1'},
 {'Date': '2006-01-08', 'PatientNumber': '1', 'Age': '75', 'RoutineTest': '1'},
 {'Date': '2006-02-02', 'PatientNumber': '1', 'Age': '75', 'RoutineTest': '1'},
 {'Date': '2006-03-13', 'PatientNumber': '1', 'Age': '75', 'RoutineTest': '1'},
 {'Date': '2006-04-08', 'PatientNumber': '1', 'Age': '75', 'RoutineTest': '1'},
 {'Date': '2006-04-26', 'PatientNumber': '1', 'Age': '75', 'RoutineTest': '1'},
 {'Date': '2006-10-14', 'PatientNumber': '1', 'Age': '76', 'RoutineTest': '1'},
 {'Date': '2007-02-02', 'PatientNumber': '1', 'Age': '76', 'RoutineTest': '1'},
 {'Date': '2007-04-27', 'PatientNumber': '1', 'Age': '76', 'RoutineTest': '1'},
 {'Date': '2007-09-29', 'PatientNumber': '1', 'Age': '77', 'RoutineTest': '1'},
 {'Date': '2007-12-03', 'PatientNumber':

###  Abstract Base Calss (ABC) for mutual methods

In [4]:
class Interface(ABC):

    @abstractmethod
    def get_data_by_field(self, field_name):
        """Fetch the data by given feild name """

    @abstractmethod
    def get_data_by_id(self, id):
        """Fetch the data by given ID  """

    @abstractmethod
    def get(self):
        """Fetch all data """

### Transform Operations
inherithed from Enum - class that automatic enumrate the variables

In [5]:
class TransformMask(Enum):
    # add here any masks you want 
    CLEAN_STRING = ".strip().lower()" 
    CAPITAL_LETTER = ".strip().lower().title()"
    

### Database Class - Define Common Properties for Source, Target, Mapping

In [6]:
class Database:
    def __init__(self):
        pass

    # built-in function that creates and returns a property object
    # get data by: get_data_source_target_mapping.get(dict_key)
    @property 
    def get_data_source_target_mapping(self):
        return {
            "source": [
                {
                    "id": 1,
                    "source_field_name": "Date",
                    "source_field_mapping": "$.Date",
                    "source_field_type": "str", # use python types
                    "is_required": True,

                },
                {
                    "id": 2,
                    "source_field_name": "PatientNumber",
                    "source_field_mapping": "$.PatientNumber",
                    "source_field_type": "str",
                    "is_required": True,

                },
                {
                    "id": 3,
                    "source_field_name": "Age",
                    "source_field_mapping": "$.Age",
                    "source_field_type": "str",
                    "is_required": True,

                },
                {
                    "id": 4,
                    "source_field_name": "RoutineTest",
                    "source_field_mapping": "$.RoutineTest",
                    "source_field_type": "str",
                    "is_required": True,

                }
            ],
            "destination": [
                {
                    "id": 1,
                    "destination_field_name": "Date",
                    "destination_field_mapping": "Date",
                    "destination_field_type": "str",
                    "default_value": "n/a",
                },
                {
                    "id": 2,
                    "destination_field_name": "PatientNumber",
                    "destination_field_mapping": "PatientNumber",
                    "destination_field_type": "int",
                    "default_value": "0"
                },
                                {
                    "id": 3,
                    "destination_field_name": "Age",
                    "destination_field_mapping": "Age",
                    "destination_field_type": "int",
                    "default_value": "0"
                },
                                {
                    "id": 4,
                    "destination_field_name": "RoutineTest",
                    "destination_field_mapping": "RoutineTest",
                    "destination_field_type": "str",
                    "default_value": "0"
                }

            ],
            "transform": [ # using the Enums 
                {
                    "id": 1,
                    "transform_mask": 'CAPITAL_LETTER'
                },
                {
                    "id": 2,
                    "transform_mask": 'CLEAN_STRING'
                }
            ],
            "mapping": [
                {
                    "id": 1,
                    "mapping_source": 1,
                    "mapping_destination": 1,
                    "mapping_transform": 1
                },
                {
                    "id": 2,
                    "mapping_source": 2,
                    "mapping_destination": 2,
                },
                                {
                    "id": 3,
                    "mapping_source": 3,
                    "mapping_destination": 3,
                },
                                {
                    "id": 4,
                    "mapping_source": 4,
                    "mapping_destination": 4,
                }
            ]
        }

### Source class

Inherited from Interface for the common methods and from Database for common variables

In [7]:
class Source(Interface, Database):
    def __init__(self):
        Database.__init__(self)

    # should be implemented - inherited from Interface
    def get_data_by_field(self, field_name):
        data = self.get
        for item in data:
            for key, value in item.items():
                if key == field_name:
                    return item
        return None

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("source")

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id") == self.id:
                return x
        return None

### Target class

Inherited from Interface for the common methods and from Database for common variables

In [8]:
class Target(Interface, Database):

    def __init__(self):
        Database.__init__(self)

    # should be implemented - inherited from Interface
    def get_data_by_field(self, field_name):
        data = self.get
        for item in data:
            for key, value in item.items():
                if key == field_name:
                    return item
        return None

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("destination")

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id").__str__() == self.id.__str__():
                return x
        return None

### Transform Class

Inherited from Interface for the common methods and from Database for common variables

In [9]:
class Transform(Interface, Database):

    def __init__(self):
        Database.__init__(self)

    # should be implemented - inherited from Interface
    def get_data_by_field(self, field_name):
        data = self.get
        for item in data:
            for key, value in item.items():
                if key == field_name:
                    return item
        return None

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("transform", [])

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id").__str__() == self.id.__str__():
                return x
        return None

### Mapping class

Inherited from Interface for the common methods and from Database for common variables

In [10]:
class Mappings(Interface, Database):

    def __init__(self):
        Database.__init__(self)

    @property
    def get(self):
        return self.get_data_source_target_mapping.get("mapping")

    def get_data_by_id(self, id):
        self.id = id
        data = self.get
        for x in data:
            if x.get("id").__str__() == self.id.__str__():
                return x
        return None

    def get_data_by_field(self, field_name):
        return None

### Format Class - JSON

Search the source data value inside a JSON file 

In [11]:
class JsonQuery:
    def __init__(self, json_path, json_data):
        self.json_path = json_path
        self.json_data = json_data

    def get(self):
        jsonpath_expression = parse(self.json_path)
        match = jsonpath_expression.find(self.json_data)
        source_data_value = match[0].value
        return source_data_value

### Combine it All - STTM

In [12]:
class STTM:
    def __init__(self, input_json):
        self.input_json = input_json
        self.mapping_instance = Mappings()
        self.source_instance = Source()
        self.destination_instance = Target()
        self.transform_instance = Transform()
        self.look_up_mask = {i.name: i.value for i in TransformMask}
        self.json_data_transformed = {}

    def _get_mapping_data(self):
        return self.mapping_instance.get

    def _get_mapping_source_data(self):
        return self.source_instance.get

    def get_transformed_data(self):

        for mappings in self._get_mapping_data():

            """fetch the source mapping """
            mapping_source_id = mappings.get("mapping_source")
            mapping_destination_id = mappings.get("mapping_destination")
            mapping_transform_id = mappings.get("mapping_transform")

            mapping_source_data = self.source_instance.get_data_by_id(id=mapping_source_id)
            transform_data = self.transform_instance.get_data_by_id(id=mapping_transform_id)

            """Fetch Source  field Name"""
            source_field_name = mapping_source_data.get("source_field_name")

            """if field given is not present incoming json """
            if source_field_name not in self.input_json.keys():
                if mapping_source_data.get("is_required"):
                    raise Exception(
                        "Alert ! Field {} is not present in JSON please FIX mappings ".format(source_field_name))
                else:
                    pass

            else:
                source_data_value = JsonQuery(
                    json_path=mapping_source_data.get("source_field_mapping"),
                    json_data=self.input_json
                ).get()

                """check the data type for source if matches with what we have """
                if mapping_source_data.get("source_field_type") != type(source_data_value).__name__:
                    if source_data_value is not None:
                        _message = (
                            "Alert ! Source Field :{} Datatype has changed from {} to {} ".format(source_field_name,
                                                                                                  mapping_source_data.get(
                                                                                                      "source_field_type"),
                                                                                                  type(
                                                                                                      source_data_value).__name__))
                        print(_message)
                        raise Exception(_message)

                """Query and fetch the Destination | target """
                destination_mappings_json_object = self.destination_instance.get_data_by_id(
                    id=mappings.get("mapping_destination"))

                destination_field_name = destination_mappings_json_object.get("destination_field_name")
                destination_field_type = destination_mappings_json_object.get("destination_field_type")

                dtypes = [str, float, list, int, set, dict]

                for dtype in dtypes:

                    """Datatype Conversion """
                    if destination_field_type == str(dtype.__name__):

                        """is source is none insert default value"""
                        if source_data_value is None:
                            self.json_data_transformed[destination_field_name] = dtype.__call__(
                                destination_mappings_json_object.get("default_value")
                            )

                        else:
                            """check if you have items to transform"""
                            if transform_data is not None:
                                """ check for invalid mask name """
                                if transform_data.get("transform_mask") not in list(self.look_up_mask.keys()):
                                    raise Exception(
                                        f"Specified Transform {transform_data.get('transform_mask')} is not available please select from following Options :{list(self.look_up_mask.keys())}")
                                else:
                                    mask_apply = self.look_up_mask.get(transform_data.get("transform_mask"))
                                    converted_dtype = dtype.__call__(source_data_value)
                                    mask = f'converted_dtype{mask_apply}'
                                    curated_value = eval(mask)
                                    self.json_data_transformed[destination_field_name] = curated_value

                            else:
                                self.json_data_transformed[destination_field_name] = dtype.__call__(source_data_value)

        return self.json_data_transformed

In [17]:
transformed_data = []
for item in json_Fact_df:
    helper = STTM(input_json=item)
    response = helper.get_transformed_data()
    transformed_data.append(response)
    print(response)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
{'Date': '2009-02-25', 'PatientNumber': 273, 'Age': 81, 'RoutineTest': '0'}
{'Date': '2009-03-30', 'PatientNumber': 273, 'Age': 81, 'RoutineTest': '0'}
{'Date': '2009-05-09', 'PatientNumber': 273, 'Age': 81, 'RoutineTest': '0'}
{'Date': '2009-05-10', 'PatientNumber': 273, 'Age': 81, 'RoutineTest': '0'}
{'Date': '2009-05-18', 'PatientNumber': 273, 'Age': 81, 'RoutineTest': '0'}
{'Date': '2009-06-21', 'PatientNumber': 273, 'Age': 81, 'RoutineTest': '0'}
{'Date': '2009-09-07', 'PatientNumber': 273, 'Age': 81, 'RoutineTest': '0'}
{'Date': '2009-09-13', 'PatientNumber': 273, 'Age': 81, 'RoutineTest': '0'}
{'Date': '2009-09-17', 'PatientNumber': 273, 'Age': 81, 'RoutineTest': '0'}
{'Date': '2009-09-17', 'PatientNumber': 273, 'Age': 81, 'RoutineTest': '0'}
{'Date': '2009-12-28', 'PatientNumber': 273, 'Age': 81, 'RoutineTest': '0'}
{'Date': '2010-01-15', 'PatientNumber': 273, 'Age': 81, 'RoutineTest': '0'}
{'Date': '2010-02-14', 

In [18]:
pd.DataFrame(transformed_data)

Unnamed: 0,Date,PatientNumber,Age,RoutineTest
0,2005-06-30,1,75,1
1,2005-07-08,1,75,1
2,2005-10-24,1,75,1
3,2006-01-08,1,75,1
4,2006-02-02,1,75,1
...,...,...,...,...
99995,2019-10-09,300,90,0
99996,2019-11-19,300,90,0
99997,2019-11-21,300,90,0
99998,2019-12-17,300,90,0
