Skip to content
This repository has been archived by the owner on Apr 30, 2022. It is now read-only.

Commit

Permalink
[FLINK-4108] [scala] Respect ResultTypeQueryable for InputFormats.
Browse files Browse the repository at this point in the history
This closes apache#2619
  • Loading branch information
twalthr authored and skidder committed Dec 27, 2016
1 parent fca5326 commit 796ad5f
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
*/
@PublicEvolving
def getRestartStrategy: RestartStrategyConfiguration = {
javaEnv.getRestartStrategy()
javaEnv.getRestartStrategy
}

/**
Expand Down Expand Up @@ -381,7 +381,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
require(inputFormat != null, "InputFormat must not be null.")
require(filePath != null, "File path must not be null.")
inputFormat.setFilePath(new Path(filePath))
createInput(inputFormat, implicitly[TypeInformation[T]])
createInput(inputFormat, explicitFirst(inputFormat, implicitly[TypeInformation[T]]))
}

/**
Expand All @@ -392,7 +392,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
if (inputFormat == null) {
throw new IllegalArgumentException("InputFormat must not be null.")
}
createInput(inputFormat, implicitly[TypeInformation[T]])
createInput(inputFormat, explicitFirst(inputFormat, implicitly[TypeInformation[T]]))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ package org.apache.flink.api
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.api.java.{DataSet => JavaDataSet}
import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo, TypeUtils, ScalaNothingTypeInfo}
import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo, ScalaNothingTypeInfo, TypeUtils}

import _root_.scala.reflect.ClassTag
import language.experimental.macros
Expand Down Expand Up @@ -52,6 +53,14 @@ package object scala {
// We need to wrap Java DataSet because we need the scala operations
private[flink] def wrap[R: ClassTag](set: JavaDataSet[R]) = new DataSet[R](set)

// Checks if object has explicit type information using ResultTypeQueryable
private[flink] def explicitFirst[T](
funcOrInputFormat: AnyRef,
typeInfo: TypeInformation[T]): TypeInformation[T] = funcOrInputFormat match {
case rtq: ResultTypeQueryable[T] => rtq.getProducedType
case _ => typeInfo
}

private[flink] def fieldNames2Indices(
typeInfo: TypeInformation[_],
fields: Array[String]): Array[Int] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.scala.typeutils

import org.apache.flink.api.common.io.FileInputFormat
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.typeutils.TypeExtractionTest.CustomTypeInputFormat
import org.apache.flink.util.TestLogger
import org.junit.Assert.assertEquals
import org.junit.Test
import org.scalatest.junit.JUnitSuiteLike


class TypeExtractionTest extends TestLogger with JUnitSuiteLike {

@Test
def testResultTypeQueryable(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val productedType = env.createInput(new CustomTypeInputFormat).getType()
assertEquals(productedType, BasicTypeInfo.LONG_TYPE_INFO)
}

}

object TypeExtractionTest {
class CustomTypeInputFormat extends FileInputFormat[String] with ResultTypeQueryable[Long] {

override def getProducedType: TypeInformation[Long] =
BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]

override def reachedEnd(): Boolean = throw new UnsupportedOperationException()

override def nextRecord(reuse: String): String = throw new UnsupportedOperationException()
}
}

0 comments on commit 796ad5f

Please sign in to comment.