Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StreamKM "Width" Parameter #97

Closed
richard-moulton opened this issue Jul 17, 2017 · 5 comments
Closed

StreamKM "Width" Parameter #97

richard-moulton opened this issue Jul 17, 2017 · 5 comments

Comments

@richard-moulton
Copy link
Contributor

Good afternoon,

I am trying to understand the source code for StreamKM++ with MOA. Simply put, as I understand there are two conflicting roles being played by the width parameter for StreamKM++. [All of the following is done using MOA's Clustering tab, with the default RBF stream generator, StreamKM++ as Algorithm 1, and Algorithm 2 cleared]

  1. The width parameter is defined in the help file as the "Size of Window for training learner." This is borne out in the source code, for example StreamKM.java's trainOnInstanceImpl method:
public void trainOnInstanceImpl(Instance inst) {
       
       ...
       
       manager.insertPoint(new Point(inst, this.numberInstances));    
       
       this.numberInstances++;
       if (this.numberInstances % widthOption.getValue() == 0) {
           
           //compute 5 clusterings of the coreset with kMeans++ and take the best
           ...
       }
    }

Clusterings are only computed every width instances and until width instances are processed the clusterings returned by the getClusteringResult() method have only NULL entries for the centresStreamingCoreset array. This means that no measurements can be calculated for StreamKM++'s performance. When I set width accordingly and run StreamKM++, however, I run into:

 java.lang.ArrayIndexOutOfBoundsException: 6
    at moa.clusterers.streamkm.BucketManager.insertPoint(BucketManager.java:92)
    at moa.clusterers.streamkm.StreamKM.trainOnInstanceImpl(StreamKM.java:71)
    ...
  1. This seems to be occurring because width is also used to calculate the number of buckets to use, as seen in BucketManager.java's constructor method:
public BucketManager(int n,int d,int maxsize, MTRandom random){
        this.clustererRandom = random;
        this.numberOfBuckets = (int) Math.ceil(Math.log((double)n/(double)maxsize) / Math.log(2) )+2;
        this.maxBucketsize = maxsize;
        this.buckets = new Bucket[this.numberOfBuckets];
        for(int i=0; i<this.numberOfBuckets; i++){
            this.buckets[i] = new Bucket(d,maxsize);
        }
        this.treeCoreset = new TreeCoreset();
        //System.out.printf("Created manager with %d buckets of dimension %d \n",this.numberOfBuckets,d);
    }

This constructor is called by StreamKM.java upon initialization in the trainOnInstanceImpl() method: the argument n is, in fact, width (via StreamKM.java's variable length). The problem this causes is that the BucketManager fails when all of its buckets are full: the ArrayIndexOutOfBoundsException is due to the BucketManager trying to move points from the last bucket to a non-existent "last bucket."

Although it would be most intuitive to me to "fix" this behaviour, it is consistent with the algorithm's description in Marcel R. Ackermann, Christiane Lammersen, Marcus Märtens, Christoph Raupach, Christian Sohler, Kamil Swierkot: StreamKM++: A Clustering Algorithms for Data Streams. ALENEX 2010: 173-187 (the paper cited in StreamKM.java's opening comments). The argument n is described therein as the size of the data stream. The paper also describes that coresets should be obtainable at any point during the data stream, something which is not the case at the moment.

My question is: am I missing something in the code or an assumption by the developers? Or does it make sense to modify StreamKM.java's getClusteringResult() method in order to provide proper clusterings as it appears was envisioned in the original paper?

Richard

@richard-moulton
Copy link
Contributor Author

I have also posted this question in the MOA development Google Group here.

@richard-moulton
Copy link
Contributor Author

I have resolved this issue in my fork of MOA as well as a couple of other issues that I discovered along the way.

In StreamKM.java:
-getClusteringResult() now obtains the current coreset and performs 5 k-means++ runs in order to return a non-empty clustering; and
-lloydPlusPlus() now calculates cluster radii.

In Point.java:
-toCluster() now takes the radius of the cluster to be passed as argument, instead of making all radii equal to one.

Created CoresetCostTriple.java:
-This is a simple data structure which allows StreamKM's lloydPlusPlus() to return the coreset centres, radii of the clusters and cost of the clustering. Previously the associated centres were never returned or stored globally.

I have also included a new FlagOption parameter which permits only the final clustering to be evaluated, as in the original code.

@richard-moulton
Copy link
Contributor Author

Pull request #100 was merged into MOA's master branch; issue is resolved.

@mustafaevam
Copy link

mustafaevam commented May 31, 2019

@abifet
@richard-moulton
I have came across the same bug, and got the following exception, while training streamKM model.

java.lang.ArrayIndexOutOfBoundsException: 6
       at moa.clusterers.streamkm.BucketManager.insertPoint(BucketManager.java:93)
       at moa.clusterers.streamkm.StreamKM.trainOnInstanceImpl(StreamKM.java:77)
       at com.evam.moakmeansplusplus.service.StreamKMService.train(StreamKMService.java:45)
       at com.evam.moakmeansplusplus.communication.TrainQueueProcessor.runOperation(TrainQueueProcessor.java:50)
       at com.evam.moakmeansplusplus.utils.EvamIntelligenceThread.run(EvamIntelligenceThread.java:31)

I have traced the source code, and found a bug in the following method:

/**
inserts a single point into the bucketmanager
**/
void insertPoint(Point p){
   
   //check if there is enough space in the first bucket
   int cursize = this.buckets[0].cursize; 
   if(cursize >= this.maxBucketsize) {
      //printf("Bucket 0 full \n");
      //start spillover process
      int curbucket  = 0;
      int nextbucket = 1;

      //check if the next bucket is empty
      if(this.buckets[nextbucket].cursize == 0){
         //copy the bucket  
         int i;
         for(i=0; i<this.maxBucketsize; i++){
            this.buckets[nextbucket].points[i] = this.buckets[curbucket].points[i].clone();
            //copyPointWithoutInit: we should not copy coordinates? 
         }
         //bucket is now full
         this.buckets[nextbucket].cursize = this.maxBucketsize;
         //first bucket is now empty
         this.buckets[curbucket].cursize = 0;
         cursize = 0;
      } else {
         //printf("Bucket %d full \n",nextbucket);
         //copy bucket to spillover and continue
         int i;
         for(i=0;i<this.maxBucketsize;i++){
            this.buckets[nextbucket].spillover[i] = this.buckets[curbucket].points[i].clone();
            //copyPointWithoutInit: we should not copy coordinates? 
         }
         this.buckets[0].cursize=0;
         cursize = 0;
         curbucket++;
         nextbucket++;
         /*
         as long as the next bucket is full output the coreset to the spillover of the next bucket
         */
         while(this.buckets[nextbucket].cursize == this.maxBucketsize){
            //printf("Bucket %d full \n",nextbucket);
            this.treeCoreset.unionTreeCoreset(this.maxBucketsize,this.maxBucketsize,
               this.maxBucketsize,p.dimension, 
               this.buckets[curbucket].points,this.buckets[curbucket].spillover,
               this.buckets[nextbucket].spillover, this.clustererRandom);
            //bucket now empty
            this.buckets[curbucket].cursize = 0;
            curbucket++;
            nextbucket++;
         }
         this.treeCoreset.unionTreeCoreset(this.maxBucketsize,this.maxBucketsize,
               this.maxBucketsize,p.dimension, 
               this.buckets[curbucket].points,this.buckets[curbucket].spillover,
               this.buckets[nextbucket].points, this.clustererRandom);
         this.buckets[curbucket].cursize = 0;
         this.buckets[nextbucket].cursize = this.maxBucketsize;
      }
   }
   //insert point into the first bucket
   this.buckets[0].points[cursize] = p.clone();
   //copyPointWithoutInit: we should not copy coordinates? 
   this.buckets[0].cursize++;
}

The exception is thrown at line :
while(this.buckets[nextbucket].cursize == this.maxBucketsize){

It means that BucketManager's buckets are full, and BucketManager is trying to move points from last bucket to non-existing next last bucket as Richard mentioned above.

I have debugged the source code line by line and got the following result:
To be more clear let's assume that my initial parameter for streamkm model is following:
n(lengthOption) = 10000
maxsize(sizeCoresetOption) = 1000

By using following formula BucketManager determines its number of bucket limit.
this.numberOfBuckets = (int) Math.ceil(Math.log((double)n/(double)maxsize) / Math.log(2) )+2;
With my n and maxsize values, bucket manager assigns 6 to numberOfBuckets variable, which means BucketManager has 6 buckets and each bucket can store 2000 data-points. 1000(maxsize) in points array and 1000(maxsize) in spillover array. In order to have better understanding, take a look Bucket class:

protected class Bucket {
   int cursize;
   Point[] points;
   Point[] spillover;
   
   public Bucket(int d, int maxsize){  // d is number of feature, in other word dimension
      this.cursize = 0;
      this.points = new Point[maxsize];
      this.spillover = new Point[maxsize];
      for(int i=0; i<maxsize; i++){
         this.points[i] = new Point(d);
         this.spillover[i] = new Point(d);
      }
   }
   
};

You can follow the code flow on the picture, I also put some notes on drawings.
After the initialization part, I have a BucketManager which has 6 buckets in it.
Each bucket has 2 arrays (points[1000] and spillover[1000]) and 1000+1000 capacity to store data-points.

Each iteration, the code insert new coming data-point to bucket[0].point array.
If the bucket[0].point array is full, then it moves bucket[0].point array to bucket[1].point array.
If bucket[1].point array is also full, then move it to bucket[1].spillover array.
When both bucket[1].points and bucket[1].spillover arrays are full, the code merges[1] these two arrays and moves it into bucket[2].points.
Same iteration goes on.

At the last bucket(buckets[5]), when both bucket[5].points and bucket[5].spillover are full, the program merge these to arrays and tries to move into bucket[6].points.
However, I have only 6 buckets and bucket[5] is the last one and it throws ArrayOutOfBoundException.
After getting exception, the code only has bucket[5].points array, and loses the bucket[5].spillover data-points.
It runs fine, but loses half of the data-points at last bucket, and last bucket's points array is never updated, holds the first 1000 data-points.
That means streamkm++ is not updating itself, partially. [This part is complicated, take a look at getCoresetFromManager method to have better understanding.]
WhatsApp Image 2019-05-31 at 12.15.46.jpeg

My solution is:
When the last bucket is full, I am going to copy the data-points into first bucket array which is bucket[0].points array.
It will move the last bucket data-points to first bucket and saves them.
So, exception and data losing problem is gone.

Now, I am implementing the code. I will let you know the process.
Any suggestion or correction would be wonderful.

[1] some merging algorithm. It gets 1000 data-points from points array and 1000 data-points from spillover array, and merges them. As an output, it gives another 1000 data-points array which has some kind of summary of 2000 data-points

@mustafaevam
Copy link

This issue should be opened again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants