In [1]:
from tabulate import tabulate
import wcwidth

def abbrev(data, l=15):
    info = (data[:l] + '..') if len(data) > l else data
    return info

In [98]:
from sagas.ofbiz.connector import OfbizConnector
from sagas.ofbiz.finder import Finder
oc=OfbizConnector()
finder=Finder(oc)
print(oc.delegator.getDelegatorName())

default


In [100]:
def desc_relations(entity_name):
    table_header = ['name','type', 'string']
    table_data = []
    ent=oc.delegator.getModelEntity(entity_name)
    # print(ent.getPlainTableName(), ent.getTitle(), ent.getDescription())
    for rel in ent.getRelationsList(True, True, True):
        # print("\t", rel.getType(), rel)
        relation_name=rel.getTitle()+rel.getRelEntityName()
        table_data.append((abbrev(rel.getRelEntityName(), 20), 
                        rel.getType(), 
                        abbrev(relation_name, 25)))
    print(tabulate(table_data, headers=table_header, tablefmt='psql'))

In [102]:
# entity_name="Testing"
# entity_name="TestingType"
entity_name="TestingNode"
ent=oc.delegator.getModelEntity(entity_name)
print(ent.getPlainTableName(), ent.getTitle(), ent.getDescription())
table_header = ['name','type', 'string']
table_data = []
names=ent.getAllFieldNames()
for field_name in names:
    fld=ent.getField(field_name)
    table_data.append((abbrev(field_name, 20), 
                        fld.getType(), 
                        abbrev(fld.getColName(), 25)))
print(tabulate(table_data, headers=table_header, tablefmt='psql'))
desc_relations(entity_name)

TESTING_NODE Testing Node None
+---------------------+-------------+------------------------+
| name                | type        | string                 |
|---------------------+-------------+------------------------|
| lastUpdatedStamp    | date-time   | LAST_UPDATED_STAMP     |
| createdTxStamp      | date-time   | CREATED_TX_STAMP       |
| createdStamp        | date-time   | CREATED_STAMP          |
| description         | description | DESCRIPTION            |
| lastUpdatedTxStamp  | date-time   | LAST_UPDATED_TX_STAMP  |
| testingNodeId       | id          | TESTING_NODE_ID        |
| primaryParentNodeId | id          | PRIMARY_PARENT_NODE_ID |
+---------------------+-------------+------------------------+
+-------------------+--------+--------------------------+
| name              | type   | string                   |
|-------------------+--------+--------------------------|
| TestingNode       | one    | PrimaryParentTestingNode |
| TestingNode       | many   | PrimaryChildT

In [20]:
# the date and time types:
#     <field-type-def type="date-time" sql-type="DATETIME(3)" java-type="java.sql.Timestamp"/>
#     <field-type-def type="date" sql-type="DATE" java-type="java.sql.Date"/>
#     <field-type-def type="time" sql-type="TIME(3)" java-type="java.sql.Time"/>
type_mappings={"string":["blob","byte-array","object",
                         "date-time","date","time",
                         "id","id-long","id-vlong",
                        "indicator","very-short","short-varchar","long-varchar","very-long",
                        "comment","description","name","value",
                        "credit-card-number","credit-card-date","email","url","tel-number"],
              "float":["currency-amount","currency-precise","fixed-point","floating-point"],
              "int":["numeric"]}
gl_mappings={"string":"String", "float":"Float", "int":"Int"}
def get_mapping_type(field_type):
    for k, v in type_mappings.items():
        if field_type in v:
            return k
    raise ValueError("Cannot find mapping type for "+field_type)
def get_graphql_type(field_type):
    mt=get_mapping_type(field_type)
    return gl_mappings[mt]

print(get_mapping_type('date'))
print(get_mapping_type('currency-precise'))
print(get_graphql_type('currency-precise'))

string
float
Float


In [92]:
from sagas.util.str_converters import to_camel_case, to_snake_case

program_header='''import graphene
'''
program_footer='''
schema = graphene.Schema(query=Query)
'''
resover_def='''
    def resolve_{rel_name}(self, info):
        return {method}("{model}", {model}, {inputs})'''
headers=[]
footers=[]
headers.append(program_header)
footers.append(program_footer)
    
def gen_model(entity_name):
    lines=[]
    resolvers=[]
    ent=oc.delegator.getModelEntity(entity_name)
    
    lines.append("class {ent}(graphene.ObjectType):".format(ent=entity_name))
    names=ent.getAllFieldNames()
    for field_name in names:
        fld=ent.getField(field_name)
        fldtype=get_graphql_type(fld.getType())
        lines.append("    {name} = graphene.{type}()".format(name=to_snake_case(field_name),
                                                            type=fldtype))

    for rel in ent.getRelationsList(True, False, True):
        model_name=rel.getRelEntityName()
        rel_name=to_snake_case(model_name)    
        # extract relation fields
        inputs=[]
        for key in rel.getKeyMaps():
            inputs.append("{}=self.{}".format(key.getRelFieldName(), 
                                           to_snake_case(key.getFieldName())))
        # build
        if rel.getType()=="many":
            method="get_relations"
            lines.append("    {name} = graphene.List(lambda: {model})".format(
                name=rel_name, model=model_name))
        else:
            method="get_related_one"
            lines.append("    {name} = graphene.Field(lambda: {model})".format(
                name=rel_name, model=model_name))
        resolvers.append(resover_def.format(rel_name=rel_name,
                                           model=model_name,
                                           method=method,
                                           inputs=", ".join(inputs)))
    return "\n".join(lines+resolvers)

all_models=[]
models=["Testing", "TestingType", "TestingItem", "TestingNode", "TestingNodeMember"]
for mo in models:
    all_models.append(gen_model(mo))
# program=("\n".join(headers+lines+resolvers+footers))
program=("\n\n".join(headers+all_models))
print(program)
exec(program)

import graphene


class Testing(graphene.ObjectType):
    last_updated_stamp = graphene.String()
    comments = graphene.String()
    created_tx_stamp = graphene.String()
    testing_type_id = graphene.String()
    testing_size = graphene.Int()
    created_stamp = graphene.String()
    testing_id = graphene.String()
    description = graphene.String()
    last_updated_tx_stamp = graphene.String()
    testing_date = graphene.String()
    testing_name = graphene.String()
    testing_type = graphene.Field(lambda: TestingType)
    testing_item = graphene.List(lambda: TestingItem)
    testing_node_member = graphene.List(lambda: TestingNodeMember)

    def resolve_testing_type(self, info):
        return get_related_one("TestingType", TestingType, testingTypeId=self.testing_type_id)

    def resolve_testing_item(self, info):
        return get_relations("TestingItem", TestingItem, testingId=self.testing_id)

    def resolve_testing_node_member(self, info):
        return get_relations("Testi

In [95]:
def fill_record(model, val, rec):
    names=model.getAllFieldNames()
    for field_name in names:
        setattr(val, to_snake_case(field_name), rec[field_name])
def create_list(*args):
    m = oc.j.ArrayList()
    for e in args:
        m.append(e)
    return m

def get_related_one(entity, of_type, **kwargs):
    model=oc.delegator.getModelEntity(entity)
    rec=finder.find_one(entity, oc.jmap(**kwargs))
    instance=of_type()
    fill_record(model, instance, rec)
    return instance

def get_relations(entity, of_type, **kwargs):
    model=oc.delegator.getModelEntity(entity)
    fields=oc.jmap(**kwargs)
    orders=create_list()
    recs=oc.delegator.findByAnd(entity, fields, orders, True) 
    result=[]
    for rec in recs:
        val=of_type()
        fill_record(model, val, rec)
        result.append(val)
    return result

def fill_records(model, of_type, recs):
    result=[]
    for rec in recs:
        val=of_type()
        fill_record(model, val, rec)
        result.append(val)
    return result

class Query(graphene.ObjectType):
    testing = graphene.List(lambda: Testing)
    testing_node= graphene.List(lambda: TestingNode)

    def resolve_testing(self, info):
        entity_name="Testing"
        recs=oc.all(entity_name)
        ent=oc.delegator.getModelEntity(entity_name)
        result=fill_records(Testing, recs)
        return result
    
    def resolve_testing_node(self, info):
        # print("query testing_node")
        entity_name="TestingNode"
        recs=oc.all(entity_name)
        ent=oc.delegator.getModelEntity(entity_name)
        result=fill_records(ent, TestingNode, recs)
        return result

schema = graphene.Schema(query=Query)

In [97]:
import json

q1 = '''
{
  testing {
    testingId
    testingName
    testingTypeId
    testingType{
        lastUpdatedTxStamp
        description
    }
  }
}
'''.strip()
q2 = '''
{
  testingNode {
    testingNodeId
    testingNodeMember{
        testingNodeId
        testingId
    }
  }
}
'''.strip()
result = schema.execute(q2)
print(json.dumps(result.data, indent=2, ensure_ascii=False))

{
  "testingNode": [
    {
      "testingNodeId": "NODE_1",
      "testingNodeMember": [
        {
          "testingNodeId": "NODE_1",
          "testingId": "PERF_TEST_5"
        },
        {
          "testingNodeId": "NODE_1",
          "testingId": "PERF_TEST_6"
        },
        {
          "testingNodeId": "NODE_1",
          "testingId": "PERF_TEST_7"
        },
        {
          "testingNodeId": "NODE_1",
          "testingId": "PERF_TEST_8"
        },
        {
          "testingNodeId": "NODE_1",
          "testingId": "PERF_TEST_9"
        }
      ]
    }
  ]
}
