从hive数据库中的student表中读取数据过滤出年龄大于16的记录写入到stuout表中。
Flink hive 支持Hive 2.3.4 和 1.2.1 以及和其相近兼容的版本。
配置hive版本
<hive.version>1.2.1</hive.version>
加入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Hadoop Dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.9.0</version>
</dependency>
<!-- Hive 2.3.4 is built with Hadoop 2.7.2.
We pick 2.7.5 which flink-shaded-hadoop is pre-built with,
but users can pick their own hadoop version,
as long as it's compatible with Hadoop 2.7.2 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-8.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Hive Metastore -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-vector-code-gen</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-llap-tez</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-shims</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
<exclusion>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
<exclusion>
<groupId>org.antlr</groupId>
<artifactId>antlr-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.antlr</groupId>
<artifactId>ST4</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.ant</groupId>
<artifactId>ant</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.ivy</groupId>
<artifactId>ivy</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>apache-curator</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-druid</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-avatica</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</exclusion>
<exclusion>
<groupId>stax</groupId>
<artifactId>stax-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
在resources
目录下新建 hive-site.xml
。
注意修改其中的hive thrift和hive metastore的地址和连接信息。
文件内容如下
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://127.0.0.1:9083</value>
<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://127.0.0.1:3306/hive?createDatabaseIfNotExist=true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hive</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>hive</value>
</property>
<property>
<name>hive.server2.thrift.port</name>
<value>10009</value>
</property>
<property>
<name>hive.server2.enable.doAs</name>
<value>false</value>
</property>
<property>
<name>datanucleus.schema.autoCreateAll</name>
<value>true</value>
</property>
</configuration>
新建student.txt 内容如下
xiaoming,17,3-1
lilei,18,3-2
lucy,17,2-1
lily,15,2-2
登入hive命令行
创建数据表
create table if not exists student (
name string,
age int,
class string
)
row format delimited
fields terminated by ','
lines terminated by '\n'
stored as textfile;
create table if not exists stuout (
name string,
age int,
class string
)
row format delimited
fields terminated by ','
lines terminated by '\n'
stored as textfile;
查看数据表列表
show tables
导入数据到student表中
load data local inpath '/home/flink/student.txt' into table student;
查看数据
select * from student;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
/**
* hive中数据表的读写
*/
public class HiveTableApi {
public static void main(String[] args) throws Exception{
// 运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 使用Blink 当前版本的flink hive 只支持在batch模式下写
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();
// table 环境
TableEnvironment tableEnv = TableEnvironment.create(bsSettings);
// hive-site.xml 所在目录地址
String rootDir = CsvStreamApi.class.getResource("/").toURI().getPath();
String name = "default";
String defaultDatabase = "default";
String hiveConfDir = rootDir;
String version = "1.2.1"; // 2.3.4 or 1.2.1
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tableEnv.registerCatalog(name, hive);
tableEnv.useCatalog(name);
tableEnv.sqlQuery("select * from student where age > 16")
.insertInto("stuout");
// tableEnv.sqlUpdate("insert into stuout select * from student where age > 16");
// 输出到本地文件
// Table src = tableEnv.sqlQuery("select * from student where age > 16");
// tableEnv.toAppendStream(src, Row.class)
// .writeAsText("hive.txt");
tableEnv.execute("hive");
}
}
注意hdfs中文件的权限问题。
执行完毕后在hive命令行中查看
select * from stuout;
flink hive功能是1.9版本新加入的功能,功能还不完善,使用时限制比较多,当前版本支持在stream下读取数据,只支持在batch模式下写入数据,写入数据不支持分区表等。