Skip to content

Commit

Permalink
working version of new simple-connector interface
Browse files Browse the repository at this point in the history
  • Loading branch information
raulcf committed Jul 24, 2018
1 parent e6e45f5 commit e2dde6b
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import au.com.bytecode.opencsv.CSVReader;

@Deprecated
public class BenchmarkingData {

public List<Attribute> attributes;
Expand Down
19 changes: 14 additions & 5 deletions ddprofiler/src/main/java/sources/implementations/CSVSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import sources.config.SourceConfig;
import sources.deprecated.Attribute;
import sources.deprecated.Record;
import sources.deprecated.TableInfo;

public class CSVSource implements Source {

Expand All @@ -37,7 +36,8 @@ public class CSVSource implements Source {
private CSVSourceConfig config;
private boolean initialized = false;
private CSVReader fileReader;
private TableInfo tableInfo;
// private TableInfo tableInfo;
private List<Attribute> attributes;

// metrics
private long lineCounter = 0;
Expand All @@ -48,10 +48,11 @@ public CSVSource() {

}

public CSVSource(String path, String relationName) {
public CSVSource(String path, String relationName, SourceConfig config) {
this.tid = SourceUtils.computeTaskId(path, relationName);
this.path = path;
this.relationName = relationName;
this.config = (CSVSourceConfig) config;
}

@Override
Expand Down Expand Up @@ -98,7 +99,7 @@ public List<Source> processSource(SourceConfig config) {
String path = f.getParent() + File.separator;
String name = f.getName();
// Make the csv config specific to the relation
CSVSource task = new CSVSource(path, name);
CSVSource task = new CSVSource(path, name, config);
totalFiles++;
// c.submitTask(pt);
tasks.add(task);
Expand All @@ -116,9 +117,16 @@ public SourceType getSourceType() {

@Override
public List<Attribute> getAttributes() throws IOException, SQLException {
if (!initialized) {
String path = this.path + this.relationName;
char separator = this.config.getSeparator().charAt(0);
fileReader = new CSVReader(new FileReader(path), separator);
initialized = true;
}
// assume that the first row is the attributes;
if (lineCounter != 0) {
return tableInfo.getTableAttributes();
// return tableInfo.getTableAttributes();
return attributes;
}
String[] attributes = fileReader.readNext();
lineCounter++;
Expand All @@ -128,6 +136,7 @@ public List<Attribute> getAttributes() throws IOException, SQLException {
Attribute attr = new Attribute(attributes[i]);
attrList.add(attr);
}
this.attributes = attrList;
return attrList;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import sources.config.SourceConfig;
import sources.deprecated.Attribute;
import sources.deprecated.Record;
import sources.deprecated.TableInfo;

public class PostgresSource implements Source {

Expand All @@ -48,7 +47,7 @@ public class PostgresSource implements Source {
private Statement theStatement;
private ResultSet theRS;

private TableInfo tableInfo;
private List<Attribute> attributes;

// Metrics
private Counter error_records = Metrics.REG.counter((name(PostgresSource.class, "error", "records")));
Expand All @@ -61,8 +60,9 @@ public PostgresSource() {

}

public PostgresSource(String relationName) {
public PostgresSource(String relationName, PostgresSourceConfig config) {
this.relationName = relationName;
this.config = config;
this.tid = SourceUtils.computeTaskId(this.config.getDatabase_name(), relationName);
}

Expand Down Expand Up @@ -101,7 +101,7 @@ public List<Source> processSource(SourceConfig config) {
PostgresSourceConfig relationPostgresSourceConfig = (PostgresSourceConfig) postgresConfig.selfCopy();
relationPostgresSourceConfig.setRelationName(relation);

PostgresSource ps = new PostgresSource(relation);
PostgresSource ps = new PostgresSource(relation, relationPostgresSourceConfig);
tasks.add(ps);
}
return tasks;
Expand All @@ -124,8 +124,12 @@ public String getRelationName() {

@Override
public List<Attribute> getAttributes() throws IOException, SQLException {
if (tableInfo.getTableAttributes() != null)
return tableInfo.getTableAttributes();
if (!initialized) {
initializeConnection();
}
if (this.attributes != null) {
return this.attributes;
}
DatabaseMetaData metadata = connection.getMetaData();
ResultSet resultSet = metadata.getColumns(null, null, config.getRelationName(), null);
Vector<Attribute> attrs = new Vector<Attribute>();
Expand All @@ -137,7 +141,7 @@ public List<Attribute> getAttributes() throws IOException, SQLException {
attrs.addElement(attr);
}
resultSet.close();
tableInfo.setTableAttributes(attrs);
this.attributes = attrs;
return attrs;
}

Expand All @@ -163,6 +167,7 @@ private void initializeConnection() {

if (this.connectionPools.containsKey(connIdentifier)) {
this.connection = this.connectionPools.get(connIdentifier);
return;
}

try {
Expand Down Expand Up @@ -246,7 +251,7 @@ private boolean read(int num, List<Record> rec_list) throws SQLException {
num--;
// FIXME: profile and optimize this
Record rec = new Record();
for (int i = 0; i < this.tableInfo.getTableAttributes().size(); i++) {
for (int i = 0; i < this.attributes.size(); i++) {
Object obj = theRS.getObject(i + 1);
if (obj != null) {
String v1 = obj.toString();
Expand Down

0 comments on commit e2dde6b

Please sign in to comment.