# Repairing ElasticSearch

Basically we managed to delete the same disk on each of 12 nodes. This had the effect of killing a lot of primary shards and leaving the cluster in an unrecoverable state. This is the script i used to bring it back to green.

We've lost data - but I don't want to loose the whole index because of 1 lost primary shard. 

It basically uses the **_cat/shards** to work out unassigned primary and replica shards and **_cluster/reroute** to assign shards to data nodes. It works because of the **allocate_primary** flag in [**_cluster/reroute**](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-reroute.html)

```
The allow_primary parameter will force a new empty primary shard to be allocated without any data. If a node which has a copy of the original shard (including data) rejoins the cluster later on, that data will be deleted: the old shard copy will be replaced by the new live shard copy.
```
This allows me to replace the lost primary shard with and empty one, then reasigning the replicas copies the new empty primary.    

There is an extra piece that is needed, on a production cluster ElasticSearch will aggressively try to recover primary shards. You get to a point at which you cannot **reroute** primary shards because they are active.

## The process

- Wait for the number of allocated primary shards to stop increasing 
  ( **Warning**: reroute will result in data loss if you start too early )
- Disable primary shard recoveries
```
curl -XPUT localhost:9200/_cluster/settings -d '{
    "transient" : {
        "cluster.routing.allocation.node_initial_primaries_recoveries" : 0
    }
}'
```
- _custer/reroute missing primary shards with **allocate_primary** set to 1, these should now all be in "UNASSIGNED" state
- Optional: reassign replicas. Once the cluster is back in "yellow" these will proceed and be done automatically but it can be faster if you do them. In my case the nodes were repeatedly cycling through restarting shards but hitting FileNotFound exceptions. 
- Re-enable primary shard recoveries ( the default value is 4 )
```
curl -XPUT localhost:9200/_cluster/settings -d '{
    "transient" : {
        "cluster.routing.allocation.node_initial_primaries_recoveries" : 4
    }
}'
```

**NOTE: following script only tested with version 1.3.6 of ElasticSearch**

### Further reading 
 - [T37: same method but doesn't stop the primary recoveries](https://t37.net/how-to-fix-your-elasticsearch-cluster-stuck-in-initializing-shards-mode.html)
 - [ElasticSearch: Shard allocation settings](https://www.elastic.co/guide/en/elasticsearch/reference/current/shards-allocation.html)
 
### Shard allocation settings / rolling restart
 
Trying to resolve these issues I kept trying to disable shard allocation, as per the [rolling restart instructions](https://www.elastic.co/guide/en/elasticsearch/guide/current/_rolling_restarts.html). The problem is you cannot reroute if this setting is none.  

from string import Template
import elasticsearch
import pandas as pd
import io, json, random, re, sys, time

client = elasticsearch.Elasticsearch([
    {'host':'127.0.0.1:9200', 'port':9200, 'timeout'108000}
])

cat_client = elasticsearch.client.CatClient(client)
cst_client = elasticsearch.client.ClusterClient(client) 

def get_shards():
    global cat_client
    t_str = cat_client.shards(v=True,bytes='k')
    out = []
    for line in t_str.split('\n'):
        line = re.sub(r'[ \t]+', ',',line.strip())
        if line.find('UNASSIGNED') != -1:
            line = ','.join([line,',,,'])
        elif line.find('RELOCATING') != -1:
            continue
        out.append(line)
    t_str = "\n".join(out)
    shards = pd.DataFrame.from_csv( io.StringIO(t_str), index_col=[0,1] )
    shards.sort_values(by=['prirep','state'], inplace=True)
    shards.sort_index(inplace=True)
    return shards
shards = get_shards()
shards.head(10)

I get a list of nodes and drop either **master** or **none data** from the list. I also drop **DataNode20** as it looks to have high memory already. This creates a list of nodes to which shards can be routed.   

In [None]:
def get_nodes():
    t_str = cat_client.nodes(v=True)
    out = []
    for line in t_str.split('\n'):
        line = re.sub(r'[ \t]+', ',',line.strip())
        out.append(line)
    t_str = "\n".join(out)
    nodes = pd.DataFrame.from_csv( io.StringIO(t_str), index_col=[0] )
    ignore = nodes[(nodes.master=='*') | ( nodes['node.role']!='d')]
    ignore = ignore.append(nodes.loc['DataNode20'])
    data_nodes = nodes.drop(ignore.index)
    data_nodes.head(15)
    n_list =data_nodes.index.get_values().tolist()
    return n_list
n_list = get_nodes()
n_list

Setup clients, and set to start allocations from a random point in the list. 

*allocate_shards* will round robin the assignment from the available data nodes. 

In [None]:
global i_node
i_node = random.randint(0,len(n_list))

def allocate_shards( n_index, shard, is_primary ):
    global i_node
    global n_list
    global cst_client
    
    t_str = """
    {
        "commands":[
            { "allocate": { 
                  "index": "$index", 
                  "shard": $shard, 
                  "node": "$node", 
                  "allow_primary": $is_prim 
                }
            }
        ]
    }
    """
    tpl = Template(t_str)
    i_max = len(n_list)
    params = {
        'index': n_index,
        'shard': shard,
        'node': n_list[i_node%i_max],
        'is_prim': is_primary
    }
    i_node +=1 
    str = tpl.substitute(params)
    cmd = json.loads(str)
    s_cmd = json.dumps(cmd, indent=2)
    #print(s_cmd)
    cst_client.reroute(body=s_cmd)
    

*get_index_stats()* is basically only here to enable two tests:
 - Check we have the same number of documents before and after reroute
 - Check that we leave the index in *state=yellow*

In [None]:
def get_index_stats( t_index ):
    t_str = cat_client.indices(index=t_index, v=True)
    out = []
    for line in t_str.split('\n'):
        line = re.sub(r'[ \t]+', ',',line.strip())
        out.append(line)
    t_str = "\n".join(out)
    stats = pd.DataFrame.from_csv( io.StringIO(t_str), index_col=[1] )
    del stats['docs.deleted']
    del stats['pri']
    del stats['rep']
    return stats

The outer loop was to allow me to run hands off. I tests without the loop for 200 or so non critical indexes, the number of docs was always the same before and after, however on my runs I had not figured out that I had to switch off *primaries_recoveries* so I kept hitting Exceptions, I was confident that when it failed there was no data loss but it needed to run hands off over night. 

I suspect switching off *primaries_recoveries* and only assigning primaries in a single loop would complete very quickly. 

Some nice points with the approach below are:
 - It runs in batches
 - It backs off if it sees and error
 - It resets on success
 - It checks shard allocation every batch
 - If there is an error it will check node allocation
  
If assigning replicas beware I had two cases:
 - the primary was also missing: super fast after the primary is assigned, basically a copy of empty shard
 - the primary was not missing: copying 100GB files across the cluster
Hence the timeouts / backoffs, getting things to run hands off was tricky and the approach below could be much improved. 

In [None]:
max_tries = 8
tries = 0
while( tries != max_tries ):
    global n_list
    shards = get_shards()
    if shards.empty:
        tries = max_tries
        break
    try:
        u_shards = shards[(shards.prirep=='p') & (shards.state=='UNASSIGNED)']
        # The following will also reassign replicas
        # u_shards = shards[shards.state=='UNASSIGNED']

        to_reroute = pd.Series(u_shards.index.get_level_values(0).unique())
        to_reroute = to_reroute.head(3)

        t = 15
        dry_run = True
        for index in to_reroute.get_values():
            o_stats = get_index_stats(index)
            if o_stats.loc[index]['health'] == 'green':
                print("Nothing to do for: %s skipping \n --- \n"%(index))
                sys.stdout.flush()
                continue
            print("Repairing: %s\n"%(index))
            print(o_stats)
            a_shards = u_shards.loc[index,:]
            print("\ntodo")
            print(a_shards.head(10))
            sys.stdout.flush()
            print("\nAssigning shards")
            # Primary shards
            pu_shards = a_shards[(a_shards.prirep=='p') & (a_shards.state=='UNASSIGNED')]
            for shard in pu_shards.index.get_values():
                print(index, shard, 1)
                sys.stdout.flush()
                if dry_run==False:
                    allocate_shards(index, shard, 1)
                    time.sleep(t)
            # Replica shards
            ru_shards = a_shards[(a_shards.prirep=='r') & (a_shards.state=='UNASSIGNED')]
            for shard in ru_shards.index.get_values():
                print(index, shard, 0)
                sys.stdout.flush()
                if dry_run==False:
                    allocate_shards(index, shard, 0)
                    time.sleep(t)
            print("Waiting for %ds to ensure applied before testing"%(15))
            sys.stdout.flush()
            time.sleep(t)
            print("\n")
            n_stats = get_index_stats(index)
            print(n_stats)
            print(n_stats.loc[index]['health'])
            print("----")
            assert n_stats.loc[index]['docs.count'] == o_stats.loc[index]['docs.count'] 
            assert n_stats.loc[index]['health'] != 'red'
            tries=0
    except Exception:
        tries += 1
        n_list = get_nodes()
        if tries==max_tries:
            break
        else:
            time.sleep(tries*120)


    