-
Notifications
You must be signed in to change notification settings - Fork 0
/
readStream.sc
98 lines (83 loc) · 2.89 KB
/
readStream.sc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import $ivy.`org.apache.paimon:paimon-bundle:0.6.0-incubating`
import $ivy.`org.apache.paimon:paimon-s3:0.6.0-incubating`
import $ivy.`org.apache.flink:flink-shaded-hadoop-2-uber:2.8.3-10.0`
import $ivy.`org.apache.hadoop:hadoop-aws:2.8.3`
import $ivy.`org.slf4j:slf4j-log4j12:1.7.15`
import org.apache.paimon.predicate.PredicateBuilder
import org.apache.paimon.table.Table
import org.apache.paimon.table.source.AbstractInnerTableScan
import org.apache.paimon.types.{DataTypes, RowType}
import org.apache.paimon.catalog.{
Catalog,
Identifier,
CatalogContext,
CatalogFactory
}
import org.apache.paimon.fs.Path
import org.apache.paimon.options.CatalogOptions.WAREHOUSE
import org.apache.paimon.options.Options
import scala.jdk.CollectionConverters._
import scala.collection.mutable
import scala.concurrent.duration._
val warehouse = "s3://vvc-stage-streamhouse/datalake"
val database = "default"
def createFilesystemCatalog(s3Keys: Map[String, String]) = {
val options =
Options.fromMap((s3Keys + (WAREHOUSE.key() -> warehouse)).asJava)
val context = CatalogContext.create(options)
CatalogFactory.createCatalog(context)
}
def getTable(s3Keys: Map[String, String]) = {
val identifier = Identifier.create(database, "country_sales")
val catalog = createFilesystemCatalog(s3Keys)
catalog.getTable(identifier) -> catalog
}
val s3Access = List(
sys.env.get("S3_ACCESS_KEY").map(v => "s3.access-key" -> v),
sys.env.get("S3_SECRET_KEY").map(v => "s3.secret-key" -> v)
).flatten.toMap
val (table, catalog) = getTable(s3Access)
val builder = new PredicateBuilder(
RowType.of(DataTypes.STRING(), DataTypes.INT(), DataTypes.INT())
)
val projection = Array(0, 1, 2)
val readBuilder = table
.copy(Map("consumer-id" -> "myId").asJava)
.newReadBuilder()
.withProjection(projection)
val columnWidth = 25
def formatRow(row: Any*) =
row.map(_.toString().padTo(columnWidth, ' ').take(columnWidth)).mkString
def printReport(table: List[(String, String)], values: Iterable[String]) = {
print("\u001b[2J\u001b[;H")
println(
s"last read size: ${table.size}, last update: ${new java.util.Date()}"
)
values.foreach(println)
}
val scan = readBuilder.newStreamScan()
var checkpoint = Option.empty[Long]
val report = mutable.SortedMap[String, String]()
val scanInterval = 2.seconds
while (true)
try {
val splits = scan.plan().splits()
checkpoint = Some(scan.checkpoint())
val read = readBuilder.newRead()
val reader = read.createReader(splits)
val table =
(for row <- reader.toCloseableIterator.asScala
yield row.getString(0).toString ->
formatRow(
row.getRowKind(),
row.getString(0),
row.getInt(1),
row.getInt(2)
)).toList
report ++= table
printReport(table, report.values)
Thread.sleep(scanInterval.toMillis)
} catch
case e: Exception =>
e.printStackTrace()
checkpoint.foreach(c => scan.restore(c.toLong))