In [41]:
%pip install -q faker
%pip install -q jsonschema

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


### Imports

In [42]:
import json
import jsonschema
from faker import Faker
import string
import random
import os

In [43]:
no_of_training_records_per_schema = 250

### Load Schemas

In [44]:

schema_files = [f for f in os.listdir('schema') if f.endswith('.json')]

schemas = {}
for schema_file in schema_files:
    with open(os.path.join('schema', schema_file), 'r') as f:
        schemas[schema_file] = json.load(f)

print(schemas.keys())

dict_keys(['dtp_schema.json', 'isa_schema.json', 'name_schema.json', 'ref_schema.json'])


### Random Data Generator for Json Schema Properties

In [45]:
class JsonPropertyValueGenerator:

    def __init__(self, faker=None):
        if faker:
            self._faker = faker
        else:
            self._faker = Faker()
        self.null_threshold = 0.4

    def get_property_value_generators(self, schema: dict, include_nulls=True) -> dict:

        properties = schema.get('properties', {})
        required_properties = schema.get('required', [])
        value_generators = {}

        for prop_name, prop_schema in properties.items():

            nulls = include_nulls and (prop_name not in required_properties)

            if prop_schema.get('type') == 'string':
                value_generators[prop_name] = self.get_string_generator(prop_name, prop_schema, nulls)
            elif prop_schema.get('type') == 'integer':
                value_generators[prop_name] = lambda: None if nulls and random.random() < self.null_threshold \
                    else self._faker.random_int(min=0, max=1000)
            elif prop_schema.get('type') == 'boolean':
                value_generators[prop_name] = lambda: None if nulls and random.random() < self.null_threshold \
                    else self._faker.boolean()
            elif prop_schema.get('type') == 'null' and nulls:
                value_generators[prop_name] = lambda: None
            else:
                value_generators[prop_name] = lambda: None

        return value_generators

    def get_string_generator(self, property_name=None, prop_schema=None, include_nulls=True):
        if "enum" in prop_schema:
            return lambda: None if include_nulls and random.random() < self.null_threshold \
                else self._faker.random_element(elements=prop_schema["enum"])
        elif prop_schema.get("format") == "email":
            return lambda: None if include_nulls and random.random() < self.null_threshold else self._faker.email()

        elif prop_schema.get("format") == "date":
            return lambda: None if include_nulls and random.random() < self.null_threshold else self._faker.date(pattern="%y%m%d")
        
        elif prop_schema.get("format") == "time":
            return lambda: None if include_nulls and random.random() < self.null_threshold else self._faker.time(pattern="%H%M")

        elif "minLength" in prop_schema or "maxLength" in prop_schema:
            description = prop_schema.get("description", "")

            if ("name" in description.lower() and "first" in description.lower()) or \
               ("name" in property_name.lower() and "first" in property_name.lower()):
                return lambda: None if include_nulls and random.random() < self.null_threshold \
                    else self._faker.first_name()

            elif ("name" in description.lower() and "last" in description.lower()) or \
                 ("name" in property_name.lower() and "last" in property_name.lower()):
                return lambda: None if include_nulls and random.random() < self.null_threshold \
                    else self._faker.last_name()

            elif ("name" in description.lower() and "middle" in description.lower()) or \
                 ("name" in property_name.lower() and "middle" in property_name.lower()):
                return lambda: None if include_nulls and random.random() < self.null_threshold \
                    else self._faker.random_element(elements=list(string.ascii_uppercase))

            elif ("name" in description.lower() and "prefix" in description.lower()) or \
                 ("name" in property_name.lower() and "prefix" in property_name.lower()):
                return lambda: None if include_nulls and random.random() < self.null_threshold else self._faker.prefix()

            elif ("name" in description.lower() and "suffix" in description.lower()) or \
                 ("name" in property_name.lower() and "suffix" in property_name.lower()):
                return lambda: None if include_nulls and random.random() < self.null_threshold else self._faker.suffix()

            elif "name" in description.lower() or "name" in property_name.lower():
                return lambda: None if include_nulls and random.random() < self.null_threshold else self._faker.name()

            else:
                return lambda: None if include_nulls and random.random() < self.null_threshold \
                    else self._faker.pystr(min_chars=prop_schema.get("minLength", 1), max_chars=prop_schema.get("maxLength", 10))
        else:
            return lambda: self._faker.pystr(min_chars=1,max_chars=10)

### Util Functions

In [46]:
def validate_json(json_data, target_schema=None) -> bool:
    try:
        jsonschema.validate(instance=json_data, schema=target_schema)
        return True
    except jsonschema.ValidationError as e:
        print(f"Validation error for object: {e}")
        return False
    

def remove_none(d):
    if isinstance(d, dict):
        return {k: remove_none(v) for k, v in d.items() if v is not None}
    elif isinstance(d, list):
        return [remove_none(v) for v in d if v is not None]
    else:
        return d
    
def get_nm1_segment(rec):
    entity_identifier_code = rec.get("entity_identifier_code", "")
    entity_type_qualifier = rec.get("entity_type_qualifier", "")
    name_last_or_organization_name = rec.get("name_last_or_organizationName", "")
    name_first = rec.get("name_first", "")
    name_middle = rec.get("name_middle", "")
    name_prefix = rec.get("name_prefix", "")
    name_suffix = rec.get("name_suffix", "")
    identification_code_qualifier = rec.get("identification_code_qualifier", "")
    identification_code = rec.get("identification_code", "")
    return f"NM1*{entity_identifier_code}*{entity_type_qualifier}*{name_last_or_organization_name}*{name_first}*{name_middle}*{name_prefix}*{name_suffix}*{identification_code_qualifier}*{identification_code}~"


def get_ref_segment(rec):
    reference_identification_qualifier = rec.get("reference_identification_qualifier", "")
    reference_identification = rec.get("reference_identification", "")
    ref_segment = rec.get("ref_segment", "")
    nm1_segment = get_nm1_segment(rec)
    return f"REF*{reference_identification_qualifier}*{reference_identification}~"

from datetime import datetime, timedelta

def dtp_segment(rec):
    date_qualifier = random.choice(["291", "348", "349", "050", "151", "152", "153", "154", "155", "156"])
    date_type = random.choice(["D8","RD8"])
    dt = Faker().date_object()
    formatted_date_from = dt.strftime("%Y%m%d")
    # date_obj = datetime.strptime(formatted_date_from, "%Y%m%d")
    new_date_obj = dt + timedelta(days=5)  # Add 5 days
    formatted_date_to = new_date_obj.strftime("%Y%m%d")

    if type == "D8":
        dtp = f"DTP*{date_qualifier}*{date_type}*{formatted_date_from}"
    else:
        dtp = f"DTP*{date_qualifier}*{date_type}*{formatted_date_from}-{formatted_date_to}"
    return dtp


def isa_segment(rec):
    isa01 = rec.get("authorization_information_qualifier", "")
    isa02 = rec.get("authorization_information", "")
    isa03 = rec.get("security_information_qualifier", "")
    isa04 = rec.get("security_information", "")
    isa05 = rec.get("interchange_sender_id_qualifier", "")
    isa06 = rec.get("interchange_sender_id", "")
    isa07 = rec.get("interchange_receiver_id_qualifier", "")
    isa08 = rec.get("interchange_receiver_id", "")
    isa09 = rec.get("interchange_date", "")
    isa10 = rec.get("interchange_time", "")
    isa11 = rec.get("repetition_separator", "")
    isa12 = rec.get("interchange_control_version_number", "")
    isa13 = rec.get("interchange_control_number", "")
    isa14 = rec.get("acknowledgment", "")
    isa15 = rec.get("usage_indicator", "")
    isa16 = rec.get("element_separator", "")
    return f"ISA*{isa01}*{isa02}*{isa03}*{isa04}*{isa05}*{isa06}*{isa07}*{isa08}*{isa09}*{isa10}*{isa11}*{isa12}*{isa13}*{isa14}*{isa15}*{isa16}~"  


custom_properties = {
    "nm1_segment": get_nm1_segment,
    "ref_segment": get_ref_segment,
    "dtp_segment": dtp_segment,
    "isa_segment": isa_segment
}



### Prompt Generator Functions

In [47]:

def generate_prompt(json_data, schema) -> dict:

    record = {
        "messages":[]
    }

    record['messages'].append(get_user_content(json_data, schema)) #user
    record['messages'].append(get_assistant_content(json_data)) #user

    return record


def get_user_content(json_data, schema):
    
    context_example = [
        "You are a Json test data generator generating valid json data based on a json schema. ",
        "You are a sample data generator that creates valid JSON data based on a schema.",
        "You are a structured data generator that creates valid JSON data based on a schema."
    ]

    context = random.choice(context_example)
    prompt = get_user_prompt(json_data, schema)
    schema = f"\nGiven the Schema: {json.dumps(schema)}" if not schema.get('title') in prompt else ""
    
    content = f"{context}{schema}\n{prompt}"
    return  {
        "role": "user",
        "content":f"{content}"
    }


def get_user_prompt(json_data, schema):

    schema_name = schema.get('title', '')
    # convert json_data keys to a comma-separated string
    properties = ", ".join(json_data.keys())
    # convert json_data values to a comma-separated string
    values = ", ".join([str(value) for value in json_data.values() if value is not None])
    # convert schema required properties to a comma-separated string
    required_properties = ", ".join(schema.get('required', []))
    # convert json_data properties and values to a formatted string
    properties_and_values = ", ".join([f"{key}: {value}" for key, value in json_data.items() if value is not None])

    templates = [
        "Create a JSON object that adheres to the given schema",
        "Produce a JSON object that conforms to the specified schema",
        "Generate a valid JSON object with the following properties: {properties}",
        "Generate a JSON object that includes the following properties: {properties}",
        "Create a JSON object with the required properties: {required_properties}",
        "Give a JSON data that has the required properties: {required_properties}",
        "Generate a JSON object with the following properties: {properties} and values: {values}",
        "Create a JSON object that contains the following properties: {properties} and values: {values}",        
        "Generate a JSON object that includes the following properties and values: {properties_and_values}",
        "Create a JSON object that contains the following properties and values: {properties_and_values}",
        "Generate a sample json for the {schema_name} schema",
        "Get a sample json for the {schema_name} schema",
        "Give a sample for the {schema_name} schema",
    ]
    only_required_properties_templates = [
        "Generate a JSON object that includes only the required properties",
        "Create a JSON object that contains only the required properties",
        "Produce a JSON object that adheres to the schema with only the required properties: {required_properties}",
        "Generate a JSON object that includes the required properties",
        "Create a JSON object that contains the required properties: {required_properties}",
        "Produce a JSON object that adheres to the schema with only the required properties",
        "Generate a JSON object that includes the required properties: {required_properties}",
        "Create a sample JSON object that contains the required properties: {required_properties}",
        "Generate a sample json for the {schema_name} schema with only required properties",
        "Get a sample json for the {schema_name} schema and include only required properties"
        
    ]
    all_properties_templates = [
        "Generate a JSON object that includes all properties",
        "Create a JSON object that contains all properties",
        "Produce a JSON object that adheres to the schema with all properties",
        "Generate a JSON object that includes all properties",
        "Create a JSON object that contains all properties",
        "Produce a JSON object that adheres to the schema with all properties",
        "Generate a sample json for the {schema_name} schema with all properties",
        "Give a sample for the {schema_name} schema and include all properties"
    ]
    
    # does json_data contain all properties in the schema?
    has_all_properties = set(json_data.keys()) == set(schema.get('properties', {}).keys())
    has_only_required_properties = set(json_data.keys()) == set(schema.get('required', []))

    if has_all_properties:
        template = random.choice(all_properties_templates)
    elif has_only_required_properties:
        template = random.choice(only_required_properties_templates)
    else:
        template = random.choice(templates)

    prompt = template.format(
        properties=properties,
        values=values,
        schema_name=schema_name,
        required_properties=required_properties,
        properties_and_values=properties_and_values
    )
    return prompt


def get_assistant_content(json_data):
    
    return  {
        "role": "assistant",
        "content": f"{json.dumps(json_data)}"
    }

### Generate Data

In [None]:
value_generator_utils = JsonPropertyValueGenerator()

generators = {}
for schema_name, schema in schemas.items():
    generators[schema_name] = value_generator_utils.get_property_value_generators(schema, include_nulls=True)

records = []
for schema_name, generator in generators.items():
    print(f"Generating {no_of_training_records_per_schema} records for schema: {schema_name}")
    records_generated = 0
    while records_generated < no_of_training_records_per_schema:
        record = {}
        for key, generate in generator.items():
            record[key] = generate() 

        record = remove_none(record)

        for custom_property, custom_function in custom_properties.items():
            if custom_property in schemas.get(schema_name).get('properties', {}):
                record[custom_property] = custom_function(record)
        
        # record['nm1_segment'] = get_nm1_segment(record)

        if validate_json(record, schemas.get(schema_name)):
            record = generate_prompt(record, schemas.get(schema_name))
            records.append(record)
            records_generated += 1


#shuffle the records
random.shuffle(records)

with open("../data/combinations.jsonl", "w") as outfile:
    for record in records:
        json.dump(record, outfile)
        outfile.write('\n')

            

Generating 250 records for schema: dtp_schema.json
Generating 250 records for schema: isa_schema.json
Generating 250 records for schema: name_schema.json
Generating 250 records for schema: ref_schema.json
