Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void init(DataCollectionMeta colmeta) throws Exception {
if(!colmeta.isFsTag()) {
writer = TextFileWriterFactory.getWriterByPath(colmeta, outStream);
} else{
writer= CommResWriterFactory.getFileWriterByType(colmeta.getResType(),colmeta);
writer= CommResWriterFactory.getFileWriterByType(colmeta.getFileFormat(),colmeta);
}
}
public void writeRecord(Map<String,Object> map) throws IOException, OperationNotSupportedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
*/
package com.robin.core.fileaccess.fs;

import com.robin.core.base.util.ResourceConst;
import com.robin.core.compress.util.CompressDecoder;
import com.robin.core.compress.util.CompressEncoder;
import com.robin.core.fileaccess.meta.DataCollectionMeta;
import com.robin.core.fileaccess.util.ResourceUtil;
import org.springframework.util.ObjectUtils;

import java.io.*;
import java.nio.file.Files;
import java.nio.file.Paths;

/**
* abstract resource system access Utils (Local/Hdfs/ApacheVFS(including ftp sftp)/S3/Tencent cloud/aliyun)
Expand Down Expand Up @@ -95,6 +100,17 @@ protected static InputStream wrapInputStream(InputStream instream){
protected static OutputStream getOutputStreamByPath(String path, OutputStream out) throws IOException{
return CompressEncoder.getOutputStreamByCompressType(path,out);
}
protected OutputStream getOutputStream(DataCollectionMeta meta) throws IOException {
OutputStream outputStream;
if(!ObjectUtils.isEmpty(meta.getResourceCfgMap().get(ResourceConst.USETMPFILETAG)) && "true".equalsIgnoreCase(meta.getResourceCfgMap().get(ResourceConst.USETMPFILETAG).toString())){
String tmpPath = com.robin.core.base.util.FileUtils.getWorkingPath(meta);
String tmpFilePath = tmpPath + ResourceUtil.getProcessFileName(meta.getPath());
outputStream= Files.newOutputStream(Paths.get(tmpFilePath));
}else {
outputStream = new ByteArrayOutputStream();
}
return outputStream;
}

@Override
public void init(DataCollectionMeta meta){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ public ArffFileIterator(DataCollectionMeta colmeta) {
@Override
public void beforeProcess() {
super.beforeProcess();
if(CollectionUtils.isEmpty(colmeta.getColumnList())){
if(!ObjectUtils.isEmpty(reader)){
if(CollectionUtils.isEmpty(colmeta.getColumnList()) && (!ObjectUtils.isEmpty(reader))){
try {
while (!(readLineStr = reader.readLine()).equalsIgnoreCase("@data")) {
if(StringUtils.startsWithIgnoreCase(readLineStr,"@RELATION ")){
Expand All @@ -39,7 +38,7 @@ public void beforeProcess() {
}catch (IOException ex){
logger.info("{}",ex.getMessage());
}
}

}
}
private DataSetColumnMeta parseDefine(String content){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public static IResourceIterator getProcessIteratorByType(DataCollectionMeta colm
IResourceIterator iterator=getIter(colmeta);
return iterator;
}
public static IResourceIterator getProcessIteratorByType(DataCollectionMeta colmeta,AbstractFileSystemAccessor accessor) throws IOException{
IResourceIterator iterator=getIter(colmeta,accessor);
return iterator;
}
public static AbstractFileIterator getProcessReaderIterator(DataCollectionMeta colmeta, AbstractFileSystemAccessor utils){
AbstractFileIterator iterator=null;
String fileType=colmeta.getFileFormat();
Expand Down Expand Up @@ -67,9 +71,9 @@ public static IResourceIterator getProcessIteratorByType(DataCollectionMeta colm
return iterator;
}
public static IResourceIterator getProcessIteratorByPath(DataCollectionMeta colmeta,InputStream in) throws IOException{
List<String> suffixList=new ArrayList<>();
FileUtils.parseFileFormat(colmeta.getPath(),suffixList);
String fileFormat=suffixList.get(0);
FileUtils.FileContent content=FileUtils.parseFile(colmeta.getPath());
colmeta.setContent(content);
String fileFormat=content.getFileFormat();
if(StringUtils.isEmpty(colmeta.getFileFormat())){
colmeta.setFileFormat(fileFormat);
}
Expand All @@ -80,7 +84,7 @@ public static IResourceIterator getProcessIteratorByPath(DataCollectionMeta colm
}
private static IResourceIterator getIter(DataCollectionMeta colmeta) throws MissingConfigException {
IResourceIterator iterator=null;
String fileType=colmeta.getFileFormat();
String fileType = getFileType(colmeta);

Class<? extends IResourceIterator> iterclass=fileIterMap.get(fileType);
try {
Expand All @@ -93,6 +97,32 @@ private static IResourceIterator getIter(DataCollectionMeta colmeta) throws Miss
}
return iterator;
}
private static IResourceIterator getIter(DataCollectionMeta colmeta,AbstractFileSystemAccessor accessor) throws MissingConfigException {
IResourceIterator iterator=null;
String fileType = getFileType(colmeta);

Class<? extends IResourceIterator> iterclass=fileIterMap.get(fileType);
try {
if (!ObjectUtils.isEmpty(iterclass)) {
iterator = iterclass.getConstructor(DataCollectionMeta.class,AbstractFileSystemAccessor.class).newInstance(colmeta,accessor);
}
iterator.beforeProcess();
}catch (Exception ex){
throw new MissingConfigException(ex);
}
return iterator;
}

private static String getFileType(DataCollectionMeta colmeta) {
String fileType= colmeta.getFileFormat();
if(ObjectUtils.isEmpty(fileType)){
FileUtils.FileContent content=FileUtils.parseFile(colmeta.getPath());
colmeta.setContent(content);
fileType=content.getFileFormat();
}
return fileType;
}

private static void discoverIterator(){
ServiceLoader.load(IResourceIterator.class).iterator().forEachRemaining(i->{
if(AbstractFileIterator.class.isAssignableFrom(i.getClass()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@ protected AbstractFileWriter(DataCollectionMeta colmeta){
}
checkAccessUtil(colmeta.getPath());
}
protected AbstractFileWriter(DataCollectionMeta colmeta,AbstractFileSystemAccessor accessor){
this.colmeta=colmeta;
formatter=DateTimeFormatter.ofPattern(colmeta.getDefaultTimestampFormat());
for (DataSetColumnMeta meta:colmeta.getColumnList()) {
columnList.add(meta.getColumnName());
columnMap.put(meta.getColumnName(), meta.getColumnType());
}
accessUtil=accessor;
}

@Override
public void setWriter(BufferedWriter writer){
this.writer=writer;
Expand Down Expand Up @@ -161,9 +171,11 @@ protected String getOutputPath(String url){
return url;
}
protected Const.CompressType getCompressType(){
List<String> fileSuffix=new ArrayList<>();
FileUtils.parseFileFormat(getOutputPath(colmeta.getPath()),fileSuffix);
return FileUtils.getFileCompressType(fileSuffix);
if(ObjectUtils.isEmpty(colmeta.getContent())) {
FileUtils.FileContent content = FileUtils.parseFile(colmeta.getPath());
colmeta.setContent(content);
}
return colmeta.getContent().getCompressType();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.robin.core.fileaccess.writer;

import com.robin.core.base.util.FileUtils;
import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor;
import com.robin.core.fileaccess.meta.DataCollectionMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -39,6 +40,11 @@ public static IResourceWriter getWriterByType(DataCollectionMeta colmeta, Buffer
fileWriter.setWriter(writer);
return fileWriter;
}
public static IResourceWriter getWriterByType(DataCollectionMeta colmeta, BufferedWriter writer,AbstractFileSystemAccessor accessor) throws IOException {
IResourceWriter fileWriter = getWriterByType(colmeta,accessor);
fileWriter.setWriter(writer);
return fileWriter;
}

public static IResourceWriter getOutputStreamByType(DataCollectionMeta colmeta, OutputStream writer) throws IOException{
IResourceWriter fileWriter = getWriterByType(colmeta);
Expand All @@ -47,6 +53,13 @@ public static IResourceWriter getOutputStreamByType(DataCollectionMeta colmeta,
}
return fileWriter;
}
public static IResourceWriter getOutputStreamByType(DataCollectionMeta colmeta, OutputStream writer,AbstractFileSystemAccessor accessor) throws IOException{
IResourceWriter fileWriter = getWriterByType(colmeta,accessor);
if(writer!=null) {
fileWriter.setOutputStream(writer);
}
return fileWriter;
}

public static IResourceWriter getWriterByPath(DataCollectionMeta colmeta, OutputStream writer) throws IOException{
IResourceWriter fileWriter = getWriterByType(colmeta);
Expand All @@ -57,8 +70,8 @@ public static IResourceWriter getWriterByPath(DataCollectionMeta colmeta, Output
public static IResourceWriter getWriterByType(DataCollectionMeta colmeta) throws IOException {
IResourceWriter fileWriter = null;
try {
String fileSuffix = getFileSuffix(colmeta);

String fileSuffix=colmeta.getFileFormat();
Class<? extends IResourceWriter> writerClass=fileWriterMap.get(fileSuffix);
if (!ObjectUtils.isEmpty(writerClass)) {
fileWriter = writerClass.getConstructor(DataCollectionMeta.class).newInstance(colmeta);
Expand All @@ -70,6 +83,33 @@ public static IResourceWriter getWriterByType(DataCollectionMeta colmeta) throws
}
return fileWriter;
}
public static IResourceWriter getWriterByType(DataCollectionMeta colmeta, AbstractFileSystemAccessor accessor) throws IOException {
IResourceWriter fileWriter = null;
try {
String fileSuffix = getFileSuffix(colmeta);

Class<? extends IResourceWriter> writerClass=fileWriterMap.get(fileSuffix);
if (!ObjectUtils.isEmpty(writerClass)) {
fileWriter = writerClass.getConstructor(DataCollectionMeta.class,AbstractFileSystemAccessor.class).newInstance(colmeta,accessor);
logger.info("using resource writer {}",writerClass.getCanonicalName());
}

} catch (Exception ex) {
throw new IOException(ex);
}
return fileWriter;
}

private static String getFileSuffix(DataCollectionMeta colmeta) {
String fileSuffix= colmeta.getFileFormat();
if(ObjectUtils.isEmpty(fileSuffix)){
FileUtils.FileContent content=FileUtils.parseFile(colmeta.getPath());
colmeta.setContent(content);
fileSuffix=content.getFileFormat();
}
return fileSuffix;
}

private static void discoverIterator(Map<String,Class<? extends IResourceWriter>> fileIterMap){
ServiceLoader.load(IResourceWriter.class).iterator().forEachRemaining(i->{
if(AbstractFileWriter.class.isAssignableFrom(i.getClass()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@

@Slf4j
public class CommResIteratorFactory {
private CommResIteratorFactory(){

}
private static Map<String,Class<? extends IResourceIterator>> iterMap =new HashMap<>();
static {
discoverIterator(iterMap);
}

public static AbstractResIterator getIterator(Long resType, DataCollectionMeta colmeta) {
public static AbstractResIterator getIterator(String resType, DataCollectionMeta colmeta) {
AbstractResIterator iterator = null;
Class<? extends IResourceIterator> clazz = iterMap.get(resType);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@

@Slf4j
public class CommResWriterFactory {
private static final String KAFKA_WRITER_CLASS = "com.robin.comm.resaccess.writer.KafkaResourceWriter";
private static final String CASSANDRA_WRITER_CLASS = "com.robin.comm.resaccess.writer.CassandraResourceWriter";
private static final String MONGO_WRITER_CLASS = "com.robin.comm.resaccess.writer.MongoResourceWriter";
private static final String REDIS_WRITER_CLASS = "com.robin.comm.resaccess.writer.RedisResourceWriter";
private static final String ROCKET_WRITER_CLASS = "com.robin.comm.resaccess.writer.RocketResourceWriter";
private static final String HBASE_WRITER_CLASS = "com.robin.comm.resaccess.writer.HbaseResourceWriter";

private static Map<String,Class<? extends IResourceWriter>> writerMap =new HashMap<>();
static {
Expand All @@ -30,7 +24,7 @@ public class CommResWriterFactory {
private CommResWriterFactory(){

}
public static AbstractResourceWriter getFileWriterByType(Long resType, DataCollectionMeta colmeta) {
public static AbstractResourceWriter getFileWriterByType(String resType, DataCollectionMeta colmeta) {
AbstractResourceWriter fileWriter = null;
Class<? extends IResourceWriter> clazz = writerMap.get(resType);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ private Object getRecordValue(ResultSetMetaData rsmd, ResultSet rs, LobHandler l
return retObj;
}

static void setTargetValue(Object target, Object value, String columnName, String columnType, LobHandler handler, PageQuery<Map<String, Object>> pageQuery) throws DAOException {
static void setTargetValue(Object target, Object value, String columnName, String columnType, PageQuery<Map<String, Object>> pageQuery) throws DAOException {
try {
if (value != null) {
Object targetValue = null;
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/java/com/robin/core/base/dao/SqlMapperDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ private static ResultSetExtractor<List<?>> resultSetExtractor(SqlMapperConfigure
Map<String, Object> map = new HashMap<>();
for (int i = 0; i < count; i++) {
String columnName = rsmd.getColumnName(i + 1);
CommJdbcUtil.setTargetValue(map, resultSet.getObject(i + 1), segment1.getColumnMapper().get(columnName).left, segment1.getColumnMapper().get(columnName).right, lobHandler, pageQuery);
CommJdbcUtil.setTargetValue(map, resultSet.getObject(i + 1), segment1.getColumnMapper().get(columnName).left, segment1.getColumnMapper().get(columnName).right, pageQuery);
}
retList.add(map);
} else {
Expand All @@ -208,7 +208,7 @@ private static ResultSetExtractor<List<?>> resultSetExtractor(SqlMapperConfigure
if (!segment1.getColumnMapper().containsKey(columnName)) {
throw new DAOException("property " + columnName + " not exist in class " + segment1.getClassName());
}
CommJdbcUtil.setTargetValue(targetObject, resultSet.getObject(i + 1), segment1.getColumnMapper().get(columnName).left, segment1.getColumnMapper().get(columnName).right, lobHandler, pageQuery);
CommJdbcUtil.setTargetValue(targetObject, resultSet.getObject(i + 1), segment1.getColumnMapper().get(columnName).left, segment1.getColumnMapper().get(columnName).right, pageQuery);
}
retList.add(targetObject);
} catch (IllegalAccessException|InstantiationException ex1) {
Expand All @@ -220,7 +220,7 @@ private static ResultSetExtractor<List<?>> resultSetExtractor(SqlMapperConfigure
Map<String, Object> map = new HashMap<>();
for (int i = 0; i < count; i++) {
String columnName = rsmd.getColumnName(i + 1);
CommJdbcUtil.setTargetValue(map, resultSet.getObject(i + 1), columnName, null, lobHandler, pageQuery);
CommJdbcUtil.setTargetValue(map, resultSet.getObject(i + 1), columnName, null, pageQuery);
}
retList.add(map);
}
Expand Down
Loading
Loading