From 392f334064db8704158b7b9cd628a5fb19849ffa Mon Sep 17 00:00:00 2001 From: Filipe Regadas Date: Tue, 12 Mar 2019 15:05:56 -0400 Subject: [PATCH] Explicit reset position on SeekableInput (#1747) --- .../spotify/scio/avro/types/AvroTapIT.scala | 41 +++++++++++++++++++ .../com/spotify/scio/io/FileStorage.scala | 10 +++-- 2 files changed, 48 insertions(+), 3 deletions(-) create mode 100644 scio-avro/src/it/scala/com/spotify/scio/avro/types/AvroTapIT.scala diff --git a/scio-avro/src/it/scala/com/spotify/scio/avro/types/AvroTapIT.scala b/scio-avro/src/it/scala/com/spotify/scio/avro/types/AvroTapIT.scala new file mode 100644 index 0000000000..1cbe595d13 --- /dev/null +++ b/scio-avro/src/it/scala/com/spotify/scio/avro/types/AvroTapIT.scala @@ -0,0 +1,41 @@ +/* + * Copyright 2019 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.spotify.scio.avro.types + +import com.spotify.scio.avro.AvroTaps +import com.spotify.scio.io.Taps +import org.apache.avro.generic.GenericRecord +import org.apache.beam.sdk.io.FileSystems +import org.apache.beam.sdk.options.PipelineOptionsFactory +import org.scalatest.{FlatSpec, Matchers} + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +final class AvroTapIT extends FlatSpec with Matchers { + + it should "read avro file" in { + FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create) + + val asd = AvroTaps(Taps()).avroFile[GenericRecord]( + "gs://data-integration-test-eu/avro-integration-test/folder-a/folder-b/shakespeare.avro") + val result = Await.result(asd, Duration.Inf) + + result.value.hasNext shouldBe true + } + +} diff --git a/scio-core/src/main/scala/com/spotify/scio/io/FileStorage.scala b/scio-core/src/main/scala/com/spotify/scio/io/FileStorage.scala index 5a5bac8c11..dc2f115dea 100644 --- a/scio-core/src/main/scala/com/spotify/scio/io/FileStorage.scala +++ b/scio-core/src/main/scala/com/spotify/scio/io/FileStorage.scala @@ -52,9 +52,13 @@ private[scio] final class FileStorage(protected[scio] val path: String) { private def getAvroSeekableInput(meta: Metadata): SeekableInput = new SeekableInput { require(meta.isReadSeekEfficient) - private val in = - FileSystems.open(meta.resourceId()).asInstanceOf[SeekableByteChannel] - in.read(ByteBuffer.allocate(1)) // read a single byte to initialize the channel + private val in = { + val channel = FileSystems.open(meta.resourceId()).asInstanceOf[SeekableByteChannel] + // metadata is lazy loaded on GCS FS and only triggered upon first read + channel.read(ByteBuffer.allocate(1)) + // reset position + channel.position(0) + } override def read(b: Array[Byte], off: Int, len: Int): Int = in.read(ByteBuffer.wrap(b, off, len)) override def tell(): Long = in.position()