diff --git a/common/src/main/java/com/robin/comm/dal/holder/RecordWriterHolder.java b/common/src/main/java/com/robin/comm/dal/holder/RecordWriterHolder.java index 7bf40be8..ee0e9771 100644 --- a/common/src/main/java/com/robin/comm/dal/holder/RecordWriterHolder.java +++ b/common/src/main/java/com/robin/comm/dal/holder/RecordWriterHolder.java @@ -32,7 +32,7 @@ public void init(DataCollectionMeta colmeta) throws Exception { if(!colmeta.isFsTag()) { writer = TextFileWriterFactory.getWriterByPath(colmeta, outStream); } else{ - writer= CommResWriterFactory.getFileWriterByType(colmeta.getResType(),colmeta); + writer= CommResWriterFactory.getFileWriterByType(colmeta.getFileFormat(),colmeta); } } public void writeRecord(Map map) throws IOException, OperationNotSupportedException { diff --git a/common/src/main/java/com/robin/core/fileaccess/fs/AbstractFileSystemAccessor.java b/common/src/main/java/com/robin/core/fileaccess/fs/AbstractFileSystemAccessor.java index 9e07a42e..32d46ccd 100644 --- a/common/src/main/java/com/robin/core/fileaccess/fs/AbstractFileSystemAccessor.java +++ b/common/src/main/java/com/robin/core/fileaccess/fs/AbstractFileSystemAccessor.java @@ -15,11 +15,16 @@ */ package com.robin.core.fileaccess.fs; +import com.robin.core.base.util.ResourceConst; import com.robin.core.compress.util.CompressDecoder; import com.robin.core.compress.util.CompressEncoder; import com.robin.core.fileaccess.meta.DataCollectionMeta; +import com.robin.core.fileaccess.util.ResourceUtil; +import org.springframework.util.ObjectUtils; import java.io.*; +import java.nio.file.Files; +import java.nio.file.Paths; /** * abstract resource system access Utils (Local/Hdfs/ApacheVFS(including ftp sftp)/S3/Tencent cloud/aliyun) @@ -95,6 +100,17 @@ protected static InputStream wrapInputStream(InputStream instream){ protected static OutputStream getOutputStreamByPath(String path, OutputStream out) throws IOException{ return CompressEncoder.getOutputStreamByCompressType(path,out); } + protected OutputStream getOutputStream(DataCollectionMeta meta) throws IOException { + OutputStream outputStream; + if(!ObjectUtils.isEmpty(meta.getResourceCfgMap().get(ResourceConst.USETMPFILETAG)) && "true".equalsIgnoreCase(meta.getResourceCfgMap().get(ResourceConst.USETMPFILETAG).toString())){ + String tmpPath = com.robin.core.base.util.FileUtils.getWorkingPath(meta); + String tmpFilePath = tmpPath + ResourceUtil.getProcessFileName(meta.getPath()); + outputStream= Files.newOutputStream(Paths.get(tmpFilePath)); + }else { + outputStream = new ByteArrayOutputStream(); + } + return outputStream; + } @Override public void init(DataCollectionMeta meta){ diff --git a/common/src/main/java/com/robin/core/fileaccess/iterator/ArffFileIterator.java b/common/src/main/java/com/robin/core/fileaccess/iterator/ArffFileIterator.java index 093b44ac..7aabe7a2 100644 --- a/common/src/main/java/com/robin/core/fileaccess/iterator/ArffFileIterator.java +++ b/common/src/main/java/com/robin/core/fileaccess/iterator/ArffFileIterator.java @@ -25,8 +25,7 @@ public ArffFileIterator(DataCollectionMeta colmeta) { @Override public void beforeProcess() { super.beforeProcess(); - if(CollectionUtils.isEmpty(colmeta.getColumnList())){ - if(!ObjectUtils.isEmpty(reader)){ + if(CollectionUtils.isEmpty(colmeta.getColumnList()) && (!ObjectUtils.isEmpty(reader))){ try { while (!(readLineStr = reader.readLine()).equalsIgnoreCase("@data")) { if(StringUtils.startsWithIgnoreCase(readLineStr,"@RELATION ")){ @@ -39,7 +38,7 @@ public void beforeProcess() { }catch (IOException ex){ logger.info("{}",ex.getMessage()); } - } + } } private DataSetColumnMeta parseDefine(String content){ diff --git a/common/src/main/java/com/robin/core/fileaccess/iterator/TextFileIteratorFactory.java b/common/src/main/java/com/robin/core/fileaccess/iterator/TextFileIteratorFactory.java index a10419af..2e16917b 100644 --- a/common/src/main/java/com/robin/core/fileaccess/iterator/TextFileIteratorFactory.java +++ b/common/src/main/java/com/robin/core/fileaccess/iterator/TextFileIteratorFactory.java @@ -38,6 +38,10 @@ public static IResourceIterator getProcessIteratorByType(DataCollectionMeta colm IResourceIterator iterator=getIter(colmeta); return iterator; } + public static IResourceIterator getProcessIteratorByType(DataCollectionMeta colmeta,AbstractFileSystemAccessor accessor) throws IOException{ + IResourceIterator iterator=getIter(colmeta,accessor); + return iterator; + } public static AbstractFileIterator getProcessReaderIterator(DataCollectionMeta colmeta, AbstractFileSystemAccessor utils){ AbstractFileIterator iterator=null; String fileType=colmeta.getFileFormat(); @@ -67,9 +71,9 @@ public static IResourceIterator getProcessIteratorByType(DataCollectionMeta colm return iterator; } public static IResourceIterator getProcessIteratorByPath(DataCollectionMeta colmeta,InputStream in) throws IOException{ - List suffixList=new ArrayList<>(); - FileUtils.parseFileFormat(colmeta.getPath(),suffixList); - String fileFormat=suffixList.get(0); + FileUtils.FileContent content=FileUtils.parseFile(colmeta.getPath()); + colmeta.setContent(content); + String fileFormat=content.getFileFormat(); if(StringUtils.isEmpty(colmeta.getFileFormat())){ colmeta.setFileFormat(fileFormat); } @@ -80,7 +84,7 @@ public static IResourceIterator getProcessIteratorByPath(DataCollectionMeta colm } private static IResourceIterator getIter(DataCollectionMeta colmeta) throws MissingConfigException { IResourceIterator iterator=null; - String fileType=colmeta.getFileFormat(); + String fileType = getFileType(colmeta); Class iterclass=fileIterMap.get(fileType); try { @@ -93,6 +97,32 @@ private static IResourceIterator getIter(DataCollectionMeta colmeta) throws Miss } return iterator; } + private static IResourceIterator getIter(DataCollectionMeta colmeta,AbstractFileSystemAccessor accessor) throws MissingConfigException { + IResourceIterator iterator=null; + String fileType = getFileType(colmeta); + + Class iterclass=fileIterMap.get(fileType); + try { + if (!ObjectUtils.isEmpty(iterclass)) { + iterator = iterclass.getConstructor(DataCollectionMeta.class,AbstractFileSystemAccessor.class).newInstance(colmeta,accessor); + } + iterator.beforeProcess(); + }catch (Exception ex){ + throw new MissingConfigException(ex); + } + return iterator; + } + + private static String getFileType(DataCollectionMeta colmeta) { + String fileType= colmeta.getFileFormat(); + if(ObjectUtils.isEmpty(fileType)){ + FileUtils.FileContent content=FileUtils.parseFile(colmeta.getPath()); + colmeta.setContent(content); + fileType=content.getFileFormat(); + } + return fileType; + } + private static void discoverIterator(){ ServiceLoader.load(IResourceIterator.class).iterator().forEachRemaining(i->{ if(AbstractFileIterator.class.isAssignableFrom(i.getClass())) diff --git a/common/src/main/java/com/robin/core/fileaccess/writer/AbstractFileWriter.java b/common/src/main/java/com/robin/core/fileaccess/writer/AbstractFileWriter.java index 0cdacef9..91b1b32d 100644 --- a/common/src/main/java/com/robin/core/fileaccess/writer/AbstractFileWriter.java +++ b/common/src/main/java/com/robin/core/fileaccess/writer/AbstractFileWriter.java @@ -62,6 +62,16 @@ protected AbstractFileWriter(DataCollectionMeta colmeta){ } checkAccessUtil(colmeta.getPath()); } + protected AbstractFileWriter(DataCollectionMeta colmeta,AbstractFileSystemAccessor accessor){ + this.colmeta=colmeta; + formatter=DateTimeFormatter.ofPattern(colmeta.getDefaultTimestampFormat()); + for (DataSetColumnMeta meta:colmeta.getColumnList()) { + columnList.add(meta.getColumnName()); + columnMap.put(meta.getColumnName(), meta.getColumnType()); + } + accessUtil=accessor; + } + @Override public void setWriter(BufferedWriter writer){ this.writer=writer; @@ -161,9 +171,11 @@ protected String getOutputPath(String url){ return url; } protected Const.CompressType getCompressType(){ - List fileSuffix=new ArrayList<>(); - FileUtils.parseFileFormat(getOutputPath(colmeta.getPath()),fileSuffix); - return FileUtils.getFileCompressType(fileSuffix); + if(ObjectUtils.isEmpty(colmeta.getContent())) { + FileUtils.FileContent content = FileUtils.parseFile(colmeta.getPath()); + colmeta.setContent(content); + } + return colmeta.getContent().getCompressType(); } @Override diff --git a/common/src/main/java/com/robin/core/fileaccess/writer/TextFileWriterFactory.java b/common/src/main/java/com/robin/core/fileaccess/writer/TextFileWriterFactory.java index dfb8c910..4d6dd3dd 100644 --- a/common/src/main/java/com/robin/core/fileaccess/writer/TextFileWriterFactory.java +++ b/common/src/main/java/com/robin/core/fileaccess/writer/TextFileWriterFactory.java @@ -16,6 +16,7 @@ package com.robin.core.fileaccess.writer; import com.robin.core.base.util.FileUtils; +import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor; import com.robin.core.fileaccess.meta.DataCollectionMeta; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,11 @@ public static IResourceWriter getWriterByType(DataCollectionMeta colmeta, Buffer fileWriter.setWriter(writer); return fileWriter; } + public static IResourceWriter getWriterByType(DataCollectionMeta colmeta, BufferedWriter writer,AbstractFileSystemAccessor accessor) throws IOException { + IResourceWriter fileWriter = getWriterByType(colmeta,accessor); + fileWriter.setWriter(writer); + return fileWriter; + } public static IResourceWriter getOutputStreamByType(DataCollectionMeta colmeta, OutputStream writer) throws IOException{ IResourceWriter fileWriter = getWriterByType(colmeta); @@ -47,6 +53,13 @@ public static IResourceWriter getOutputStreamByType(DataCollectionMeta colmeta, } return fileWriter; } + public static IResourceWriter getOutputStreamByType(DataCollectionMeta colmeta, OutputStream writer,AbstractFileSystemAccessor accessor) throws IOException{ + IResourceWriter fileWriter = getWriterByType(colmeta,accessor); + if(writer!=null) { + fileWriter.setOutputStream(writer); + } + return fileWriter; + } public static IResourceWriter getWriterByPath(DataCollectionMeta colmeta, OutputStream writer) throws IOException{ IResourceWriter fileWriter = getWriterByType(colmeta); @@ -57,8 +70,8 @@ public static IResourceWriter getWriterByPath(DataCollectionMeta colmeta, Output public static IResourceWriter getWriterByType(DataCollectionMeta colmeta) throws IOException { IResourceWriter fileWriter = null; try { + String fileSuffix = getFileSuffix(colmeta); - String fileSuffix=colmeta.getFileFormat(); Class writerClass=fileWriterMap.get(fileSuffix); if (!ObjectUtils.isEmpty(writerClass)) { fileWriter = writerClass.getConstructor(DataCollectionMeta.class).newInstance(colmeta); @@ -70,6 +83,33 @@ public static IResourceWriter getWriterByType(DataCollectionMeta colmeta) throws } return fileWriter; } + public static IResourceWriter getWriterByType(DataCollectionMeta colmeta, AbstractFileSystemAccessor accessor) throws IOException { + IResourceWriter fileWriter = null; + try { + String fileSuffix = getFileSuffix(colmeta); + + Class writerClass=fileWriterMap.get(fileSuffix); + if (!ObjectUtils.isEmpty(writerClass)) { + fileWriter = writerClass.getConstructor(DataCollectionMeta.class,AbstractFileSystemAccessor.class).newInstance(colmeta,accessor); + logger.info("using resource writer {}",writerClass.getCanonicalName()); + } + + } catch (Exception ex) { + throw new IOException(ex); + } + return fileWriter; + } + + private static String getFileSuffix(DataCollectionMeta colmeta) { + String fileSuffix= colmeta.getFileFormat(); + if(ObjectUtils.isEmpty(fileSuffix)){ + FileUtils.FileContent content=FileUtils.parseFile(colmeta.getPath()); + colmeta.setContent(content); + fileSuffix=content.getFileFormat(); + } + return fileSuffix; + } + private static void discoverIterator(Map> fileIterMap){ ServiceLoader.load(IResourceWriter.class).iterator().forEachRemaining(i->{ if(AbstractFileWriter.class.isAssignableFrom(i.getClass())) diff --git a/common/src/main/java/com/robin/core/resaccess/CommResIteratorFactory.java b/common/src/main/java/com/robin/core/resaccess/CommResIteratorFactory.java index d583ab9d..30a65dd0 100644 --- a/common/src/main/java/com/robin/core/resaccess/CommResIteratorFactory.java +++ b/common/src/main/java/com/robin/core/resaccess/CommResIteratorFactory.java @@ -12,12 +12,15 @@ @Slf4j public class CommResIteratorFactory { + private CommResIteratorFactory(){ + + } private static Map> iterMap =new HashMap<>(); static { discoverIterator(iterMap); } - public static AbstractResIterator getIterator(Long resType, DataCollectionMeta colmeta) { + public static AbstractResIterator getIterator(String resType, DataCollectionMeta colmeta) { AbstractResIterator iterator = null; Class clazz = iterMap.get(resType); try { diff --git a/common/src/main/java/com/robin/core/resaccess/CommResWriterFactory.java b/common/src/main/java/com/robin/core/resaccess/CommResWriterFactory.java index b0ab5b65..72d8ed85 100644 --- a/common/src/main/java/com/robin/core/resaccess/CommResWriterFactory.java +++ b/common/src/main/java/com/robin/core/resaccess/CommResWriterFactory.java @@ -16,12 +16,6 @@ @Slf4j public class CommResWriterFactory { - private static final String KAFKA_WRITER_CLASS = "com.robin.comm.resaccess.writer.KafkaResourceWriter"; - private static final String CASSANDRA_WRITER_CLASS = "com.robin.comm.resaccess.writer.CassandraResourceWriter"; - private static final String MONGO_WRITER_CLASS = "com.robin.comm.resaccess.writer.MongoResourceWriter"; - private static final String REDIS_WRITER_CLASS = "com.robin.comm.resaccess.writer.RedisResourceWriter"; - private static final String ROCKET_WRITER_CLASS = "com.robin.comm.resaccess.writer.RocketResourceWriter"; - private static final String HBASE_WRITER_CLASS = "com.robin.comm.resaccess.writer.HbaseResourceWriter"; private static Map> writerMap =new HashMap<>(); static { @@ -30,7 +24,7 @@ public class CommResWriterFactory { private CommResWriterFactory(){ } - public static AbstractResourceWriter getFileWriterByType(Long resType, DataCollectionMeta colmeta) { + public static AbstractResourceWriter getFileWriterByType(String resType, DataCollectionMeta colmeta) { AbstractResourceWriter fileWriter = null; Class clazz = writerMap.get(resType); try { diff --git a/core/src/main/java/com/robin/core/base/dao/CommJdbcUtil.java b/core/src/main/java/com/robin/core/base/dao/CommJdbcUtil.java index e1678248..81e208d1 100644 --- a/core/src/main/java/com/robin/core/base/dao/CommJdbcUtil.java +++ b/core/src/main/java/com/robin/core/base/dao/CommJdbcUtil.java @@ -318,7 +318,7 @@ private Object getRecordValue(ResultSetMetaData rsmd, ResultSet rs, LobHandler l return retObj; } - static void setTargetValue(Object target, Object value, String columnName, String columnType, LobHandler handler, PageQuery> pageQuery) throws DAOException { + static void setTargetValue(Object target, Object value, String columnName, String columnType, PageQuery> pageQuery) throws DAOException { try { if (value != null) { Object targetValue = null; diff --git a/core/src/main/java/com/robin/core/base/dao/SqlMapperDao.java b/core/src/main/java/com/robin/core/base/dao/SqlMapperDao.java index 488e70c7..b967f1e4 100644 --- a/core/src/main/java/com/robin/core/base/dao/SqlMapperDao.java +++ b/core/src/main/java/com/robin/core/base/dao/SqlMapperDao.java @@ -194,7 +194,7 @@ private static ResultSetExtractor> resultSetExtractor(SqlMapperConfigure Map map = new HashMap<>(); for (int i = 0; i < count; i++) { String columnName = rsmd.getColumnName(i + 1); - CommJdbcUtil.setTargetValue(map, resultSet.getObject(i + 1), segment1.getColumnMapper().get(columnName).left, segment1.getColumnMapper().get(columnName).right, lobHandler, pageQuery); + CommJdbcUtil.setTargetValue(map, resultSet.getObject(i + 1), segment1.getColumnMapper().get(columnName).left, segment1.getColumnMapper().get(columnName).right, pageQuery); } retList.add(map); } else { @@ -208,7 +208,7 @@ private static ResultSetExtractor> resultSetExtractor(SqlMapperConfigure if (!segment1.getColumnMapper().containsKey(columnName)) { throw new DAOException("property " + columnName + " not exist in class " + segment1.getClassName()); } - CommJdbcUtil.setTargetValue(targetObject, resultSet.getObject(i + 1), segment1.getColumnMapper().get(columnName).left, segment1.getColumnMapper().get(columnName).right, lobHandler, pageQuery); + CommJdbcUtil.setTargetValue(targetObject, resultSet.getObject(i + 1), segment1.getColumnMapper().get(columnName).left, segment1.getColumnMapper().get(columnName).right, pageQuery); } retList.add(targetObject); } catch (IllegalAccessException|InstantiationException ex1) { @@ -220,7 +220,7 @@ private static ResultSetExtractor> resultSetExtractor(SqlMapperConfigure Map map = new HashMap<>(); for (int i = 0; i < count; i++) { String columnName = rsmd.getColumnName(i + 1); - CommJdbcUtil.setTargetValue(map, resultSet.getObject(i + 1), columnName, null, lobHandler, pageQuery); + CommJdbcUtil.setTargetValue(map, resultSet.getObject(i + 1), columnName, null, pageQuery); } retList.add(map); } diff --git a/core/src/main/java/com/robin/core/base/datameta/DataBaseUtil.java b/core/src/main/java/com/robin/core/base/datameta/DataBaseUtil.java index ec9ce6b2..cfb521ce 100644 --- a/core/src/main/java/com/robin/core/base/datameta/DataBaseUtil.java +++ b/core/src/main/java/com/robin/core/base/datameta/DataBaseUtil.java @@ -42,14 +42,15 @@ import java.util.stream.Collectors; @Slf4j +@SuppressWarnings("unused") public class DataBaseUtil { private Connection connection; private static final Logger logger = LoggerFactory.getLogger(DataBaseUtil.class); private BaseDataBaseMeta dataBaseMeta; - private static final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); - private static final SimpleDateFormat format1 = new SimpleDateFormat("yyyyMMddhhmmss"); - private static final SimpleDateFormat format2 = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss.SSS"); - private static final SimpleDateFormat format3 = new SimpleDateFormat("yyyy-MM-dd"); + private static final DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss"); + private static final DateTimeFormatter format1 = DateTimeFormatter.ofPattern("yyyyMMddhhmmss"); + private static final DateTimeFormatter format2 = DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss.SSS"); + private static final DateTimeFormatter format3 = DateTimeFormatter.ofPattern("yyyy-MM-dd"); public void connect(BaseDataBaseMeta meta) throws ClassNotFoundException, SQLException { dataBaseMeta = meta; @@ -148,11 +149,7 @@ public List getTableMetaByTableName(String tablename, String if(!org.springframework.util.StringUtils.isEmpty(defaultValue)) { datameta.setDefaultValue(defaultValue); } - if (pklist.contains(columnname)) { - datameta.setPrimaryKey(true); - } else { - datameta.setPrimaryKey(false); - } + datameta.setPrimaryKey(pklist.contains(columnname)); columnlist.add(datameta); } return columnlist; @@ -235,11 +232,7 @@ public static List getTableMetaByTableName(Connection conn, } } setType(columnname, columnType, rs.getInt("DATA_TYPE"), rs.getString("TYPE_NAME"), datalength, nullable, comment, precise, scale, datameta); - if (pklist != null && pklist.contains(columnname)) { - datameta.setPrimaryKey(true); - } else { - datameta.setPrimaryKey(false); - } + datameta.setPrimaryKey(pklist != null && pklist.contains(columnname)); columnlist.add(datameta); } return columnlist; @@ -311,8 +304,7 @@ public static List getQueryMeta(Connection conn, String sql) } } - public static String translateDbType(Integer dbType) { - int type = dbType.intValue(); + public static String translateDbType(Integer type) { String retStr; if (type == Types.INTEGER || type == Types.TINYINT) { retStr = Const.META_TYPE_INTEGER; @@ -344,7 +336,7 @@ public static Map transformDbTypeByObj(Object obj) { Map retMap = new HashMap<>(); String type = null; - SimpleDateFormat targetFormat = null; + DateTimeFormatter targetFormat = null; if (obj instanceof Long) { type = Const.META_TYPE_BIGINT; } else if (obj instanceof Integer) { @@ -374,7 +366,7 @@ public static Map transformDbTypeByObj(Object obj) { return retMap; } - private static boolean isStringValueDate(String value, SimpleDateFormat format) { + private static boolean isStringValueDate(String value, DateTimeFormatter format) { try { format.parse(value); return true; @@ -426,9 +418,8 @@ public static boolean isValueValid(Object value, String type) { validtag = true; } break; - case Const.META_TYPE_STRING: + default: validtag = true; - break; } } return validtag; @@ -473,7 +464,7 @@ public String generateCreateTableSql(BaseDataBaseMeta meta, List { diff --git a/core/src/main/java/com/robin/core/base/service/AbstractMybatisService.java b/core/src/main/java/com/robin/core/base/service/AbstractMybatisService.java index 2578a8fb..f888d907 100644 --- a/core/src/main/java/com/robin/core/base/service/AbstractMybatisService.java +++ b/core/src/main/java/com/robin/core/base/service/AbstractMybatisService.java @@ -416,7 +416,7 @@ public T selectOne(Wrapper wrapper) { public boolean deleteWithRequest(Object queryObject) { try { QueryWrapper wrapper = wrapWithEntity(queryObject); - return delete(wrapper); + return SqlHelper.retBool(baseDao.delete(wrapper)); } catch (Exception ex) { throw new ServiceException(ex); } @@ -426,7 +426,7 @@ public boolean deleteWithRequest(Object queryObject) { public boolean updateWithRequest(T model, Object queryObject) { try { QueryWrapper wrapper = wrapWithEntity(queryObject); - return update(model, wrapper); + return SqlHelper.retBool(baseDao.update(model, wrapper)); } catch (Exception ex) { throw new ServiceException(ex); } diff --git a/core/src/main/java/com/robin/core/base/util/Const.java b/core/src/main/java/com/robin/core/base/util/Const.java index 5a913a0a..6a3d5307 100644 --- a/core/src/main/java/com/robin/core/base/util/Const.java +++ b/core/src/main/java/com/robin/core/base/util/Const.java @@ -390,7 +390,8 @@ public enum FILESYSTEM{ HDFS("hdfs"), S3("s3"), ALIYUN("oss"), - TENCENT("cos"); + TENCENT("cos"), + QINIU("qiniu"); private String value; FILESYSTEM(String value){ this.value=value; @@ -442,7 +443,7 @@ public String getValue(){ //定时任务触发时间点 public static final String TRIGGER_TIMESPAN = "triggerTimeSpan"; - public final static List ESCAPE_CHARACTERS = new ArrayList( + public final static List ESCAPE_CHARACTERS = new ArrayList<>( Lists.newArrayList("$", "(", ")", "*", "+", ".", "[", "?", "\\", "^", "{", "|")); diff --git a/core/src/main/java/com/robin/core/base/util/FileUtils.java b/core/src/main/java/com/robin/core/base/util/FileUtils.java index 73ff4811..99dcab51 100644 --- a/core/src/main/java/com/robin/core/base/util/FileUtils.java +++ b/core/src/main/java/com/robin/core/base/util/FileUtils.java @@ -3,7 +3,10 @@ import cn.hutool.core.io.FileUtil; import com.google.common.collect.Lists; import com.robin.core.fileaccess.meta.DataCollectionMeta; +import lombok.Data; import lombok.extern.slf4j.Slf4j; +import org.springframework.util.Assert; +import org.springframework.util.MimeTypeUtils; import org.springframework.util.ObjectUtils; import java.io.File; @@ -25,39 +28,71 @@ public class FileUtils { public static final List compressTypeEnum =Collections.unmodifiableList(Lists.newArrayList(Const.CompressType.COMPRESS_TYPE_GZ, Const.CompressType.COMPRESS_TYPE_LZO, Const.CompressType.COMPRESS_TYPE_BZ2, Const.CompressType.COMPRESS_TYPE_SNAPPY, Const.CompressType.COMPRESS_TYPE_ZIP, Const.CompressType.COMPRESS_TYPE_LZMA,Const.CompressType.COMPRESS_TYPE_LZ4,Const.CompressType.COMPRESS_TYPE_ZSTD,Const.CompressType.COMPRESS_TYPE_BROTLI,Const.CompressType.COMPRESS_TYPE_XZ)); + private static final Map contentTypeMap=new HashMap<>(); + + static { + ResourceBundle bundle=ResourceBundle.getBundle("contenttype"); + if(!ObjectUtils.isEmpty(bundle)){ + Iterator iter=bundle.keySet().iterator(); + while (iter.hasNext()){ + String key=iter.next(); + contentTypeMap.put(key,bundle.getString(key)); + } + } + } + private FileUtils(){ } - public static String parseFileFormat(String path, List suffix) { - String filePath = null; - if (suffix == null) { - suffix = new ArrayList<>(); + + public static FileContent parseFile(String path){ + Assert.isTrue(!ObjectUtils.isEmpty(path),"path must not be null!"); + FileContent fileContent=new FileContent(); + int pos=path.lastIndexOf(File.separator); + if(pos==-1){ + pos=path.lastIndexOf("/"); } - if (path != null && !path.trim().isEmpty()) { - int pos = path.lastIndexOf(File.separator); - if (pos == -1) { - pos = path.lastIndexOf("/"); - } - if (pos != -1) { - String fileName = path.substring(pos + 1); - String[] arr = fileName.split("\\."); - int lastpos = fileName.lastIndexOf("."); - filePath = fileName.substring(0, lastpos); - for (int i = arr.length - 1; i > 0; i--) { - suffix.add(arr[i]); + if(pos!=-1){ + String fileName=path.substring(pos+1); + String filePath=path.substring(0,pos); + fileContent.setFilePath(filePath); + String[] parts=fileName.split("\\."); + List sepParts=new ArrayList<>(); + for(int i=parts.length-1;i>0;i--){ + if(Const.CompressType.COMPRESS_TYPE_NONE.equals(fileContent.getCompressType())){ + Const.CompressType compressType=getFileCompressType(parts[i]); + if(!Const.CompressType.COMPRESS_TYPE_NONE.equals(compressType)) { + fileContent.setCompressType(compressType); + if (contentTypeMap.containsKey(parts[i].toLowerCase())) { + fileContent.setContentType(contentTypeMap.get(parts[i].toLowerCase())); + } + }else{ + parseFileFormat(fileContent,parts[i],sepParts); + } + }else { + parseFileFormat(fileContent,parts[i],sepParts); } } + sepParts.add(parts[0]); + Collections.reverse(sepParts); + fileContent.setFileName(StringUtils.join(sepParts,".")); } - return filePath; + return fileContent; } - - public static Const.CompressType getFileCompressType(List suffixList) { - Const.CompressType type = Const.CompressType.COMPRESS_TYPE_NONE; - if (!suffixList.isEmpty() && avaiableCompressSuffixs.contains(suffixList.get(0).toLowerCase())) { - type = compressTypeEnum.get(avaiableCompressSuffixs.indexOf(suffixList.get(0))); + private static void parseFileFormat(FileContent content,String suffix,List sepParts){ + if(ObjectUtils.isEmpty(content.getFileFormat())){ + content.setFileFormat(suffix); + if(ObjectUtils.isEmpty(content.getContentType())){ + if(contentTypeMap.containsKey(suffix.toLowerCase())){ + content.setContentType(contentTypeMap.get(suffix.toLowerCase())); + } + } + }else{ + sepParts.add(suffix); } - return type; } + + public static Const.CompressType getFileCompressType(String suffix) { Const.CompressType type = Const.CompressType.COMPRESS_TYPE_NONE; if (suffix!=null && !suffix.isEmpty() && avaiableCompressSuffixs.contains(suffix.toLowerCase())) { @@ -65,6 +100,17 @@ public static Const.CompressType getFileCompressType(String suffix) { } return type; } + public static String getContentType(String fileFormat){ + return contentTypeMap.containsKey(fileFormat.toLowerCase())?contentTypeMap.get(fileFormat.toLowerCase()):null; + } + public static String getContentType(DataCollectionMeta meta){ + String fileType=meta.getFileFormat(); + if(ObjectUtils.isEmpty(fileType)){ + FileUtils.FileContent content=FileUtils.parseFile(meta.getPath()); + fileType=content.getFileFormat(); + } + return getContentType(fileType); + } public static boolean mkDirWithGroupAndUser(String path,String group,String user) { FileUtil.mkdir(path); Path filePath= Paths.get(path); @@ -111,4 +157,16 @@ public static String getWorkingPath(DataCollectionMeta meta){ ? meta.getResourceCfgMap().get(ResourceConst.WORKINGPATHPARAM).toString() : org.apache.commons.io.FileUtils.getTempDirectoryPath(); } + @Data + public static class FileContent{ + private String fileName; + private String filePath; + private String fileFormat; + private String contentType; + private Const.CompressType compressType= Const.CompressType.COMPRESS_TYPE_NONE; + } + public static void main(String[] args){ + String path="file:///e:/tmp/test/test1.avro.cs.lz4"; + System.out.println(parseFile(path)); + } } diff --git a/core/src/main/java/com/robin/core/base/util/ResourceConst.java b/core/src/main/java/com/robin/core/base/util/ResourceConst.java index 2b98504b..2929c0cc 100644 --- a/core/src/main/java/com/robin/core/base/util/ResourceConst.java +++ b/core/src/main/java/com/robin/core/base/util/ResourceConst.java @@ -154,4 +154,19 @@ public String getValue() { return value; } } + public enum QINIUPARAM{ + DOMAIN("domain"), + REGION("region"), + ACESSSKEY("accessKey"), + SECURITYKEY("securityKey"); + private String value; + QINIUPARAM(String value){ + this.value=value; + } + + public String getValue() { + return value; + } + + } } diff --git a/core/src/main/java/com/robin/core/collection/util/CollectionMapConvert.java b/core/src/main/java/com/robin/core/collection/util/CollectionMapConvert.java index e5432078..78f7a864 100644 --- a/core/src/main/java/com/robin/core/collection/util/CollectionMapConvert.java +++ b/core/src/main/java/com/robin/core/collection/util/CollectionMapConvert.java @@ -190,14 +190,12 @@ public static List filterListByColumnCondition(List listobj, String sc public static String getColumnValueAppendBySeparate(List listobj, Function column, String separate) throws MissingConfigException,InvocationTargetException,IllegalAccessException { checkType(listobj); - StringBuilder buffer = new StringBuilder(); Assert.notNull(column,""); List values= listobj.stream().collect(Collectors.mapping(column,Collectors.toList())); return StringUtils.join(values,separate); } public static List

getValueListBySeparate(List listobj, Function column) throws MissingConfigException,InvocationTargetException,IllegalAccessException { - List retList = new ArrayList<>(); checkType(listobj); return listobj.stream().collect(Collectors.mapping(column,Collectors.toList())); } diff --git a/core/src/main/java/com/robin/core/compress/util/CompressDecoder.java b/core/src/main/java/com/robin/core/compress/util/CompressDecoder.java index 12381451..82862a1b 100644 --- a/core/src/main/java/com/robin/core/compress/util/CompressDecoder.java +++ b/core/src/main/java/com/robin/core/compress/util/CompressDecoder.java @@ -29,9 +29,8 @@ private CompressDecoder(){ } public static InputStream getInputStreamByCompressType(String path, InputStream rawstream) throws IOException{ InputStream inputStream; - List suffixList=new ArrayList<>(); - FileUtils.parseFileFormat(path,suffixList); - Const.CompressType type=FileUtils.getFileCompressType(suffixList); + FileUtils.FileContent content=FileUtils.parseFile(path); + Const.CompressType type=content.getCompressType(); switch (type){ case COMPRESS_TYPE_GZ: inputStream=new GZIPInputStream(wrapInputStream(rawstream)); diff --git a/core/src/main/java/com/robin/core/compress/util/CompressEncoder.java b/core/src/main/java/com/robin/core/compress/util/CompressEncoder.java index 06a9d3a1..3e613cc7 100644 --- a/core/src/main/java/com/robin/core/compress/util/CompressEncoder.java +++ b/core/src/main/java/com/robin/core/compress/util/CompressEncoder.java @@ -38,9 +38,8 @@ private CompressEncoder(){ */ public static OutputStream getOutputStreamByCompressType(String path,OutputStream rawstream) throws IOException{ OutputStream outputStream; - List suffixList=new ArrayList<>(); - String fileName= FileUtils.parseFileFormat(path,suffixList); - Const.CompressType type=FileUtils.getFileCompressType(suffixList); + FileUtils.FileContent content=FileUtils.parseFile(path); + Const.CompressType type=content.getCompressType(); switch (type){ case COMPRESS_TYPE_GZ: outputStream=new GZIPOutputStream(wrapOutputStream(rawstream)); @@ -58,7 +57,7 @@ public static OutputStream getOutputStreamByCompressType(String path,OutputStrea break; case COMPRESS_TYPE_ZIP: ZipOutputStream stream1=new ZipOutputStream(wrapOutputStream(rawstream)); - stream1.putNextEntry(new ZipEntry(fileName)); + stream1.putNextEntry(new ZipEntry(content.getFileName()+"."+content.getFileFormat())); outputStream=stream1; break; case COMPRESS_TYPE_LZ4: diff --git a/core/src/main/java/com/robin/core/fileaccess/meta/DataCollectionMeta.java b/core/src/main/java/com/robin/core/fileaccess/meta/DataCollectionMeta.java index 429c8c71..ab91a983 100644 --- a/core/src/main/java/com/robin/core/fileaccess/meta/DataCollectionMeta.java +++ b/core/src/main/java/com/robin/core/fileaccess/meta/DataCollectionMeta.java @@ -18,6 +18,7 @@ import com.robin.core.base.datameta.BaseDataBaseMeta; import com.robin.core.base.datameta.DataBaseColumnMeta; import com.robin.core.base.datameta.DataBaseParam; +import com.robin.core.base.util.FileUtils; import com.robin.core.base.util.ResourceConst; import com.robin.core.convert.util.ConvertUtil; import com.robin.core.fileaccess.util.ResourceUtil; @@ -58,6 +59,7 @@ public class DataCollectionMeta implements Serializable { private String dbType; private String tableName; private String protocol; + private FileUtils.FileContent content; public void setAvroSchema(Class clazz){ String fullClassName=clazz.getClass().getCanonicalName(); int pos=fullClassName.lastIndexOf("."); diff --git a/core/src/main/resources/contenttype.properties b/core/src/main/resources/contenttype.properties new file mode 100644 index 00000000..e0efe7df --- /dev/null +++ b/core/src/main/resources/contenttype.properties @@ -0,0 +1,103 @@ +3gp=video/3gpp +asf=video/x-ms-asf +avi=video/x-msvideo +dwg=application/x-dwg +m4u=video/vndmpegurl +m4v=video/x-m4v +mov=video/quicktime + +mp4=video/mp4 +mpg4=video/mp4 + +mpe=video/x-mpeg + +mpeg=video/mpg +mpg=video/mpg + +m3u=audio/x-mpegurl + +m4a=audio/mp4a-latm +m4b=audio/mp4a-latm +m4p=audio/mp4a-latm + + +mp2=x-mpeg +mp3=audio/x-mpeg + +mpga=audio/mpeg +ogg=audio/ogg +rmvb=audio/x-pn-realaudio + +wma=audio/x-ms-wma +wmv=audio/x-ms-wmv + +c=text/plain +java=text/plain +conf=text/plain +cpp=text/plain +h=text/plain +prop=text/plain +rc=text/plain +sh=text/plain +log=text/plain +txt=text/plain +xml=text/plain + +html=text/html +htm=text/html + +css=text/css + + +jpg=image/jpeg +jpeg=image/jpeg +tif=image/tiff +ico=image/x-icon +swf=application/x-shockwave-flash +wav=audio/wav + +bmp=image/bmp +gif=image/gif +png=image/png + + +bin=application/octet-stream +class=application/octet-stream +exe=application/octet-stream + +apk=application/vndandroidpackage-archive +doc=application/msword +docx=application/vndopenxmlformats-officedocumentwordprocessingmldocument +xls=application/vndms-excel +xlsx=application/vndopenxmlformats-officedocumentspreadsheetmlsheet + +gtar=application/x-gtar +gz=application/x-gzip +jar=application/java-archive +js=application/x-javascript +mpc=application/vndmpohuncertificate +msg=application/vndms-outlook +pdf=application/pdf + +pps=application/vndms-powerpoint +ppt=application/vndms-powerpoint + +pptx=application/vndopenxmlformats-officedocumentpresentationmlpresentation +rtf=application/rtf +tar=application/x-tar +tgz=application/x-compressed +wps=application/vndms-works +z=application/x-compress +zip=application/x-zip-compressed + +asp=text/asp +jsp=text/html +parquet=application/octet-stream +snappy=application/octet-stream +lzma=application/octet-stream +xz=application/octet-stream +avro=application/octet-stream +proto=application/octet-stream +orc=application/octet-stream +bz2=application/octet-stream +lz4=application/octet-stream diff --git a/etl/src/main/java/com/robin/etl/util/CommProcessCycleGen.java b/etl/src/main/java/com/robin/etl/util/CommProcessCycleGen.java index b687d7f3..0ccaac34 100644 --- a/etl/src/main/java/com/robin/etl/util/CommProcessCycleGen.java +++ b/etl/src/main/java/com/robin/etl/util/CommProcessCycleGen.java @@ -1,9 +1,7 @@ package com.robin.etl.util; -import com.robin.core.base.util.Const; import com.robin.etl.common.EtlConstant; import org.apache.commons.lang3.tuple.Pair; -import org.apache.tomcat.jni.Local; import org.springframework.lang.NonNull; import org.springframework.util.Assert; @@ -11,7 +9,6 @@ import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.time.temporal.WeekFields; -import java.util.Map; public class CommProcessCycleGen implements IprocessCycleGen { private LocalDateTime preTime; diff --git a/hadooptool/pom.xml b/hadooptool/pom.xml index 2a892c09..dc7bdd11 100644 --- a/hadooptool/pom.xml +++ b/hadooptool/pom.xml @@ -332,6 +332,12 @@ + + com.qiniu + qiniu-java-sdk + [7.16.0, 7.16.99] + true + diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/COSFileSystemAccessor.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/COSFileSystemAccessor.java index 64fa0d91..8188ec2b 100644 --- a/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/COSFileSystemAccessor.java +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/COSFileSystemAccessor.java @@ -11,10 +11,13 @@ import com.qcloud.cos.transfer.TransferManagerConfiguration; import com.qcloud.cos.transfer.Upload; import com.robin.core.base.exception.ResourceNotAvailableException; +import com.robin.core.base.util.Const; import com.robin.core.base.util.ResourceConst; import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor; import com.robin.core.fileaccess.meta.DataCollectionMeta; import com.robin.core.fileaccess.util.ResourceUtil; +import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.tuple.Pair; @@ -30,11 +33,19 @@ import java.util.concurrent.Executors; /** - * Tencent COS FileSystemAccessor + * Tencent COS FileSystemAccessor,must init individual */ @Slf4j +@Getter public class COSFileSystemAccessor extends AbstractFileSystemAccessor { private COSClient cosClient; + private String regionName; + private HttpProtocol protocol=HttpProtocol.http; + private String securityKey; + private String accessKey; + private COSFileSystemAccessor(){ + this.identifier= Const.FILESYSTEM.TENCENT.getValue(); + } @Override public void init(DataCollectionMeta meta) { @@ -52,6 +63,16 @@ public void init(DataCollectionMeta meta) { meta.getResourceCfgMap().get(ResourceConst.COSPARAM.SECURITYKEY.getValue()).toString()); cosClient = new COSClient(cosCredentials, config); } + public void init(){ + Assert.notNull(regionName,"regionName required!"); + Assert.notNull(accessKey,"accessKey required!"); + Assert.notNull(securityKey,"securityKey required!"); + Region region=new Region(regionName); + ClientConfig config=new ClientConfig(region); + config.setHttpProtocol(protocol); + COSCredentials cosCredentials = new BasicCOSCredentials(accessKey,securityKey); + cosClient = new COSClient(cosCredentials, config); + } @Override public Pair getInResourceByReader(DataCollectionMeta meta, String resourcePath) throws IOException { @@ -109,17 +130,7 @@ private InputStream getInputStreamByConfig(DataCollectionMeta meta) { String objectName= meta.getPath(); return getObject(bucketName,objectName); } - private static OutputStream getOutputStream(DataCollectionMeta meta) throws IOException { - OutputStream outputStream; - if(!ObjectUtils.isEmpty(meta.getResourceCfgMap().get(ResourceConst.USETMPFILETAG)) && "true".equalsIgnoreCase(meta.getResourceCfgMap().get(ResourceConst.USETMPFILETAG).toString())){ - String tmpPath = com.robin.core.base.util.FileUtils.getWorkingPath(meta); - String tmpFilePath = tmpPath + ResourceUtil.getProcessFileName(meta.getPath()); - outputStream= Files.newOutputStream(Paths.get(tmpFilePath)); - }else { - outputStream = new ByteArrayOutputStream(); - } - return outputStream; - } + private COSObjectInputStream getObject(@NonNull String bucketName,@NonNull String key) { GetObjectRequest request = new GetObjectRequest(bucketName, key); COSObject object = cosClient.getObject(request); @@ -162,10 +173,13 @@ public void finishWrite(DataCollectionMeta meta, OutputStream outputStream) { private boolean upload(DataCollectionMeta meta, OutputStream outputStream) throws IOException,InterruptedException { String bucketName=meta.getResourceCfgMap().get("bucketName").toString(); TransferManager transferManager=getManager(); - PutObjectRequest request=null; + PutObjectRequest request; String tmpFilePath=null; + ObjectMetadata objectMetadata = new ObjectMetadata(); + if(!ObjectUtils.isEmpty(meta.getContent())){ + objectMetadata.setContentType(meta.getContent().getContentType()); + } if(ByteArrayOutputStream.class.isAssignableFrom(outputStream.getClass())){ - ObjectMetadata objectMetadata = new ObjectMetadata(); objectMetadata.setContentLength(((ByteArrayOutputStream)outputStream).size()); request = new PutObjectRequest(bucketName, meta.getPath(), new ByteArrayInputStream(((ByteArrayOutputStream)outputStream).toByteArray()),objectMetadata); }else{ @@ -191,5 +205,38 @@ private boolean upload(DataCollectionMeta meta, OutputStream outputStream) throw } return false; } + public static class Builder{ + private COSFileSystemAccessor accessor; + public Builder(){ + accessor=new COSFileSystemAccessor(); + } + public Builder accessKey(String accessKey){ + accessor.accessKey=accessKey; + return this; + } + public Builder secretKey(String secretKey){ + accessor.securityKey=secretKey; + return this; + } + public Builder withMetaConfig(DataCollectionMeta meta){ + accessor.init(meta); + return this; + } + public Builder region(String regionName){ + accessor.regionName=regionName; + return this; + } + public Builder protocol(HttpProtocol protocol){ + accessor.protocol=protocol; + return this; + } + public COSFileSystemAccessor build(){ + if(!ObjectUtils.isEmpty(accessor.getCosClient())){ + accessor.init(); + } + return accessor; + } + + } } diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/HdfsFileSystemAccessor.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/HdfsFileSystemAccessor.java index 26487fac..280d8f49 100644 --- a/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/HdfsFileSystemAccessor.java +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/HdfsFileSystemAccessor.java @@ -15,6 +15,9 @@ import java.util.HashMap; import java.util.Map; +/** + * Singleton HDFS FileSystem Accessor,using defaultName as key + */ public class HdfsFileSystemAccessor extends AbstractFileSystemAccessor { private static final Logger logger=LoggerFactory.getLogger(HdfsFileSystemAccessor.class); private final Map hdfsUtilMap=new MapMaker().concurrencyLevel(16).makeMap(); diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/OSSFileSystemAccessor.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/OSSFileSystemAccessor.java index 378110d3..7ded03a2 100644 --- a/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/OSSFileSystemAccessor.java +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/OSSFileSystemAccessor.java @@ -7,6 +7,7 @@ import com.aliyun.oss.common.comm.ResponseMessage; import com.aliyun.oss.model.Bucket; import com.aliyun.oss.model.OSSObject; +import com.aliyun.oss.model.ObjectMetadata; import com.aliyun.oss.model.PutObjectResult; import com.robin.core.base.exception.MissingConfigException; import com.robin.core.base.util.Const; @@ -14,6 +15,7 @@ import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor; import com.robin.core.fileaccess.meta.DataCollectionMeta; import com.robin.core.fileaccess.util.ResourceUtil; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.tuple.Pair; @@ -29,11 +31,18 @@ * Aliyun OSS FileSystemAccessor */ @Slf4j +@Getter public class OSSFileSystemAccessor extends AbstractFileSystemAccessor { - public OSSFileSystemAccessor(){ + + private OSS ossClient; + private String endpoint; + private String region; + private String accessKeyId; + private String securityAccessKey; + + private OSSFileSystemAccessor(){ this.identifier= Const.FILESYSTEM.ALIYUN.getValue(); } - private OSS ossClient; @Override public void init(DataCollectionMeta meta){ @@ -43,15 +52,24 @@ public void init(DataCollectionMeta meta){ Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.ACESSSKEYID.getValue()),"must provide accessKey"); Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.SECURITYACCESSKEY.getValue()),"must provide securityAccessKey"); - String endpoint=meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.ENDPOIN.getValue()).toString(); - String region=meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.REGION.getValue()).toString(); - String accessKeyId=meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.ACESSSKEYID.getValue()).toString(); - String securityAccessKey=meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.SECURITYACCESSKEY.getValue()).toString(); + endpoint=meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.ENDPOIN.getValue()).toString(); + region=meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.REGION.getValue()).toString(); + accessKeyId=meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.ACESSSKEYID.getValue()).toString(); + securityAccessKey=meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.SECURITYACCESSKEY.getValue()).toString(); CredentialsProvider credentialsProvider= CredentialsProviderFactory.newDefaultCredentialProvider(accessKeyId,securityAccessKey); ossClient= OSSClientBuilder.create().endpoint(endpoint).credentialsProvider(credentialsProvider) .region(region).build(); } + public void init(){ + Assert.notNull(region,"must provide endpoint"); + Assert.notNull(endpoint,"must provide region"); + Assert.notNull(accessKeyId,"must provide accessKey"); + Assert.notNull(securityAccessKey,"must provide securityAccessKey"); + CredentialsProvider credentialsProvider= CredentialsProviderFactory.newDefaultCredentialProvider(accessKeyId,securityAccessKey); + ossClient= OSSClientBuilder.create().endpoint(endpoint).credentialsProvider(credentialsProvider) + .region(region).build(); + } @Override public Pair getInResourceByReader(DataCollectionMeta meta, String resourcePath) throws IOException { @@ -111,17 +129,7 @@ public void finishWrite(DataCollectionMeta meta,OutputStream outputStream) { log.error("{}",ex.getMessage()); } } - private static OutputStream getOutputStream(DataCollectionMeta meta) throws IOException { - OutputStream outputStream; - if(!ObjectUtils.isEmpty(meta.getResourceCfgMap().get(ResourceConst.USETMPFILETAG)) && "true".equalsIgnoreCase(meta.getResourceCfgMap().get(ResourceConst.USETMPFILETAG).toString())){ - String tmpPath = com.robin.core.base.util.FileUtils.getWorkingPath(meta); - String tmpFilePath = tmpPath + ResourceUtil.getProcessFileName(meta.getPath()); - outputStream= Files.newOutputStream(Paths.get(tmpFilePath)); - }else { - outputStream = new ByteArrayOutputStream(); - } - return outputStream; - } + private InputStream getInputStreamByConfig(DataCollectionMeta meta) { Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.BUCKETNAME.getValue()),"must provide bucketName"); String bucketName= meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.BUCKETNAME.getValue()).toString(); @@ -134,13 +142,20 @@ private Bucket createBucket(String bucketName){ private boolean putObject(String bucketName,DataCollectionMeta meta,OutputStream outputStream) throws IOException{ PutObjectResult result; String tmpFilePath=null; + ObjectMetadata metadata=new ObjectMetadata(); + if(!ObjectUtils.isEmpty(meta.getContent())){ + metadata.setContentType(meta.getContent().getContentType()); + } if(ByteArrayOutputStream.class.isAssignableFrom(outputStream.getClass())) { - result = ossClient.putObject(bucketName, meta.getPath(), new ByteArrayInputStream(((ByteArrayOutputStream)outputStream).toByteArray())); + ByteArrayOutputStream byteArrayOutputStream=(ByteArrayOutputStream)outputStream; + metadata.setContentLength(byteArrayOutputStream.size()); + result = ossClient.putObject(bucketName, meta.getPath(), new ByteArrayInputStream(byteArrayOutputStream.toByteArray()),metadata); }else{ outputStream.close(); String tmpPath = com.robin.core.base.util.FileUtils.getWorkingPath(meta); tmpFilePath = tmpPath + ResourceUtil.getProcessFileName(meta.getPath()); - result=ossClient.putObject(bucketName,meta.getPath(), Files.newInputStream(Paths.get(tmpFilePath))); + metadata.setContentLength(Files.size(Paths.get(tmpFilePath))); + result=ossClient.putObject(bucketName,meta.getPath(), Files.newInputStream(Paths.get(tmpFilePath)),metadata); } ResponseMessage message=result.getResponse(); if(message.isSuccessful() && !ObjectUtils.isEmpty(tmpFilePath)){ @@ -160,4 +175,37 @@ private InputStream getObject(String bucketName,String objectName){ throw new MissingConfigException(" key "+objectName+" not in OSS bucket "+bucketName); } } + public static class Builder{ + private OSSFileSystemAccessor accessor; + public Builder(){ + accessor=new OSSFileSystemAccessor(); + } + public Builder region(String region){ + accessor.region=region; + return this; + } + public Builder accessKeyId(String accessKeyId){ + accessor.accessKeyId=accessKeyId; + return this; + } + public Builder endpoint(String endPoint){ + accessor.endpoint=endPoint; + return this; + } + public Builder securityAccessKey(String securityAccessKey){ + accessor.securityAccessKey=securityAccessKey; + return this; + } + public Builder withMetaConfig(DataCollectionMeta meta){ + accessor.init(meta); + return this; + } + public OSSFileSystemAccessor build(){ + if(ObjectUtils.isEmpty(accessor.getOssClient())){ + accessor.init(); + } + return accessor; + } + + } } diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/QiniuFileSystemAccessor.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/QiniuFileSystemAccessor.java new file mode 100644 index 00000000..a475db47 --- /dev/null +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/QiniuFileSystemAccessor.java @@ -0,0 +1,174 @@ +package com.robin.comm.fileaccess.fs; + +import com.google.gson.Gson; +import com.qiniu.common.QiniuException; +import com.qiniu.http.Response; +import com.qiniu.storage.BucketManager; +import com.qiniu.storage.Configuration; +import com.qiniu.storage.Region; +import com.qiniu.storage.UploadManager; +import com.qiniu.storage.model.DefaultPutRet; +import com.qiniu.storage.model.FileInfo; +import com.qiniu.util.Auth; +import com.robin.comm.util.json.GsonUtil; +import com.robin.core.base.util.Const; +import com.robin.core.base.util.ResourceConst; +import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor; +import com.robin.core.fileaccess.meta.DataCollectionMeta; +import com.robin.core.fileaccess.util.ResourceUtil; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; +import org.springframework.lang.NonNull; +import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; +import org.springframework.util.ObjectUtils; + +import java.io.*; +import java.net.URL; +import java.net.URLEncoder; +import java.nio.file.Files; +import java.nio.file.Paths; + + +/** + * Qiniu FileSystemAccessor + */ +@Slf4j +public class QiniuFileSystemAccessor extends AbstractFileSystemAccessor { + private UploadManager uploadManager; + private BucketManager bucketManager; + private String domain; + private Auth auth; + private Gson gson= GsonUtil.getGson(); + public QiniuFileSystemAccessor(){ + this.identifier= Const.FILESYSTEM.QINIU.getValue(); + } + + @Override + public void init(DataCollectionMeta meta) { + Assert.isTrue(!CollectionUtils.isEmpty(meta.getResourceCfgMap()),"config map is empty!"); + Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.QINIUPARAM.DOMAIN.getValue()),"must provide domain"); + Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.QINIUPARAM.REGION.getValue()),"must provide region"); + Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.QINIUPARAM.ACESSSKEY.getValue()),"must provide accessKey"); + Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.QINIUPARAM.SECURITYKEY.getValue()),"must provide securityKey"); + String accessKey=meta.getResourceCfgMap().get(ResourceConst.QINIUPARAM.ACESSSKEY.getValue()).toString(); + String secretKey=meta.getResourceCfgMap().get(ResourceConst.QINIUPARAM.SECURITYKEY.getValue()).toString(); + domain=meta.getResourceCfgMap().get(ResourceConst.QINIUPARAM.DOMAIN.getValue()).toString(); + auth= Auth.create(accessKey,secretKey); + Region region=Region.autoRegion(); + Configuration cfg=new Configuration(region); + cfg.resumableUploadAPIVersion=Configuration.ResumableUploadAPIVersion.V2; + uploadManager=new UploadManager(cfg); + } + + @Override + public Pair getInResourceByReader(DataCollectionMeta meta, String resourcePath) throws IOException { + InputStream inputStream = getInputStreamByConfig(meta); + return Pair.of(getReaderByPath(resourcePath, inputStream, meta.getEncode()),inputStream); + } + + @Override + public Pair getOutResourceByWriter(DataCollectionMeta meta, String resourcePath) throws IOException { + OutputStream outputStream = getOutputStream(meta); + return Pair.of(getWriterByPath(meta.getPath(), outputStream, meta.getEncode()),outputStream); + } + + @Override + public OutputStream getRawOutputStream(DataCollectionMeta meta, String resourcePath) throws IOException { + return getOutputStream(meta); + } + + @Override + public OutputStream getOutResourceByStream(DataCollectionMeta meta, String resourcePath) throws IOException { + return getOutputStreamByPath(resourcePath, getOutputStream(meta)); + } + + @Override + public InputStream getInResourceByStream(DataCollectionMeta meta, String resourcePath) throws IOException { + InputStream inputStream = getInputStreamByConfig(meta); + return getInputStreamByPath(resourcePath, inputStream); + } + + @Override + public InputStream getRawInputStream(DataCollectionMeta meta, String resourcePath) throws IOException { + return getInputStreamByConfig(meta); + } + + @Override + public boolean exists(DataCollectionMeta meta, String resourcePath) throws IOException { + String bucketName=meta.getResourceCfgMap().get("bucketName").toString(); + return isKeyExist(bucketName,resourcePath); + } + + @Override + public long getInputStreamSize(DataCollectionMeta meta, String resourcePath) throws IOException { + String bucketName=meta.getResourceCfgMap().get("bucketName").toString(); + return getSize(bucketName,resourcePath); + } + + @Override + public void finishWrite(DataCollectionMeta meta, OutputStream outputStream) { + String bucketName=meta.getResourceCfgMap().get("bucketName").toString(); + String token=auth.uploadToken(bucketName,meta.getPath()); + try{ + putObject(token,meta,outputStream); + }catch (IOException ex){ + log.error("{}",ex.getMessage()); + } + } + + private InputStream getInputStreamByConfig(DataCollectionMeta meta) { + Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.BUCKETNAME.getValue()),"must provide bucketName"); + String bucketName= meta.getResourceCfgMap().get(ResourceConst.OSSPARAM.BUCKETNAME.getValue()).toString(); + String objectName= meta.getPath(); + return getObject(bucketName,objectName); + } + private boolean isKeyExist(String bucketName,String key) { + try { + FileInfo info = bucketManager.stat(bucketName, key); + int status = info.status; + return true; + } catch (QiniuException ex) { + log.error("{}", ex.getMessage()); + } + return false; + } + private long getSize(String bucketName,String key) { + try { + FileInfo info = bucketManager.stat(bucketName, key); + return info.fsize; + } catch (QiniuException ex) { + log.error("{}", ex.getMessage()); + } + return 0L; + } + private boolean putObject(String token,DataCollectionMeta meta,OutputStream outputStream) throws IOException{ + Response result; + String tmpFilePath=null; + if(ByteArrayOutputStream.class.isAssignableFrom(outputStream.getClass())) { + ByteArrayOutputStream byteArrayOutputStream=(ByteArrayOutputStream)outputStream; + result= uploadManager.put(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()),byteArrayOutputStream.size(),meta.getPath(),token,null,meta.getContent().getContentType(),true); + }else{ + outputStream.close(); + String tmpPath = com.robin.core.base.util.FileUtils.getWorkingPath(meta); + tmpFilePath = tmpPath + ResourceUtil.getProcessFileName(meta.getPath()); + long size=Files.size(Paths.get(tmpFilePath)); + result=uploadManager.put(Files.newInputStream(Paths.get(tmpFilePath)),size,meta.getPath(),token,null,meta.getContent().getContentType(),true); + } + DefaultPutRet putRet=gson.fromJson(result.bodyString(),DefaultPutRet.class); + if(!ObjectUtils.isEmpty(putRet)){ + return true; + } + return false; + } + private InputStream getObject(@NonNull String bucketName, @NonNull String key) { + try{ + String fileUrl= URLEncoder.encode(bucketName,"UTF-8").replace("+","%20"); + String accessUrl=String.format("%s/%s",domain,fileUrl); + return new URL(accessUrl).openStream(); + }catch (Exception ex){ + + } + return null; + } +} diff --git a/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/S3FileSystemAccessor.java b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/S3FileSystemAccessor.java index fb7d317a..d07295d8 100644 --- a/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/S3FileSystemAccessor.java +++ b/hadooptool/src/main/java/com/robin/comm/fileaccess/fs/S3FileSystemAccessor.java @@ -7,6 +7,9 @@ import com.robin.core.fileaccess.meta.DataCollectionMeta; import com.robin.core.fileaccess.util.ResourceUtil; import com.robin.dfs.aws.AwsUtils; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; @@ -22,11 +25,17 @@ /** * Amazon AWS FileSystemAccessor */ +@Slf4j +@Getter public class S3FileSystemAccessor extends AbstractFileSystemAccessor { private S3Client client; private S3AsyncClient asyncClient; private Region region; - public S3FileSystemAccessor(){ + private String regionName; + private String accessKey; + private String secret; + + private S3FileSystemAccessor(){ this.identifier= Const.FILESYSTEM.S3.getValue(); } @@ -50,15 +59,14 @@ public OutputStream getOutResourceByStream(DataCollectionMeta meta, String resou @Override public OutputStream getRawOutputStream(DataCollectionMeta meta, String resourcePath) throws IOException { return getOutputStream(meta); - //Pair> pair = AwsUtils.putAsync(asyncClient, bucketName, resourcePath); - //futureMap.put(resourcePath, pair.getValue()); } @Override public InputStream getInResourceByStream(DataCollectionMeta meta, String resourcePath) throws IOException { - return null; + InputStream inputStream = getRawInputStream(meta,resourcePath); + return getInputStreamByPath(resourcePath, inputStream); } @Override @@ -91,6 +99,15 @@ public void init(DataCollectionMeta meta) { } } } + public void init(){ + Assert.notNull(region,"region name required!"); + Assert.notNull(accessKey,"accessKey name required!"); + Assert.notNull(secret,"secret name required!"); + + region = ObjectUtils.isEmpty(regionName) ? Region.US_EAST_1 : Region.of(regionName.toString()); + client = AwsUtils.getClientByCredential(region,accessKey,secret); + asyncClient = AwsUtils.getAsyncClientByCredential(region, accessKey, secret); + } @Override @@ -98,17 +115,36 @@ public void finishWrite(DataCollectionMeta meta, OutputStream outputStream) { String bucketName = meta.getResourceCfgMap().get(ResourceConst.S3PARAM.BUCKETNAME.getValue()).toString(); ByteArrayOutputStream outputStream1=(ByteArrayOutputStream) outputStream; int size=outputStream1.size(); - AwsUtils.put(client,bucketName,meta.getPath(),new ByteArrayInputStream(outputStream1.toByteArray()),new Long(size)); + String contentType=!ObjectUtils.isEmpty(meta.getContent())?meta.getContent().getContentType():null; + AwsUtils.put(client,bucketName,meta.getPath(),contentType,new ByteArrayInputStream(outputStream1.toByteArray()),new Long(size)); } - private static OutputStream getOutputStream(DataCollectionMeta meta) throws IOException { - OutputStream outputStream; - if(!ObjectUtils.isEmpty(meta.getResourceCfgMap().get(ResourceConst.USETMPFILETAG)) && "true".equalsIgnoreCase(meta.getResourceCfgMap().get(ResourceConst.USETMPFILETAG).toString())){ - String tmpPath = com.robin.core.base.util.FileUtils.getWorkingPath(meta); - String tmpFilePath = tmpPath + ResourceUtil.getProcessFileName(meta.getPath()); - outputStream= Files.newOutputStream(Paths.get(tmpFilePath)); - }else { - outputStream = new ByteArrayOutputStream(); + public static class Builder{ + private S3FileSystemAccessor accessor; + public Builder(){ + accessor=new S3FileSystemAccessor(); + } + public Builder accessKey(String accessKey){ + accessor.accessKey=accessKey; + return this; + } + public Builder secret(String secret){ + accessor.secret=secret; + return this; + } + public Builder region(String regionName){ + accessor.regionName=regionName; + return this; + } + public Builder withMetaConfig(DataCollectionMeta meta){ + accessor.init(meta); + return this; + } + public S3FileSystemAccessor build(){ + if(!ObjectUtils.isEmpty(accessor.getClient())){ + accessor.init(); + } + return accessor; } - return outputStream; } + } diff --git a/hadooptool/src/main/java/com/robin/dfs/aws/AwsUtils.java b/hadooptool/src/main/java/com/robin/dfs/aws/AwsUtils.java index a360734e..2c078024 100644 --- a/hadooptool/src/main/java/com/robin/dfs/aws/AwsUtils.java +++ b/hadooptool/src/main/java/com/robin/dfs/aws/AwsUtils.java @@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.springframework.util.CollectionUtils; +import org.springframework.util.ObjectUtils; import software.amazon.awssdk.auth.credentials.AwsCredentials; import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; import software.amazon.awssdk.core.ResponseInputStream; @@ -26,112 +27,138 @@ @Slf4j public class AwsUtils { - public static S3Client getHttpClient(){ + public static S3Client getHttpClient() { return S3Client.builder().httpClientBuilder(ApacheHttpClient.builder()).build(); } - public static S3Client getClientByCredential(Region region,String accessKey,String secret){ - return S3Client.builder().region(region).credentialsProvider(()->new AwsCredentials() { + + public static S3Client getClientByCredential(Region region, String accessKey, String secret) { + return S3Client.builder().region(region).credentialsProvider(() -> new AwsCredentials() { @Override public String accessKeyId() { return accessKey; } + @Override public String secretAccessKey() { return secret; } }).build(); } - public static S3AsyncClient getAsyncClientByCredential(Region region,String accessKey,String secret){ - return S3AsyncClient.builder().region(region).credentialsProvider(()->new AwsCredentials() { + + public static S3AsyncClient getAsyncClientByCredential(Region region, String accessKey, String secret) { + return S3AsyncClient.builder().region(region).credentialsProvider(() -> new AwsCredentials() { @Override public String accessKeyId() { return accessKey; } + @Override public String secretAccessKey() { return secret; } }).build(); } - public static S3Client getClientByRegion(Region region){ - ProfileCredentialsProvider credentialsProvider = ProfileCredentialsProvider.create(); + + public static S3Client getClientByRegion(Region region) { + ProfileCredentialsProvider credentialsProvider = ProfileCredentialsProvider.create(); return S3Client.builder().credentialsProvider(credentialsProvider).region(region).build(); } - public static S3AsyncClient getAsyncClientByRegion(Region region){ + + public static S3AsyncClient getAsyncClientByRegion(Region region) { ProfileCredentialsProvider credentialsProvider = ProfileCredentialsProvider.create(); return S3AsyncClient.builder().credentialsProvider(credentialsProvider).region(region).build(); } - public static void createBucket(S3Client client,String bucketName){ + + public static void createBucket(S3Client client, String bucketName) { try { client.createBucket(CreateBucketRequest.builder().bucket(bucketName).build()); client.waiter().waitUntilBucketExists(HeadBucketRequest.builder().bucket(bucketName).build()); - }catch (S3Exception ex){ + } catch (S3Exception ex) { } } - public static boolean put(S3Client client,String bucketName, String key, InputStream stream,Long length){ - createBucket(client,bucketName); - PutObjectRequest request=PutObjectRequest.builder().bucket(bucketName).key(key).build(); - PutObjectResponse response= client.putObject(request, RequestBody.fromInputStream(stream,length)); + + public static boolean put(S3Client client, String bucketName, String key, String contentType, InputStream stream, Long length) { + createBucket(client, bucketName); + PutObjectRequest.Builder builder = PutObjectRequest.builder().bucket(bucketName).key(key); + if (!ObjectUtils.isEmpty(contentType)) { + builder.contentType(contentType); + } + PutObjectRequest request = builder.build(); + PutObjectResponse response = client.putObject(request, RequestBody.fromInputStream(stream, length)); return response.bucketKeyEnabled(); } - public static Pair> putAsync(S3AsyncClient s3AsyncClient,String bucketName, String key){ - OutputStreamPublisher publisher=new OutputStreamPublisher(); + public static Pair> putAsync(S3AsyncClient s3AsyncClient, String bucketName, String key) { + + OutputStreamPublisher publisher = new OutputStreamPublisher(); - AsyncRequestBody body=AsyncRequestBody.fromPublisher(publisher); + AsyncRequestBody body = AsyncRequestBody.fromPublisher(publisher); - CompletableFuture responseFuture = s3AsyncClient.putObject(r->r.bucket(bucketName).key(key),body); - return Pair.of(publisher,responseFuture); + CompletableFuture responseFuture = s3AsyncClient.putObject(r -> r.bucket(bucketName).key(key), body); + return Pair.of(publisher, responseFuture); } - public static ResponseInputStream getObject(S3Client client,String bucketName,String key){ + + public static ResponseInputStream getObject(S3Client client, String bucketName, String key) { try { GetObjectRequest request = GetObjectRequest.builder().bucket(bucketName).key(key).build(); return client.getObject(request); - }catch (S3Exception ex){ + } catch (S3Exception ex) { } return null; } - public static boolean exists(S3Client client,String bucketName,String objectName){ - HeadObjectRequest objectRequest= HeadObjectRequest.builder().bucket(bucketName).key(objectName).build(); - try{ + + public static boolean exists(S3Client client, String bucketName, String objectName) { + HeadObjectRequest objectRequest = HeadObjectRequest.builder().bucket(bucketName).key(objectName).build(); + try { client.headObject(objectRequest); return true; - }catch (NoSuchKeyException ex){ + } catch (NoSuchKeyException ex) { return false; } } - public static boolean bucketExists(S3Client client,String bucketName){ - HeadBucketRequest bucketRequest= HeadBucketRequest.builder().bucket(bucketName).build(); - try{ + + public static boolean bucketExists(S3Client client, String bucketName) { + HeadBucketRequest bucketRequest = HeadBucketRequest.builder().bucket(bucketName).build(); + try { client.headBucket(bucketRequest); return true; - }catch (NoSuchBucketException ex){ + } catch (NoSuchBucketException ex) { return false; } } - public static List> list(S3Client client, String bucketName){ - List> retList=new ArrayList<>(); + + public static long size(S3Client client, String bucketName, String key) { + HeadObjectRequest objectRequest = HeadObjectRequest.builder().bucket(bucketName).key(key).build(); + try { + HeadObjectResponse response = client.headObject(objectRequest); + return response.contentLength(); + } catch (NoSuchKeyException ex) { + return 0L; + } + } + + public static List> list(S3Client client, String bucketName) { + List> retList = new ArrayList<>(); try { ListObjectsRequest request = ListObjectsRequest.builder().bucket(bucketName).build(); ListObjectsResponse response = client.listObjects(request); - if(!CollectionUtils.isEmpty(response.contents())){ - for(S3Object obj:response.contents()){ - Map tmap=new HashMap<>(); - tmap.put("key",obj.key()); - tmap.put("owner",obj.owner().displayName()); - tmap.put("size",obj.size()); + if (!CollectionUtils.isEmpty(response.contents())) { + for (S3Object obj : response.contents()) { + Map tmap = new HashMap<>(); + tmap.put("key", obj.key()); + tmap.put("owner", obj.owner().displayName()); + tmap.put("size", obj.size()); retList.add(tmap); } } - }catch (S3Exception ex){ + } catch (S3Exception ex) { } return retList; } - } diff --git a/hadooptool/src/main/resources/META-INF/services/com.robin.core.fileaccess.fs.IFileSystemAccessor b/hadooptool/src/main/resources/META-INF/services/com.robin.core.fileaccess.fs.IFileSystemAccessor index 8548b10a..d0366d02 100644 --- a/hadooptool/src/main/resources/META-INF/services/com.robin.core.fileaccess.fs.IFileSystemAccessor +++ b/hadooptool/src/main/resources/META-INF/services/com.robin.core.fileaccess.fs.IFileSystemAccessor @@ -1,4 +1 @@ -com.robin.comm.fileaccess.fs.HdfsFileSystemAccessor -com.robin.comm.fileaccess.fs.OSSFileSystemAccessor -com.robin.comm.fileaccess.fs.COSFileSystemAccessor -com.robin.comm.fileaccess.fs.S3FileSystemAccessor \ No newline at end of file +com.robin.comm.fileaccess.fs.HdfsFileSystemAccessor \ No newline at end of file diff --git a/metadata/src/main/java/com/robin/meta/service/resource/GlobalResourceService.java b/metadata/src/main/java/com/robin/meta/service/resource/GlobalResourceService.java index 08e9e6e1..a903b868 100644 --- a/metadata/src/main/java/com/robin/meta/service/resource/GlobalResourceService.java +++ b/metadata/src/main/java/com/robin/meta/service/resource/GlobalResourceService.java @@ -201,9 +201,8 @@ public static String getTableClassName(String tableName) { private Schema getFileSchema(AbstractFileSystemAccessor util, DataCollectionMeta meta, GlobalResource resource, int maxReadLines) throws Exception { Schema schema = null; - List suffixList = new ArrayList<>(); - FileUtils.parseFileFormat(meta.getPath(), suffixList); - String fileFormat = suffixList.get(0); + FileUtils.FileContent content=FileUtils.parseFile(meta.getPath()); + String fileFormat =content.getFileFormat(); int columnPos = 0; //read Header 10000 Line int readLines = maxReadLines > 0 ? maxReadLines : 10000; diff --git a/web/src/main/java/com/robin/core/web/util/HttpContextUtils.java b/web/src/main/java/com/robin/core/web/util/HttpContextUtils.java index f5827b4d..91d762d1 100644 --- a/web/src/main/java/com/robin/core/web/util/HttpContextUtils.java +++ b/web/src/main/java/com/robin/core/web/util/HttpContextUtils.java @@ -22,7 +22,10 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import java.io.*; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintWriter; import java.util.Enumeration; import java.util.HashMap; import java.util.Iterator;