Skip to content

Commit

Permalink
move migration to mongod
Browse files Browse the repository at this point in the history
  • Loading branch information
wentingwang committed Apr 6, 2014
1 parent cf47250 commit 5883b18
Show file tree
Hide file tree
Showing 3 changed files with 385 additions and 125 deletions.
108 changes: 38 additions & 70 deletions src/mongo/s/commands_admin.cpp
Expand Up @@ -1659,59 +1659,6 @@ namespace mongo {
BSONObj subObj = sub.done();
BSONObj range = b.done().getOwned();
return range;
}
BSONObj getFetchedDataAsBSON(BSONObj range, long long count)
{

BSONObjBuilder b;
b.append("count",count);

vector<BSONObj> ranges;
ranges.push_back(range);
b.append("ranges",ranges);

BSONObj fetchedData = b.done().getOwned();
cout << "[WWT] block=" <<fetchedData.toString()<<endl;
return fetchedData;
}
void getMinMaxAsBSON(BSONObj range, BSONObj proposedKey, BSONObj& min, BSONObj& max)

{
const char *key = proposedKey.firstElement().fieldName();
BSONObj sub = range[key].Obj();
BSONElement maxElem = sub["$lt"];
BSONElement minElem = sub["$gte"];
cout<<"[WWT] get sub="<<sub.toString()<<endl;
cout<<"[WWT] get min="<<minElem.toString()<<endl;
cout<<"[WWT] get max="<<maxElem.toString()<<endl;

if(maxElem.eoo() && minElem.eoo()){

max = ShardKeyPattern(proposedKey).globalMax();
min = ShardKeyPattern(proposedKey).globalMin();
}
else if (maxElem.eoo() && !minElem.eoo()){

max = ShardKeyPattern(proposedKey).globalMax();
BSONObjBuilder b;
b.appendAs(minElem, key);
min =b.obj();
}
else if (!maxElem.eoo() && minElem.eoo()){
min = ShardKeyPattern(proposedKey).globalMin();
BSONObjBuilder b;
b.appendAs(maxElem, key);
max =b.obj();
}
else{
BSONObjBuilder b1;
b1.appendAs(minElem, key);
min =b1.obj();
BSONObjBuilder b2;
b2.appendAs(maxElem, key);
max =b2.obj();
}

}

void migrateChunk(const string ns, BSONObj proposedKey, BSONObjSet splitPoints, int numChunk, int assignment[], vector<Shard> shards, string removedReplicas[],bool multithread,long long **datainkr)
Expand Down Expand Up @@ -1833,31 +1780,39 @@ namespace mongo {
}
*/

int threadsPerNode[numShards];
for (int i=0;i<numShards;i++){
threadsPerNode[i]=1;
}
if(multithread)
{
for (int i=0;i<numShards;i++){
threadsPerNode[i]=0;
}


int **threads;
threads = new int*[numChunk];
for (int i = 0; i < numChunk; i++)
int **threads;
threads = new int*[numChunk];
for (int i = 0; i < numChunk; i++)
threads[i] = new int[numShards];

for (int i = 0; i < numChunk; i++){
for (int i = 0; i < numChunk; i++){
for (int j = 0; j < numShards; j++){
threads[i][j]=1;
}
}
}

long long **unit;
unit = new long long*[numChunk];
for (int i = 0; i < numChunk; i++)
long long **unit;
unit = new long long*[numChunk];
for (int i = 0; i < numChunk; i++)
unit[i] = new long long[numShards];

for (int i = 0; i < numChunk; i++){
for (int i = 0; i < numChunk; i++){
for (int j = 0; j < numShards; j++){
unit[i][j]=datainkr[i][j];
}
}
}

for (int i = 0; i < numChunk; i++){
for (int i = 0; i < numChunk; i++){
//find the largest unit
int max = 0;
int max_i = 0;
Expand All @@ -1876,14 +1831,27 @@ namespace mongo {
threads[max_i][max_j]++;
unit[max_i][max_j] = datainkr[max_i][max_j]/threads[max_i][max_j];

}
cout<< "[WWT] threads matrix"<<endl;
for (int i = 0; i < numChunk; i++){
}



cout<< "[WWT] threads matrix"<<endl;
for (int i = 0; i < numChunk; i++){
for (int j = 0; j < numShards; j++){
cout<< threads[i][j]<<"\t";
if( j!=assignment[i] && datainkr[i][j]!=0){
threadsPerNode[assignment[i]] +=threads[i][j];
}
}
cout<<endl;
}
}
}//end of multithreads

cout<< "[WWT] threads per nodes:" <<endl;
for(int i=0;i< numShards; i++)
{
cout<< threadsPerNode[i]<<"\t";
}


vector<shared_ptr<boost::thread> > migrateThreads;
Expand Down Expand Up @@ -1921,7 +1889,7 @@ namespace mongo {
//create an object to encapsulate all the params
BSONObj paramObj = params.obj();
cout<<"[WWT] Migrate Command Parameters are "<< paramObj.toString()<<endl;
migrateThreads.push_back(shared_ptr<boost::thread>(new boost::thread (boost::bind(&ReShardCollectionCmd::singleMigrate, this, paramObj, ns, removedReplicas[i], 1))));
migrateThreads.push_back(shared_ptr<boost::thread>(new boost::thread (boost::bind(&ReShardCollectionCmd::singleMigrate, this, paramObj, ns, removedReplicas[i], threadsPerNode[i]))));
}

for (unsigned i = 0; i < migrateThreads.size(); i++) {
Expand Down

0 comments on commit 5883b18

Please sign in to comment.