Skip to content
This repository has been archived by the owner on Nov 16, 2023. It is now read-only.

Dynamic schema generation #392

Closed
tschaub opened this issue Oct 27, 2022 · 7 comments
Closed

Dynamic schema generation #392

tschaub opened this issue Oct 27, 2022 · 7 comments

Comments

@tschaub
Copy link
Contributor

tschaub commented Oct 27, 2022

I was hoping I might be able to build a schema without having a struct type ahead of time. The use case is reading other structured data (where the type can only be inferred at runtime) and generating a Parquet file.

I see lots of useful logic used by the SchemaOf function, but I'm not yet seeing any exported functions that might let me build a schema without having a struct to use as a model.

It looks like I could write my own implementation of the Node interface and use that as the root in the NewSchema function. Is there another way that a schema might be built dynamically, or does this go against the aim of this library?

@tschaub
Copy link
Contributor Author

tschaub commented Oct 27, 2022

I came across Group and other Node-returning functions like String and Leaf. I think these will allow me to build up the root for a schema. In this case I'm trying to read in JSON data and convert to an appropriate Parquet schema. Given the limited types in JSON, this looks pretty doable with the functions that are exported.

I'll close this unless anyone has any other suggestions.

@tschaub
Copy link
Contributor Author

tschaub commented Oct 27, 2022

Hmm, I can create the schema, but am unable to create rows that can be written. Here is a modification of the write (any) example that demonstrates the issue I'm running into:

package main

import (
	"bytes"
	"log"

	"github.com/segmentio/parquet-go"
)

func main() {
	schema := parquet.SchemaOf(struct {
		Name   string
		Number int
	}{})

	buf := new(bytes.Buffer)
	err := parquet.Write(
		buf,
		[]any{
			map[string]any{"Name": "Luke", "Number": 42},
			map[string]any{"Name": "Han", "Number": 21},
		},
		schema,
	)
	if err != nil {
		log.Fatal(err)
	}
}

This panics with panic: cannot create parquet value of type BYTE_ARRAY from go value of type interface {} in value.go.

In my case, I'm dynamically generating the schema instead of calling SchemaOf with a struct. I'm reading JSON data, so my rows end up as map[string]any. It looks like this write any type use only works using a map with a more specific type.

Curious if anybody thinks there is hope to use this library to write parquet data where the schema is only known at runtime.

@hhoughgg
Copy link
Contributor

hhoughgg commented Oct 28, 2022

        ds := someData()        
        schema := generateAtRunTime()
        structFields := make([]reflect.StructField, 0, len(schema))
	for _, field := range schema {
		structFields = append(structFields, reflect.StructField{
			Name: capitalizeString(field.Name),
			Type: field.Type,
			Tag:  reflect.StructTag(field.Tag),
		})
	}

	customStruct := reflect.StructOf(structFields)
	customStructSlice := reflect.MakeSlice(reflect.SliceOf(customStruct), len(ds), len(ds))

You can use reflection to dynamically build a struct based on some run time schema. Then you can set each field also using reflection: customStructSlice.Index(0).Field(0).SetInt(100).

You can convert each struct back to an interface with customStructSlice.Index(0).Interface() and use it as an argument to write as well as schemaOf().

I don't think this library supports map[string]any. There are other libraries that do support it but I would do the above instead as in my experience the performance is much better. You can allocate all your memory in a single go with the above vs an allocation must be done for every single map key with map[string]any.

@tschaub
Copy link
Contributor Author

tschaub commented Oct 28, 2022

Thank you for the suggestion, @hhoughgg. I'll give this approach a try.

I can't generate the schema until I parse (some of) the JSON data, so I'm going to have some map[string]any at some point. I guess the steps will look something like:

  1. decode JSON records as []map{string}any
  2. generate struct type representing the decoded records above
  3. generate schema from struct type
  4. convert map{string}any elements into structs
  5. write the structs to parquet

@tschaub
Copy link
Contributor Author

tschaub commented Nov 2, 2022

The approach above is working for me, so I'll close this. Thanks for the pointers, @hhoughgg.

