diff --git a/README.md b/README.md index 71266dfa..09eaa6c0 100644 --- a/README.md +++ b/README.md @@ -8,40 +8,85 @@ Slightly Framework design to support Spring based java or Bigdata program. 1.Introduction - I.This project is base on Spring Framework and has four modules: - |----------------------------------------------------------------------------------------------------| - | Module | Description | - |----------------------------------------------------------------------------------------------------| - | Core | the core class include data access layer basic class(model,dao,service) and etc. | - |----------------------------------------------------------------------------------------------------| - | Comm | FileSystem Access tool(local/vfs),support FileFormat(csv/xml/json/avro/parquet/protobuf)| - | | ,support Compress Format(gzip/bzip2/snappy/lzo/zip/lzma/lz4) | - | | ,read and write excel,read word or PowerPoint | - |----------------------------------------------------------------------------------------------------| - |Hadooptool|FileSystem Access tool(hdfs), comm tool to access to HDFS,Hbase,Hive,Mongdb and etc | - |----------------------------------------------------------------------------------------------------| - |Example |springmvc config based and spring boot based Example; | - |----------------------------------------------------------------------------------------------------| - |Web |struts1,struts2 and springmvc support web component and required class. | - |----------------------------------------------------------------------------------------------------| - |Webui |Spring Boot with Oauth2 Thymeleaf Example; | - |----------------------------------------------------------------------------------------------------| - |Estool | ElasticSearch Comm Query tool | - |----------------------------------------------------------------------------------------------------| - |Tracer | Zipkin Brave tracing,Can trace All Database and Record parameters | - |----------------------------------------------------------------------------------------------------| - - II. Special feature + I.This project is base on Spring Framework and has below modules: + |----------------------------------------------------------------------------------------------------------------| + | Module | Description | + |----------------------------------------------------------------------------------------------------------------| + | Core | the core class include data access layer basic class(model,dao,service) and etc. | + |----------------------------------------------------------------------------------------------------------------| + | Comm | FileSystem Access tool(local/vfs),support FileFormat(csv/xml/json/avro/parquet/protobuf)| + | | ,support Compress Format(gzip/bzip2/snappy/lzo/zip/lzma/lz4) | + | | ,read and write excel,read word or PowerPoint | + |----------------------------------------------------------------------------------------------------------------| + |Hadooptool |FileSystem Access tool(hdfs), comm tool to access to HDFS,Hbase,Hive,Mongdb and etc | + |----------------------------------------------------------------------------------------------------------------| + |Example |springmvc config based and spring boot based Simple framework Example; | + |----------------------------------------------------------------------------------------------------------------| + |Web |struts1,struts2 and springmvc support web component and required class. | + |----------------------------------------------------------------------------------------------------------------| + |Webui |Spring Boot with Oauth2 Thymeleaf Example; | + |----------------------------------------------------------------------------------------------------------------| + |Estool | ElasticSearch Comm Query tool | + |----------------------------------------------------------------------------------------------------------------| + |Tracer | Zipkin Brave tracing,Can trace All Database and Record parameters | + |----------------------------------------------------------------------------------------------------------------| + + + It is available under the terms of either the Apache Software License 2.0 or the Eclipse Public License 1.0. + +2.Support Features + I. Construct Simple Java FrameWork + contain use sysrole and relation with customer privilege to use and roles; + 1.xml based standard frame : see example/config-example + 2.spring based standard frame: see example/boot-example + + II. Bigdata supprot + hadooptool: + HDFS tool: com.robin.hadoop.hdfs can access HDFS with kerberos security + Hbase tool: com.robin.hadoop.hbase hbase tool + Cassandra tool : CassandraUtils + + III. BigData common file format read/write tools support(including compress type support) + AVRO + PARQUET + ORC + PROTOBUF + CSV + XML + JSON + ARFF(weka format) + + IV. File storage and Cloud storage support + LocalFileSystem + ApacheVFS + HDFS + Amazon S3 + Aliyun OSS + Tencent COS + Apache Kafka + RabbitMq + + V. Iterable and wirtable support intergate Storage and file format + mix storage and File format to support cross storage read/write + + VI. Spring cloud support + WebUI simple webui base on dhtmlxGrid 5.1 with spring boot native + related project in my another project microservices + + VII. Zipkin Intergation + trace sub project aimed to support All database to be tracable and can record query parameters. + + VIII. Special feature a.A user defined xml Query config system,similar to mybatis,but easy config. b.Support defined annotation or jpa annotation in JdbcDao with ORM. c. BaseAnnotationService can access DB with minimize code,and use transaction with annotation. d.A common db access meta and util,can access all kind of db. e.Spring cloud based WebUI f.support Hadoop plateform - - It is available under the terms of either the Apache Software License 2.0 or the Eclipse Public License 1.0. - - 2.Development + g. Excel read write utils support auto merge columns and customer header define. + + +3.Development I.Model Layer:Simple ORM tool, Support JAVA JPA or my BaseObject Annotation Demostration:(support Composite primary key) @@ -221,14 +266,5 @@ Slightly Framework design to support Spring based java or Bigdata program. upon feature aim to simplify the work to develop standard MVC java code. - II. Bigdata supprot - hadooptool: - HDFS tool: com.robin.hadoop.hdfs can access HDFS with kerberos security - Hbase tool: com.robin.hadoop.hbase hbase tool - Cassandra tool : CassandraUtils - - III. Spring cloud support - WebUI simple webui base on dhtmlxGrid 5.1 with spring boot native - related project in my another project microservices diff --git a/common/src/main/java/com/robin/comm/util/xls/ExcelProcessor.java b/common/src/main/java/com/robin/comm/util/xls/ExcelProcessor.java index bb5ff58e..a9003989 100644 --- a/common/src/main/java/com/robin/comm/util/xls/ExcelProcessor.java +++ b/common/src/main/java/com/robin/comm/util/xls/ExcelProcessor.java @@ -232,8 +232,7 @@ public static Object readValue(Cell cell, String type, DateTimeFormatter format, break; case Const.META_TYPE_BIGINT: if (!StringUtils.isEmpty(str)) { - Double d = Double.valueOf(str); - strCell = d.longValue(); + strCell = Double.valueOf(str).longValue(); } else { strCell = 0L; } diff --git a/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractFileIterator.java b/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractFileIterator.java index 77fc62df..a09abcb7 100644 --- a/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractFileIterator.java +++ b/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractFileIterator.java @@ -144,4 +144,8 @@ public String getIdentifier() { public AbstractFileSystemAccessor getFileSystemAccessor(){ return accessUtil; } + + public void setAccessUtil(AbstractFileSystemAccessor accessUtil) { + this.accessUtil = accessUtil; + } } diff --git a/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractResIterator.java b/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractResIterator.java index 45d860c0..f94e6a38 100644 --- a/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractResIterator.java +++ b/common/src/main/java/com/robin/core/fileaccess/iterator/AbstractResIterator.java @@ -1,6 +1,7 @@ package com.robin.core.fileaccess.iterator; import com.robin.core.base.exception.OperationNotSupportException; +import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor; import com.robin.core.fileaccess.meta.DataCollectionMeta; import com.robin.core.fileaccess.meta.DataSetColumnMeta; import org.slf4j.Logger; @@ -51,4 +52,9 @@ public void setReader(BufferedReader reader) { public void setInputStream(InputStream inputStream) { throw new OperationNotSupportException(""); } + + @Override + public void setAccessUtil(AbstractFileSystemAccessor accessUtil) { + throw new OperationNotSupportException(""); + } } diff --git a/common/src/main/java/com/robin/core/fileaccess/iterator/ArffFileIterator.java b/common/src/main/java/com/robin/core/fileaccess/iterator/ArffFileIterator.java index 7aabe7a2..9b5c17c3 100644 --- a/common/src/main/java/com/robin/core/fileaccess/iterator/ArffFileIterator.java +++ b/common/src/main/java/com/robin/core/fileaccess/iterator/ArffFileIterator.java @@ -2,6 +2,7 @@ import com.google.common.collect.Lists; import com.robin.core.base.util.Const; +import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor; import com.robin.core.fileaccess.meta.DataCollectionMeta; import com.robin.core.fileaccess.meta.DataSetColumnMeta; import org.springframework.util.CollectionUtils; diff --git a/common/src/main/java/com/robin/core/fileaccess/iterator/IResourceIterator.java b/common/src/main/java/com/robin/core/fileaccess/iterator/IResourceIterator.java index c337e5e4..a274deb9 100644 --- a/common/src/main/java/com/robin/core/fileaccess/iterator/IResourceIterator.java +++ b/common/src/main/java/com/robin/core/fileaccess/iterator/IResourceIterator.java @@ -1,4 +1,6 @@ package com.robin.core.fileaccess.iterator; +import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor; + import java.io.BufferedReader; import java.io.Closeable; import java.io.InputStream; @@ -11,4 +13,5 @@ public interface IResourceIterator extends Iterator>, Closeab String getIdentifier(); void setInputStream(InputStream inputStream); void setReader(BufferedReader reader); + void setAccessUtil(AbstractFileSystemAccessor accessUtil); } \ No newline at end of file diff --git a/common/src/main/java/com/robin/core/fileaccess/iterator/JsonFileIterator.java b/common/src/main/java/com/robin/core/fileaccess/iterator/JsonFileIterator.java index 56ac7b6d..32181c6d 100644 --- a/common/src/main/java/com/robin/core/fileaccess/iterator/JsonFileIterator.java +++ b/common/src/main/java/com/robin/core/fileaccess/iterator/JsonFileIterator.java @@ -18,6 +18,7 @@ import com.google.gson.stream.JsonReader; import com.robin.core.base.util.Const; import com.robin.core.convert.util.ConvertUtil; +import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor; import com.robin.core.fileaccess.meta.DataCollectionMeta; import com.robin.core.fileaccess.meta.DataSetColumnMeta; import org.springframework.util.CollectionUtils; diff --git a/common/src/main/java/com/robin/core/fileaccess/iterator/PlainTextFileIterator.java b/common/src/main/java/com/robin/core/fileaccess/iterator/PlainTextFileIterator.java index c042ce4e..4c03e6d9 100644 --- a/common/src/main/java/com/robin/core/fileaccess/iterator/PlainTextFileIterator.java +++ b/common/src/main/java/com/robin/core/fileaccess/iterator/PlainTextFileIterator.java @@ -18,6 +18,7 @@ import com.robin.core.base.util.Const; import com.robin.core.base.util.StringUtils; import com.robin.core.convert.util.ConvertUtil; +import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor; import com.robin.core.fileaccess.meta.DataCollectionMeta; import com.robin.core.fileaccess.meta.DataSetColumnMeta; import org.springframework.util.ObjectUtils; diff --git a/common/src/main/java/com/robin/core/fileaccess/iterator/TextFileIteratorFactory.java b/common/src/main/java/com/robin/core/fileaccess/iterator/TextFileIteratorFactory.java index 2e16917b..f6c63103 100644 --- a/common/src/main/java/com/robin/core/fileaccess/iterator/TextFileIteratorFactory.java +++ b/common/src/main/java/com/robin/core/fileaccess/iterator/TextFileIteratorFactory.java @@ -49,7 +49,8 @@ public static AbstractFileIterator getProcessReaderIterator(DataCollectionMeta c Class iterclass=fileIterMap.get(fileType); try { if (!ObjectUtils.isEmpty(iterclass)) { - iterator = (AbstractFileIterator) iterclass.getConstructor(DataCollectionMeta.class, AbstractFileSystemAccessor.class).newInstance(colmeta,utils); + iterator = (AbstractFileIterator) iterclass.getConstructor(DataCollectionMeta.class).newInstance(colmeta); + iterator.setAccessUtil(utils); } iterator.beforeProcess(); }catch (Exception ex){ diff --git a/common/src/main/java/com/robin/core/fileaccess/iterator/XmlFileIterator.java b/common/src/main/java/com/robin/core/fileaccess/iterator/XmlFileIterator.java index 549cbcf6..60e3bd20 100644 --- a/common/src/main/java/com/robin/core/fileaccess/iterator/XmlFileIterator.java +++ b/common/src/main/java/com/robin/core/fileaccess/iterator/XmlFileIterator.java @@ -17,6 +17,7 @@ import com.robin.core.base.util.Const; import com.robin.core.convert.util.ConvertUtil; +import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor; import com.robin.core.fileaccess.meta.DataCollectionMeta; import com.robin.core.fileaccess.meta.DataSetColumnMeta; @@ -44,7 +45,10 @@ public XmlFileIterator(){ public XmlFileIterator(DataCollectionMeta metaList) { super(metaList); identifier= Const.FILEFORMATSTR.XML.getValue(); - + } + public XmlFileIterator(DataCollectionMeta metaList, AbstractFileSystemAccessor accessor) { + super(metaList,accessor); + identifier= Const.FILEFORMATSTR.XML.getValue(); } @Override public void beforeProcess() { diff --git a/common/src/main/java/com/robin/core/fileaccess/writer/AbstractQueueWriter.java b/common/src/main/java/com/robin/core/fileaccess/writer/AbstractQueueWriter.java index 1229bcff..ea625509 100644 --- a/common/src/main/java/com/robin/core/fileaccess/writer/AbstractQueueWriter.java +++ b/common/src/main/java/com/robin/core/fileaccess/writer/AbstractQueueWriter.java @@ -3,6 +3,7 @@ import com.robin.core.base.exception.OperationNotSupportException; import com.robin.core.base.util.Const; import com.robin.core.base.util.ResourceConst; +import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor; import com.robin.core.fileaccess.meta.DataCollectionMeta; import com.twitter.bijection.Injection; import com.twitter.bijection.avro.GenericAvroCodecs; @@ -86,4 +87,8 @@ public void setOutputStream(OutputStream outputStream){ throw new OperationNotSupportException(""); } + @Override + public void setAccessUtil(AbstractFileSystemAccessor accessUtil) { + throw new OperationNotSupportException(""); + } } diff --git a/common/src/main/java/com/robin/core/fileaccess/writer/IResourceWriter.java b/common/src/main/java/com/robin/core/fileaccess/writer/IResourceWriter.java index a9986254..425208b9 100644 --- a/common/src/main/java/com/robin/core/fileaccess/writer/IResourceWriter.java +++ b/common/src/main/java/com/robin/core/fileaccess/writer/IResourceWriter.java @@ -1,5 +1,7 @@ package com.robin.core.fileaccess.writer; +import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor; + import javax.naming.OperationNotSupportedException; import java.io.BufferedWriter; import java.io.Closeable; @@ -17,5 +19,6 @@ public interface IResourceWriter extends Closeable { String getIdentifier(); void setWriter(BufferedWriter writer); void setOutputStream(OutputStream outputStream); + void setAccessUtil(AbstractFileSystemAccessor accessUtil); } diff --git a/common/src/main/java/com/robin/core/fileaccess/writer/TextFileWriterFactory.java b/common/src/main/java/com/robin/core/fileaccess/writer/TextFileWriterFactory.java index 4d6dd3dd..6e2f43aa 100644 --- a/common/src/main/java/com/robin/core/fileaccess/writer/TextFileWriterFactory.java +++ b/common/src/main/java/com/robin/core/fileaccess/writer/TextFileWriterFactory.java @@ -90,7 +90,8 @@ public static IResourceWriter getWriterByType(DataCollectionMeta colmeta, Abstra Class writerClass=fileWriterMap.get(fileSuffix); if (!ObjectUtils.isEmpty(writerClass)) { - fileWriter = writerClass.getConstructor(DataCollectionMeta.class,AbstractFileSystemAccessor.class).newInstance(colmeta,accessor); + fileWriter = writerClass.getConstructor(DataCollectionMeta.class).newInstance(colmeta); + fileWriter.setAccessUtil(accessor); logger.info("using resource writer {}",writerClass.getCanonicalName()); } diff --git a/common/src/main/java/com/robin/core/resaccess/iterator/AbstractQueueIterator.java b/common/src/main/java/com/robin/core/resaccess/iterator/AbstractQueueIterator.java index ec9fe306..2a63d4d6 100644 --- a/common/src/main/java/com/robin/core/resaccess/iterator/AbstractQueueIterator.java +++ b/common/src/main/java/com/robin/core/resaccess/iterator/AbstractQueueIterator.java @@ -1,5 +1,7 @@ package com.robin.core.resaccess.iterator; +import com.robin.core.base.exception.OperationNotSupportException; +import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor; import com.robin.core.fileaccess.iterator.AbstractResIterator; import com.robin.core.fileaccess.meta.DataCollectionMeta; import com.robin.core.fileaccess.util.AvroUtils; @@ -46,4 +48,9 @@ public Map next() { return null; } public abstract List> pollMessage() throws IOException; + + @Override + public void setAccessUtil(AbstractFileSystemAccessor accessUtil) { + throw new OperationNotSupportException(""); + } } diff --git a/common/src/main/java/com/robin/core/resaccess/iterator/JdbcResIterator.java b/common/src/main/java/com/robin/core/resaccess/iterator/JdbcResIterator.java index 123eae81..cc2ddb9a 100644 --- a/common/src/main/java/com/robin/core/resaccess/iterator/JdbcResIterator.java +++ b/common/src/main/java/com/robin/core/resaccess/iterator/JdbcResIterator.java @@ -4,7 +4,9 @@ import com.robin.comm.dal.holder.db.JdbcResourceHolder; import com.robin.comm.dal.pool.ResourceAccessHolder; import com.robin.core.base.dao.SimpleJdbcDao; +import com.robin.core.base.exception.OperationNotSupportException; import com.robin.core.base.spring.SpringContextHolder; +import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor; import com.robin.core.fileaccess.iterator.AbstractResIterator; import com.robin.core.fileaccess.meta.DataCollectionMeta; import lombok.extern.slf4j.Slf4j; @@ -107,4 +109,8 @@ public String getIdentifier() { return "jdbc"; } + @Override + public void setAccessUtil(AbstractFileSystemAccessor accessUtil) { + throw new OperationNotSupportException(""); + } } diff --git a/core/src/main/java/com/robin/core/base/dao/JdbcDao.java b/core/src/main/java/com/robin/core/base/dao/JdbcDao.java index cfb1d98e..2a174145 100644 --- a/core/src/main/java/com/robin/core/base/dao/JdbcDao.java +++ b/core/src/main/java/com/robin/core/base/dao/JdbcDao.java @@ -524,26 +524,24 @@ public P createVO(T obj, Class

returnTemplate().update(factory.newPreparedStatementCreator(insertSegment.getParams()), keyHolder); retval = keyHolder.getKey().longValue(); - if (!ObjectUtils.isEmpty(retval)) { - //assign increment column - if (generateColumn != null) { - FieldContent pkColumn = AnnotationRetriever.getPrimaryField(fields); - if (pkColumn == null) { - throw new DAOException("model " + obj.getClass().getSimpleName() + " does not have primary key"); - } - Object targetVal = ReflectUtils.getIncrementValueBySetMethod(generateColumn.getSetMethod(), retval); - if (pkColumn.getPrimaryKeys() == null) { - generateColumn.getSetMethod().invoke(obj, targetVal); - retObj = (P) targetVal; - } else { - for (FieldContent field : pkColumn.getPrimaryKeys()) { - if (field.isIncrement() || field.isSequential()) { - field.getSetMethod().invoke(generateColumn.getGetMethod().invoke(obj), retval); - } + if (!ObjectUtils.isEmpty(retval) && (generateColumn != null)) { + FieldContent pkColumn = AnnotationRetriever.getPrimaryField(fields); + if (pkColumn == null) { + throw new DAOException("model " + obj.getClass().getSimpleName() + " does not have primary key"); + } + Object targetVal = ReflectUtils.getIncrementValueBySetMethod(generateColumn.getSetMethod(), retval); + if (pkColumn.getPrimaryKeys() == null) { + generateColumn.getSetMethod().invoke(obj, targetVal); + retObj = (P) targetVal; + } else { + for (FieldContent field : pkColumn.getPrimaryKeys()) { + if (field.isIncrement() || field.isSequential()) { + field.getSetMethod().invoke(generateColumn.getGetMethod().invoke(obj), retval); } - retObj = (P) pkColumn.getGetMethod().invoke(obj); } + retObj = (P) pkColumn.getGetMethod().invoke(obj); } + } } else { //no pk model insert @@ -811,15 +809,18 @@ private List> queryItemList(final PageQuery> queryAllItemList(final String querySQL, Object... obj) { - return this.returnTemplate().query(querySQL, obj, new SplitPageResultSetExtractor(0, 0, lobHandler) {}); + return this.returnTemplate().query(querySQL, obj, new SplitPageResultSetExtractor(0, 0, lobHandler) { + }); } private List> queryAllItemList(final String querySQL, final List mappingFieldList, Object[] obj) { - return this.returnTemplate().query(querySQL, obj, new SplitPageResultSetExtractor(0, 0, lobHandler, mappingFieldList) {}); + return this.returnTemplate().query(querySQL, obj, new SplitPageResultSetExtractor(0, 0, lobHandler, mappingFieldList) { + }); } private void generateQuerySqlBySingleFields(FieldContent columncfg, Const.OPERATOR oper, StringBuilder queryBuffer, int length) { diff --git a/core/src/main/java/com/robin/core/base/service/IMybatisBaseService.java b/core/src/main/java/com/robin/core/base/service/IMybatisBaseService.java index 267ebb79..703b1e7d 100644 --- a/core/src/main/java/com/robin/core/base/service/IMybatisBaseService.java +++ b/core/src/main/java/com/robin/core/base/service/IMybatisBaseService.java @@ -2,7 +2,6 @@ import com.baomidou.mybatisplus.core.conditions.Wrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; -import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.toolkit.support.SFunction; import com.baomidou.mybatisplus.extension.service.IService; diff --git a/core/src/main/java/com/robin/core/base/util/ResourceConst.java b/core/src/main/java/com/robin/core/base/util/ResourceConst.java index 2929c0cc..4437edef 100644 --- a/core/src/main/java/com/robin/core/base/util/ResourceConst.java +++ b/core/src/main/java/com/robin/core/base/util/ResourceConst.java @@ -4,6 +4,7 @@ public class ResourceConst { public static final String WORKINGPATHPARAM="output.workingPath"; public static final String USETMPFILETAG="output.usingTmpFiles"; + public static final String BUCKETNAME="bucketName"; public enum IngestType { TYPE_HDFS(1L,"HDFS"), @@ -111,8 +112,7 @@ public String toString() { public enum S3PARAM{ ACCESSKEY("S3AccessKey"), SECRET("S3Secret"), - REGION("S3Region"), - BUCKETNAME("S3BucketName"); + REGION("S3Region"); private String value; S3PARAM(String value){ this.value=value; @@ -122,12 +122,25 @@ public String getValue() { return value; } } + public enum BOSPARAM{ + ENDPOIN("endpoint"), + REGION("region"), + ACESSSKEYID("accessKeyId"), + SECURITYACCESSKEY("securityAccessKey"); + private String value; + BOSPARAM(String value){ + this.value=value; + } + + public String getValue() { + return value; + } + } public enum OSSPARAM{ ENDPOIN("endpoint"), REGION("region"), ACESSSKEYID("accessKeyId"), SECURITYACCESSKEY("securityAccessKey"), - BUCKETNAME("bucketName"), OBJECTNAME("objectName"); private String value; OSSPARAM(String value){ @@ -143,7 +156,6 @@ public enum COSPARAM{ REGION("region"), ACESSSKEY("accessKey"), SECURITYKEY("securityKey"), - BUCKETNAME("bucketName"), OBJECTNAME("objectName"); private String value; COSPARAM(String value){ @@ -158,7 +170,8 @@ public enum QINIUPARAM{ DOMAIN("domain"), REGION("region"), ACESSSKEY("accessKey"), - SECURITYKEY("securityKey"); + SECURITYKEY("securityKey"), + DOWNDOMAIN("downDomain"); private String value; QINIUPARAM(String value){ this.value=value; @@ -169,4 +182,18 @@ public String getValue() { } } + public enum MINIO{ + ENDPOINT("endpoint"), + ACESSSKEY("accessKey"), + SECURITYKEY("securityKey"); + private String value; + MINIO(String value){ + this.value=value; + } + + public String getValue() { + return value; + } + + } } diff --git a/dataming/src/main/java/com/robin/dataming/weka/algorithm/KMeansModeler.java b/dataming/src/main/java/com/robin/dataming/weka/algorithm/KMeansModeler.java index 8d877f1a..a4831d4b 100644 --- a/dataming/src/main/java/com/robin/dataming/weka/algorithm/KMeansModeler.java +++ b/dataming/src/main/java/com/robin/dataming/weka/algorithm/KMeansModeler.java @@ -1,10 +1,8 @@ package com.robin.dataming.weka.algorithm; -import com.robin.dataming.weka.utils.WekaUtils; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; -import weka.clusterers.Clusterer; import weka.clusterers.SimpleKMeans; import weka.core.EuclideanDistance; import weka.core.Instances; diff --git a/hadooptool/pom.xml b/hadooptool/pom.xml index dc7bdd11..83b56b09 100644 --- a/hadooptool/pom.xml +++ b/hadooptool/pom.xml @@ -338,6 +338,23 @@ [7.16.0, 7.16.99] true + + com.baidubce + bce-java-sdk + 0.10.353 + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + + + + + io.minio + minio + 8.4.0 + true + diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/BOSFileSystemAccessor.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/BOSFileSystemAccessor.java new file mode 100644 index 00000000..25ea8520 --- /dev/null +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/BOSFileSystemAccessor.java @@ -0,0 +1,159 @@ +package com.robin.comm.fileaccess.fs; + +import com.baidubce.auth.DefaultBceCredentials; +import com.baidubce.services.bos.BosClient; +import com.baidubce.services.bos.BosClientConfiguration; +import com.baidubce.services.bos.model.BosObject; +import com.robin.core.base.exception.MissingConfigException; +import com.robin.core.base.util.ResourceConst; +import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor; +import com.robin.core.fileaccess.meta.DataCollectionMeta; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; +import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; +import org.springframework.util.ObjectUtils; + +import java.io.*; + +@Slf4j +@Getter +public class BOSFileSystemAccessor extends AbstractFileSystemAccessor { + private String endpoint; + private String accessKeyId; + private String securityAccessKey; + private String bucketName; + private BosClient client; + + @Override + public void init(DataCollectionMeta meta) { + Assert.isTrue(!CollectionUtils.isEmpty(meta.getResourceCfgMap()),"config map is empty!"); + Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.BOSPARAM.ENDPOIN.getValue()),"must provide endpoint"); + Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.BOSPARAM.ACESSSKEYID.getValue()),"must provide accessKey"); + Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.BOSPARAM.SECURITYACCESSKEY.getValue()),"must provide securityAccessKey"); + + endpoint=meta.getResourceCfgMap().get(ResourceConst.BOSPARAM.ENDPOIN.getValue()).toString(); + accessKeyId=meta.getResourceCfgMap().get(ResourceConst.BOSPARAM.ACESSSKEYID.getValue()).toString(); + securityAccessKey=meta.getResourceCfgMap().get(ResourceConst.BOSPARAM.SECURITYACCESSKEY.getValue()).toString(); + BosClientConfiguration config=new BosClientConfiguration(); + config.setCredentials(new DefaultBceCredentials(accessKeyId,securityAccessKey)); + config.setEndpoint(endpoint); + client=new BosClient(config); + } + public void init(){ + Assert.notNull(endpoint,"must provide region"); + Assert.notNull(accessKeyId,"must provide accessKey"); + Assert.notNull(securityAccessKey,"must provide securityAccessKey"); + BosClientConfiguration config=new BosClientConfiguration(); + config.setCredentials(new DefaultBceCredentials(accessKeyId,securityAccessKey)); + config.setEndpoint(endpoint); + client=new BosClient(config); + } + + @Override + public Pair getInResourceByReader(DataCollectionMeta meta, String resourcePath) throws IOException { + InputStream inputStream = getInputStreamByConfig(meta); + return Pair.of(getReaderByPath(resourcePath, inputStream, meta.getEncode()),inputStream); + } + + @Override + public Pair getOutResourceByWriter(DataCollectionMeta meta, String resourcePath) throws IOException { + OutputStream outputStream = getOutputStream(meta); + return Pair.of(getWriterByPath(meta.getPath(), outputStream, meta.getEncode()),outputStream); + } + + @Override + public OutputStream getRawOutputStream(DataCollectionMeta meta, String resourcePath) throws IOException { + return getOutputStream(meta); + } + + @Override + public OutputStream getOutResourceByStream(DataCollectionMeta meta, String resourcePath) throws IOException { + return getOutputStreamByPath(resourcePath, getOutputStream(meta)); + } + + @Override + public InputStream getInResourceByStream(DataCollectionMeta meta, String resourcePath) throws IOException { + InputStream inputStream = getInputStreamByConfig(meta); + return getInputStreamByPath(resourcePath, inputStream); + } + + @Override + public InputStream getRawInputStream(DataCollectionMeta meta, String resourcePath) throws IOException { + return getInputStreamByConfig(meta); + } + + @Override + public boolean exists(DataCollectionMeta meta, String resourcePath) throws IOException { + String bucketName= getBucketName(meta); + return client.doesObjectExist(bucketName,resourcePath); + } + + @Override + public long getInputStreamSize(DataCollectionMeta meta, String resourcePath) throws IOException { + String bucketName= getBucketName(meta); + if(exists(meta,resourcePath)){ + BosObject object=client.getObject(bucketName,resourcePath); + return object.getObjectMetadata().getContentLength(); + } + return 0; + } + private InputStream getInputStreamByConfig(DataCollectionMeta meta) { + String bucketName= getBucketName(meta); + String objectName= meta.getPath(); + return getObject(bucketName,objectName); + } + private String getBucketName(DataCollectionMeta meta) { + return !ObjectUtils.isEmpty(bucketName)?bucketName:meta.getResourceCfgMap().get(ResourceConst.BUCKETNAME).toString(); + } + private InputStream getObject(String bucketName,String objectName){ + if(client.doesObjectExist(bucketName,objectName)) { + BosObject object = client.getObject(bucketName, objectName); + if (!ObjectUtils.isEmpty(object)) { + return object.getObjectContent(); + } else { + throw new RuntimeException("objectName " + objectName + " can not get!"); + } + }else{ + throw new MissingConfigException(" key "+objectName+" not in OSS bucket "+bucketName); + } + } + public static class Builder{ + private BOSFileSystemAccessor accessor; + public Builder(){ + accessor=new BOSFileSystemAccessor(); + } + public static Builder builder(){ + return new Builder(); + } + public Builder accessKeyId(String accessKeyId){ + accessor.accessKeyId=accessKeyId; + return this; + } + public Builder endpoint(String endPoint){ + accessor.endpoint=endPoint; + return this; + } + public Builder securityAccessKey(String securityAccessKey){ + accessor.securityAccessKey=securityAccessKey; + return this; + } + public Builder withMetaConfig(DataCollectionMeta meta){ + accessor.init(meta); + return this; + } + public Builder bucket(String bucketName){ + accessor.bucketName=bucketName; + return this; + } + public BOSFileSystemAccessor build(){ + if(ObjectUtils.isEmpty(accessor.getClient())){ + accessor.init(); + } + return accessor; + } + + } + +} diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/COSFileSystemAccessor.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/COSFileSystemAccessor.java index 8188ec2b..7e0faa86 100644 --- a/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/COSFileSystemAccessor.java +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/COSFileSystemAccessor.java @@ -43,6 +43,7 @@ public class COSFileSystemAccessor extends AbstractFileSystemAccessor { private HttpProtocol protocol=HttpProtocol.http; private String securityKey; private String accessKey; + private String bucketName; private COSFileSystemAccessor(){ this.identifier= Const.FILESYSTEM.TENCENT.getValue(); } @@ -109,13 +110,13 @@ public InputStream getRawInputStream(DataCollectionMeta meta, String resourcePat @Override public boolean exists(DataCollectionMeta meta, String resourcePath) throws IOException { - String bucketName= meta.getResourceCfgMap().get("bucketName").toString(); + String bucketName= getBucketName(meta); return exists(bucketName,resourcePath); } @Override public long getInputStreamSize(DataCollectionMeta meta, String resourcePath) throws IOException { - String bucketName= meta.getResourceCfgMap().get("bucketName").toString(); + String bucketName= getBucketName(meta); if(exists(bucketName,resourcePath)){ ObjectMetadata metadata=cosClient.getObjectMetadata(bucketName,resourcePath); if(!ObjectUtils.isEmpty(metadata)){ @@ -125,8 +126,7 @@ public long getInputStreamSize(DataCollectionMeta meta, String resourcePath) thr return 0; } private InputStream getInputStreamByConfig(DataCollectionMeta meta) { - Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.BUCKETNAME.getValue()),"must provide bucketName"); - String bucketName= meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.BUCKETNAME.getValue()).toString(); + String bucketName= getBucketName(meta); String objectName= meta.getPath(); return getObject(bucketName,objectName); } @@ -162,7 +162,6 @@ private TransferManager getManager() { @Override public void finishWrite(DataCollectionMeta meta, OutputStream outputStream) { - Assert.notNull(meta.getResourceCfgMap().get("bucketName"),"must provide bucketName"); try{ upload(meta,outputStream); }catch (InterruptedException | IOException ex){ @@ -171,7 +170,7 @@ public void finishWrite(DataCollectionMeta meta, OutputStream outputStream) { } private boolean upload(DataCollectionMeta meta, OutputStream outputStream) throws IOException,InterruptedException { - String bucketName=meta.getResourceCfgMap().get("bucketName").toString(); + String bucketName= getBucketName(meta); TransferManager transferManager=getManager(); PutObjectRequest request; String tmpFilePath=null; @@ -205,11 +204,19 @@ private boolean upload(DataCollectionMeta meta, OutputStream outputStream) throw } return false; } + + private String getBucketName(DataCollectionMeta meta) { + return !ObjectUtils.isEmpty(bucketName)?bucketName:meta.getResourceCfgMap().get(ResourceConst.BUCKETNAME).toString(); + } + public static class Builder{ private COSFileSystemAccessor accessor; public Builder(){ accessor=new COSFileSystemAccessor(); } + public static Builder builder(){ + return new Builder(); + } public Builder accessKey(String accessKey){ accessor.accessKey=accessKey; return this; @@ -230,6 +237,10 @@ public Builder protocol(HttpProtocol protocol){ accessor.protocol=protocol; return this; } + public Builder bucket(String bucketName){ + accessor.bucketName=bucketName; + return this; + } public COSFileSystemAccessor build(){ if(!ObjectUtils.isEmpty(accessor.getCosClient())){ accessor.init(); diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/MinioFileSystemAccessor.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/MinioFileSystemAccessor.java new file mode 100644 index 00000000..5f16592d --- /dev/null +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/MinioFileSystemAccessor.java @@ -0,0 +1,133 @@ +package com.robin.comm.fileaccess.fs; + +import com.robin.core.base.util.ResourceConst; +import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor; +import com.robin.core.fileaccess.meta.DataCollectionMeta; +import com.robin.dfs.minio.MinioUtils; +import io.minio.MinioClient; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; +import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; +import org.springframework.util.ObjectUtils; + +import java.io.*; + +/** + * Minio FileSystemAccessor,must init individual + */ +@Slf4j +@Getter +public class MinioFileSystemAccessor extends AbstractFileSystemAccessor { + private MinioClient client; + private String accessKey; + private String secretKey; + private String endpoint; + private String bucketName; + + @Override + public void init(DataCollectionMeta meta) { + Assert.isTrue(!CollectionUtils.isEmpty(meta.getResourceCfgMap()),"config map is empty!"); + Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.MINIO.ENDPOINT.getValue()),"must provide endpoint"); + Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.MINIO.ACESSSKEY.getValue()),"must provide accessKey"); + Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.MINIO.SECURITYKEY.getValue()),"must provide securityKey"); + endpoint=meta.getResourceCfgMap().get(ResourceConst.MINIO.ENDPOINT.getValue()).toString(); + accessKey=meta.getResourceCfgMap().get(ResourceConst.MINIO.ACESSSKEY.getValue()).toString(); + secretKey=meta.getResourceCfgMap().get(ResourceConst.MINIO.SECURITYKEY.getValue()).toString(); + MinioClient.Builder builder=MinioClient.builder().endpoint(endpoint).credentials(accessKey,secretKey); + client=builder.build(); + } + public void init(){ + Assert.notNull(endpoint,"must provide endpoint"); + Assert.notNull(accessKey,"must provide accessKey"); + Assert.notNull(secretKey,"must provide securityKey"); + MinioClient.Builder builder=MinioClient.builder().endpoint(endpoint).credentials(accessKey,secretKey); + client=builder.build(); + } + + @Override + public Pair getInResourceByReader(DataCollectionMeta meta, String resourcePath) throws IOException { + InputStream inputStream = getObject(getBucketName(meta),resourcePath); + return Pair.of(getReaderByPath(resourcePath, inputStream, meta.getEncode()),inputStream); + } + + @Override + public Pair getOutResourceByWriter(DataCollectionMeta meta, String resourcePath) throws IOException { + OutputStream outputStream = getOutputStream(meta); + return Pair.of(getWriterByPath(meta.getPath(), outputStream, meta.getEncode()),outputStream); + } + + @Override + public OutputStream getRawOutputStream(DataCollectionMeta meta, String resourcePath) throws IOException { + return getOutputStream(meta); + } + + @Override + public OutputStream getOutResourceByStream(DataCollectionMeta meta, String resourcePath) throws IOException { + return getOutputStreamByPath(resourcePath, getOutputStream(meta)); + } + + @Override + public InputStream getInResourceByStream(DataCollectionMeta meta, String resourcePath) throws IOException { + return getInputStreamByPath(resourcePath, getObject(getBucketName(meta),resourcePath)); + } + + @Override + public InputStream getRawInputStream(DataCollectionMeta meta, String resourcePath) throws IOException { + return getObject(getBucketName(meta),resourcePath); + } + + @Override + public boolean exists(DataCollectionMeta meta, String resourcePath) throws IOException { + return MinioUtils.exists(client,getBucketName(meta),resourcePath); + } + + @Override + public long getInputStreamSize(DataCollectionMeta meta, String resourcePath) throws IOException { + return MinioUtils.size(client,getBucketName(meta),resourcePath); + } + private String getBucketName(DataCollectionMeta meta){ + return !ObjectUtils.isEmpty(bucketName)?bucketName:meta.getResourceCfgMap().get(ResourceConst.BUCKETNAME).toString(); + } + private InputStream getObject(String bucketName,String objectName) throws IOException{ + return MinioUtils.getObject(client,bucketName,objectName); + } + public static class Builder{ + private MinioFileSystemAccessor accessor; + public Builder(){ + accessor=new MinioFileSystemAccessor(); + } + public static Builder builder(){ + return new Builder(); + } + public Builder accessKey(String accessKey){ + accessor.accessKey=accessKey; + return this; + } + public Builder secretKey(String secretKey){ + accessor.secretKey=secretKey; + return this; + } + public Builder endpoint(String endpoint){ + accessor.endpoint=endpoint; + return this; + } + + public Builder bucket(String bucketName){ + accessor.bucketName=bucketName; + return this; + } + public Builder withMetaConfig(DataCollectionMeta meta){ + accessor.init(meta); + return this; + } + + public MinioFileSystemAccessor build(){ + if(ObjectUtils.isEmpty(accessor.getClient())){ + accessor.init(); + } + return accessor; + } + } +} diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/OSSFileSystemAccessor.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/OSSFileSystemAccessor.java index 7ded03a2..14075623 100644 --- a/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/OSSFileSystemAccessor.java +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/OSSFileSystemAccessor.java @@ -39,6 +39,7 @@ public class OSSFileSystemAccessor extends AbstractFileSystemAccessor { private String region; private String accessKeyId; private String securityAccessKey; + private String bucketName; private OSSFileSystemAccessor(){ this.identifier= Const.FILESYSTEM.ALIYUN.getValue(); @@ -106,19 +107,24 @@ public InputStream getRawInputStream(DataCollectionMeta meta, String resourcePat @Override public boolean exists(DataCollectionMeta meta, String resourcePath) throws IOException { - String bucketName= meta.getResourceCfgMap().get("bucketName").toString(); + String bucketName= getBucketName(meta); return ossClient.doesObjectExist(bucketName,resourcePath); } @Override public long getInputStreamSize(DataCollectionMeta meta, String resourcePath) throws IOException { - String bucketName= meta.getResourceCfgMap().get("bucketName").toString(); + String bucketName= getBucketName(meta); if(exists(meta,resourcePath)){ OSSObject object=ossClient.getObject(bucketName,resourcePath); return object.getObjectMetadata().getContentLength(); } return 0; } + + private String getBucketName(DataCollectionMeta meta) { + return !ObjectUtils.isEmpty(bucketName)?bucketName:meta.getResourceCfgMap().get(ResourceConst.BUCKETNAME).toString(); + } + @Override public void finishWrite(DataCollectionMeta meta,OutputStream outputStream) { Assert.notNull(meta.getResourceCfgMap().get("bucketName"),"must provide bucketName"); @@ -131,8 +137,7 @@ public void finishWrite(DataCollectionMeta meta,OutputStream outputStream) { } private InputStream getInputStreamByConfig(DataCollectionMeta meta) { - Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.BUCKETNAME.getValue()),"must provide bucketName"); - String bucketName= meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.BUCKETNAME.getValue()).toString(); + String bucketName= getBucketName(meta); String objectName= meta.getPath(); return getObject(bucketName,objectName); } @@ -180,6 +185,9 @@ public static class Builder{ public Builder(){ accessor=new OSSFileSystemAccessor(); } + public static Builder builder(){ + return new Builder(); + } public Builder region(String region){ accessor.region=region; return this; @@ -200,6 +208,10 @@ public Builder withMetaConfig(DataCollectionMeta meta){ accessor.init(meta); return this; } + public Builder bucket(String bucketName){ + accessor.bucketName=bucketName; + return this; + } public OSSFileSystemAccessor build(){ if(ObjectUtils.isEmpty(accessor.getOssClient())){ accessor.init(); diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/QiniuFileSystemAccessor.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/QiniuFileSystemAccessor.java index a475db47..91fee8f7 100644 --- a/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/QiniuFileSystemAccessor.java +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/QiniuFileSystemAccessor.java @@ -16,6 +16,7 @@ import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor; import com.robin.core.fileaccess.meta.DataCollectionMeta; import com.robin.core.fileaccess.util.ResourceUtil; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.springframework.lang.NonNull; @@ -34,13 +35,19 @@ * Qiniu FileSystemAccessor */ @Slf4j +@Getter public class QiniuFileSystemAccessor extends AbstractFileSystemAccessor { private UploadManager uploadManager; private BucketManager bucketManager; private String domain; private Auth auth; + private String accessKey; + private String secretKey; + private Region region; + private String bucketName; private Gson gson= GsonUtil.getGson(); - public QiniuFileSystemAccessor(){ + private String downDomain; + private QiniuFileSystemAccessor(){ this.identifier= Const.FILESYSTEM.QINIU.getValue(); } @@ -51,11 +58,23 @@ public void init(DataCollectionMeta meta) { Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.QINIUPARAM.REGION.getValue()),"must provide region"); Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.QINIUPARAM.ACESSSKEY.getValue()),"must provide accessKey"); Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.QINIUPARAM.SECURITYKEY.getValue()),"must provide securityKey"); + Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.QINIUPARAM.DOWNDOMAIN.getValue()),"must provide downDomain"); String accessKey=meta.getResourceCfgMap().get(ResourceConst.QINIUPARAM.ACESSSKEY.getValue()).toString(); String secretKey=meta.getResourceCfgMap().get(ResourceConst.QINIUPARAM.SECURITYKEY.getValue()).toString(); domain=meta.getResourceCfgMap().get(ResourceConst.QINIUPARAM.DOMAIN.getValue()).toString(); auth= Auth.create(accessKey,secretKey); - Region region=Region.autoRegion(); + region=Region.autoRegion(); + downDomain=meta.getResourceCfgMap().get(ResourceConst.QINIUPARAM.DOWNDOMAIN.getValue()).toString(); + Configuration cfg=new Configuration(region); + cfg.resumableUploadAPIVersion=Configuration.ResumableUploadAPIVersion.V2; + uploadManager=new UploadManager(cfg); + } + public void init(){ + Assert.notNull(domain,"must provide domain"); + Assert.notNull(region,"must provide region"); + Assert.notNull(accessKey,"must provide accessKey"); + Assert.notNull(secretKey,"must provide securityKey"); + auth= Auth.create(accessKey,secretKey); Configuration cfg=new Configuration(region); cfg.resumableUploadAPIVersion=Configuration.ResumableUploadAPIVersion.V2; uploadManager=new UploadManager(cfg); @@ -96,19 +115,21 @@ public InputStream getRawInputStream(DataCollectionMeta meta, String resourcePat @Override public boolean exists(DataCollectionMeta meta, String resourcePath) throws IOException { - String bucketName=meta.getResourceCfgMap().get("bucketName").toString(); + String bucketName= getBucketName(meta); return isKeyExist(bucketName,resourcePath); } + + @Override public long getInputStreamSize(DataCollectionMeta meta, String resourcePath) throws IOException { - String bucketName=meta.getResourceCfgMap().get("bucketName").toString(); + String bucketName=getBucketName(meta); return getSize(bucketName,resourcePath); } @Override public void finishWrite(DataCollectionMeta meta, OutputStream outputStream) { - String bucketName=meta.getResourceCfgMap().get("bucketName").toString(); + String bucketName=getBucketName(meta); String token=auth.uploadToken(bucketName,meta.getPath()); try{ putObject(token,meta,outputStream); @@ -118,8 +139,7 @@ public void finishWrite(DataCollectionMeta meta, OutputStream outputStream) { } private InputStream getInputStreamByConfig(DataCollectionMeta meta) { - Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.BUCKETNAME.getValue()),"must provide bucketName"); - String bucketName= meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.BUCKETNAME.getValue()).toString(); + String bucketName= getBucketName(meta); String objectName= meta.getPath(); return getObject(bucketName,objectName); } @@ -171,4 +191,50 @@ private InputStream getObject(@NonNull String bucketName, @NonNull String key) { } return null; } + private String getBucketName(DataCollectionMeta meta) { + return !ObjectUtils.isEmpty(this.getBucketName()) ? this.getBucketName() : meta.getResourceCfgMap().get("bucketName").toString(); + } + public static class Builder{ + private QiniuFileSystemAccessor accessor; + public Builder(){ + accessor=new QiniuFileSystemAccessor(); + } + public static Builder builder(){ + return new Builder(); + } + public Builder accessKey(String accessKey){ + accessor.accessKey=accessKey; + return this; + } + public Builder secretKey(String secretKey){ + accessor.secretKey=secretKey; + return this; + } + public Builder domain(String domain){ + accessor.domain=domain; + return this; + } + public Builder region(Region region){ + accessor.region=region; + return this; + } + public Builder bucket(String bucketName){ + accessor.bucketName=bucketName; + return this; + } + public Builder withMetaConfig(DataCollectionMeta meta){ + accessor.init(meta); + return this; + } + public Builder downDomain(String downDomain){ + accessor.downDomain=downDomain; + return this; + } + public QiniuFileSystemAccessor build(){ + if(ObjectUtils.isEmpty(accessor.getUploadManager())){ + accessor.init(); + } + return accessor; + } + } } diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/S3FileSystemAccessor.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/S3FileSystemAccessor.java index d07295d8..6ed1bb87 100644 --- a/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/S3FileSystemAccessor.java +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/S3FileSystemAccessor.java @@ -17,6 +17,7 @@ import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; import java.io.*; import java.nio.file.Files; @@ -34,6 +35,7 @@ public class S3FileSystemAccessor extends AbstractFileSystemAccessor { private String regionName; private String accessKey; private String secret; + private String bucketName; private S3FileSystemAccessor(){ this.identifier= Const.FILESYSTEM.S3.getValue(); @@ -71,19 +73,24 @@ public InputStream getInResourceByStream(DataCollectionMeta meta, String resourc @Override public InputStream getRawInputStream(DataCollectionMeta meta, String resourcePath) throws IOException { - String bucketName = meta.getResourceCfgMap().get(ResourceConst.S3PARAM.BUCKETNAME.getValue()).toString(); + String bucketName = getBucketName(meta); return AwsUtils.getObject(client, bucketName, resourcePath); } + private String getBucketName(DataCollectionMeta meta) { + return ObjectUtils.isEmpty(bucketName)?bucketName:meta.getResourceCfgMap().get(ResourceConst.BUCKETNAME).toString(); + } + @Override public boolean exists(DataCollectionMeta meta, String resourcePath) throws IOException { - String bucketName = meta.getResourceCfgMap().get(ResourceConst.S3PARAM.BUCKETNAME.getValue()).toString(); + String bucketName = getBucketName(meta); return AwsUtils.exists(client,bucketName,meta.getPath()); } @Override public long getInputStreamSize(DataCollectionMeta meta, String resourcePath) throws IOException { - return 0; + String bucketName=getBucketName(meta); + return AwsUtils.size(client,bucketName,resourcePath); } @Override @@ -100,7 +107,6 @@ public void init(DataCollectionMeta meta) { } } public void init(){ - Assert.notNull(region,"region name required!"); Assert.notNull(accessKey,"accessKey name required!"); Assert.notNull(secret,"secret name required!"); @@ -112,7 +118,7 @@ public void init(){ @Override public void finishWrite(DataCollectionMeta meta, OutputStream outputStream) { - String bucketName = meta.getResourceCfgMap().get(ResourceConst.S3PARAM.BUCKETNAME.getValue()).toString(); + String bucketName = getBucketName(meta); ByteArrayOutputStream outputStream1=(ByteArrayOutputStream) outputStream; int size=outputStream1.size(); String contentType=!ObjectUtils.isEmpty(meta.getContent())?meta.getContent().getContentType():null; @@ -120,6 +126,9 @@ public void finishWrite(DataCollectionMeta meta, OutputStream outputStream) { } public static class Builder{ private S3FileSystemAccessor accessor; + public static Builder builder(){ + return new Builder(); + } public Builder(){ accessor=new S3FileSystemAccessor(); } @@ -135,6 +144,10 @@ public Builder region(String regionName){ accessor.regionName=regionName; return this; } + public Builder bucket(String bucketName){ + accessor.bucketName=bucketName; + return this; + } public Builder withMetaConfig(DataCollectionMeta meta){ accessor.init(meta); return this; diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/util/MockFileSystem.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/util/MockFileSystem.java index 90c924a8..b4d5fcde 100644 --- a/hadooptool/src/main/java/com/robin/comm/fileaccess/util/MockFileSystem.java +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/util/MockFileSystem.java @@ -21,7 +21,6 @@ public class MockFileSystem extends FileSystem { byte[] streamBytes; final List streams = new ArrayList<>(); OutputStream outputStream; - AbstractFileSystemAccessor accessUtil; DataCollectionMeta colmeta; @Override @@ -36,8 +35,8 @@ public MockFileSystem(Configuration conf, byte[] streamBytes) { setConf(conf); this.streamBytes = streamBytes; } - public MockFileSystem(DataCollectionMeta colmeta, AbstractFileSystemAccessor accessUtil) { - this.accessUtil=accessUtil; + public MockFileSystem(DataCollectionMeta colmeta,OutputStream outputStream) { + this.outputStream=outputStream; this.colmeta=colmeta; } @@ -57,10 +56,6 @@ public FSDataInputStream open(Path path, int i) throws IOException { @Override public FSDataOutputStream create(Path path, FsPermission fsPermission, boolean b, int i, short i1, long l, Progressable progressable) throws IOException { - if(outputStream!=null){ - throw new FileAlreadyExistsException("file already write,wait for closing"); - } - outputStream=accessUtil.getRawOutputStream(colmeta, ResourceUtil.getProcessPath(path.toString())); return new FSDataOutputStream(outputStream,new Statistics("")); } diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/util/ParquetUtil.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/util/ParquetUtil.java index 7a92f6ca..1318d0fa 100644 --- a/hadooptool/src/main/java/com/robin/comm/fileaccess/util/ParquetUtil.java +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/util/ParquetUtil.java @@ -178,16 +178,16 @@ private static void readFullyDirectBuffer(ByteBuffer byteBufr, byte[] tmpBuf, In } } - public static OutputFile makeOutputFile(@NonNull AbstractFileSystemAccessor accessUtil, @NonNull DataCollectionMeta colmeta, @NonNull String filePath) { + public static OutputFile makeOutputFile(@NonNull OutputStream outputStream, @NonNull DataCollectionMeta colmeta, @NonNull String filePath) { return new OutputFile() { @Override public PositionOutputStream create(long l) throws IOException { - return makePositionOutputStream(accessUtil, colmeta, filePath, IO_BUF_SIZE); + return makePositionOutputStream(outputStream, colmeta, filePath, IO_BUF_SIZE); } @Override public PositionOutputStream createOrOverwrite(long l) throws IOException { - return makePositionOutputStream(accessUtil, colmeta, filePath, IO_BUF_SIZE); + return makePositionOutputStream(outputStream, colmeta, filePath, IO_BUF_SIZE); } @Override @@ -202,9 +202,8 @@ public long defaultBlockSize() { }; } - private static PositionOutputStream makePositionOutputStream(@NonNull AbstractFileSystemAccessor accessUtil, DataCollectionMeta colmeta, @Nonnull String filePath, int ioBufSize) + private static PositionOutputStream makePositionOutputStream(@NonNull OutputStream output, DataCollectionMeta colmeta, @Nonnull String filePath, int ioBufSize) throws IOException { - final OutputStream output = accessUtil.getRawOutputStream(colmeta, ResourceUtil.getProcessPath(colmeta.getPath())); return new PositionOutputStream() { private long position = 0; private boolean isClosed=false; diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/writer/OrcFileWriter.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/writer/OrcFileWriter.java index eb60f2e7..a1305dfc 100644 --- a/hadooptool/src/main/java/com/robin/comm/fileaccess/writer/OrcFileWriter.java +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/writer/OrcFileWriter.java @@ -6,6 +6,7 @@ import com.robin.core.base.util.ResourceConst; import com.robin.core.fileaccess.meta.DataCollectionMeta; import com.robin.core.fileaccess.meta.DataSetColumnMeta; +import com.robin.core.fileaccess.util.ResourceUtil; import com.robin.core.fileaccess.writer.AbstractFileWriter; import com.robin.hadoop.hdfs.HDFSUtil; import org.apache.hadoop.conf.Configuration; @@ -74,14 +75,15 @@ public void beginWrite() throws IOException { compressionKind=CompressionKind.ZLIB; } - if(!ObjectUtils.isEmpty(colmeta.getSourceType()) && colmeta.getSourceType().equals(ResourceConst.IngestType.TYPE_HDFS.getValue())){ + if(Const.FILESYSTEM.HDFS.getValue().equals(colmeta.getFsType())){ HDFSUtil util=new HDFSUtil(colmeta); conf=util.getConfig(); fs= FileSystem.get(conf); }else{ conf=new Configuration(); checkAccessUtil(null); - fs=new MockFileSystem(colmeta,accessUtil); + out= accessUtil.getRawOutputStream(colmeta, ResourceUtil.getProcessPath(colmeta.getPath())); + fs=new MockFileSystem(colmeta,out); } schema= OrcUtil.getSchema(colmeta); owriter= OrcFile.createWriter(new Path(colmeta.getPath()), OrcFile.writerOptions(conf).setSchema(schema).compress(compressionKind).fileSystem(fs)); diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/writer/ParquetFileWriter.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/writer/ParquetFileWriter.java index f5b3a652..7d7c4982 100644 --- a/hadooptool/src/main/java/com/robin/comm/fileaccess/writer/ParquetFileWriter.java +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/writer/ParquetFileWriter.java @@ -94,10 +94,11 @@ public void beginWrite() throws IOException { mapWriter = new CustomParquetWriter(new Path(colmeta.getPath()), schema, true, codecName); } }else{ + out=accessUtil.getRawOutputStream(colmeta, ResourceUtil.getProcessPath(colmeta.getPath())); if(useAvroEncode) { - avroWriter = AvroParquetWriter.builder(ParquetUtil.makeOutputFile(accessUtil, colmeta, ResourceUtil.getProcessPath(colmeta.getPath()))).withCompressionCodec(codecName).withSchema(avroSchema).build(); + avroWriter = AvroParquetWriter.builder(ParquetUtil.makeOutputFile(out, colmeta, ResourceUtil.getProcessPath(colmeta.getPath()))).withCompressionCodec(codecName).withSchema(avroSchema).build(); }else { - mapWriter = new CustomParquetWriter.Builder>(ParquetUtil.makeOutputFile(accessUtil, colmeta, ResourceUtil.getProcessPath(colmeta.getPath())), schema).withCompressionCodec(codecName).build(); + mapWriter = new CustomParquetWriter.Builder>(ParquetUtil.makeOutputFile(out, colmeta, ResourceUtil.getProcessPath(colmeta.getPath())), schema).withCompressionCodec(codecName).build(); } } } diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/writer/ProtoBufFileWriter.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/writer/ProtoBufFileWriter.java index c6bd6e3d..9860d4d4 100644 --- a/hadooptool/src/main/java/com/robin/comm/fileaccess/writer/ProtoBufFileWriter.java +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/writer/ProtoBufFileWriter.java @@ -7,16 +7,17 @@ import com.google.protobuf.DynamicMessage; import com.robin.comm.fileaccess.util.ProtoBufUtil; import com.robin.core.base.util.Const; +import com.robin.core.compress.util.CompressEncoder; import com.robin.core.fileaccess.meta.DataCollectionMeta; import com.robin.core.fileaccess.meta.DataSetColumnMeta; import com.robin.core.fileaccess.util.ResourceUtil; import com.robin.core.fileaccess.writer.AbstractFileWriter; import org.springframework.util.CollectionUtils; import org.springframework.util.ObjectUtils; -import org.tukaani.xz.LZMAOutputStream; import javax.naming.OperationNotSupportedException; import java.io.IOException; +import java.io.OutputStream; import java.util.Iterator; import java.util.Map; @@ -27,6 +28,7 @@ public class ProtoBufFileWriter extends AbstractFileWriter { private DynamicSchema.Builder schemaBuilder; private DynamicMessage.Builder mesgBuilder; private MessageDefinition definition; + private OutputStream wrapOutputStream; public ProtoBufFileWriter(){ this.identifier= Const.FILEFORMATSTR.PROTOBUF.getValue(); } @@ -53,8 +55,9 @@ public void beginWrite() throws IOException { mesgBuilder= DynamicMessage.newBuilder(schema.getMessageDescriptor(colmeta.getValueClassName())); } checkAccessUtil(null); - out = accessUtil.getOutResourceByStream(colmeta, ResourceUtil.getProcessPath(colmeta.getPath())); - + out=accessUtil.getRawOutputStream(colmeta,ResourceUtil.getProcessPath(colmeta.getPath())); + wrapOutputStream = CompressEncoder.getOutputStreamByCompressType(ResourceUtil.getProcessPath(colmeta.getPath()),out); //accessUtil.getOutResourceByStream(colmeta, ResourceUtil.getProcessPath(colmeta.getPath())); + getCompressType(); }catch (Exception ex){ ex.printStackTrace(); } @@ -63,12 +66,12 @@ public void beginWrite() throws IOException { @Override public void finishWrite() throws IOException { - out.close(); + wrapOutputStream.close(); } @Override public void flush() throws IOException { - out.flush(); + wrapOutputStream.flush(); } @Override @@ -84,6 +87,6 @@ public void writeRecord(Map map) throws IOException, OperationNo } } DynamicMessage message=msgBuilder.build(); - message.writeDelimitedTo(out); + message.writeDelimitedTo(wrapOutputStream); } } diff --git a/hadooptool/src/main/java/com/robin/dfs/minio/MinioUtils.java b/hadooptool/src/main/java/com/robin/dfs/minio/MinioUtils.java new file mode 100644 index 00000000..66963db3 --- /dev/null +++ b/hadooptool/src/main/java/com/robin/dfs/minio/MinioUtils.java @@ -0,0 +1,100 @@ +package com.robin.dfs.minio; + + +import io.minio.*; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.compress.utils.IOUtils; + +import javax.servlet.http.HttpServletResponse; +import java.io.*; +import java.util.ResourceBundle; + +@Slf4j +public class MinioUtils { + public static boolean bucketExists(MinioClient client,String bucketName){ + boolean found=false; + try{ + found=client.bucketExists(BucketExistsArgs.builder().bucket(bucketName).build()); + }catch (Exception e){ + log.error("{}",e); + } + return found; + } + + public static boolean putBucket(MinioClient client,String bucketName, String objectName, InputStream inputStream,long fileSize,String contentType){ + try{ + PutObjectArgs args= PutObjectArgs.builder().bucket(bucketName).object(objectName) + .stream(inputStream,fileSize,-1).contentType(contentType).build(); + client.putObject(args); + }catch (Exception ex){ + log.error("{}",ex); + return false; + } + return true; + } + public static boolean exists(MinioClient client,String bucketName,String key){ + try{ + client.statObject(StatObjectArgs.builder().bucket(bucketName).object(key).build()); + return true; + }catch (Exception ex){ + return false; + } + } + public static long size(MinioClient client,String bucketName,String key){ + try{ + StatObjectResponse response=client.statObject(StatObjectArgs.builder().bucket(bucketName).object(key).build()); + return response.size(); + }catch (Exception ex){ + return 0L; + } + } + public static InputStream getObject(MinioClient client,String bucketName,String objectName) throws IOException{ + try { + GetObjectArgs args = GetObjectArgs.builder().bucket(bucketName).object(objectName).build(); + return client.getObject(args); + }catch (Exception ex){ + throw new IOException(ex); + } + } + + public static boolean download(MinioClient client,String bucketName,String objectName, OutputStream outputStream){ + boolean executeOk=false; + GetObjectArgs args= GetObjectArgs.builder().bucket(bucketName).object(objectName).build(); + try(GetObjectResponse response1=client.getObject(args)){ + IOUtils.copy(response1,outputStream); + executeOk=true; + }catch (Exception ex){ + log.error("{}",ex); + } + return executeOk; + } + public static void download(MinioClient client,String bucketName,String objectName,String contentType, HttpServletResponse response) throws Exception{ + GetObjectArgs args= GetObjectArgs.builder().bucket(bucketName).object(objectName).build(); + try(GetObjectResponse response1=client.getObject(args)){ + int pos=objectName.lastIndexOf("/"); + String fileName=objectName.substring(pos); + response.setCharacterEncoding("utf-8"); + response.setContentType(contentType); + response.addHeader("Content-Disposition", "attachment;fileName=" + fileName); + IOUtils.copy(response1,response.getOutputStream()); + response.getOutputStream().flush(); + }catch (Exception ex){ + throw ex; + } + } + public static void main(String[] args){ + ResourceBundle bundle=ResourceBundle.getBundle("application"); + MinioClient.Builder builder= MinioClient.builder().endpoint(bundle.getString("minio.endpoint")).credentials(bundle.getString("minio.accessKey"),bundle.getString("minio.secretKey")); + MinioClient client=builder.build(); + ByteArrayOutputStream outputStream=new ByteArrayOutputStream(); + try(FileInputStream inputStream=new FileInputStream("e:/SF000010595188.pdf");){ + //download(client,"20240510/SF000010715871.pdf",outputStream); + File file=new File("e:/SF000010595188.pdf"); + + }catch (Exception ex){ + ex.printStackTrace(); + } + System.out.println(outputStream); + } + +} diff --git a/hadooptool/src/test/java/com/robin/test/TestResourceReadWrite.java b/hadooptool/src/test/java/com/robin/test/TestResourceReadWrite.java index e9fcf537..99c5d89a 100644 --- a/hadooptool/src/test/java/com/robin/test/TestResourceReadWrite.java +++ b/hadooptool/src/test/java/com/robin/test/TestResourceReadWrite.java @@ -1,5 +1,7 @@ package com.robin.test; +import com.qiniu.storage.Region; +import com.robin.comm.fileaccess.fs.QiniuFileSystemAccessor; import com.robin.core.base.dao.SimpleJdbcDao; import com.robin.core.base.datameta.BaseDataBaseMeta; import com.robin.core.base.datameta.DataBaseMetaFactory; @@ -10,18 +12,19 @@ import com.robin.core.fileaccess.meta.DataCollectionMeta; import com.robin.core.fileaccess.writer.AbstractFileWriter; import com.robin.core.fileaccess.writer.TextFileWriterFactory; -import com.robin.core.query.extractor.ResultSetOperationExtractor; +import com.robin.core.query.extractor.ResultSetExtractorUtils; import junit.framework.TestCase; import lombok.extern.slf4j.Slf4j; import org.junit.Test; +import javax.naming.OperationNotSupportedException; import java.io.IOException; import java.sql.Connection; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.sql.Timestamp; import java.util.Date; +import java.util.HashMap; import java.util.Map; +import java.util.ResourceBundle; + @Slf4j public class TestResourceReadWrite extends TestCase { @Test @@ -30,37 +33,47 @@ public void testWrite(){ BaseDataBaseMeta meta=DataBaseMetaFactory.getDataBaseMetaByType(BaseDataBaseMeta.TYPE_MYSQL, param); try(Connection conn=SimpleJdbcDao.getConnection(meta)){ DataCollectionMeta.Builder builder=new DataCollectionMeta.Builder(); - builder.addColumn("id",Const.META_TYPE_BIGINT,null); - builder.addColumn("line_code",Const.META_TYPE_STRING,null); - builder.addColumn("line_name",Const.META_TYPE_STRING,null); - builder.addColumn("tdate",Const.META_TYPE_BIGINT,null); + builder.addColumn("param_sn",Const.META_TYPE_INTEGER,null); + builder.addColumn("weixin_order_id",Const.META_TYPE_STRING,null); + builder.addColumn("open_id",Const.META_TYPE_STRING,null); + builder.addColumn("total_money",Const.META_TYPE_INTEGER,null); + builder.addColumn("subscribe_id",Const.META_TYPE_STRING,null); + builder.addColumn("product_name",Const.META_TYPE_STRING,null); + //builder.addColumn("start_time",Const.META_TYPE_TIMESTAMP,null); builder.resourceCfg("hostName", "127.0.0.1").resourceCfg("protocol", "ftp") - .resourceCfg("port", 21).resourceCfg("userName", "test").resourceCfg("password", "test").fileFormat(Const.FILEFORMATSTR.PARQUET.getValue()) - .resPath("/tmp/test1.parquet.snappy").protocol(Const.VFS_PROTOCOL.FTP.getValue()).fsType(Const.FILESYSTEM.VFS.getValue()); + .resourceCfg("file.useAvroEncode","true") + .resourceCfg("port", 21).resourceCfg("userName", "test").resourceCfg("password", "test").fileFormat(Const.FILEFORMATSTR.PROTOBUF.getValue()) + .resPath("tmp/test2.proto.lzma").protocol(Const.VFS_PROTOCOL.FTP.getValue()); + QiniuFileSystemAccessor.Builder builder1=new QiniuFileSystemAccessor.Builder(); + ResourceBundle bundle=ResourceBundle.getBundle("qiniu"); + builder1.domain(bundle.getString("domain")).region(Region.autoRegion()).bucket(bundle.getString("bucket")) + .accessKey(bundle.getString("accessKey")).secretKey(bundle.getString("secretKey")); + QiniuFileSystemAccessor accessor=builder1.build(); DataCollectionMeta colmeta=builder.build(); - final AbstractFileWriter jwriter=(AbstractFileWriter) TextFileWriterFactory.getWriterByType(colmeta); + final AbstractFileWriter jwriter=(AbstractFileWriter) TextFileWriterFactory.getWriterByType(colmeta,accessor); System.out.println(new Date()); jwriter.beginWrite(); - ResultSetOperationExtractor extractor=new ResultSetOperationExtractor() { - @Override - public boolean executeAdditionalOperation(Map map, ResultSetMetaData rsmd) - throws SQLException { - try{ - map.put("tdate",((Timestamp)map.get("start_time")).getTime()); - map.remove("start_time"); + SimpleJdbcDao.executeOperationWithHandler(conn,"select param_sn,weixin_order_id,open_id,total_money,subscribe_id,product_name from t_business_bill_detail",false, rs -> { + int pos=0; + try { + Map map = new HashMap<>(); + while (rs.next()) { + map.clear(); + ResultSetExtractorUtils.wrapResultSetToMap(rs, "UTF-8", map); + //map.put("tdate",((Timestamp)map.get("start_time")).getTime()); jwriter.writeRecord(map); - }catch(Exception ex){ - ex.printStackTrace(); - throw new SQLException(ex); + pos++; } - return true; + }catch (IOException | OperationNotSupportedException ex){ + log.error("{}",ex); } - }; - SimpleJdbcDao.executeOperationWithQuery(conn, "select id,line_code,line_name,start_time from test_line",false, extractor); + return pos; + }); + //SimpleJdbcDao.executeOperationWithQuery(conn, "select param_sn,weixin_order_id,open_id,total_money,subscribe_id,product_name from t_business_bill_detail",false, extractor); jwriter.flush(); jwriter.finishWrite(); @@ -69,6 +82,10 @@ public boolean executeAdditionalOperation(Map map, ResultSetMeta }catch(Exception ex){ ex.printStackTrace(); } + } + @Test + public void testReadFromQiniu(){ + } @Test public void testRead(){ diff --git a/web/src/main/java/com/robin/core/web/util/HttpContextUtils.java b/web/src/main/java/com/robin/core/web/util/HttpContextUtils.java index 91d762d1..2cba17cd 100644 --- a/web/src/main/java/com/robin/core/web/util/HttpContextUtils.java +++ b/web/src/main/java/com/robin/core/web/util/HttpContextUtils.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.PrintWriter; +import java.nio.charset.StandardCharsets; import java.util.Enumeration; import java.util.HashMap; import java.util.Iterator; @@ -95,7 +96,7 @@ public static Map parseRequest(HttpServletRequest request){ try { if (StringUtils.startsWithIgnoreCase(request.getContentType(), MediaType.APPLICATION_JSON_VALUE)) { byte[] requestbytes=getRequestJson(request); - retMap.putAll(gson.fromJson(new String(requestbytes,"UTF8"),new TypeToken>(){}.getType())); + retMap.putAll(gson.fromJson(new String(requestbytes, StandardCharsets.UTF_8),new TypeToken>(){}.getType())); } else if (StringUtils.startsWithIgnoreCase(request.getContentType(), MediaType.APPLICATION_FORM_URLENCODED_VALUE)) { Map reqMap=request.getParameterMap(); if(!CollectionUtils.isEmpty(reqMap)){