Skip to content

Commit

Permalink
Explicit reset position on SeekableInput (#1747)
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas committed Mar 12, 2019
1 parent 13cdd7a commit 392f334
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 3 deletions.
41 changes: 41 additions & 0 deletions scio-avro/src/it/scala/com/spotify/scio/avro/types/AvroTapIT.scala
@@ -0,0 +1,41 @@
/*
* Copyright 2019 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.spotify.scio.avro.types

import com.spotify.scio.avro.AvroTaps
import com.spotify.scio.io.Taps
import org.apache.avro.generic.GenericRecord
import org.apache.beam.sdk.io.FileSystems
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.scalatest.{FlatSpec, Matchers}

import scala.concurrent.Await
import scala.concurrent.duration.Duration

final class AvroTapIT extends FlatSpec with Matchers {

it should "read avro file" in {
FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create)

val asd = AvroTaps(Taps()).avroFile[GenericRecord](
"gs://data-integration-test-eu/avro-integration-test/folder-a/folder-b/shakespeare.avro")
val result = Await.result(asd, Duration.Inf)

result.value.hasNext shouldBe true
}

}
10 changes: 7 additions & 3 deletions scio-core/src/main/scala/com/spotify/scio/io/FileStorage.scala
Expand Up @@ -52,9 +52,13 @@ private[scio] final class FileStorage(protected[scio] val path: String) {
private def getAvroSeekableInput(meta: Metadata): SeekableInput =
new SeekableInput {
require(meta.isReadSeekEfficient)
private val in =
FileSystems.open(meta.resourceId()).asInstanceOf[SeekableByteChannel]
in.read(ByteBuffer.allocate(1)) // read a single byte to initialize the channel
private val in = {
val channel = FileSystems.open(meta.resourceId()).asInstanceOf[SeekableByteChannel]
// metadata is lazy loaded on GCS FS and only triggered upon first read
channel.read(ByteBuffer.allocate(1))
// reset position
channel.position(0)
}
override def read(b: Array[Byte], off: Int, len: Int): Int =
in.read(ByteBuffer.wrap(b, off, len))
override def tell(): Long = in.position()
Expand Down

0 comments on commit 392f334

Please sign in to comment.