### Unbalanced import

#### Context

The import was started with 16 threads, whereas the underlying strean has 24 partitions:

    > import.sh -o import -t 16 -l import/statements_live-us-east -r us-east -b / -a -w 200000 -bulk 

As a results, the number of consumer threads drops of 8 threads when some of the partitions are fully consumed then the numnber of threads regularly decreases until reaching 2 with a low throughput.

Looking at the offsets:

    > stream.sh lag -k --verbose -l import/statements_live-us-east
    ## Log: Name{id='import-statements_live-us-east', urn='import/statements_live-us-east'} partitions: 24
    ### Group: Name{id='StreamImporter-runDocumentConsumersEx', urn='StreamImporter/runDocumentConsumersEx'}
| partition | lag | pos | end | posOffset |?endOffset?|
| --- | ---: | ---: | ---: | ---: | ---: |
|All|134614904|410913682|545528586|13325484|22937322|
|0|9497160|13440162|22937322|13440162|22937322|
|1|9478898|13401838|22880736|13401838|22880736|
|2|9460836|13447068|22907904|13447068|22907904|
|3|9480060|13419108|22899168|13419108|22899168|
|4|9494178|13436388|22930566|13436388|22930566|
|5|9545021|13383589|22928610|13383589|22928610|
|6|9418620|13426782|22845402|13426782|22845402|
|7|9522299|13385569|22907868|13385569|22907868|
|8|9239384|13390192|22629576|13390192|22629576|
|9|9324698|13333324|22658022|13333324|22658022|
|10|9275058|13387290|22662348|13387290|22662348|
|11|9314988|13325484|22640472|13325484|22640472|
|12|5352228|17275308|22627536|17275308|22627536|
|13|5406157|17226965|22633122|17226965|22633122|
|14|5364081|17273745|22637826|17273745|22637826|
|15|5441238|17230662|22671900|17230662|22671900|
|16|0|22613778|22613778|22613778|22613778|
|17|0|22633500|22633500|22633500|22633500|
|18|0|22629768|22629768|22629768|22629768|
|19|0|22640196|22640196|22640196|22640196|
|20|0|22680138|22680138|22680138|22680138|
|21|0|22658220|22658220|22658220|22658220|
|22|0|22667322|22667322|22667322|22667322|
|23|0|22607286|22607286|22607286|22607286|

So clearly:

 - 8 threads went to the end
 - then the problems started

Tried to stop/restart, but the same phenomena did happen:

<img src="../monitoring/us-east-import-glitch.png"/>


The assignent of a source streams to a worker thread is done via:

     protected List<List<LogPartition>> getDefaultAssignments() {
        Map<String, Integer> streams = Collections.singletonMap(logName,
                manager.getAppender(Name.ofUrn(logName)).size());
        return KafkaUtils.roundRobinAssignments(getNbThreads(), streams);
     }

If we have 16 consumer threads but 24 partitions in kafka, some of the threads will be associated with more than 1 partition.

We we know that we have 8 partitions that are completed:so, any thread reading from this log is likely to be stuck until a timeout.
As a result, because each of the 8 "finished" streams will be read by more than 1 threads, we end up blocking most of the threads.

The second run, done with the proper threads parameter (-t 24), should the allow to finish the import just fine,
But it does not seem to be the case.

<img src="../monitoring/import-thread-dispatch.png"/>

It seems that only 16 threads are created and it quickly drops to 2.


#### Trying to solve the problem

Tried to restart the importer again changing some timeout parameters:

    > import.sh -o import -t 24 -l import/statements_live-us-east -json {\"waitMessageTimeoutSeconds\":600\,\"batchThresholdS\":20\,\"retryDelayS\":10} -r us-east -b / -a -w 400000 -bulk 

It does slow down the decrease of threads, but the problem remains,

In the logs we can see a lot of rebalances:
    
    Rollback current batch because of consumer rebalancing
    
    Incomplete rebalance during poll, raising exception, revoked: true, lost: false

Looking at datadog the number of threads decreases regularly and the throughput also goes down:

<img src="../monitoring/import-us-east-threads.png"/>


#### Understanding the problem

When the importer is started each thread is associated with exactly one partition

    - t0: p0
    - t1: p1
    ..
    - t23:p23


Then, because partition 16 to 23 are finished, the corresponding threads are blocked and after a "wait msg timeout", there will be a re-balance.

    - t0: p0, p1
    - t1: p2, p3
    ...
    - t7: p14, p15
    - t8: p16, p17
    - t15: p22, p23


Then threads 8 and 15 will be stuck too and after a timeout a new rebalance will happen:

    - t0: p0, p1, p2
    - t1: p3, p4, p5
    ...
    - t5: p15, p16, p17
    ...
    - t7: p21, p22, p23

Then threads 6 and 7 are stuck too ... and so on until we end up with:

    - t0: p0, ... p11 
    - t1: p12, .. p23

#### Solving the problem

The first solution is obviously to avoid creating the problem but not doing a mistake when chosing the number of threads.

However:
 
  - mistakes can happen
  - having unbalanced partition is probably a use case

We can try to solve this by changing the Kafka allocation system:

    partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor

With that, we can expect the rebalancer threads to look like:

    - t0: p0, p16
    - t1: p1, p17
    ...
    - t7: p7, p23
    - t8: p8
    ...
    - t15: p15

New status 

| partition | lag | pos | end | posOffset |?endOffset?|
| --- | ---: | ---: | ---: | ---: | ---: |
|All|15851445|529677141|545528586|19422325|22937322|
|0|3457412|19479910|22937322|19479910|22937322|
|1|3458411|19422325|22880736|19422325|22880736|
|2|374833|22533071|22907904|22533071|22907904|
|3|428322|22470846|22899168|22470846|22899168|
|4|396413|22534153|22930566|22534153|22930566|
|5|545269|22383341|22928610|22383341|22928610|
|6|364440|22480962|22845402|22480962|22845402|
|7|445777|22462091|22907868|22462091|22907868|
|8|84672|22544904|22629576|22544904|22629576|
|9|223146|22434876|22658022|22434876|22658022|
|10|3000009|19662339|22662348|19662339|22662348|
|11|3072729|19567743|22640472|19567743|22640472|
|12|0|22627536|22627536|22627536|22627536|
|13|6|22633116|22633122|22633116|22633122|
|14|6|22637820|22637826|22637820|22637826|
|15|0|22671900|22671900|22671900|22671900|
|16|0|22613778|22613778|22613778|22613778|
|17|0|22633500|22633500|22633500|22633500|
|18|0|22629768|22629768|22629768|22629768|
|19|0|22640196|22640196|22640196|22640196|
|20|0|22680138|22680138|22680138|22680138|
|21|0|22658220|22658220|22658220|22658220|
|22|0|22667322|22667322|22667322|22667322|
|23|0|22607286|22607286|22607286|22607286|


Last status after all the threads are dead

| partition | lag | pos | end | posOffset |?endOffset?|
| --- | ---: | ---: | ---: | ---: | ---: |
|All|9762445|535766141|545528586|20225311|22937322|
|0|2578263|20359059|22937322|20359059|22937322|
|1|2655425|20225311|22880736|20225311|22880736|
|2|0|22907904|22907904|22907904|22907904|
|3|0|22899168|22899168|22899168|22899168|
|4|20|22930546|22930566|22930546|22930566|
|5|0|22928610|22928610|22928610|22928610|
|6|19|22845383|22845402|22845383|22845402|
|7|97|22907771|22907868|22907771|22907868|
|8|0|22629576|22629576|22629576|22629576|
|9|0|22658022|22658022|22658022|22658022|
|10|2269399|20392949|22662348|20392949|22662348|
|11|2259210|20381262|22640472|20381262|22640472|
|12|0|22627536|22627536|22627536|22627536|
|13|6|22633116|22633122|22633116|22633122|
|14|6|22637820|22637826|22637820|22637826|
|15|0|22671900|22671900|22671900|22671900|
|16|0|22613778|22613778|22613778|22613778|
|17|0|22633500|22633500|22633500|22633500|
|18|0|22629768|22629768|22629768|22629768|
|19|0|22640196|22640196|22640196|22640196|
|20|0|22680138|22680138|22680138|22680138|
|21|0|22658220|22658220|22658220|22658220|
|22|0|22667322|22667322|22667322|22667322|
|23|0|22607286|22607286|22607286|22607286|