Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Production Use #2

Closed
kelindar opened this issue Oct 12, 2017 · 33 comments
Closed

Production Use #2

kelindar opened this issue Oct 12, 2017 · 33 comments

Comments

@kelindar
Copy link

Hi @xitongsys, it seems like this is under active development still, do you think it's ready for production use? We'd like to replace some of our spark pipelines :)

@nutzhub
Copy link

nutzhub commented Oct 12, 2017

I'm also testing it out as well, it would be a grate idea to provide godoc and ci badges

@xitongsys
Copy link
Owner

hi @kelindar @nut-abctech , sorry about the delay reply and thanks for your suggestions.
I'm trying to add more ut and regression test, godoc and ci. Help is needed and everything is welcome :)
On the safe side, I suggest to use it in production when the ut and regression test is finished.
Test and feedback is welcome, thanks!

@IkiM0no
Copy link

IkiM0no commented Oct 14, 2017

Hi @xitongsys , we are looking to convert .csv files on S3 to parquet. Would this tool be suitable?
From what I can see in /example/benchmark/WriteParquet.go, it looks like at line 80 you are generating some demo data in a loop.
Theoretically, could one do something similar, but instead loop over some .csv file, parse each row to an object like Student, and write to .parquet as you have? Thanks for your consideration.

@xitongsys
Copy link
Owner

xitongsys commented Oct 14, 2017

@IkiM0no Yes, it works. But you should provide the similar interface of S3 as my example.

In addition I'm considering to do some other work:
(1) add more examples of converting csv/orc... to parquet.
(2) considering the flat data is the most case, I will add some specific functions to convert to parquet, which will streamline the process and will have much better performance
So this project is still under active development and will have much more functions. You can follow the latest updating :)
Hope it's helpful

@IkiM0no
Copy link

IkiM0no commented Oct 14, 2017

Very helpful @xitongsys thank you very much.
I will attempt the .csv conversion to parquet using your library. If successful, I'll post a gist that perhaps you could incorporate into your examples.
Regarding "you should provide the similar interface of S3 as my example", could you point me to where this is? I can't seem to find it by searching "S3" in this repo. Thanks again!

@xitongsys
Copy link
Owner

@IkiM0no oh...sorry about my misleading expression
I have provided the local/hdfs examples, which have the interface implement. If you want to write/read file on S3, you should provide the similar interface.

@IkiM0no
Copy link

IkiM0no commented Oct 16, 2017

Thanks for the clarification @xitongsys :)
Sorry, but one more question. I was able to create the parquet file from csv, but when I upload to s3 and create table from the parquet file, all the data is 'broken', meaning columns contain byte characters.

I suspect maybe this is due to the parameters I am passing to ph.WriteInit(), particularly np and objAveSize. Should these values passed correspond to the dimensions of the table? What to pass here for these values, for example my data is 3 columns and 3 rows?

@xitongsys
Copy link
Owner

xitongsys commented Oct 16, 2017

@IkiM0no
These two parameters just influence the performance.
objAveSize is the approximated size of every row data.

what is the meaning of "broken" ? Do you store some strings, but get byte characters? like
"ABC" -> [65, 67, 68]
If it is this situation, you can use UTF8 type instead of BYTE_ARRAY.
Because BYTE_ARRAY is a base type in parquet, it can store many types of variables.
Besides I don't know how do you create table from S3 (maybe hive?), you can upload the results, which can give more hints.

@IkiM0no
Copy link

IkiM0no commented Oct 17, 2017

@xitongsys, thanks, let me provide some clarification and my steps.

My csv sitting on s3 is very simple, 3 rows, 3 columns like this:

rob,pike,50
jane,doe,33
foo,bar,22

My test script is just grabbing this file from S3, no problems there.
The rest of the script is taken from your example: /example/benchmark/WriteParquet.go
For each row in the csv i create a 'Person' struct (with the appropriate UT8 and INT64 datatypes), just like your 'Student' struct and write it to the parquet file.
I put this file back on s3 manually and in Impala create a table:

CREATE EXTERNAL TABLE temp.parqtest (`First` STRING, `Last` STRING, `Age` INT) 
STORED AS PARQUET 
LOCATION  's3a://path/to/parquet/directory/'
;

When I select * from that table temp.parqtest, the data is all jumbled together. It doesn't create the appropriate rows and columns I expect. I'll post a screen shot tomorrow.
Now I am attaching the parquet file and the go script I used.
Thank you very much for your assistance!

tmp.zip

@xitongsys
Copy link
Owner

Hi, @IkiM0no
In your pq.go Line 62
'ph.WriteInit(f, new(Person), 3, 30)' is error
you should use
'ph.WriteInit(f, new(PP), 3, 30)'

You can try it and if there are still error, please tell me :)

@IkiM0no
Copy link

IkiM0no commented Oct 17, 2017

Ah! Please forgive my user error. I have corrected this issue. I have also changed the variable names so they aren't reserved words 'First' and 'Last' in SQL, So 'First' > 'First_name', etc.

Now I can inspect the schema and file using parquet-tools.

$ parquet-tools schema test.parquet
message parquet_go_root {
  required int64 Person_age;
  required binary Last_name (UTF8);
  required binary First_name (UTF8);
}
$ parquet-tools head -n 5 test.parquet
Person_age = 50
Last_name = pike
First_name = rob
... etc

Great!

Interestingly though, the columns appear in reverse order. I have seen that this can be an issue for Impala.

Anyway, I dropped and re-created my table using either column order and refresh:

CREATE EXTERNAL TABLE staging.ptest 
(Person_age INT, Last_name STRING, First_name STRING)
--( First_name STRING, Last_name STRING, Person_age INT)
STORED AS PARQUET
LOCATION  's3a://<my_bucket>/path/to/test_parq/'
;
REFRESH staging.ptest;

Now when I query the table, Impala complains:

File 's3a://<my_bucket>/path/to/test_parq/test.parquet' uses an unsupported encoding: DELTA_BINARY_PACKED for column 'First'.

Can I specify some other encoding?

EDIT: I found this page stating that Imapala supports certain encodings: https://www.cloudera.com/documentation/enterprise/5-3-x/topics/impala_parquet.html

parquet-tools meta shows:

$ parquet-tools meta test.parquet
creator:     null

file schema: parquet_go_root
---------------------------------------------------------------------------------------------------------------------------------------
Person_age:  REQUIRED INT64 R:0 D:0
Last_name:   REQUIRED BINARY O:UTF8 R:0 D:0
First_name:  REQUIRED BINARY O:UTF8 R:0 D:0

row group 1: RC:3 TS:438
---------------------------------------------------------------------------------------------------------------------------------------
First_name:   BINARY SNAPPY DO:0 FPO:0 SZ:150/144/0.96 VC:3 ENC:PLAIN,DELTA_BINARY_PACKED,RLE,BIT_PACKED
Person_age:   INT64 SNAPPY DO:0 FPO:150 SZ:147/141/0.96 VC:3 ENC:PLAIN,DELTA_BINARY_PACKED,RLE,BIT_PACKED
Last_name:    BINARY SNAPPY DO:0 FPO:297 SZ:141/135/0.96 VC:3 ENC:PLAIN,DELTA_BINARY_PACKED,RLE,BIT_PACKED

@xitongsys
Copy link
Owner

@IkiM0no
These two issues are bugs and I have fixed. You can try to use the latest version.
Thanks :)

@IkiM0no
Copy link

IkiM0no commented Oct 18, 2017

@xitongsys
Happy to report that since your latest commit, my example is working! :)
Thank you for your support.
I will try on a production-scale test next to see what we can produce.

@xitongsys
Copy link
Owner

@IkiM0no
Happy to hear that and expect your further feedback :) thanks

@IkiM0no
Copy link

IkiM0no commented Oct 23, 2017

@xitongsys something I've been working on is abstracting the need to write/hard-code a struct for each csv file I am converting to parquet (with the help of parquet-go, of course (: ).

I'm expressing this as schemata.go in my project:

package main
import (
	"github.com/xitongsys/parquet-go/parquet"
)
type Record struct {
	Cols []Column
}
type Column struct {
	Value        string
	PqType     parquet.ConvertedType
}

and generating a list of type Record from my csv data at runtime.

The issue is, that type Record is not the kind of struct expected by parquet-go/ParquetHandler/Write.go

How would you recommend approaching this?

I feel like maybe Write.go should expose a method that implements Write functionality for structs like Record so that users of your library can avoid hard-coding data structures.

@xitongsys
Copy link
Owner

@IkiM0no
Thanks your suggestions and I have added a plugin named CSVWriter which can meet you requirement.
Descriptions has added in readme file and the example in example/csv_write.go. You can download the latest version.

@IkiM0no
Copy link

IkiM0no commented Oct 27, 2017

@xitongsys apologies for the delay, but I have now had time to test the CSVWriter and happy to report this works brilliantly! Many thanks. I've been telling people about this library and how great it is. Will continue to test and provide feedback as we scale up our efforts :)

@xitongsys
Copy link
Owner

xitongsys commented Oct 28, 2017

@IkiM0no
thanks your test and feedback.
Parquet-go is still under active development and it will have some changes. If your codes doesn't work using the latest codes, you can read the readme file :)
Today I add some features :

  1. you should firstly call Flush() before WriteStop().
  2. CSVWriter also changed. It support two types of input: string and values.

Maybe you should update your code.

The readme and examples has been updated, you can read it.

@IkiM0no
Copy link

IkiM0no commented Oct 29, 2017 via email

@IkiM0no
Copy link

IkiM0no commented Nov 17, 2017

Hi @xitongsys. I'm now using parquet-go to iterate over a data set and want to write to multiple parquet files based on a partition column in the data. So all rows with partition col = "01" write to 01.parq, if partition col = "02" write to 02.parq, etc.

I am spinning off a 'worker' go routine for each row and passing the row over a channel to a worker() function.

Now I would like to create a different version of the Create() method from your example that creates the parquet file if it does not exist (that partition has not yet been seen), or simply appends to that .parq file if it has already been created. I have done this successfully with plain .csv files, but am having trouble with parquet-go because ParquetFile is an an interface and I'm having trouble extending it.

This is my basic method:

func (self *PqFile) CreateOrAppend(name string) (ParquetFile, error) {
	//file, err := os.Create(name)
	file, err := os.OpenFile(name, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
	myFile := new(PqFile)
	myFile.File = file
	return myFile, err
}

Do you have advice on how I can add this new method to the *PqFile object?

@xitongsys
Copy link
Owner

hi, @IkiM0no , Do you mean you must use 'CreateOrAppend' as your function name? If it's not, why don't you put your code in Create() method?

@IkiM0no
Copy link

IkiM0no commented Nov 21, 2017

@xitongsys good suggestion, I have implemented Create() as suggested:

func (self *PqFile) Create(name string) (ParquetFile, error) {
	//file, err := os.Create(name)
	file, err := os.OpenFile(name, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
	myFile := new(PqFile)
	myFile.File = file
	return myFile, err
}

Below is writer.go in my project.

As a test, I added a csv writer to workerP() to test if I had some bug in my concurrency.

The csv files write as expected, and the .parq files are created, but only contain a string 'PAR1'. I know I have a bug somewhere. I have added lots of print statements, verified that the channels are sending and receiving data, but cannot seem to locate it.

Do you see that I have implemented something incorrectly in my NewCSVWriterHandler or maybe the Create function?

package main

import (
	"os"
	"fmt"
	"log"
	"sync"
	"strings"
	"crypto/rand"
	"encoding/csv"
	. "github.com/xitongsys/parquet-go/ParquetHandler"
	. "github.com/xitongsys/parquet-go/Plugin/CSVWriter"
)

var (
	// list of channels to communicate with workers
	// workers accessed synchronousely no mutex required
	workers = make(map[string]chan []string)

	// wg is to make sure all workers done before exiting main
	wg = sync.WaitGroup{}

	// mu used only for sequential printing, not relevant for program logic
	mu = sync.Mutex{}
	path = localPath
)

func writerT(csvRaw string, tableMetaData []MetadataType) {

	// wait for all workers to finish up before exit
	defer wg.Wait()

	// Set local instance of tableMetaData from what was passed in
	tableMetaData = tableMetaData
	s := strings.NewReader(csvRaw)
	r := csv.NewReader(s)

	lines, err := r.ReadAll()
	if err != nil {
		log.Fatalf("error reading all lines: %v", err)
	}
	numLines := len(lines) -1

	for idx, rec := range lines {
		if idx == numLines {
			stopWorkers()
			return
		}
		process(rec)
	}
}

func process(rec []string) {
	l := len(rec)
	part := rec[l-1]

	if c, ok := workers[part]; ok {
		// send rec to worker
		c <- rec
	} else {
		// if no worker for the partition

		// make a chan
		nc := make(chan []string)
		workers[part] = nc

		// start worker with this chan
		go workerP(nc)

		// send rec to worker via chan
		nc <- rec
	}
}

func workerP(c chan []string) {

	// wg.Done signals to main worker completion
	wg.Add(1)
	defer wg.Done()

	u, _ := newUUID()

	// CSV
	csvName := localPath + u + ".csv"
	// If the file doesn't exist, create it, or append to the file
	csvFile, err := os.OpenFile(csvName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
	if err != nil {
		log.Fatal(err)
	}
	writer := csv.NewWriter(csvFile)

	fName := localPath + u + ".parq"
	fmt.Println(fName)

	// PARQUET
	var f ParquetFile
	f = &PqFile{}
	f, err = f.Create(fName)
	if err != nil {
		fmt.Println(err)
	}
	ph := NewCSVWriterHandler()
	ph.WriteInit(tableMetaData, f, 10, 30)

	part := [][]string{}
	for {
		// wait for a rec or close(chan)
		rec, ok := <-c
		if ok {
			// save the rec
			// instead of accumulation in memory
			// this can be saved to file directly

			mu.Lock()
			row := make([]*string, len(rec))
			for j := 0; j < len(rec); j++ {
				row[j] = &rec[j]
			}
			ph.Write(row)

			// CSV
			if err := writer.Write(rec); err != nil {
				log.Fatal(err)
			}
			mu.Unlock()

			part = append(part, rec)
		} else {
			// channel closed on EOF
			// dump partition
			// locks ensures sequential printing
			// not a required for independent files

			mu.Lock()
			row := make([]*string, len(rec))
			for j := 0; j < len(rec); j++ {
				row[j] = &rec[j]
			}
			ph.Write(row)

			// CSV
			for _, p := range part {
				if err := writer.Write(p); err != nil {
					log.Fatal(err)
				}
				writer.Flush()
			}
			mu.Unlock()

			return
		}
	}
	ph.Flush()
	ph.WriteStop()
	f.Close()
	csvFile.Close()
}

// simply signals to workers to stop
func stopWorkers() {
	for _, c := range workers {
		// signal to all workers to exit
		close(c)
	}
}

// newUUID generates a random UUID according to RFC 4122
func newUUID() (string, error) {
	uuid := make([]byte, 16)
	n, err := io.ReadFull(rand.Reader, uuid)
	if n != len(uuid) || err != nil {
		return "", err
	}
	// variant bits; see section 4.1.1
	uuid[8] = uuid[8]&^0xc0 | 0x80
	// version 4 (pseudo-random); see section 4.1.3
	uuid[6] = uuid[6]&^0xf0 | 0x40
	return fmt.Sprintf("%x-%x-%x-%x-%x", uuid[0:4], uuid[4:6], uuid[6:8], uuid[8:10], uuid[10:]), nil
}

@xitongsys
Copy link
Owner

xitongsys commented Nov 21, 2017

hi @IkiM0no
In your code, the following part may be wrong:

row := make([]*string, len(rec))   
for j := 0; j < len(rec); j++ {   
       row[j] = &rec[j]   
}   
ph.Write(row)  

You should use ph.WriteString(rec).
Write function is used to write value directly. WriteString function is used to write strings and it convert the strings to corresponding type in metadata.
For details, you can read the example in readme file. Hope it's helpful :)

@IkiM0no
Copy link

IkiM0no commented Nov 22, 2017

Thank you for the suggestion @xitongsys :) I see what happened, I did not go get the latest version of Plugin/CSVWriter explicitly, so I had an older version without the WriteString() method.

I have implemented the change you suggest, but still see only "PAR1" in the parquet files, but complete data in the csv files (being written by the same workerP().

I wonder what the issue is because those are the only 4 bytes contained in each parquet file. I have added a small change to drop the last item in the row slice, as that is the partition column, not contained in tableMetaData, but apart from that it looks just like the README you provided. Anyway, I confirmed the worker can still 'see' the tableMetaData by printing it just before the call to ph.WriteInit(). The Create method is the same as I provided before.

This is my updated writer.go with your suggestion to use WriteString(), but still not working as expected.

I wonder, do you think that I need to go get all the packages in parquet-go again? Like maybe I missed some critical push you have done in the past month? Or do you see some issue with my code?

package main

import (
	"os"
	//"io"
	"fmt"
	"log"
	"sync"
	//"reflect"
	"strings"
	"encoding/csv"
	. "github.com/xitongsys/parquet-go/ParquetHandler"
	. "github.com/xitongsys/parquet-go/Plugin/CSVWriter"
)

var (
	// list of channels to communicate with workers
	// workers accessed synchronousely no mutex required
	workers = make(map[string]chan []string)

	// wg is to make sure all workers done before exiting main
	wg = sync.WaitGroup{}

	// mu used only for sequential printing, not relevant for program logic
	mu = sync.Mutex{}
	path = localPath

	// Moved to global scope
	//tableMetaData []MetadataType
)

func writerT(csvRaw string, tableMetaData []MetadataType) {

	// wait for all workers to finish up before exit
	defer wg.Wait()

	// Set local instance of tableMetaData from what was passed in
	tableMetaData = tableMetaData
	s := strings.NewReader(csvRaw)
	r := csv.NewReader(s)

	lines, err := r.ReadAll()
	if err != nil {
		log.Fatalf("error reading all lines: %v", err)
	}
	numLines := len(lines) -1

	for idx, rec := range lines {
		if idx == numLines {
			stopWorkers()
			return
		}
		process(rec)
	}
}

func process(rec []string) {
	l := len(rec)
	part := rec[l-1]

	if c, ok := workers[part]; ok {
		// send rec to worker
		c <- rec
	} else {
		// if no worker for the partition

		// make a chan
		nc := make(chan []string)
		workers[part] = nc

		// start worker with this chan
		go workerP(nc)

		// send rec to worker via chan
		nc <- rec
	}
}

func workerP(c chan []string) {

	// wg.Done signals to main worker completion
	wg.Add(1)
	defer wg.Done()

	u, _ := newUUID()

	// CSV
	csvName := localPath + u + ".csv"
	// If the file doesn't exist, create it, or append to the file
	csvFile, err := os.OpenFile(csvName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
	if err != nil {
		log.Fatal(err)
	}
	writer := csv.NewWriter(csvFile)

	fName := localPath + u + ".parq"
	fmt.Println(fName)

	// PARQUET
	var pf ParquetFile
	//var err error
	pf = &MyFile{}
	pf, err = pf.Create(fName)
	if err != nil {
		fmt.Println(err)
	}

    ph := NewCSVWriterHandler()
	fmt.Println("TMD:", tableMetaData)
    ph.WriteInit(tableMetaData, pf, 10, 30)

	part := [][]string{}
	for {
		// wait for a rec or close(chan)
		rec, ok := <-c
		if ok {
			// save the rec
			// instead of accumulation in memory
			// this can be saved to file directly

			mu.Lock()
			// PARQUET
			row := make([]*string, len(rec))
			for j := 0; j < len(rec); j++ {
				row[j] = &rec[j]
			}

			// drop partition col
			row = row[:len(row) - 1]
			ph.WriteString(row)

			// cannot use rec (type []string) as type []*string in argument to ph.WriteString
			//rec = rec[:len(rec) - 1]
			//ph.WriteString(rec)

			// CSV
			if err := writer.Write(rec); err != nil {
				log.Fatal(err)
			}
			mu.Unlock()

			part = append(part, rec)

		} else {
			// channel closed on EOF
			// dump partition
			// locks ensures sequential printing
			// not a required for independent files

			mu.Lock()
			// PARQUET
			for _, p := range part {
				row := make([]*string, len(p))
				for j := 0; j < len(p); j++ {
					row[j] = &p[j]
					// drop partition col
					noPart := row[:len(row) - 1]
					ph.WriteString(noPart)
				}
			}

			// CSV
			for _, p := range part {
				if err := writer.Write(p); err != nil {
					log.Fatal(err)
				}
				writer.Flush()
			}
			mu.Unlock()

			return
		}
	}
	ph.Flush()
	ph.WriteStop()
	pf.Close()
	csvFile.Close()
}

// simply signals to workers to stop
func stopWorkers() {
	for _, c := range workers {
		// signal to all workers to exit
		close(c)
	}
}

@xitongsys
Copy link
Owner

xitongsys commented Nov 25, 2017

hi, @IkiM0no Sorry about the delay. These days I rewrite many parts of parquet-go, which gives a great performance improvement. Some of the functions has changed. So please use the latest version.

Parquet writer has a buffer inside, so you can't create a new writer to append some other data to the same parquet file. I have changed many places and fixed some bugs in your code. Now it works.
(Besides I have something confused, such as why you write all data twice in the same file?)
Hope it's helpful :)

@xitongsys
Copy link
Owner

xitongsys commented Nov 26, 2017

@IkiM0no I have update parquet-go again and the reader/writer has been implemented inside and users need not to implement by themselves. Please use the latest version . I have changed and tested your code, it is ok.

package main

import (
	"encoding/csv"
	"fmt"
	"github.com/satori/go.uuid"
	. "github.com/xitongsys/parquet-go/ParquetFile"
	. "github.com/xitongsys/parquet-go/Plugin/CSVWriter"
	"log"
	"os"
	"strings"
	"sync"
)

var (
	workers       = make(map[string]chan []string)
	parquetWriter = make(map[string]*CSVWriter)
	csvWriter     = make(map[string]*csv.Writer)
	wg            = sync.WaitGroup{}
	mu            = sync.Mutex{}
	localPath     = "./"
	path          = localPath
	md            = []MetadataType{
		{Type: "UTF8", Name: "Name"},
		{Type: "INT32", Name: "Age"},
		{Type: "INT64", Name: "Id"},
		{Type: "FLOAT", Name: "Weight"},
		//{Type: "BOOLEAN", Name: "Sex"},
	}
)

func writerT(csvRaw string, tableMetaData []MetadataType) {
	defer wg.Wait()
	tableMetaData = tableMetaData
	s := strings.NewReader(csvRaw)
	r := csv.NewReader(s)

	lines, err := r.ReadAll()
	if err != nil {
		log.Fatalf("error reading all lines: %v", err)
	}
	numLines := len(lines) - 1

	for idx, rec := range lines {
		if idx == numLines {
			stopWorkers()
			return
		}
		process(rec)
	}
}

func process(rec []string) {
	l := len(rec)
	part := rec[l-1]

	if c, ok := workers[part]; ok {
		c <- rec
	} else {
		nc := make(chan []string)
		workers[part] = nc

		u := fmt.Sprintf("%v", uuid.NewV4())
		csvName := localPath + u + ".csv"
		csvFile, _ := os.OpenFile(csvName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
		csvWriter[part] = csv.NewWriter(csvFile)

		fName := localPath + u + ".parq"
		pf, _ := NewLocalFileWriter(fName)
		parquetWriter[part], _ = NewCSVWriter(md, pf, 10)

		go workerP(nc)
		nc <- rec
	}
}

func workerP(c chan []string) {
	wg.Add(1)
	defer wg.Done()

	var pw *CSVWriter
	var writer *csv.Writer

	part := [][]string{}
	for {
		// wait for a rec or close(chan)
		rec, ok := <-c
		if ok {
			key := rec[len(rec)-1]
			pw = parquetWriter[key]
			writer = csvWriter[key]

			mu.Lock()
			// PARQUET
			row := make([]*string, len(rec))
			for j := 0; j < len(rec); j++ {
				row[j] = &rec[j]
			}
			// drop partition col
			row = row[:len(row)-1]
			pw.WriteString(row)

			// CSV
			if err := writer.Write(rec); err != nil {
				log.Fatal(err)
			}
			mu.Unlock()
			part = append(part, rec)

		} else {
			if len(part) <= 0 {
				break
			}

			key := part[0][len(part[0])-1]
			pw = parquetWriter[key]
			writer = csvWriter[key]

			mu.Lock()
			// PARQUET
			for _, p := range part {
				row := make([]*string, len(p))
				for j := 0; j < len(p); j++ {
					row[j] = &p[j]
				}
				pw.WriteString(row[:len(row)-1])
			}

			// CSV
			for _, p := range part {
				if err := writer.Write(p); err != nil {
					log.Fatal(err)
				}
				writer.Flush()
			}
			mu.Unlock()

			break
		}
	}
	pw.Flush(true)
	pw.WriteStop()
}

// simply signals to workers to stop
func stopWorkers() {
	for _, c := range workers {
		// signal to all workers to exit
		close(c)
	}
}

func main() {
	num := 10
	res := ""
	for i := 0; i < num; i++ {
		res += fmt.Sprintf("%s_%d", "Student Name", i) + "," +
			fmt.Sprintf("%d", 20+i%5) + "," +
			fmt.Sprintf("%d", i) + "," +
			fmt.Sprintf("%f", 50.0+float32(i)*0.1) + "," +
			fmt.Sprintf("%t", i%2 == 0) + "\n"
	}
	log.Println(res)
	writerT(res, md)
}

@IkiM0no
Copy link

IkiM0no commented Nov 26, 2017

Thank you @xitongsys very much for your continued support and development on parquet-go! :)
I will begin testing on the new version tomorrow and will report back progress.

@IkiM0no
Copy link

IkiM0no commented Nov 28, 2017

@xitongsys happy to report this latest v0.9.8 is working very, very well! 👍 The new Writer capabilities are making this very fast now that we can use concurrency.

One thing I'm wondering still is about how to properly pass dates to parquet-go.
In my data, I have time stamp columns in this format: "2017-11-02 18:32:45.94". I have made sure that in table meta data these are assigned correctly to "DATE" type, but when I inspect the data with parquet-tools (cli utility) I see the year only, for example:

$ parquet-tools head -n 10 myfile.parq
ingest_datetime = 2017
event_datetime = 2017
... etc, more columns

This is the output for inspecting the same columns schema:

$ parquet-tools schema myfile.parq
 optional int32 ingest_datetime (DATE);
 optional int32 event_datetime (DATE);
... etc, more columns

Should I be formatting the time stamp(s) prior to writing to the file, perhaps it prefers Unix epoch time format?

@xitongsys
Copy link
Owner

@IkiM0no "DATE is used to for a logical date type, without a time of day. It must annotate an int32 that stores the number of days from the Unix epoch, 1 January 1970."
Example is in example/local_flat.go. I test the parquet file in spark/hive , and the output is

+-----------+---+---+------+-----+----------+
|       Name|Age| Id|Weight|  Sex|       Day|
+-----------+---+---+------+-----+----------+
|StudentName| 20|  0|  50.0| true|2017-11-28|
|StudentName| 21|  1|  50.1|false|2017-11-28|
|StudentName| 22|  2|  50.2| true|2017-11-28|
|StudentName| 23|  3|  50.3|false|2017-11-28|
|StudentName| 24|  4|  50.4| true|2017-11-28|
|StudentName| 20|  5|  50.5|false|2017-11-28|
|StudentName| 21|  6|  50.6| true|2017-11-28|
|StudentName| 22|  7|  50.7|false|2017-11-28|
|StudentName| 23|  8|  50.8| true|2017-11-28|
|StudentName| 24|  9|  50.9|false|2017-11-28|
+-----------+---+---+------+-----+----------+

If you want to store a time stamp, i think you should use TIMESTAMP_MILLIS
''TIMESTAMP_MILLIS is used for a combined logical date and time type, with millisecond precision. It must annotate an int64 that stores the number of milliseconds from the Unix epoch, 00:00:00.000 on 1 January 1970, UTC."
For more information, you can see the parquet-format project: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md

@tejasmanohar
Copy link

I was wondering if there's an update here as per whether this library should be considered stable for production usage.

@xitongsys
Copy link
Owner

As I known, several people have used this library in their production environments. So I think you can also have a try :)

@aohua
Copy link
Contributor

aohua commented Jun 1, 2018

@tejasmanohar I'm already using it in a production application. This was the only go library that I found can read and write parquet files. It saves a lot of work for me and it works very well so far. However, in my case the performance is not critical. Why not give it a try and maybe you can help to improve it as well.

@tejasmanohar
Copy link

tejasmanohar commented Jun 1, 2018

@xitongsys @aohua Sweet. I just wanted to know if folks are depending on it.

Why not give it a try

Because I can also just use the JVM like we have to for Spark :trollface:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants