Skip to content
Browse files

Added berkeleydb from the book "The Berkeley DB Book"

Signed-off-by: Stefan Naewe <stefan.naewe@atlas-elektronik.com>
  • Loading branch information...
1 parent 208b18c commit 31a4f553cc957a31e5f922911d0c3820cbf5b27a Stefan Naewe committed
Showing with 9,843 additions and 0 deletions.
  1. +22 −0 berkeleydb/Chapter04/SConstruct
  2. +64 −0 berkeleydb/Chapter04/env_ex.cc
  3. +66 −0 berkeleydb/Chapter04/hello_world.cc
  4. +27 −0 berkeleydb/Chapter04/hello_world.cc.env
  5. +6 −0 berkeleydb/Chapter04/try.cc
  6. +22 −0 berkeleydb/Chapter05/SConstruct
  7. +152 −0 berkeleydb/Chapter05/cds.cc
  8. +24 −0 berkeleydb/Chapter05/nested
  9. +344 −0 berkeleydb/Chapter05/tds.cc
  10. +29 −0 berkeleydb/Chapter06/SConstruct
  11. +4 −0 berkeleydb/Chapter06/account.txt
  12. +3 −0 berkeleydb/Chapter06/bank.txt
  13. +152 −0 berkeleydb/Chapter06/cds.cc
  14. +603 −0 berkeleydb/Chapter06/load_db.cc
  15. +267 −0 berkeleydb/Chapter06/load_db.h
  16. +94 −0 berkeleydb/Chapter06/misc.cc
  17. +24 −0 berkeleydb/Chapter06/nested
  18. +5 −0 berkeleydb/Chapter06/person.txt
  19. +349 −0 berkeleydb/Chapter06/tds.cc
  20. +40 −0 berkeleydb/Chapter07/SConstruct
  21. +4 −0 berkeleydb/Chapter07/account.txt
  22. +3 −0 berkeleydb/Chapter07/bank.txt
  23. 0 berkeleydb/Chapter07/cd
  24. BIN berkeleydb/Chapter07/cds
  25. +152 −0 berkeleydb/Chapter07/cds.cc
  26. 0 berkeleydb/Chapter07/chap5_cds
  27. +2 −0 berkeleydb/Chapter07/clean_env
  28. +731 −0 berkeleydb/Chapter07/load_db.cc
  29. +313 −0 berkeleydb/Chapter07/load_db.h
  30. 0 berkeleydb/Chapter07/ls
  31. BIN berkeleydb/Chapter07/misc
  32. +94 −0 berkeleydb/Chapter07/misc.cc
  33. +24 −0 berkeleydb/Chapter07/nested
  34. +5 −0 berkeleydb/Chapter07/person.txt
  35. +167 −0 berkeleydb/Chapter07/tds.cc
  36. +1,379 −0 berkeleydb/Chapter07/xx
  37. +20 −0 berkeleydb/Chapter08/SConstruct
  38. +779 −0 berkeleydb/Chapter08/load_db.cc
  39. +329 −0 berkeleydb/Chapter08/load_db.h
  40. +633 −0 berkeleydb/Chapter08/rep.cc
  41. +162 −0 berkeleydb/Chapter08/rep.h
  42. +48 −0 berkeleydb/Chapter08/tds.cc
  43. +121 −0 berkeleydb/Chapter08/watcher.cc
  44. +44 −0 berkeleydb/Chapter08/watcher.h
  45. +91 −0 berkeleydb/Chapter08/worker.cc
  46. +37 −0 berkeleydb/Chapter08/worker.h
  47. +278 −0 berkeleydb/Chapter09/.sconsign
  48. +20 −0 berkeleydb/Chapter09/SConstruct
  49. +294 −0 berkeleydb/Chapter09/dbentry.cc
  50. +233 −0 berkeleydb/Chapter09/dbentry.h
  51. +456 −0 berkeleydb/Chapter09/gtm.cc
  52. +130 −0 berkeleydb/Chapter09/gtm.h
  53. +687 −0 berkeleydb/Chapter09/load_db.cc
  54. +310 −0 berkeleydb/Chapter09/load_db.h
