# Restore shard data to an existing database 

The `influxd restore` command cannot restore data to an existing database. The recommended restore procedure is to restore incremental backups (or a shard) to a temporaty database and use an InfluxQL query to copy data from one database to another. 

The recommended query usually take a long time to run, depending on the size of the database to copy, and sometimes it fails with the [ERR: partial write: field type conflict](https://community.influxdata.com/t/help-with-err-partial-write-field-type-conflict-error/25531) error.

This notebook outlines a procedure using the `aioinflux` async Python client to copy data from one database to another to get around of the problems above. This procedure runs one async query per measurement and is much faster than the single query approach. For a database with ~1000 measurements we have observed data rates a thousand times faster. 


## InfluxDB instance

Run this notebook on the same cluster where InfluxDB runs. We connect to the InfluxDB internal cluster address to bypass  the ingress, otherwhise the connection is killed after 60s given our current cluster configuration. This avoids the `ClientPayloadError: Response payload is not completed` error.

The InfluxDB instance also needs to be configure with `max-concurrent-queries = 0` and `query-timeout = "0s"` to avoid queries being killed.

In [None]:
HOST = "sasquatch-influxdb.sasquatch"

Look for the InfluxDB admin credentials in the SQuaRE 1Password.

In [None]:
import getpass
username = "admin"
password = getpass.getpass(prompt='Password for user `{}`: '.format(username))

## Source database and retention policy

In [None]:
src_db = "<source db with the incremental retore data>"
src_rp = "<source db retention policy name>"

## Instantiate the aioinflux client

In [None]:
from aioinflux import InfluxDBClient
import asyncio

The default aiohttp client timeout is only 300s, if you have queries that run longer thant that you also neeed to set a different timeout here.

In [None]:
client = InfluxDBClient(host=HOST, port='8086', path='/', db=src_db, username=username, password=password, timeout=86400, output='dataframe')
client.url

## Retrieve the list of measurements from the source database

In [None]:
measurements = await client.show_measurements()
measurements.head()

## Copy measurements to the destination database
Using a test database so we can verify that both databases have the same number of points for each measurement to validate this procedure. 

In [None]:
dest_db = "<destination db to where you want to restore the data>"
dest_rp = "<destination db retention policy name>"

The `GROUP BY *` is required to preserve tags.

In [None]:
async def copy(measurement): 
    q = f'''SELECT * INTO "{dest_db}"."{dest_rp}"."{measurement}" FROM "{src_db}"."{src_rp}"."{measurement}" GROUP BY * '''
    result = await client.query(q)
    

We have a large number of measurements to copy over. Batch size can be used to control the number of async queries executed in each iteration, it gives some control on the InfluxDB load. The time to process each batch is given by the slowest query.

In [None]:
batch = []
BATCH_SIZE = 300
for count, measurement in enumerate(measurements['name']):
    batch.append(measurement)
    if count % BATCH_SIZE == 0:       
        coroutines = [copy(m) for m in batch]
        await asyncio.gather(*coroutines)
        print("Processing a new batch...")
        batch = []

## Verify copy 
To validade this procedure we restored one shard to an empty database and compared the number of points on each measurement between the source and destination databases. Need a better test if the copy is done to an existing database.

In [None]:
for measurement in measurements['name']:
    src_count = await client.query(f'''SELECT COUNT(*) FROM "{src_db}"."{src_rp}"."{measurement}"  ''') 
    dest_count = await client.query(f'''SELECT COUNT(*) FROM "{dest_db}"."{dest_rp}"."{measurement}"  ''') 
    assert dest_count.equals(src_count)
                               