Permalink
Browse files

improve SEATSLoader: one thread per table

  • Loading branch information...
lmwnshn authored and apavlo committed Jan 11, 2018
1 parent ccf3edf commit b7c9ee15aabfe25910af7ba857ed2d981dbc0b6d
Showing with 194 additions and 36 deletions.
  1. +194 −36 src/com/oltpbenchmark/benchmarks/seats/SEATSLoader.java
@@ -128,26 +128,105 @@ public SEATSLoader(SEATSBenchmark benchmark, Connection c) {
public List<LoaderThread> createLoaderThreads() throws SQLException {
List<LoaderThread> threads = new ArrayList<LoaderThread>();
final CountDownLatch fixedLatch = new CountDownLatch(1);
final CountDownLatch loadLatch = new CountDownLatch(3);
// High level locking overview, where step N+1 depends on step N
// and latches are countDown()'d from top to bottom:
//
// 1. [histLatch] Histograms will be loaded on their own
//
// FIXED TABLES [fixedLatch]
// 2.
// [countryLatch] Country will be loaded on their own
// AIRPORT depends on COUNTRY
// AIRLINE depends on COUNTRY
//
// 3. [scalePrepLatch]
// We need to load fixed table data into histograms before we
// start to load scaling tables
//
// SCALING TABLES
// 4.
// [custLatch] CUSTOMER depends on AIRPORT
// [distanceLatch] AIRPORT_DISTANCE depends on AIRPORT
// [flightLatch] FLIGHT depends on AIRLINE, AIRPORT, AIRPORT_DISTANCE
//
// 5. [loadLatch]
// RESERVATIONS depends on FLIGHT, CUSTOMER
// FREQUENT_FLYER depends on FLIGHT, CUSTOMER, AIRLINE
//
// Important note: FLIGHT must come before FREQUENT_FLYER so that
// we can use the flights_per_airline histogram when
// selecting an airline to create a new FREQUENT_FLYER
// account for a CUSTOMER
//
// 6. Then we save the profile
final CountDownLatch histLatch = new CountDownLatch(1);
// 1. [histLatch] HISTOGRAMS
threads.add(new LoaderThread() {
@Override
public void load(Connection conn) throws SQLException {
SEATSLoader.this.loadHistograms();
loadLatch.countDown();
histLatch.countDown();
}
});
final CountDownLatch fixedLatch = new CountDownLatch(3);
final CountDownLatch countryLatch = new CountDownLatch(1);
// 2. [countryLatch] COUNTRY
threads.add(new LoaderThread() {
@Override
public void load(Connection conn) throws SQLException {
SEATSLoader.this.loadFixedTables(conn);
try {
histLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
SEATSLoader.this.loadFixedTable(conn, SEATSConstants.TABLENAME_COUNTRY);
fixedLatch.countDown();
countryLatch.countDown();
}
});
// 2. AIRPORT depends on COUNTRY
threads.add(new LoaderThread() {
@Override
public void load(Connection conn) throws SQLException {
try {
countryLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
SEATSLoader.this.loadFixedTable(conn, SEATSConstants.TABLENAME_AIRPORT);
fixedLatch.countDown();
}
});
// 2. AIRLINE depends on COUNTRY
threads.add(new LoaderThread() {
@Override
public void load(Connection conn) throws SQLException {
try {
countryLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
SEATSLoader.this.loadFixedTable(conn, SEATSConstants.TABLENAME_AIRLINE);
fixedLatch.countDown();
loadLatch.countDown();
}
});
final CountDownLatch scalingPrepLatch = new CountDownLatch(1);
// 3. [scalingPrepLatch] guards all of the fixed tables and should
// be used from this point onwards instead of individual fixed locks
threads.add(new LoaderThread() {
@Override
public void load(Connection conn) throws SQLException {
@@ -158,7 +237,96 @@ public void load(Connection conn) throws SQLException {
throw new RuntimeException(e);
}
SEATSLoader.this.loadScalingTables(conn);
// Setup the # of flights per airline
SEATSLoader.this.flights_per_airline.putAll(SEATSLoader.this.profile.getAirlineCodes(), 0);
scalingPrepLatch.countDown();
}
});
final CountDownLatch custLatch = new CountDownLatch(1);
final CountDownLatch distanceLatch = new CountDownLatch(1);
final CountDownLatch flightLatch = new CountDownLatch(1);
// 4. [custLatch] CUSTOMER depends on AIRPORT
threads.add(new LoaderThread() {
@Override
public void load(Connection conn) throws SQLException {
try {
scalingPrepLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
SEATSLoader.this.loadScalingTable(conn, SEATSConstants.TABLENAME_CUSTOMER);
custLatch.countDown();
}
});
// 4. AIRPORT_DISTANCE depends on AIRPORT
threads.add(new LoaderThread() {
@Override
public void load(Connection conn) throws SQLException {
try {
scalingPrepLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
SEATSLoader.this.loadScalingTable(conn, SEATSConstants.TABLENAME_AIRPORT_DISTANCE);
distanceLatch.countDown();
}
});
// 4. [flightLatch] FLIGHT depends on AIRPORT_DISTANCE, AIRLINE, AIRPORT
threads.add(new LoaderThread() {
@Override
public void load(Connection conn) throws SQLException {
try {
distanceLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
SEATSLoader.this.loadScalingTable(conn, SEATSConstants.TABLENAME_FLIGHT);
flightLatch.countDown();
}
});
final CountDownLatch loadLatch = new CountDownLatch(2);
// 5. RESERVATIONS depends on FLIGHT, CUSTOMER
threads.add(new LoaderThread() {
@Override
public void load(Connection conn) throws SQLException {
try {
flightLatch.await();
custLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
SEATSLoader.this.loadScalingTable(conn, SEATSConstants.TABLENAME_RESERVATION);
loadLatch.countDown();
}
});
// 5. FREQUENT_FLYER depends on FLIGHT, CUSTOMER, AIRLINE
threads.add(new LoaderThread() {
@Override
public void load(Connection conn) throws SQLException {
try {
flightLatch.await();
custLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
SEATSLoader.this.loadScalingTable(conn, SEATSConstants.TABLENAME_FREQUENT_FLYER);
loadLatch.countDown();
}
});
@@ -249,18 +417,16 @@ protected void loadHistograms() {
*
* @param catalog_db
*/
protected void loadFixedTables(Connection conn) {
for (String table_name : SEATSConstants.TABLES_DATAFILES) {
LOG.debug(String.format("Loading table '%s' from fixed file", table_name));
try {
Table catalog_tbl = this.benchmark.getTableCatalog(table_name);
assert (catalog_tbl != null);
Iterable<Object[]> iterable = this.getFixedIterable(catalog_tbl);
this.loadTable(conn, catalog_tbl, iterable, 5000);
} catch (Throwable ex) {
throw new RuntimeException("Failed to load data files for fixed-sized table '" + table_name + "'", ex);
}
} // FOR
protected void loadFixedTable(Connection conn, String table_name) {
LOG.debug(String.format("Loading table '%s' from fixed file", table_name));
try {
Table catalog_tbl = this.benchmark.getTableCatalog(table_name);
assert (catalog_tbl != null);
Iterable<Object[]> iterable = this.getFixedIterable(catalog_tbl);
this.loadTable(conn, catalog_tbl, iterable, 5000);
} catch (Throwable ex) {
throw new RuntimeException("Failed to load data files for fixed-sized table '" + table_name + "'", ex);
}
}
/**
@@ -269,23 +435,15 @@ protected void loadFixedTables(Connection conn) {
*
* @param catalog_db
*/
protected void loadScalingTables(Connection conn) {
// Setup the # of flights per airline
this.flights_per_airline.putAll(this.profile.getAirlineCodes(), 0);
// IMPORTANT: FLIGHT must come before FREQUENT_FLYER so that we
// can use the flights_per_airline histogram when selecting an airline
// to create a new FREQUENT_FLYER account for a CUSTOMER
for (String table_name : SEATSConstants.TABLES_SCALING) {
try {
Table catalog_tbl = this.benchmark.getTableCatalog(table_name);
assert (catalog_tbl != null);
Iterable<Object[]> iterable = this.getScalingIterable(catalog_tbl);
this.loadTable(conn, catalog_tbl, iterable, 5000);
} catch (Throwable ex) {
throw new RuntimeException("Failed to load data files for scaling-sized table '" + table_name + "'", ex);
}
} // FOR
protected void loadScalingTable(Connection conn, String table_name) {
try {
Table catalog_tbl = this.benchmark.getTableCatalog(table_name);
assert (catalog_tbl != null);
Iterable<Object[]> iterable = this.getScalingIterable(catalog_tbl);
this.loadTable(conn, catalog_tbl, iterable, 5000);
} catch (Throwable ex) {
throw new RuntimeException("Failed to load data files for scaling-sized table '" + table_name + "'", ex);
}
}
/**

0 comments on commit b7c9ee1

Please sign in to comment.