Sorry, we could not display the entire diff because it was too big.
View
22 berkeleydb/Chapter04/SConstruct
@@ -0,0 +1,22 @@
+
+MYLIBPATH = ['/home/hy/sw/install/lib',
+ '/usr/lib']
+MYCPPPATH = ['/home/hy/sw/install/include']
+USED_LIBS = ['db_cxx',
+ 'pthread']
+
+SOURCES_FILES = ['hello_world.cc']
+
+SOURCES_FILES_1 = ['env_ex.cc']
+
+Program( 'hello_world',
+ SOURCES_FILES,
+ LIBS = USED_LIBS,
+ CPPPATH = MYCPPPATH,
+ LIBPATH = MYLIBPATH )
+
+Program( 'env_ex',
+ SOURCES_FILES_1,
+ LIBS = USED_LIBS,
+ CPPPATH = MYCPPPATH,
+ LIBPATH = MYLIBPATH )
View
64 berkeleydb/Chapter04/env_ex.cc
@@ -0,0 +1,64 @@
+
+#include <iostream>
+
+#include <db_cxx.h>
+
+using std::cout;
+using std::endl;
+using std::cerr;
+
+void errCallback (const DbEnv *env, const char *prefix, const char *errMsg)
+{
+ cout << prefix << " " << errMsg << endl;
+}
+
+int main(int argc, char **argv)
+{
+ DbEnv dbenv(0);
+
+ try
+ {
+ const char *envhome = "./chap4_env";
+
+ dbenv.set_errpfx("env_ex");
+ dbenv.set_errcall(errCallback);
+
+ dbenv.open(envhome, DB_CREATE | DB_INIT_MPOOL, 0);
+
+ Db db(&dbenv, 0);
+ db.open(NULL, "chap4_db", NULL , DB_BTREE, DB_CREATE, 0644);
+
+ char *first_key = "first_record";
+ u_int32_t key_len = (u_int32_t)strlen(first_key);
+
+ char *first_value = "Hello World - Berkeley DB style!!";
+ u_int32_t value_len = (u_int32_t)strlen(first_value);
+
+ Dbt key(first_key, key_len + 1 );
+ Dbt value(first_value, value_len + 1);
+
+ int ret;
+ ret = db.put(0, &key, &value, DB_NOOVERWRITE);
+
+ if (ret == DB_KEYEXIST)
+ {
+ db.err(ret, "");
+ }
+
+ Dbt stored_value;
+ ret = db.get(0, &key, &stored_value, 0);
+
+ cout << (char *)stored_value.get_data() << endl;
+
+ db.close(0);
+
+ }
+ catch(DbException &dbex)
+ {
+ dbenv.err(dbex.get_errno(), "Db exception caught");
+ }
+ catch(...)
+ {
+ cout << "unknown exception caught" << endl;
+ }
+}
View
66 berkeleydb/Chapter04/hello_world.cc
@@ -0,0 +1,66 @@
+
+#include <iostream>
+
+#include <db_cxx.h>
+
+using std::cout;
+using std::endl;
+using std::cerr;
+
+void errCallback (const DbEnv *env, const char *prefix, const char *errMsg)
+{
+ cout << prefix << " " << errMsg << endl;
+}
+
+int main(int argc, char **argv)
+{
+ Db db(0, 0);
+
+ try
+ {
+ db.set_errpfx("hello_world");
+ db.set_errcall(errCallback);
+
+ db.open(NULL,
+ "./chap4_db",
+ NULL,
+ DB_BTREE,
+ DB_CREATE,
+ 0644);
+
+ char *first_key = "first_record";
+ u_int32_t key_len = (u_int32_t)strlen(first_key);
+
+ char *first_value = "Hello World - Berkeley DB style!!";
+ u_int32_t value_len = (u_int32_t)strlen(first_value);
+
+ Dbt key(first_key, key_len + 1 );
+ Dbt value(first_value, value_len + 1);
+
+ int ret;
+ ret = db.put(0,
+ &key,
+ &value,
+ DB_NOOVERWRITE);
+
+ if (ret == DB_KEYEXIST)
+ {
+ db.err(ret, "");
+ }
+
+ Dbt stored_value;
+ ret = db.get(0,
+ &key,
+ &stored_value,
+ 0);
+ cout << (char *)stored_value.get_data()
+ << endl;
+
+ db.close(0);
+
+ }
+ catch(DbException &dbex)
+ {
+ db.err(dbex.get_errno(), "Db exception caught");
+ }
+}
View
27 berkeleydb/Chapter04/hello_world.cc.env
@@ -0,0 +1,27 @@
+
+#include <iostream>
+
+#include <db_cxx.h>
+
+int main(int argc, char **argv)
+{
+ try
+ {
+ // Create the db environment handle
+ DbEnv *env = new DbEnv(0);
+
+ //Set the db data directory
+ env->set_data_dir("/tmp/chapter4");
+
+ //Open the db environment
+ env->open(DB_CREATE | DB_INIT_MPOOL, 0);
+
+
+ //Close the db environment
+ env->close(0);
+ }
+ catch(DbException &dbex)
+ {
+ cerr << "hello_world: exception caught: " << dbe.what() << endl;
+ }
+}
View
6 berkeleydb/Chapter04/try.cc
@@ -0,0 +1,6 @@
+#include <stdio.h>
+
+int main(int argc, char **argv)
+{
+ printf("hello world\n");
+}
View
22 berkeleydb/Chapter05/SConstruct
@@ -0,0 +1,22 @@
+
+MYLIBPATH = ['/home/hy/sw/install/lib',
+ '/usr/lib']
+MYCPPPATH = ['/home/hy/sw/install/include']
+USED_LIBS = ['db_cxx',
+ 'pthread',
+ 'ACE',]
+
+SOURCES_FILES = ['cds.cc']
+SOURCES_FILES_1 = ['tds.cc']
+
+Program( 'cds',
+ SOURCES_FILES,
+ LIBS = USED_LIBS,
+ CPPPATH = MYCPPPATH,
+ LIBPATH = MYLIBPATH )
+
+Program( 'tds',
+ SOURCES_FILES_1,
+ LIBS = USED_LIBS,
+ CPPPATH = MYCPPPATH,
+ LIBPATH = MYLIBPATH )
View
152 berkeleydb/Chapter05/cds.cc
@@ -0,0 +1,152 @@
+#include <iostream>
+#include <sstream>
+#include <string>
+
+#include <db_cxx.h>
+
+#include <ace/Task.h>
+
+using std::cout;
+using std::endl;
+using std::cerr;
+using std::stringstream;
+using std::string;
+
+#define NUM_RECS 5
+
+void errCallback (const DbEnv *env, const char *prefix, const char *errMsg)
+{
+ ACE_DEBUG((LM_ERROR, "(%t) %s errMsg: %s\n", prefix, errMsg));
+}
+
+class CDSWorker: public ACE_Task_Base
+{
+ public:
+ CDSWorker(DbEnv *env, Db *db, bool reader, int id);
+ virtual int svc();
+
+ private:
+ void runReader();
+ void runWriter();
+
+ DbEnv *env_;
+ Db *db_;
+ bool reader_;
+ int id_;
+};
+
+CDSWorker::CDSWorker(DbEnv *env, Db *db, bool reader, int id)
+ :env_(env), db_(db), reader_(reader), id_(id)
+{}
+
+int CDSWorker::svc()
+{
+ try
+ {
+ if(reader_)
+ runReader();
+ else
+ runWriter();
+ }
+ catch(...)
+ {
+ ACE_DEBUG((LM_ERROR, "CDSWorker: exception caught\n"));
+ }
+}
+
+void CDSWorker::runReader()
+{
+ Dbc *dbcur;
+ db_->cursor(NULL, &dbcur, 0);
+ Dbt key;
+ Dbt value;
+
+ while (dbcur->get(&key, &value, DB_NEXT) == 0)
+ {
+ ACE_DEBUG((LM_ERROR, "Reader %d: key \"%s\" data \"%s\"\n",
+ id_, (char*)(key.get_data()),
+ (char*)(value.get_data()) ));
+ ACE_OS::sleep(1);
+ }
+ dbcur->close();
+}
+
+void CDSWorker::runWriter()
+{
+ for(int i=0; i < NUM_RECS; i++)
+ {
+ stringstream keyss;
+ keyss << id_;
+ keyss << "_";
+ keyss << i;
+ string keystr = keyss.str();
+
+ stringstream valss;
+ valss << id_;
+ valss << "_";
+ valss << i*i;
+ string valstr = valss.str();
+
+ Dbt key((void*)(keystr.c_str()), keystr.size() + 1);
+ Dbt value((void*)(valstr.c_str()), valstr.size() + 1);
+
+ int ret = db_->put(0, &key, &value, 0);
+ if(ret)
+ env_->err(ret, keystr.c_str());
+ else
+ ACE_DEBUG((LM_ERROR, "Writer %d: key \"%s\" data \"%s\" \n",
+ id_,
+ keystr.c_str(),
+ valstr.c_str() ));
+ ACE_OS::sleep(1);
+ }
+}
+
+int main(int argc, char **argv)
+{
+ DbEnv dbenv(0);
+
+ try
+ {
+ const char *envhome = "./chap5_env";
+
+ dbenv.set_errpfx("env_ex");
+ dbenv.set_errcall(errCallback);
+
+ dbenv.open(envhome, DB_CREATE | DB_INIT_MPOOL | DB_INIT_CDB | DB_THREAD, 0);
+
+ Db db_ver(NULL, 0);
+ int verify = db_ver.verify( "./chap5_env/chap5_cds", NULL, NULL, 0);
+
+ ACE_DEBUG((LM_ERROR, "verify returned %d\n", verify));
+
+ Db db(&dbenv, 0);
+ db.open(NULL, "chap5_cds", NULL , DB_BTREE, DB_CREATE | DB_THREAD, 0644);
+
+ CDSWorker *w1 = new CDSWorker(&dbenv, &db, false, 1);
+ if(w1->activate((THR_NEW_LWP | THR_JOINABLE), 1) != 0)
+ ACE_DEBUG((LM_ERROR, "Could not launch w1\n"));
+
+ CDSWorker *r1 = new CDSWorker(&dbenv, &db, true, 2);
+ if(r1->activate((THR_NEW_LWP | THR_JOINABLE), 1) != 0)
+ ACE_DEBUG((LM_ERROR, "Could not launch r1\n"));
+
+ w1->wait();
+ r1->wait();
+
+ delete r1;
+ delete w1;
+
+ db.close(0);
+ dbenv.close(0);
+
+ }
+ catch(DbException &dbex)
+ {
+ dbenv.err(dbex.get_errno(), "Db exception caught");
+ }
+ catch(...)
+ {
+ cout << "unknown exception caught" << endl;
+ }
+}
View
24 berkeleydb/Chapter05/nested
@@ -0,0 +1,24 @@
+
+DbTxn *parent;
+try
+{
+ env->txn_begin(NULL, &parent, 0);
+ db->get(parent, &key, &value, 0);
+
+ DbTxn *child;
+ try
+ {
+ env->txn_begin(parent, &child, 0);
+ db->put(child, &key, &value1, 0);
+ child->commit(0);
+ }
+ catch (DbException ex)
+ {
+ child->abort();
+ }
+ parent->commit(0);
+}
+catch (DbException& ex)
+{
+ parent->abort();
+}
View
344 berkeleydb/Chapter05/tds.cc
@@ -0,0 +1,344 @@
+#include <iostream>
+#include <sstream>
+#include <string>
+
+#include <db_cxx.h>
+
+#include <ace/Task.h>
+#include <ace/Time_Value.h>
+#include <ace/Thread_Manager.h>
+
+using std::cout;
+using std::endl;
+using std::cerr;
+using std::stringstream;
+using std::string;
+
+#define NUM_RECS 50000
+
+#define READER 1
+#define WRITER 2
+#define NON_CURSOR_READER 3
+#define CHECKPOINT 4
+
+void errCallback (const DbEnv *env, const char *prefix, const char *errMsg)
+{
+ ACE_DEBUG((LM_ERROR, "(%t) %s errMsg: %s\n", prefix, errMsg));
+}
+
+class CDSWorker: public ACE_Task_Base
+{
+ public:
+ CDSWorker(DbEnv *env, Db *db, int type, int id);
+ virtual int svc();
+
+ private:
+ void runReader();
+ void runNonCursorReader();
+ void runWriter();
+ void runCheckpoint();
+ void archive(u_int32_t flag);
+
+ DbEnv *env_;
+ Db *db_;
+ int type_;
+ int id_;
+
+};
+
+CDSWorker::CDSWorker(DbEnv *env, Db *db, int type, int id)
+ :env_(env), db_(db), type_(type), id_(id)
+{}
+
+int CDSWorker::svc()
+{
+ try
+ {
+ switch(type_)
+ {
+ case READER:
+ runReader();
+ break;
+ case WRITER:
+ runWriter();
+ break;
+ case NON_CURSOR_READER:
+ runNonCursorReader();
+ break;
+ case CHECKPOINT:
+ runCheckpoint();
+ break;
+ default:
+ ACE_DEBUG((LM_ERROR, "unknown type %d\n", type_));
+ break;
+ }
+ }
+ catch(...)
+ {
+ ACE_DEBUG((LM_ERROR, "CDSWorker: exception caught\n"));
+ }
+}
+
+void CDSWorker::runReader()
+{
+ Dbc *dbcur;
+ Dbt key;
+ Dbt value;
+
+ DbTxn *txn;
+ try
+ {
+ env_->txn_begin(NULL, &txn, 0);
+ db_->cursor(txn, &dbcur, DB_DEGREE_2);
+ while (dbcur->get(&key, &value, DB_NEXT) == 0)
+ {
+ ACE_DEBUG((LM_ERROR, "Reader %d: key \"%s\" data \"%s\"\n",
+ id_, (char*)(key.get_data()),
+ (char*)(value.get_data()) ));
+ }
+ dbcur->close();
+ txn->commit(0);
+ }
+ catch(DbException &txn_ex)
+ {
+ txn->abort();
+ }
+
+}
+
+void CDSWorker::runWriter()
+{
+ for(int i=0; i < NUM_RECS; i++)
+ {
+ stringstream keyss;
+ keyss << id_;
+ keyss << "_";
+ keyss << i;
+ string keystr = keyss.str();
+
+ stringstream valss;
+ valss << id_;
+ valss << "_";
+ valss << i*i;
+ string valstr = valss.str();
+
+ Dbt key((void*)(keystr.c_str()), keystr.size() + 1);
+ char buff[6000];
+ strcpy(buff, valstr.c_str());
+ Dbt value((void*)(buff), 6000);
+
+ DbTxn *txn;
+ try
+ {
+ env_->txn_begin(NULL, &txn, 0);
+ int ret = db_->put(txn, &key, &value, 0);
+ if(ret)
+ env_->err(ret, keystr.c_str());
+#if 0
+ else
+ ACE_DEBUG((LM_ERROR, "Writer %d: key \"%s\" data \"%s\" \n",
+ id_,
+ keystr.c_str(),
+ valstr.c_str() ));
+#endif
+ txn->commit(0);
+ }
+ catch(DbException &txn_ex)
+ {
+ env_->err(txn_ex.get_errno(), "writer");
+ txn->abort();
+ }
+ }
+}
+
+void CDSWorker::runNonCursorReader()
+{
+ DbTxn *txn;
+ env_->txn_begin(NULL, &txn, 0);
+ while(true)
+ {
+ int j = 0;
+ for(int i=0; i < NUM_RECS; i++)
+ {
+ stringstream keyss;
+ keyss << "1_";
+ keyss << i;
+ string keystr = keyss.str();
+
+ Dbt key((void*)(keystr.c_str()), keystr.size() + 1);
+ Dbt value;
+ value.set_flags(DB_DBT_MALLOC);
+
+ try
+ {
+ int ret = db_->get(txn, &key, &value,0 );
+ if(ret == DB_NOTFOUND)
+ {
+ //ACE_DEBUG((LM_ERROR, "key not found\n"));
+ txn->commit(0);
+ env_->txn_begin(NULL, &txn, 0);
+ i --;
+ continue;
+ }
+ else if(ret != 0)
+ {
+ env_->err(ret, keystr.c_str());
+ }
+ else
+ {
+#if 0
+ ACE_DEBUG((LM_ERROR, "NonCursorReader %d: key \"%s\" data \"%s\" size %d \n",
+ id_,
+ keystr.c_str(),
+ (char*)(value.get_data()), value.get_size() ));
+#endif
+ }
+ j++;
+ if(j >= 20)
+ {
+ txn->commit(0);
+ env_->txn_begin(NULL, &txn, 0);
+ j = 0;
+ }
+ }
+ catch(DbException &txn_ex)
+ {
+ env_->err(txn_ex.get_errno(), "nonCursorReader");
+ txn->abort();
+ env_->txn_begin(NULL, &txn, 0);
+ }
+ }
+ }
+}
+
+void CDSWorker::runCheckpoint()
+{
+ int ARCH_INTERVAL = 60;
+ ACE_Time_Value last_arch = 0;
+
+ while(true)
+ {
+ try
+ {
+ env_->txn_checkpoint(0,0,0);
+ ACE_Time_Value curr_time = ACE_OS::gettimeofday();
+ if(curr_time.sec() - last_arch.sec() > ARCH_INTERVAL)
+ {
+ ACE_DEBUG((LM_ERROR, "removing unused log files\n"));
+ archive(DB_ARCH_REMOVE);
+
+ ACE_DEBUG((LM_ERROR, "archiving data files\n"));
+ archive(DB_ARCH_DATA);
+
+ ACE_DEBUG((LM_ERROR, "archiving log files\n"));
+ archive(DB_ARCH_LOG);
+ }
+ }
+ catch(DbException &ex)
+ {
+ env_->err(ex.get_errno(), "checkpoint");
+ }
+ ACE_OS::sleep(60);
+ }
+}
+
+void CDSWorker::archive(u_int32_t flags)
+{
+ int ret;
+ char **file_list = NULL;
+
+ ret = env_->log_archive(&file_list, flags);
+ if(ret)
+ {
+ env_->err(ret, "runCheckpoint: archive failed");
+ }
+ else if(file_list != NULL)
+ {
+ char **begin = file_list;
+ for(; *file_list != NULL; ++file_list)
+ {
+ ACE_DEBUG((LM_ERROR, "%s\n", *file_list));
+ }
+ free(begin);
+ }
+}
+
+int main(int argc, char **argv)
+{
+ DbEnv dbenv(0);
+ try
+ {
+ const char *envhome = "./chap5_env";
+
+ dbenv.set_errpfx("env_ex");
+ dbenv.set_errcall(errCallback);
+ dbenv.set_lk_detect(DB_LOCK_DEFAULT);
+
+ u_int32_t envFlags =
+ DB_CREATE | DB_INIT_MPOOL | DB_INIT_TXN |
+ DB_INIT_LOG | DB_INIT_LOCK | DB_RECOVER | DB_THREAD;
+
+ dbenv.open(envhome, envFlags, 0);
+
+ Db db(&dbenv, 0);
+ DbTxn *txn;
+ try
+ {
+ dbenv.txn_begin(NULL, &txn, 0);
+ db.set_pagesize(512);
+ db.open(txn, "chap5_tds", NULL ,
+ DB_BTREE, DB_CREATE | DB_THREAD, 0644);
+ txn->commit(0);
+ }
+ catch(DbException &txn_ex)
+ {
+ txn->abort();
+ }
+
+ CDSWorker *w1 = new CDSWorker(&dbenv, &db, WRITER, 1);
+ if(w1->activate((THR_NEW_LWP | THR_JOINABLE), 1) != 0)
+ ACE_DEBUG((LM_ERROR, "Could not launch w1\n"));
+
+ CDSWorker *w2 = new CDSWorker(&dbenv, &db, WRITER, 1);
+ if(w2->activate((THR_NEW_LWP | THR_JOINABLE), 1) != 0)
+ ACE_DEBUG((LM_ERROR, "Could not launch w1\n"));
+ //ACE_OS::sleep(1);
+
+ CDSWorker *r1 = new CDSWorker(&dbenv, &db, NON_CURSOR_READER, 2);
+ if(r1->activate((THR_NEW_LWP | THR_JOINABLE), 1) != 0)
+ ACE_DEBUG((LM_ERROR, "Could not launch r1\n"));
+
+ CDSWorker *r2 = new CDSWorker(&dbenv, &db, NON_CURSOR_READER, 3);
+ if(r2->activate((THR_NEW_LWP | THR_JOINABLE), 1) != 0)
+ ACE_DEBUG((LM_ERROR, "Could not launch r3\n"));
+
+ CDSWorker *r3 = new CDSWorker(&dbenv, &db, NON_CURSOR_READER, 4);
+ if(r3->activate((THR_NEW_LWP | THR_JOINABLE), 1) != 0)
+ ACE_DEBUG((LM_ERROR, "Could not launch r4\n"));
+
+ CDSWorker *r4 = new CDSWorker(&dbenv, &db, CHECKPOINT, 5);
+ if(r4->activate((THR_NEW_LWP | THR_JOINABLE), 1) != 0)
+ ACE_DEBUG((LM_ERROR, "Could not launch r5\n"));
+
+ ACE_Thread_Manager *tm = ACE_Thread_Manager::instance();
+ tm->wait();
+
+ delete r1;
+ delete w1;
+ delete r2;
+ delete r3;
+ delete r4;
+
+ db.close(0);
+ dbenv.close(0);
+
+ }
+ catch(DbException &dbex)
+ {
+ dbenv.err(dbex.get_errno(), "Db exception caught");
+ }
+ catch(...)
+ {
+ cout << "unknown exception caught" << endl;
+ }
+}
View
29 berkeleydb/Chapter06/SConstruct
@@ -0,0 +1,29 @@
+
+MYLIBPATH = ['/home/hy/sw/install/lib',
+ '/usr/lib']
+MYCPPPATH = ['/home/hy/sw/install/include']
+USED_LIBS = ['db_cxx',
+ 'pthread',
+ 'ACE',]
+
+SOURCES_FILES = ['cds.cc']
+SOURCES_FILES_1 = ['tds.cc', 'load_db.cc']
+SOURCES_FILES_2 = ['misc.cc']
+
+Program( 'cds',
+ SOURCES_FILES,
+ LIBS = USED_LIBS,
+ CPPPATH = MYCPPPATH,
+ LIBPATH = MYLIBPATH )
+
+Program( 'tds',
+ SOURCES_FILES_1,
+ LIBS = USED_LIBS,
+ CPPPATH = MYCPPPATH,
+ LIBPATH = MYLIBPATH )
+
+Program( 'misc',
+ SOURCES_FILES_2,
+ LIBS = USED_LIBS,
+ CPPPATH = MYCPPPATH,
+ LIBPATH = MYLIBPATH )
View
4 berkeleydb/Chapter06/account.txt
@@ -0,0 +1,4 @@
+21344,111223333,2,4000,1
+34211,111443335,1,4500,2
+54325,111443335,2,5680,1
+65466,111223333,3,7878,2
View
3 berkeleydb/Chapter06/bank.txt
@@ -0,0 +1,3 @@
+1,Bank of America,94086
+2,Wells Fargo Bank,94085
+3,Citibank,94085
View
152 berkeleydb/Chapter06/cds.cc
@@ -0,0 +1,152 @@
+#include <iostream>
+#include <sstream>
+#include <string>
+
+#include <db_cxx.h>
+
+#include <ace/Task.h>
+
+using std::cout;
+using std::endl;
+using std::cerr;
+using std::stringstream;
+using std::string;
+
+#define NUM_RECS 5
+
+void errCallback (const DbEnv *env, const char *prefix, const char *errMsg)
+{
+ ACE_DEBUG((LM_ERROR, "(%t) %s errMsg: %s\n", prefix, errMsg));
+}
+
+class CDSWorker: public ACE_Task_Base
+{
+ public:
+ CDSWorker(DbEnv *env, Db *db, bool reader, int id);
+ virtual int svc();
+
+ private:
+ void runReader();
+ void runWriter();
+
+ DbEnv *env_;
+ Db *db_;
+ bool reader_;
+ int id_;
+};
+
+CDSWorker::CDSWorker(DbEnv *env, Db *db, bool reader, int id)
+ :env_(env), db_(db), reader_(reader), id_(id)
+{}
+
+int CDSWorker::svc()
+{
+ try
+ {
+ if(reader_)
+ runReader();
+ else
+ runWriter();
+ }
+ catch(...)
+ {
+ ACE_DEBUG((LM_ERROR, "CDSWorker: exception caught\n"));
+ }
+}
+
+void CDSWorker::runReader()
+{
+ Dbc *dbcur;
+ db_->cursor(NULL, &dbcur, 0);
+ Dbt key;
+ Dbt value;
+
+ while (dbcur->get(&key, &value, DB_NEXT) == 0)
+ {
+ ACE_DEBUG((LM_ERROR, "Reader %d: key \"%s\" data \"%s\"\n",
+ id_, (char*)(key.get_data()),
+ (char*)(value.get_data()) ));
+ ACE_OS::sleep(1);
+ }
+ dbcur->close();
+}
+
+void CDSWorker::runWriter()
+{
+ for(int i=0; i < NUM_RECS; i++)
+ {
+ stringstream keyss;
+ keyss << id_;
+ keyss << "_";
+ keyss << i;
+ string keystr = keyss.str();
+
+ stringstream valss;
+ valss << id_;
+ valss << "_";
+ valss << i*i;
+ string valstr = valss.str();
+
+ Dbt key((void*)(keystr.c_str()), keystr.size() + 1);
+ Dbt value((void*)(valstr.c_str()), valstr.size() + 1);
+
+ int ret = db_->put(0, &key, &value, 0);
+ if(ret)
+ env_->err(ret, keystr.c_str());
+ else
+ ACE_DEBUG((LM_ERROR, "Writer %d: key \"%s\" data \"%s\" \n",
+ id_,
+ keystr.c_str(),
+ valstr.c_str() ));
+ ACE_OS::sleep(1);
+ }
+}
+
+int main(int argc, char **argv)
+{
+ DbEnv dbenv(0);
+
+ try
+ {
+ const char *envhome = "./chap5_env";
+
+ dbenv.set_errpfx("env_ex");
+ dbenv.set_errcall(errCallback);
+
+ dbenv.open(envhome, DB_CREATE | DB_INIT_MPOOL | DB_INIT_CDB | DB_THREAD, 0);
+
+ Db db_ver(NULL, 0);
+ int verify = db_ver.verify( "./chap5_env/chap5_cds", NULL, NULL, 0);
+
+ ACE_DEBUG((LM_ERROR, "verify returned %d\n", verify));
+
+ Db db(&dbenv, 0);
+ db.open(NULL, "chap5_cds", NULL , DB_BTREE, DB_CREATE | DB_THREAD, 0644);
+
+ CDSWorker *w1 = new CDSWorker(&dbenv, &db, false, 1);
+ if(w1->activate((THR_NEW_LWP | THR_JOINABLE), 1) != 0)
+ ACE_DEBUG((LM_ERROR, "Could not launch w1\n"));
+
+ CDSWorker *r1 = new CDSWorker(&dbenv, &db, true, 2);
+ if(r1->activate((THR_NEW_LWP | THR_JOINABLE), 1) != 0)
+ ACE_DEBUG((LM_ERROR, "Could not launch r1\n"));
+
+ w1->wait();
+ r1->wait();
+
+ delete r1;
+ delete w1;
+
+ db.close(0);
+ dbenv.close(0);
+
+ }
+ catch(DbException &dbex)
+ {
+ dbenv.err(dbex.get_errno(), "Db exception caught");
+ }
+ catch(...)
+ {
+ cout << "unknown exception caught" << endl;
+ }
+}
View
603 berkeleydb/Chapter06/load_db.cc
@@ -0,0 +1,603 @@
+#include "load_db.h"
+
+#include <ace/Log_Msg.h>
+#include <netinet/in.h>
+
+static const DbDesc dbDescArr[] =
+{
+ { "Person_Db", "person.txt", PERSON_DB, 0, 0 },
+ { "Account_Db", "account.txt", 2, 0, 0 },
+ { "Bank_Db", "bank.txt", 3, 0, 0 },
+ { "Person_Name_Db", "", PERSON_NAME_DB, 1, PERSON_DB },
+ { "Person_Dob_Db", "", PERSON_DOB_DB, 1, PERSON_DB }
+};
+
+static const size_t numDb = 5;
+
+static int btreeCompare(Db *db, const Dbt *d1, const Dbt *d2);
+
+DbUtils::DbMap DbUtils::dbMap_;
+DbEnv* DbUtils::dbEnv_ = NULL;
+
+void readAccount(std::string& line)
+{
+ //ACE_DEBUG((LM_ERROR, "readAccount: reading %s\n", line.c_str()));
+ std::istringstream iss (line, std::istringstream::in);
+ std::string strVal;
+
+ Account acc;
+ char c;
+
+ iss >> acc.accNumber;
+ iss >> c;
+
+ iss >> acc.ssn;
+ iss >> c;
+
+ iss >> acc.type;
+ iss >> c;
+
+ iss >> acc.balance;
+ iss >> c;
+
+ iss >> acc.bankId;
+
+ ACE_DEBUG((LM_ERROR,
+ "accNumber-%u ssn-%Q type-%u balance-%u bankId-%u\n",
+ acc.accNumber, acc.ssn, acc.type, acc.balance, acc.bankId));
+}
+
+void readBank(std::string& line)
+{
+ //ACE_DEBUG((LM_ERROR, "Load::readBank: reading %s\n", line.c_str()));
+ std::istringstream iss (line, std::istringstream::in);
+ std::string strVal;
+
+ Bank b;
+ char c;
+
+ iss >> b.id;
+ iss >> c;
+
+ DbUtils::getStrVal(iss, strVal);
+ b.name = strVal;
+
+ iss >> b.zip;
+
+ ACE_DEBUG((LM_ERROR,
+ "readBank: id-%u name-%s zip-%u \n",
+ b.id, b.name.c_str(), b.zip));
+}
+
+void DbUtils::loadDbs(DbEnv *env)
+{
+ dbEnv_ = env;
+ for(int i=0; i < numDb; i++)
+ {
+ DbDesc dd = dbDescArr[i];
+ Db *db = new Db(env, 0);
+ DbTxn *txn;
+ try
+ {
+ env->txn_begin(NULL, &txn, 0);
+ db->set_pagesize(512);
+ //db->set_bt_compare(btreeCompare);
+ if(dd.isSec)
+ {
+ db->set_flags(DB_DUPSORT);
+ db->open(txn, dd.dbName, NULL ,
+ DB_BTREE, DB_CREATE | DB_THREAD, 0644);
+ DbMapIter it = dbMap_.find(dd.primaryId);
+ if(it != dbMap_.end())
+ {
+ Db *primary = it->second;
+ primary->associate(txn, db, DbUtils::getSecKey, 0);
+ }
+ else
+ {
+ ACE_DEBUG((LM_ERROR, "unknonw primary db %d\n",
+ dd.primaryId));
+ throw;
+ }
+ }
+ else
+ {
+ db->open(txn, dd.dbName, NULL ,
+ DB_BTREE, DB_CREATE | DB_THREAD, 0644);
+ }
+ txn->commit(0);
+ dbMap_[dd.id] = db;
+ }
+ catch(DbException &txn_ex)
+ {
+ txn->abort();
+ }
+ }
+ for(int i=0; i < numDb; i++)
+ {
+ DbDesc dd = dbDescArr[i];
+ if(!dd.isSec)
+ load(dd.fileName, dd.dbName, dd.id);
+ }
+}
+
+void DbUtils::load(const char *file, const char *dbName, int id)
+{
+ std::ifstream fileStrm(file, std::ios::in);
+ if ( !fileStrm )
+ {
+ ACE_DEBUG((LM_ERROR, "LoadDb::load: unable to open %s\n", file));
+ throw std::exception();
+ }
+
+ Db *db = getDbHandle(id);
+
+ while (!fileStrm.eof())
+ {
+ std::string line;
+ std::getline(fileStrm, line);
+ if(line.length() <= 0)
+ break;
+ switch(id)
+ {
+ case PERSON_DB:
+ {
+ Person p(line);
+ p.dbPut(db);
+ }
+ break;
+ case 2:
+ readAccount(line);
+ break;
+ case 3:
+ readBank(line);
+ break;
+ default:
+ ACE_DEBUG((LM_ERROR, "LoadDb::load: unknown Db id %d\n", id));
+ break;
+ }
+ }
+}
+
+void DbUtils::getStrVal(std::istream& instrm, std::string& value)
+{
+ char c;
+ value.clear();
+ while(instrm.get(c))
+ {
+ switch(c)
+ {
+ case ',':
+ case '\n':
+ return;
+ default:
+ value += c;
+ }
+ }
+}
+
+void DbUtils::closeDbResources()
+{
+ DbMapIter it = dbMap_.begin();
+ for(; it != dbMap_.end(); ++it)
+ {
+ Db *db = it->second;
+ db->close(0);
+ }
+ dbEnv_->close(0);
+}
+
+Db* DbUtils::getDbHandle(int dbId)
+{
+ Db *db = NULL;
+
+ DbMapIter it = dbMap_.find(dbId);
+ if(it != dbMap_.end())
+ db = it->second;
+ return db;
+}
+
+Person::Person(std::string& line)
+{
+ std::istringstream iss (line, std::istringstream::in);
+ std::string strVal;
+
+ iss >> ssn_;
+
+ char c;
+ iss >> c;
+
+ DbUtils::getStrVal(iss, strVal);
+ name_ = strVal;
+
+ DbUtils::getStrVal(iss, strVal);
+ dob_ = strVal;
+
+ ACE_DEBUG((LM_ERROR, "readPerson: ssn-%u name-%s, dob-%s\n",
+ ssn_, name_.c_str(), dob_.c_str()));
+}
+
+
+DbEntry::DbEntry()
+{}
+
+void DbEntry::dbDel(Db *db, Dbt *key, DbTxn* t)
+{
+ DbTxn *txn = t;
+ try
+ {
+ if(t == NULL)
+ DbUtils::dbEnv_->txn_begin(NULL, &txn, 0);
+ int ret = db->del(txn, key, 0);
+ if(ret)
+ DbUtils::dbEnv_->err(ret, "DbEntry::dbGet" );
+ if(t == NULL)
+ txn->commit(0);
+ }
+ catch(DbException &txn_ex)
+ {
+ DbUtils::dbEnv_->err(txn_ex.get_errno(), "DbEntry::dbGet");
+ if(t == NULL)
+ txn->abort();
+ else
+ throw;
+ }
+}
+
+int DbEntry::dbGet(Db *db, Dbt *key, Dbt* data, DbTxn* t)
+{
+ int ret = -1;
+ DbTxn *txn = t;
+ try
+ {
+ if(t == NULL)
+ DbUtils::dbEnv_->txn_begin(NULL, &txn, 0);
+ ret = db->get(txn, key, data, 0);
+ if(ret)
+ DbUtils::dbEnv_->err(ret, "DbEntry::dbGet" );
+ if(t == NULL)
+ txn->commit(0);
+ }
+ catch(DbException &txn_ex)
+ {
+ DbUtils::dbEnv_->err(txn_ex.get_errno(), "DbEntry::dbGet");
+ if(t == NULL)
+ txn->abort();
+ else
+ throw;
+ }
+ return ret;
+}
+
+void DbEntry::dbPut(Db *db, DbTxn* t)
+{
+ serialize();
+
+ void *pkey = NULL;
+ int keyLen = 0;
+
+ getPKey(pkey, keyLen);
+
+ Dbt pkeyDbt(pkey, keyLen);
+ Dbt valueDbt(serializedData_, serializedLen_);
+
+ DbTxn *txn = t;
+ try
+ {
+ if(t == NULL)
+ DbUtils::dbEnv_->txn_begin(NULL, &txn, 0);
+ int ret = db->put(txn, &pkeyDbt, &valueDbt, 0);
+ if(ret)
+ DbUtils::dbEnv_->err(ret, "DbEntry::dbPut" );
+ if(t == NULL)
+ txn->commit(0);
+ }
+ catch(DbException &txn_ex)
+ {
+ DbUtils::dbEnv_->err(txn_ex.get_errno(), "DbEntry::dbPut");
+ if(t == NULL)
+ txn->abort();
+ else
+ throw;
+ }
+
+}
+
+void DbEntry::reverseEndian(unsigned long *intVal)
+{
+ ACE_DEBUG((LM_ERROR, "original value %x\n", *intVal));
+ unsigned long tmpInt = *intVal;
+ char *buf = (char*)intVal;
+ char *bufOrig = (char*)(&tmpInt);
+ int size = sizeof(unsigned long);
+ for(int i=0; i < size; i++)
+ {
+ buf[size - i - 1] = bufOrig[i];
+ }
+ ACE_DEBUG((LM_ERROR, "converted value %x\n", *intVal));
+}
+
+
+Person::Person()
+{}
+
+char* Person::serialize()
+{
+ memset (serializedData_, 0, 1000);
+
+ char *buf= serializedData_;
+
+ int elemLen = 0;
+ int totalLen = 0;
+
+ elemLen = sizeof(unsigned long);
+ memcpy(buf, &ssn_, elemLen);
+ totalLen += elemLen;
+ buf += elemLen;
+
+ elemLen = name_.length() + 1;
+ memcpy(buf, name_.c_str(), elemLen);
+ totalLen += elemLen;
+ buf += elemLen;
+
+ elemLen = dob_.length() + 1;
+ memcpy(buf, dob_.c_str(), elemLen);
+ totalLen += elemLen;
+
+ serializedLen_ = totalLen;
+ return serializedData_;
+
+}
+
+int Person::deserialize(char *buf)
+{
+ char *tmpBuf = buf;
+ int totalLen = 0;
+ int elemLen = 0;
+
+ elemLen = sizeof(ssn_);
+ ssn_ = *((unsigned long *)tmpBuf);
+ totalLen += elemLen;
+ tmpBuf += elemLen;
+
+ name_ = tmpBuf;
+ elemLen = name_.length() + 1;
+ totalLen += elemLen;
+ tmpBuf += elemLen;
+
+ dob_ = tmpBuf;
+ elemLen = dob_.length() + 1;
+ totalLen += elemLen;
+
+ return 0;
+}
+
+void Person::getPKey(void*& key, int& length)
+{
+ ssn_ = htonl(ssn_);
+ key = &ssn_;
+ length = sizeof(ssn_);
+}
+
+void Person::insert(DbTxn* txn)
+{
+ dbPut(getDbHandle(), txn);
+}
+
+int Person::getBySSN(unsigned long ssn, DbTxn* txn)
+{
+ int ret = -1;
+ unsigned long ssnVal = htonl(ssn);
+ Dbt key;
+ Dbt data;
+ data.set_flags(DB_DBT_MALLOC);
+ key.set_data((void*)(&ssnVal));
+ key.set_size(sizeof(ssn));
+ ret = dbGet(DbUtils::getDbHandle(PERSON_DB), &key, &data, txn);
+ if(ret == 0)
+ deserialize((char*)(data.get_data()));
+ return ret;
+
+}
+
+void Person::delBySSN(unsigned long ssn, DbTxn* txn)
+{
+ int ret = -1;
+ unsigned long ssnVal = htonl(ssn);
+ Dbt key;
+ key.set_data((void*)(&ssnVal));
+ key.set_size(sizeof(ssn));
+ dbDel(DbUtils::getDbHandle(PERSON_DB), &key, txn);
+}
+
+Db* Person::getDbHandle()
+{
+ return DbUtils::getDbHandle(PERSON_DB);
+}
+//void Person::getByName(std::string name)
+//{
+//}
+
+void Person::getByNameAndDob(std::string& name,
+ std::string dob, DbTxn* txn)
+{
+ try
+ {
+
+ Dbt key;
+ Dbt value;
+
+ Dbc *nameCur = NULL;
+ Db *personNameDb = DbUtils::getDbHandle(PERSON_NAME_DB);
+ int ret = personNameDb->cursor(NULL, &nameCur, 0);
+ key.set_data((void*)(name.c_str()));
+ key.set_size(name.length() + 1);
+ ret = nameCur->get(&key, &value, DB_SET);
+ if(ret)
+ {
+ DbUtils::dbEnv_->err(ret, "Person::getByNameAndDob");
+ nameCur->close();
+ return;
+ }
+
+ Dbc *dobCur = NULL;
+ Db *personDobDb = DbUtils::getDbHandle(PERSON_DOB_DB);
+ ret = personDobDb->cursor(NULL, &dobCur, 0);
+ key.set_data((void*)dob.c_str());
+ key.set_size(dob.length() + 1);
+ ret = dobCur->get(&key, &value, DB_SET);
+ if(ret)
+ {
+ DbUtils::dbEnv_->err(ret, "Person::getByNameAndDob");
+ nameCur->close();
+ dobCur->close();
+ return;
+ }
+
+ Dbc *curArr[3];
+ curArr[0] = nameCur;
+ curArr[1] = dobCur;
+ curArr[2] = NULL;
+
+ Db *personDb = DbUtils::getDbHandle(PERSON_DB);
+ Dbc *joinCur;
+ ret = personDb->join(curArr, &joinCur, 0);
+ if(ret)
+ {
+ DbUtils::dbEnv_->err(ret, "Person::getByNameAndDob");
+ nameCur->close();
+ dobCur->close();
+ return;
+ }
+
+ while( (ret = joinCur->get(&key, &value, 0)) == 0)
+ {
+ Person p;
+ p.deserialize((char*)(value.get_data()));
+ ACE_DEBUG((LM_ERROR, "Found %s\n", p.toString().c_str()));
+ }
+ nameCur->close();
+ dobCur->close();
+ joinCur->close();
+ }
+ catch(DbException& ex)
+ {
+ DbUtils::dbEnv_->err(ex.get_errno(), "Person::getByNameAndDob");
+ }
+}
+
+void Person::getBulk()
+{
+ try
+ {
+ //the 5K buffer that will be used for bulk retreival
+ int BULK_LEN = 5 * 1024;
+ char buff[BULK_LEN];
+ Dbt bulk;
+ bulk.set_data(buff);
+ bulk.set_ulen(BULK_LEN);
+ bulk.set_flags(DB_DBT_USERMEM);
+
+ //create a cursor on the database
+ Dbc *bulkCur;
+ Db *personDb = DbUtils::getDbHandle(PERSON_DB);
+ int ret = personDb->cursor(NULL, &bulkCur, 0);
+
+ //in a loop retreive 5K worth of records at a time
+ while(true)
+ {
+ Dbt key;
+ ret = bulkCur->get(&key, &bulk,
+ DB_MULTIPLE_KEY | DB_NEXT);
+ if(ret)
+ {
+ DbUtils::dbEnv_->err(ret,
+ "getBulk:bulkCur:get");
+ break;
+ }
+ //DbMultipleKeyDataIterator mIter(bulk);
+ DbMultipleDataIterator mIter(bulk);
+ Dbt d;
+ Dbt k;
+ Person p;
+ //while( (mIter.next(k, d)) )
+ while( (mIter.next(d)) )
+ {
+ p.deserialize((char*)(d.get_data()));
+ ACE_DEBUG((LM_ERROR, "getBulk: %s\n",
+ p.toString().c_str()));
+ }
+ }
+ bulkCur->close();
+ }
+ catch(DbException& ex)
+ {
+ DbUtils::dbEnv_->err(ex.get_errno(),
+ "Person::getBulk");
+ }
+}
+
+std::string Person::toString()
+{
+ std::stringstream ss;
+ ss << "Name: ";
+ ss << name_;
+ ss << " SSN: ";
+ ss << ssn_;
+ ss << " DOB: ";
+ ss << dob_;
+
+ return ss.str();
+}
+
+int btreeCompare(Db *db, const Dbt *d1, const Dbt *d2)
+{
+ unsigned int d1_int;
+ unsigned int d2_int;
+ memcpy(&d1_int, d1->get_data(), sizeof(unsigned int));
+ memcpy(&d2_int, d2->get_data(), sizeof(unsigned int));
+ int ret = d1_int - d2_int;
+ ACE_DEBUG((LM_ERROR, "d1 %d d2 %d returning %d\n", d1_int, d2_int, ret));
+ return ret;
+}
+int DbUtils::getSecKey(Db *secondary, const Dbt *pKey,
+ const Dbt *data, Dbt *secKey)
+{
+ const char *dbName;
+ const char *fileName;
+ int ret = secondary->get_dbname(&fileName, &dbName);
+ std::string fileStr = fileName;
+
+ if(!fileStr.compare("Person_Name_Db"))
+ {
+ Person p;
+ p.deserialize((char*)(data->get_data()));
+ size_t len = p.name_.length() + 1;
+ char *name = (char*)(malloc(len));
+ memcpy(name, p.name_.c_str(), len);
+
+ memset(secKey, 0, sizeof(Dbt));
+ secKey->set_data(name);
+ secKey->set_size(len);
+ secKey->set_flags(DB_DBT_APPMALLOC);
+ ACE_DEBUG((LM_ERROR, "getSecKey called for %s, sec key returned %s\n",
+ fileName, name));
+ }
+ else if(!fileStr.compare("Person_Dob_Db"))
+ {
+ Person p;
+ p.deserialize((char*)(data->get_data()));
+ size_t len = p.dob_.length() + 1;
+ char *dob = (char*)(malloc(len));
+ memcpy(dob, p.dob_.c_str(), len);
+
+ memset(secKey, 0, sizeof(Dbt));
+ secKey->set_data(dob);
+ secKey->set_size(len);
+ secKey->set_flags(DB_DBT_APPMALLOC);
+ ACE_DEBUG((LM_ERROR, "getSecKey called for %s, sec key returned %s\n",
+ fileName, dob));
+ }
+ return 0;
+}
+
View
267 berkeleydb/Chapter06/load_db.h
@@ -0,0 +1,267 @@
+#ifndef _LOAD_DB_
+#define _LOAD_DB_
+
+#include <iostream>
+#include <string>
+#include <sstream>
+#include <fstream>
+#include <map>
+
+#include <db_cxx.h>
+
+#define PERSON_DB 1
+#define ACCOUNT_DB 2
+#define BANK_DB 3
+#define PERSON_NAME_DB 4
+#define PERSON_DOB_DB 5
+
+class Person;
+template <class Entry> class DbIter;
+
+typedef struct
+{
+ const char* dbName;
+ const char* fileName;
+ int id;
+ int isSec;
+ int primaryId;
+} DbDesc;
+
+class DbUtils
+{
+ public:
+ static void loadDbs(DbEnv *env);
+ static void load(const char*file, const char* dbName, int id);
+ static void getStrVal(std::istream& instrm, std::string& value);
+ static void closeDbResources();
+ static int getSecKey(Db *secondary, const Dbt *pKey,
+ const Dbt *data, Dbt *secKey);
+
+ static Db* getDbHandle(int dbId);
+ typedef std::map< int, Db* > DbMap;
+ typedef std::map< int, Db* >::iterator DbMapIter;
+ static DbMap dbMap_;
+ static DbEnv *dbEnv_;
+};
+
+class DbEntry
+{
+ public:
+ DbEntry();
+ virtual int deserialize(char *buf)=0;
+
+ protected:
+ virtual char* serialize()=0;
+ virtual void getPKey(void*& key, int& length)=0;
+ virtual std::string toString()=0;
+
+ void reverseEndian(unsigned long *intVal);
+
+ void dbPut(Db *db, DbTxn* txn=NULL);
+ int dbGet(Db *db, Dbt *key, Dbt *data, DbTxn* txn=NULL);
+ void dbDel(Db *db, Dbt *key, DbTxn* txn=NULL);
+
+ char serializedData_[1000];
+ int serializedLen_;
+
+ friend class DbUtils;
+};
+
+class Person : public DbEntry
+{
+ public:
+ Person();
+ Person(std::string& line);
+ Person(unsigned long ssn, std::string name, std::string dob);
+
+ static void associateSec(Db *primary);
+
+ void insert(DbTxn* txn=NULL);
+
+ int getBySSN(unsigned long ssn, DbTxn* txn=NULL);
+ void delBySSN(unsigned long ssn, DbTxn* txn=NULL);
+
+ void getByNameAndDob(std::string& name,
+ std::string dob, DbTxn* txn=NULL);
+
+ std::string getName()
+ { return name_; }
+
+ void setName(std::string name)
+ { name_ = name; }
+
+ std::string getDOB()
+ { return dob_; }
+
+ void setDOB(std::string dob)
+ { dob_ = dob; }
+
+ unsigned long getSSN()
+ { return ssn_; }
+
+ void setSSN(unsigned long ssn)
+ { ssn_ = ssn; }
+
+ Db* getDbHandle();
+
+ int deserialize(char *buf);
+
+ void getBulk();
+
+ protected:
+ char* serialize();
+ void getPKey(void*& key, int& length);
+ void regSecKeys(Db *db);
+ std::string toString();
+
+ private:
+ unsigned long ssn_;
+ std::string name_;
+ std::string dob_;
+
+ friend class DbUtils;
+};
+
+class Account
+{
+ public:
+ unsigned long accNumber;
+ unsigned long long ssn;
+ unsigned long type;
+ unsigned long balance;
+ unsigned long bankId;
+ friend class LoadDb;
+};
+
+class Bank
+{
+ public:
+ unsigned long id;
+ std::string name;
+ unsigned long zip;
+ friend class LoadDb;
+};
+
+template <class Entry>
+class DbIter
+{
+ public:
+ DbIter();
+ ~DbIter();
+
+ bool next();
+ bool prev();
+ Entry& getEntry() {return curr_;}
+ void close();
+
+ private:
+ void init(u_int32_t flag);
+ int get(u_int32_t flag);
+
+ Dbc *cursor_;
+ int state_;
+ Entry curr_;
+ Db *db_;
+ DbTxn *txn_;
+ bool done_;
+ bool closed_;
+ static const int INACTIVE = 0;
+ static const int ACTIVE = 1;
+};
+
+template <class Entry>
+DbIter<Entry>::DbIter():
+ state_(INACTIVE),
+ cursor_(NULL),
+ done_(false),
+ closed_(false)
+{
+}
+
+template <class Entry>
+DbIter<Entry>::~DbIter()
+{
+ close();
+}
+
+template <class Entry>
+void DbIter<Entry>::init(u_int32_t flag)
+{
+ try
+ {
+ db_ = curr_.getDbHandle();
+ DbUtils::dbEnv_->txn_begin(NULL, &txn_, 0);
+ int ret = db_->cursor(txn_, &cursor_, flag);
+ state_ = ACTIVE;
+ }
+ catch(DbException& ex)
+ {
+ DbUtils::dbEnv_->err(ex.get_errno(), "DbIter::init");
+ txn_->abort();
+ }
+}
+
+template <class Entry>
+void DbIter<Entry>::close()
+{
+ if(closed_)
+ return;
+ try
+ {
+ cursor_->close();
+ txn_->commit(0);
+ }
+ catch(DbException& ex)
+ {
+ DbUtils::dbEnv_->err(ex.get_errno(), "DbIter::close");
+ txn_->abort();
+ }
+ closed_ = true;
+}
+
+template <class Entry>
+bool DbIter<Entry>::next()
+{
+ if(state_ != ACTIVE)
+ init(0);
+ get(DB_NEXT);
+ return !done_;
+}
+
+template <class Entry>
+bool DbIter<Entry>::prev()
+{
+ if(state_ != ACTIVE)
+ init(0);
+ get(DB_PREV);
+ return !done_;
+}
+
+template <class Entry>
+int DbIter<Entry>::get(u_int32_t flag)
+{
+ Dbt key;
+ Dbt value;
+ int ret = -1;
+ if(closed_ || done_)
+ return ret;
+
+ ret = cursor_->get(&key, &value, flag);
+ if(ret == 0)
+ {
+ curr_.deserialize((char*)(value.get_data()));
+ }
+ else if(ret == DB_NOTFOUND)
+ {
+ done_ = true;
+ close();
+ }
+ else
+ {
+ DbUtils::dbEnv_->err(ret, "DbIter::next");
+ }
+ return ret;
+
+}
+typedef DbIter<Person> PersonIter;
+#endif //_LOAD_DB_
View
94 berkeleydb/Chapter06/misc.cc
@@ -0,0 +1,94 @@
+#include <db_cxx.h>
+#include <ace/Task.h>
+#include <string>
+#include <sstream>
+
+void errCallback (const DbEnv *env, const char *prefix, const char *errMsg)
+{
+ ACE_DEBUG((LM_ERROR, "(%t) %s errMsg: %s\n", prefix, errMsg));
+}
+
+struct DataAlign
+{
+ int a;
+ char b;
+ int c;
+};
+
+std::string getStr(char *data, int len)
+{
+ std::stringstream ss;
+ for(int i=0; i < len; i++)
+ {
+ ss << data[i];
+ ACE_DEBUG((LM_ERROR, "inserting %X \n", data[i]));
+ }
+ return ss.str();
+}
+
+int main(int argc, char **argv)
+{
+ DbEnv dbenv(0);
+ try
+ {
+ const char *envhome = "./misc_env";
+
+ dbenv.set_errpfx("misc");
+ dbenv.set_errcall(errCallback);
+ dbenv.set_lk_detect(DB_LOCK_DEFAULT);
+
+ u_int32_t envFlags =
+ DB_CREATE | DB_INIT_MPOOL | DB_INIT_TXN |
+ DB_INIT_LOG | DB_INIT_LOCK | DB_RECOVER | DB_THREAD;
+
+ dbenv.open(envhome, envFlags, 0);
+
+ Db miscDb(&dbenv, 0);
+ miscDb.open(NULL, "Misc_Db", NULL, DB_BTREE,
+ DB_CREATE | DB_THREAD, 0644);
+
+ DataAlign d;
+ memset(&d, 0, sizeof(d));
+ d.a = 1;
+ d.b = (char)2;
+ d.c = 3;
+
+ char *dataStr = "Data alignment test";
+
+ Dbt key((void*)&d, sizeof(d));
+ Dbt data((void*)dataStr, strlen(dataStr) + 1);
+
+ miscDb.put(NULL, &key, &data, 0);
+
+ DataAlign d1;
+ memset(&d1, 0, sizeof(d1));
+ d1.a = 1;
+ d1.b = (char)2;
+ d1.c = 3;
+
+ Dbt newKey((void*)&d1, sizeof(d1));
+ ACE_HEX_DUMP((LM_ERROR, (char*)(newKey.get_data()),
+ newKey.get_size()));
+ Dbt value1;
+ value1.set_flags(DB_DBT_MALLOC);
+ int ret = miscDb.get(NULL, &newKey, &value1, 0);
+
+ if(ret == 0)
+ {
+ std::string str = (char*)(value1.get_data());
+ ACE_DEBUG((LM_ERROR, "get: %s\n", str.c_str()));
+ }
+ else
+ {
+ ACE_DEBUG((LM_ERROR, "data not found\n"));
+ }
+ }
+ catch(DbException &dbex)
+ {
+ dbenv.err(dbex.get_errno(), "Db exception caught");
+ }
+ catch(...)
+ {
+ dbenv.err(0, "unknown exception caught");
+ }
+}
View
24 berkeleydb/Chapter06/nested
@@ -0,0 +1,24 @@
+
+DbTxn *parent;
+try
+{
+ env->txn_begin(NULL, &parent, 0);
+ db->get(parent, &key, &value, 0);
+
+ DbTxn *child;
+ try
+ {
+ env->txn_begin(parent, &child, 0);
+ db->put(child, &key, &value1, 0);
+ child->commit(0);
+ }
+ catch (DbException ex)
+ {
+ child->abort();
+ }
+ parent->commit(0);
+}
+catch (DbException& ex)
+{
+ parent->abort();
+}
View
5 berkeleydb/Chapter06/person.txt
@@ -0,0 +1,5 @@
+111223333,Manish Pandey,jan 13 1971
+111223334,Shirish Rai,dec 23 1974
+111443335,Roland Hendel,feb 13 1973
+111443336,Roland Hendel,feb 13 1973
+111443337,Roland Hendel,feb 13 1973
View
349 berkeleydb/Chapter06/tds.cc
@@ -0,0 +1,349 @@
+#include <iostream>
+#include <sstream>
+#include <string>
+
+#include <ace/Task.h>
+#include <ace/Time_Value.h>
+#include <ace/Thread_Manager.h>
+
+#include "load_db.h"
+
+using std::cout;
+using std::endl;
+using std::cerr;
+using std::stringstream;
+using std::string;
+
+#define NUM_RECS 50000
+
+#define READER 1
+#define WRITER 2
+#define NON_CURSOR_READER 3
+#define CHECKPOINT 4
+
+void errCallback (const DbEnv *env, const char *prefix, const char *errMsg)
+{
+ ACE_DEBUG((LM_ERROR, "(%t) %s errMsg: %s\n", prefix, errMsg));
+}
+
+class CDSWorker: public ACE_Task_Base
+{
+ public:
+ CDSWorker(DbEnv *env, Db *db, int type, int id);
+ virtual int svc();
+
+ private:
+ void runReader();
+ void runNonCursorReader();
+ void runWriter();
+ void runCheckpoint();
+ void archive(u_int32_t flag);
+
+ DbEnv *env_;
+ Db *db_;
+ int type_;
+ int id_;
+
+};
+
+CDSWorker::CDSWorker(DbEnv *env, Db *db, int type, int id)
+ :env_(env), db_(db), type_(type), id_(id)
+{}
+
+int CDSWorker::svc()
+{
+ try
+ {
+ switch(type_)
+ {
+ case READER:
+ runReader();
+ break;
+ case WRITER:
+ runWriter();
+ break;
+ case NON_CURSOR_READER:
+ runNonCursorReader();
+ break;
+ case CHECKPOINT:
+ runCheckpoint();
+ break;
+ default:
+ ACE_DEBUG((LM_ERROR, "unknown type %d\n", type_));
+ break;
+ }
+ }
+ catch(...)
+ {
+ ACE_DEBUG((LM_ERROR, "CDSWorker: exception caught\n"));
+ }
+}
+
+void CDSWorker::runReader()
+{
+ Dbc *dbcur;
+ Dbt key;
+ Dbt value;
+
+ DbTxn *txn;
+ try
+ {
+ env_->txn_begin(NULL, &txn, 0);
+ db_->cursor(txn, &dbcur, DB_DEGREE_2);
+ while (dbcur->get(&key, &value, DB_NEXT) == 0)
+ {
+ ACE_DEBUG((LM_ERROR, "Reader %d: key \"%s\" data \"%s\"\n",
+ id_, (char*)(key.get_data()),
+ (char*)(value.get_data()) ));
+ }
+ dbcur->close();
+ txn->commit(0);
+ }
+ catch(DbException &txn_ex)
+ {
+ txn->abort();
+ }
+
+}
+
+void CDSWorker::runWriter()
+{
+ for(int i=0; i < NUM_RECS; i++)
+ {
+ stringstream keyss;
+ keyss << id_;
+ keyss << "_";
+ keyss << i;
+ string keystr = keyss.str();
+
+ stringstream valss;
+ valss << id_;
+ valss << "_";
+ valss << i*i;
+ string valstr = valss.str();
+
+ Dbt key((void*)(keystr.c_str()), keystr.size() + 1);
+ char buff[6000];
+ strcpy(buff, valstr.c_str());
+ Dbt value((void*)(buff), 6000);
+
+ DbTxn *txn;
+ try
+ {
+ env_->txn_begin(NULL, &txn, 0);
+ int ret = db_->put(txn, &key, &value, 0);
+ if(ret)
+ env_->err(ret, keystr.c_str());
+#if 0
+ else
+ ACE_DEBUG((LM_ERROR, "Writer %d: key \"%s\" data \"%s\" \n",
+ id_,
+ keystr.c_str(),
+ valstr.c_str() ));
+#endif
+ txn->commit(0);
+ }
+ catch(DbException &txn_ex)
+ {
+ env_->err(txn_ex.get_errno(), "writer");
+ txn->abort();
+ }
+ }
+}
+
+void CDSWorker::runNonCursorReader()
+{
+ DbTxn *txn;
+ env_->txn_begin(NULL, &txn, 0);
+ while(true)
+ {
+ int j = 0;
+ for(int i=0; i < NUM_RECS; i++)
+ {
+ stringstream keyss;
+ keyss << "1_";
+ keyss << i;
+ string keystr = keyss.str();
+
+ Dbt key((void*)(keystr.c_str()), keystr.size() + 1);
+ Dbt value;
+ value.set_flags(DB_DBT_MALLOC);
+
+ try
+ {
+ int ret = db_->get(txn, &key, &value,0 );
+ if(ret == DB_NOTFOUND)
+ {
+ //ACE_DEBUG((LM_ERROR, "key not found\n"));
+ txn->commit(0);
+ env_->txn_begin(NULL, &txn, 0);
+ i --;
+ continue;
+ }
+ else if(ret != 0)
+ {
+ env_->err(ret, keystr.c_str());
+ }
+ else
+ {
+#if 0
+ ACE_DEBUG((LM_ERROR, "NonCursorReader %d: key \"%s\" data \"%s\" size %d \n",
+ id_,
+ keystr.c_str(),
+ (char*)(value.get_data()), value.get_size() ));
+#endif
+ }
+ j++;
+ if(j >= 20)
+ {
+ txn->commit(0);
+ env_->txn_begin(NULL, &txn, 0);
+ j = 0;
+ }
+ }
+ catch(DbException &txn_ex)
+ {
+ env_->err(txn_ex.get_errno(), "nonCursorReader");
+ txn->abort();
+ env_->txn_begin(NULL, &txn, 0);
+ }
+ }
+ }
+}
+
+void CDSWorker::runCheckpoint()
+{
+ int ARCH_INTERVAL = 60;
+ ACE_Time_Value last_arch = 0;
+
+ while(true)
+ {
+ try
+ {
+ env_->txn_checkpoint(0,0,0);
+ ACE_Time_Value curr_time = ACE_OS::gettimeofday();
+ if(curr_time.sec() - last_arch.sec() > ARCH_INTERVAL)
+ {
+ ACE_DEBUG((LM_ERROR, "removing unused log files\n"));
+ archive(DB_ARCH_REMOVE);
+
+ ACE_DEBUG((LM_ERROR, "archiving data files\n"));
+ archive(DB_ARCH_DATA);
+
+ ACE_DEBUG((LM_ERROR, "archiving log files\n"));
+ archive(DB_ARCH_LOG);
+ }
+ }
+ catch(DbException &ex)
+ {
+ env_->err(ex.get_errno(), "checkpoint");
+ }
+ ACE_OS::sleep(60);
+ }
+}
+
+void CDSWorker::archive(u_int32_t flags)
+{
+ int ret;
+ char **file_list = NULL;
+
+ ret = env_->log_archive(&file_list, flags);
+ if(ret)
+ {
+ env_->err(ret, "runCheckpoint: archive failed");
+ }
+ else if(file_list != NULL)
+ {
+ char **begin = file_list;
+ for(; *file_list != NULL; ++file_list)
+ {
+ ACE_DEBUG((LM_ERROR, "%s\n", *file_list));
+ }
+ free(begin);
+ }
+}
+
+
+int main(int argc, char **argv)
+{
+ DbEnv dbenv(0);
+ try
+ {
+ const char *envhome = "./chap6_env";
+
+ dbenv.set_errpfx("env_ex");
+ dbenv.set_errcall(errCallback);
+ dbenv.set_lk_detect(DB_LOCK_DEFAULT);
+
+ u_int32_t envFlags =
+ DB_CREATE | DB_INIT_MPOOL | DB_INIT_TXN |
+ DB_INIT_LOG | DB_INIT_LOCK | DB_RECOVER | DB_THREAD;
+
+ dbenv.open(envhome, envFlags, 0);
+
+ //Load the databases from the CSV files
+ DbUtils::loadDbs(&dbenv);
+
+ //Basic operations on the database
+ Person p;
+ unsigned long ss = 111223334;
+ DbTxn *t;
+ try
+ {
+ DbUtils::dbEnv_->txn_begin(NULL, &t, 0);
+ if( p.getBySSN(ss, t) == 0)
+ {
+ p.setDOB("jan 1 2001");
+ p.insert(t);
+ t->commit(0);
+ }
+ }
+ catch(DbException& ex)
+ {
+ DbUtils::dbEnv_->err(ex.get_errno(), "DbEntry::dbPut");
+ t->abort();
+ }
+
+ if( p.getBySSN(ss) == 0)
+ {
+ ACE_DEBUG((LM_ERROR, "Found record for ssn %u: name %s dob %s \n",
+ ss,
+ p.getName().c_str(),
+ p.getDOB().c_str()));
+ }
+ p.delBySSN(ss);
+ PersonIter iter;
+
+ //Iterator operations
+ while(iter.next())
+ {
+ Person p = iter.getEntry();
+ ACE_DEBUG((LM_ERROR, "Found record for ssn %u: name %s dob %s \n",
+ ss,
+ p.getName().c_str(),
+ p.getDOB().c_str()));
+
+ }
+ iter.close();
+
+ //Equality join
+ Person joinTest;
+ std::string n = "Roland Hendel";
+ std::string dob = "feb 13 1973";
+ joinTest.getByNameAndDob(n, dob);
+
+ //Bulk retieval
+ Person bulk;
+ bulk.getBulk();
+
+ DbUtils::closeDbResources();
+ }
+ catch(DbException &dbex)
+ {
+ dbenv.err(dbex.get_errno(), "Db exception caught");
+ }
+ catch(...)
+ {
+ cout << "unknown exception caught" << endl;
+ }
+}
View
40 berkeleydb/Chapter07/SConstruct
@@ -0,0 +1,40 @@
+
+MYLIBPATH = ['/home/hy/sw/install/lib',
+ '/usr/lib']
+
+MYCPPPATH = ['/home/hy/sw/install/include',
+ '/home/hy/sw/install/include/boost-1_33',
+ ]
+
+USED_LIBS = ['db_cxx',
+ 'pthread',
+ 'ACE',]
+
+SOURCES_FILES = ['cds.cc']
+SOURCES_FILES_1 = ['tds.cc', 'load_db.cc']
+SOURCES_FILES_2 = ['misc.cc']
+SOURCES_FILES_3 = ['watcher.cc', 'load_db.cc']
+
+Program( 'cds',
+ SOURCES_FILES,
+ LIBS = USED_LIBS,
+ CPPPATH = MYCPPPATH,
+ LIBPATH = MYLIBPATH )
+
+Program( 'tds',
+ SOURCES_FILES_1,
+ LIBS = USED_LIBS,
+ CPPPATH = MYCPPPATH,
+ LIBPATH = MYLIBPATH )
+
+Program( 'watcher',
+ SOURCES_FILES_3,
+ LIBS = USED_LIBS,
+ CPPPATH = MYCPPPATH,
+ LIBPATH = MYLIBPATH )
+
+Program( 'misc',
+ SOURCES_FILES_2,
+ LIBS = USED_LIBS,
+ CPPPATH = MYCPPPATH,
+ LIBPATH = MYLIBPATH )
View
4 berkeleydb/Chapter07/account.txt
@@ -0,0 +1,4 @@
+21344,111223333,2,4000,1
+34211,111443335,1,4500,2
+54325,111443335,2,5680,1
+65466,111223333,3,7878,2
View
3 berkeleydb/Chapter07/bank.txt
@@ -0,0 +1,3 @@
+1,Bank of America,94086
+2,Wells Fargo Bank,94085
+3,Citibank,94085
View
0 berkeleydb/Chapter07/cd
No changes.
View
BIN berkeleydb/Chapter07/cds
Binary file not shown.
View
152 berkeleydb/Chapter07/cds.cc
@@ -0,0 +1,152 @@
+#include <iostream>
+#include <sstream>
+#include <string>
+
+#include <db_cxx.h>
+
+#include <ace/Task.h>
+
+using std::cout;
+using std::endl;
+using std::cerr;
+using std::stringstream;
+using std::string;
+
+#define NUM_RECS 5
+
+void errCallback (const DbEnv *env, const char *prefix, const char *errMsg)
+{
+ ACE_DEBUG((LM_ERROR, "(%t) %s errMsg: %s\n", prefix, errMsg));
+}
+
+class CDSWorker: public ACE_Task_Base
+{
+ public:
+ CDSWorker(DbEnv *env, Db *db, bool reader, int id);
+ virtual int svc();
+
+ private:
+ void runReader();
+ void runWriter();
+
+ DbEnv *env_;
+ Db *db_;
+ bool reader_;
+ int id_;
+};
+
+CDSWorker::CDSWorker(DbEnv *env, Db *db, bool reader, int id)
+ :env_(env), db_(db), reader_(reader), id_(id)
+{}
+
+int CDSWorker::svc()
+{
+ try
+ {
+ if(reader_)
+ runReader();
+ else
+ runWriter();
+ }
+ catch(...)
+ {
+ ACE_DEBUG((LM_ERROR, "CDSWorker: exception caught\n"));
+ }
+}
+
+void CDSWorker::runReader()
+{
+ Dbc *dbcur;
+ db_->cursor(NULL, &dbcur, 0);
+ Dbt key;
+ Dbt value;
+
+ while (dbcur->get(&key, &value, DB_NEXT) == 0)
+ {
+ ACE_DEBUG((LM_ERROR, "Reader %d: key \"%s\" data \"%s\"\n",
+ id_, (char*)(key.get_data()),
+ (char*)(value.get_data()) ));
+ ACE_OS::sleep(1);
+ }
+ dbcur->close();
+}
+
+void CDSWorker::runWriter()
+{
+ for(int i=0; i < NUM_RECS; i++)
+ {
+ stringstream keyss;
+ keyss << id_;
+ keyss << "_";
+ keyss << i;
+ string keystr = keyss.str();
+
+ stringstream valss;
+ valss << id_;
+ valss << "_";
+ valss << i*i;
+ string valstr = valss.str();
+
+ Dbt key((void*)(keystr.c_str()), keystr.size() + 1);
</