@tschaub tschaub closed this as completed Nov 2, 2022
@KasonBraley
Copy link

@tschaub I am having the problem as you did and have your steps have helped me out a lot to almost get there. But I am stuck on step 5, and unable to have my custom struct created via reflect able to be written correctly. Were you able to used the GenericWriter with your approach? Or did you have to revert back to the deprecated Writer type? I've been trying to use the GenericWriter, but the problem I am running in to is that a reflect.Type or reflect.Value does not appear to satisfy generics. I am not very familiar with the reflect package, nor Parquet in general, so I could just be missing something.

This is the error I am mostly getting: panic: reflect: call of reflect.Value.MapIndex on struct Value. Although I've tried various different approaches with no success.

This is essentially what I have, after retrieving the details needed for the schema creation from somewhere else. I am just not sure what to rows type in WriteFile, as the code below does not compile.

	group := parquet.Group{}
	structFields := make([]reflect.StructField, 0, len(dtcs))
	for _, dtc := range dtcs {
		tag := reflect.StructTag("parquet:" + strconv.Quote(dtc.ColumnName+",optional"))
		var reflectType reflect.Type

		switch dtc.DatastoreType {
		case "string":
			reflectType = reflect.TypeOf(string(""))
			group[dtc.ColumnName] = parquet.Optional(parquet.String())
		case "int":
			reflectType = reflect.TypeOf(int32(0))
			group[dtc.ColumnName] = parquet.Optional(parquet.Int(32))
		case "long":
			reflectType = reflect.TypeOf(int64(0))
			group[dtc.ColumnName] = parquet.Optional(parquet.Int(64))
		case "double":
			reflectType = reflect.TypeOf(float64(0))
			group[dtc.ColumnName] = parquet.Optional(parquet.Leaf(parquet.DoubleType))
		case "float":
			reflectType = reflect.TypeOf(float32(0))
			group[dtc.ColumnName] = parquet.Optional(parquet.Leaf(parquet.FloatType))
		default:
			log.Println("unsupported datastore type")
		}

		structFields = append(structFields, reflect.StructField{
			Name: caser.String(dtc.ColumnName),
			Type: reflectType,
			Tag:  tag,
		})
	}

	structForSchemaType := reflect.StructOf(structFields)

	newInput := reflect.New(structForSchemaType)

	for logKey, logValue := range m {
		structFieldValue := newInput.Elem().FieldByName(caser.String(logKey))

		if !structFieldValue.IsValid() {
			log.Printf("not valid: %v: %v", logKey, logValue)
			continue
		}

		switch structFieldValue.Kind() {
		case reflect.String:
			structFieldValue.SetString(fmt.Sprintf("%v", logValue))
		case reflect.Int, reflect.Int32, reflect.Int64:
			switch typedLogValue := logValue.(type) {
			case int:
				structFieldValue.SetInt(int64(typedLogValue))
			case string:
				val, err := strconv.ParseInt(typedLogValue, 10, 32)
				if err != nil {
					// return fmt.Errorf("failed to parse 'int' value to type int: %w", err)
				}
				structFieldValue.SetInt(int64(val))
			}
		case reflect.Float32, reflect.Float64:
			switch typedLogValue := logValue.(type) {
			case float64:
				structFieldValue.SetFloat(typedLogValue)
			case string:
				val, err := strconv.ParseFloat(typedLogValue, 64)
				if err != nil {
					// return fmt.Errorf("failed to parse 'double' value to type int: %w", err)
				}
				structFieldValue.SetFloat(val)
			}
		}
	}

	schema := parquet.NewSchema("Record", group)

	// doesn't complile. Error is `variable of type reflect.Value is not a type`
	err = parquet.WriteFile("./foo.parquet", []newInput{newInput}, schema)
	if err != nil {
		log.Fatal()
	}

Sorry for commenting on an old & closed issue. I'll probably create a new issue with this question if I don't get a response.

@kevinburkesegment
Copy link
Contributor

kevinburkesegment commented Jul 28, 2023 via email

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants