Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming new records into an existing parquet file in S3 #125

Open
designreact opened this issue May 13, 2021 · 5 comments
Open

Streaming new records into an existing parquet file in S3 #125

designreact opened this issue May 13, 2021 · 5 comments

Comments

@designreact
Copy link

I'm attempting to aggregate records by id as they are processed from SQS via lambda into S3.

I do get a merged file uploaded into S3 as I can see the filesize increasing each time the lambda runs, but when using parquet-tools to inspect the data I only see one result. I have a feeling this is due to multiple headers being set in the file and parquet-tools is only reading the latest entry.

Can anyone help me figure out a way to adapt my approach using part of the parquetjs library? The aim is to correctly stitch together the streams, I think I need to parse the oldStream chunks into parquet rows and then write them into a new write stream but being new to parquet and parquetjs I don't know where to start.

My approach could well be a poor one, but if possible I'd rather not create / maintain another process, e.g. a cloudwatch scheduled event to aggregate and repartition my data.

I think this may relate to: #120

Thank to all the contributers for all your hard work, this is a great library from what I've seen so far 👍🏻

My current approach (though a little broken):

Read existing parquet file as stream

const oldStream = new Stream.PassThrough();
getS3FileStream(Bucket, Key) // Key: guid=xyz/year=2021/month=04/2021.04.xyz.parquet
  .pipe(oldStream);

Create new parquet stream from SQS record

const recordStream = new Stream.PassThrough();
createRecordStream(record) // formats SQS Record data inline with schema
  .pipe(StreamArray.withParser())
  .pipe(new parquet.ParquetTransformer(schema, { useDataPageV2: false }))
  .pipe(recordStream);

Merge streams together

const combinedStream = new Stream.PassThrough();
mergeStream(oldStream, recordStream)
  .pipe(combinedStream);

Upload to S3

const upload = s3Stream.upload({
  Bucket,
  Key // Key: guid=xyz/year=2021/month=04/2021.04.xyz.parquet
});
combinedStream.pipe(upload);
@designreact
Copy link
Author

designreact commented May 14, 2021

Using the parquets fork which adds an openBuffer method. I've managed to get this working using ParquetReader and ParquetWriter, but not via streaming 😔

If my understanding is correct this means the whole file will be loaded into memory to perform updates. Given my S3 Key partitioning the size of my sqsRecords this may not be an issue in the short term but is not really the solution I was hoping for.

const upload = s3Stream.upload({ Bucket,  Key });
const existingRecords = await getS3FileBuffer(Bucket, Key);

const reader = await parquet.ParquetReader.openBuffer(existingRecords);
const writer = await parquet.ParquetWriter.openStream(schema, upload);
      
let cursor = reader.getCursor();
let rec = null;
while (rec = await cursor.next()) {
  writer.appendRow(rec)
}
writer.appendRow({ value: transformRecord(sqsRecord) });
writer.close();

@paulocoghi
Copy link

Did you find any alternative solution, @designreact? I also plan to save to S3.

@aijazkhan81
Copy link

@paulocoghi , @designreact , did you find any way to upload the file to s3?

@mmuller88
Copy link

mmuller88 commented Dec 21, 2021

that seems to work:

import { S3Client } from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';

const createUploadStream = (
  fileName: string,
): { passStream: stream.PassThrough; parallelUploadS3: Upload } => {
  const passStream = new stream.PassThrough();
  const uploadKeyName = fileName + '.parquet';
  const parallelUploadS3 = new Upload({
    client: new S3Client({}),
    queueSize: 4,
    params: {
      Bucket: downloadBucketName,
      Key: uploadKeyName,
      Body: passStream,
    },
  });

  parallelUploadS3.on('httpUploadProgress', (progress) => {
    console.log(`Download part: ${progress.part}`);
  });

  return { passStream, parallelUploadS3 };
};

export const handler = async (event: DownloadLambdaEvent) => {
  console.log('Processing event: ', JSON.stringify(event, null, 2));
  try {
  
const { parallelUploadS3, passStream } = createUploadStream(event.fileName);

https.get(`${DownloadUrlPrefix}${event.fileName}.zip`, (fileStream) => {
      fileStream.pipe(unzipper.ParseOne()).pipe(ndjson.parse()).pipe(new parquet.ParquetTransformer(schema, opts)).pipe(passStream);
    });
    ...

@maelp
Copy link

maelp commented Jan 21, 2024

Does someone know how we would do this for Google Cloud bucket?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants