/
DynamicExtractors.scala
146 lines (129 loc) · 5.72 KB
/
DynamicExtractors.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/*
* This software is licensed under the Apache 2 license, quoted below.
*
* Copyright 2019 Astraea, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* [http://www.apache.org/licenses/LICENSE-2.0]
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
* SPDX-License-Identifier: Apache-2.0
*
*/
package org.locationtech.rasterframes.expressions
import geotrellis.proj4.CRS
import geotrellis.raster.{CellGrid, Tile}
import geotrellis.vector.Extent
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.jts.JTSTypes
import org.apache.spark.sql.rf.{RasterSourceUDT, TileUDT}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.locationtech.jts.geom.Envelope
import org.locationtech.rasterframes.encoders.CatalystSerializer._
import org.locationtech.rasterframes.model.{LazyCRS, TileContext}
import org.locationtech.rasterframes.ref.{ProjectedRasterLike, RasterRef, RasterSource}
import org.locationtech.rasterframes.tiles.ProjectedRasterTile
private[rasterframes]
object DynamicExtractors {
/** Partial function for pulling a tile and its context from an input row. */
lazy val tileExtractor: PartialFunction[DataType, InternalRow => (Tile, Option[TileContext])] = {
case _: TileUDT =>
(row: InternalRow) =>
(row.to[Tile](TileUDT.tileSerializer), None)
case t if t.conformsTo[ProjectedRasterTile] =>
(row: InternalRow) => {
val prt = row.to[ProjectedRasterTile]
(prt, Some(TileContext(prt)))
}
}
lazy val rasterRefExtractor: PartialFunction[DataType, InternalRow => RasterRef] = {
case t if t.conformsTo[RasterRef] =>
(row: InternalRow) => row.to[RasterRef]
}
lazy val tileableExtractor: PartialFunction[DataType, InternalRow => Tile] =
tileExtractor.andThen(_.andThen(_._1)).orElse(rasterRefExtractor.andThen(_.andThen(_.tile)))
lazy val rowTileExtractor: PartialFunction[DataType, Row => (Tile, Option[TileContext])] = {
case _: TileUDT =>
(row: Row) => (row.to[Tile](TileUDT.tileSerializer), None)
case t if t.conformsTo[ProjectedRasterTile] =>
(row: Row) => {
val prt = row.to[ProjectedRasterTile]
(prt, Some(TileContext(prt)))
}
}
/** Partial function for pulling a ProjectedRasterLike an input row. */
lazy val projectedRasterLikeExtractor: PartialFunction[DataType, InternalRow ⇒ ProjectedRasterLike] = {
case _: RasterSourceUDT ⇒
(row: InternalRow) => row.to[RasterSource](RasterSourceUDT.rasterSourceSerializer)
case t if t.conformsTo[ProjectedRasterTile] =>
(row: InternalRow) => row.to[ProjectedRasterTile]
case t if t.conformsTo[RasterRef] =>
(row: InternalRow) => row.to[RasterRef]
}
/** Partial function for pulling a CellGrid from an input row. */
lazy val gridExtractor: PartialFunction[DataType, InternalRow ⇒ CellGrid] = {
case _: TileUDT =>
(row: InternalRow) => row.to[Tile](TileUDT.tileSerializer)
case _: RasterSourceUDT =>
(row: InternalRow) => row.to[RasterSource](RasterSourceUDT.rasterSourceSerializer)
case t if t.conformsTo[RasterRef] ⇒
(row: InternalRow) => row.to[RasterRef]
case t if t.conformsTo[ProjectedRasterTile] =>
(row: InternalRow) => row.to[ProjectedRasterTile]
}
lazy val crsExtractor: PartialFunction[DataType, Any => CRS] = {
case _: StringType =>
(v: Any) => LazyCRS(v.asInstanceOf[UTF8String].toString)
case t if t.conformsTo[CRS] =>
(v: Any) => v.asInstanceOf[InternalRow].to[CRS]
}
lazy val extentLikeExtractor: PartialFunction[DataType, Any ⇒ Extent] = {
case t if org.apache.spark.sql.rf.WithTypeConformity(t).conformsTo(JTSTypes.GeometryTypeInstance) =>
(input: Any) => JTSTypes.GeometryTypeInstance.deserialize(input).getEnvelopeInternal
case t if t.conformsTo[Extent] =>
(input: Any) => input.asInstanceOf[InternalRow].to[Extent]
case t if t.conformsTo[Envelope] =>
(input: Any) => Extent(input.asInstanceOf[InternalRow].to[Envelope])
}
sealed trait TileOrNumberArg
sealed trait NumberArg extends TileOrNumberArg
case class TileArg(tile: Tile, ctx: Option[TileContext]) extends TileOrNumberArg
case class DoubleArg(value: Double) extends NumberArg
case class IntegerArg(value: Int) extends NumberArg
lazy val tileOrNumberExtractor: PartialFunction[DataType, Any => TileOrNumberArg] =
tileArgExtractor.orElse(numberArgExtractor)
lazy val tileArgExtractor: PartialFunction[DataType, Any => TileArg] = {
case t if tileExtractor.isDefinedAt(t) => {
case ir: InternalRow =>
val (tile, ctx) = tileExtractor(t)(ir)
TileArg(tile, ctx)
}
}
lazy val numberArgExtractor: PartialFunction[DataType, Any => NumberArg] =
doubleArgExtractor.orElse(intArgExtractor)
lazy val doubleArgExtractor: PartialFunction[DataType, Any => DoubleArg] = {
case _: DoubleType | _: FloatType | _: DecimalType => {
case d: Double => DoubleArg(d)
case f: Float => DoubleArg(f.toDouble)
case d: Decimal => DoubleArg(d.toDouble)
}
}
lazy val intArgExtractor: PartialFunction[DataType, Any => IntegerArg] = {
case _: IntegerType | _: ByteType | _: ShortType => {
case i: Int => IntegerArg(i)
case b: Byte => IntegerArg(b.toInt)
case s: Short => IntegerArg(s.toInt)
case c: Char => IntegerArg(c.toInt)
}
}
}