In [2]:
import os
import csv
import io
import json
from uuid import uuid4
from genson import SchemaBuilder
from flatten_json import flatten, unflatten
from datamodel_code_generator import DataModelType, PythonVersion
from datamodel_code_generator.model import get_data_model_types
from datamodel_code_generator.parser.jsonschema import JsonSchemaParser



def json_to_json_schema(input_json):
    input_dict = input_json.data
    builder = SchemaBuilder()
    builder.add_object(input_dict)
    json_schema = builder.to_schema()
    return json_schema

In [67]:
test_data = {"hello":[
    "home",
    "state",
    "country", 
    "world"
  ],
  "goodbye":{
      "lovers":[
      ],
      "friends":[
          "Rick", 
          "Bob"
          ]
      },
  "fu":256,
  "bar":[]
  }


In [68]:
builder = SchemaBuilder()
builder.add_object(test_data)
json_schema = builder.to_schema()

In [65]:

def unflatten_schema(json_schema):
    response = unflatten(json_schema)
    return response

def key_cleaner(key):
    clean_key = key.replace("properties", "")[1:]
    clean_key = clean_key.replace("..", ".")
    clean_key = clean_key.replace(".type", "")
    clean_key = clean_key.replace("items", "")
    clean_key = clean_key[:-1] if clean_key[-1]=="." else clean_key
    return clean_key

def empty_array_or_object_cleanup(clean_item, 
                                  empty_object_warning):
    if not empty_object_warning:
        clean_item['type']=None
    else:
        clean_item['type']="object"
    return clean_item

def clean_item_builder(clean_key, 
                       entry,
                       expecting_data_type):
    empty_object_warning = False
    clean_item = {
                        "uid": str(uuid4()),
                        "name": clean_key
                        }
    if entry == "array":
        clean_item['array']=True
        expecting_data_type = True
    elif entry=="object":
        clean_item['array']=False
        expecting_data_type=True
        empty_object_warning=True
    else:
        clean_item["array"]=False
        clean_item["type"] = entry
    return clean_item, expecting_data_type, empty_object_warning


def flat_schema_adjuster(flattened: dict):
    clean_entries = {}
    entries_being_processed = []
    clean_entry_array =[]
    loop_start = True
    expecting_data_type=False
    key_counter = 0
    adjustable_keys = [key for key in flattened.keys() if key.startswith("properties")]
    key_in_process = adjustable_keys[-1]

    for key in adjustable_keys:
        clean_key = key_cleaner(key)
        entry = flattened[key]

        if "required" in key or "data" in key:
            continue

        if key_in_process in key:
            empty_object_warning=False
            expecting_data_type=False

        if not loop_start and clean_key != key_in_process:
            key_counter = 0
            if key_counter == 0 and expecting_data_type:
                clean_item = empty_array_or_object_cleanup(clean_item,
                                                           entry)
                clean_entry_array.append(clean_item)

        if clean_key not in clean_entries.keys() and clean_key not in key_in_process:
            expecting_data_type=False
            clean_item, expecting_data_type, empty_object_warning = clean_item_builder(clean_key, 
                                                                                       entry,
                                                                                       expecting_data_type)
        else:
            clean_item['type'] = entry
            expecting_data_type = False
            
        if "type" in clean_item.keys():
            clean_entries[clean_key]="processed"
            clean_entry_array.append(clean_item)
        if clean_key not in entries_being_processed:
            entries_being_processed.append(clean_key)

        key_in_process = clean_key
        key_counter +=1
        loop_start = False

    if "type" not in clean_item.keys() and key==adjustable_keys[-1]: 
        clean_item = empty_array_or_object_cleanup(clean_item,
                                                  empty_object_warning)
        clean_entry_array.append(clean_item)
    return clean_entry_array



def csv_string_writer(clean_array: list):
    # Convert dictionary list to CSV string
    output = io.StringIO()
    fieldnames = clean_array[0].keys()
    writer = csv.DictWriter(output, fieldnames=fieldnames)
    
    writer.writeheader()
    writer.writerows(clean_array)

    csv_string = output.getvalue()
    return csv_string


def schema_to_flat_csv(json_schema:dict):
    flattened = flatten(json_schema,
                       separator=".")
    adjusted_schema = flat_schema_adjuster(flattened)
    for entry in adjusted_schema:
        entry['Required']=True
        entry["Default"]=None
    schema_csv = csv_string_writer(adjusted_schema)
    return schema_csv




In [69]:
flattened = flatten(json_schema,
                       separator=".")
adjusted_schema = flat_schema_adjuster(flattened)
for entry in adjusted_schema:
    entry['Required']=True
    entry["Default"]=None

In [70]:
adjusted_schema

[{'uid': '46f48237-d6b8-4943-aa49-94abc9762a63',
  'name': 'hello',
  'array': True,
  'type': 'string',
  'Required': True,
  'Default': None},
 {'uid': 'c17c3217-a0a0-4333-867b-07e1f40e9d2b',
  'name': 'goodbye.lovers',
  'array': True,
  'type': 'object',
  'Required': True,
  'Default': None},
 {'uid': '42d0ed18-53b8-48c3-9ce8-af23e8ec2804',
  'name': 'goodbye.friends',
  'array': True,
  'type': 'string',
  'Required': True,
  'Default': None},
 {'uid': '78d289b3-fa3c-4891-8078-7c03a967ff13',
  'name': 'fu',
  'array': False,
  'type': 'integer',
  'Required': True,
  'Default': None},
 {'uid': '8d60594d-86a8-4959-bc34-ef1e13c65275',
  'name': 'bar',
  'array': True,
  'type': None,
  'Required': True,
  'Default': None}]

In [31]:
data_model_types = get_data_model_types(
    DataModelType.PydanticV2BaseModel,
    target_python_version=PythonVersion.PY_312
)
parser = JsonSchemaParser(
   json.dumps(test_schema),
   data_model_type=data_model_types.data_model,
   data_model_root_type=data_model_types.root_model,
   data_model_field_type=data_model_types.field_model,
   data_type_manager_type=data_model_types.data_type_manager,
   dump_resolve_reference_action=data_model_types.dump_resolve_reference_action,
                       )
result = parser.parse()
print(result)

from __future__ import annotations

from typing import List, Optional

from pydantic import BaseModel


class Goodbye(BaseModel):
    lovers: Optional[List] = None
    friends: List[str]


class Model(BaseModel):
    hello: List[str]
    goodbye: Goodbye
    fu: str



In [53]:
str(uuid4())

'0fb0a94f-70ed-4739-aeea-750c567517e6'