# Data Synchronization

Sometimes you would like to “synchronize” data from TDP to other downstream systems, to bring data to existing analytical systems for your users (SDEs, Data Scientists, Data Analysts) are accustomed to working with.  This could be:

* Any new or updated data that’s ingested into TDP should be available in the downstream system
* A particular subset of data (but is generally “all data” or “all data from xyz inputs”)

Examples:

* Bring data from JSONs created by Tetra Data IDS into Snowflake or another OLAP database in order to do more performant exploratory data analysis / build transformed & aggregated SQL tables
* Copy data into a specific software

In v4.0+ of Tetra Data Platform, the capability also exists to “listen” or “subscribe” to data updates via [Event Subscriptions](https://developers.tetrascience.com/docs/event-subscriptions) . However, should your use case require a polling mechanism, the following implementation can provide a performant and scalable way to do this.

Note: This solution pattern does not cover changes in downstream systems being propagated back into Tetra Data Platform 

## Solution Overview

1. Set up a regular process to hit the searchEQL API to pull updated data since last “poll” with a query that is for your specific subset
    a. “Poll” Search API for updated data based on date / time
2. For each result in the query, retrieve the data with the /v1/datalake/retrieve endpoint
3. Process your data into your specific data target (will be a specific solution to your target)

## Import Statements

In [None]:
import os
import json
import requests

In [None]:
AUTH_FILE_DIR = "./"
AUTH_FILENAME = "auth.json"
API_SEARCHEQL = "https://api.tetrascience.com/v1/datalake/searchEql"

## 1. “Poll” Search API for updated data based on date / time

In order to synchronize data effectively, use a process that makes calls to the /v1/datalake/searchEql API endpoint on a regular cadence (e.g. every 4 hours) that pulls data which has been updated since then and then do an “upsert” or “merge” into the downstream system using the indexedAt field to query based on a particular date

All data from this search will have an indexedAt value that represents when the data was last updated.

If results of the query are expected to exceed 10,000 results, the solution will need to implement support for pagination.

The consuming program will need to maintain a mechanism to ensure that no data is “skipped” by looking back at date / times and processing for duplicates (which upsert / merge can handle) or keeping track of the “last date / time” polled. 

### Use authenication file for API headers

In [None]:
with open(os.path.join(AUTH_FILE_DIR, AUTH_FILENAME), "r") as f:
    auth_data = json.loads(f.read())

headers = {"ts-auth-token": auth_data["auth_token"],
           "x-org-slug": auth_data["org"],
           "Content-Type": "application/json"}

### Example query where IDS data contains project information

In [None]:
query_basic = {
                  "query": {
                      "match": {
                          "data.project.name": "my_project"
                      }
                  }
              }

In [None]:
example_searcheql_basic = requests.post(API_SEARCHEQL, headers=headers, data=json.dumps(query_basic))

### Example query with date range

* "size": Can be anything reasonable up to 10,000 and defines how many results we will get per query. A good upper bound for general use is 1,000.

* "query": The query itself, built out of the components

* "bool" (boolean query): To search for files that match boolean combinations of a set of queries.

* "must":  The clause (query) must appear in matching files. This is like logical AND. 

* "term": The exact term that's found in files, in this case the IDS JSON. For example, in this query, the query returns files that have the category "IDS" and the source.type "empower". Other fields can be added if needed.

* "range": This defines the dates over which to query the indexedAt field. The greater-than-or-equals time should always be the same as the less-than time of the previous execution of the job. The times are specified here in UTC. The query format supports time-zone information should it be necessary. The less-than time must always be in the past to avoid missing any files.

* "_source": This defines what fields you want returned from your query. Here only the fileId is returned. You can speed up query times by specifying only a few fields.

* "sort": This specifies how you would like the returned search results to be ordered. Here it is sorted in an ascending manner by the fileId.

In [None]:
query_empower = {
                   "size": 50,
                   "query": {
                       "bool": {
                           "must": [
                               {
                                   "term": { "source.type": "empower" }
                               },
                               {
                                   "term": { "category": "IDS" }
                               },
                               {
                                   "range": {
                                       "indexedAt": {
                                          "gte": "2023-03-01T00:00:00",
                                          "lt": "2023-03-20T00:00:00"
                                       }
                                   }
                               }
                           ]
                       }
                   },
                   "_source": [ "fileId" ],
                   "sort": [
                       {"fileId": "asc"}
                   ]
                }

In [None]:
example_searcheql_empower = requests.post(API_SEARCHEQL, headers=headers, data=json.dumps(query_empower))

### Example subsequent query

If you want to search the same set of files but return the files after the last query, add the "search_after" criteria of the fileId of the final result of the previous data set. Then it returns the next 50 items (as specified in the query) after the last search.

In [None]:
query_empower_next = {
                        "size": 50,
                        "query": {
                            "bool": {
                                "must": {
                                    "term": { "source.type": "empower" }
                                },
                                "must": {
                                    "term": { "category": "IDS" }
                                },
                                {
                                    "range": {
                                        "indexedAt": {
                                            "gte": "2023-03-01T00:00:00",
                                            "lt": "2023-03-20T00:00:00"
                                         }
                                     }
                                 }
                            }
                        },
                        "_source": [ "fileId" ],
                        "search_after": ["11ee50ef-6b0d-4266-863d-461d02f6f1b1"],
                        "sort": [
                            {"fileId": "asc"}
                        ]
                    }

### Example query returned items

This Empower API calls above (and other queries you construct) will return a response that contains objects representing “files” to process.  You will want to iterate over each record in the hits.hits[] array.

## 2. Retrieve the data with the /v1/datalake/retrieve endpoint

Using the object in the hits.hits[] array, use the source.source.fileId to retrieve the actual data from the https://api.tetrascience.com/v1/datalake/retrieve API endpoint.

See documentation: https://developers.tetrascience.com/reference/retrieve-a-file 

This will return either a:
* an application/octet-stream file contain the data record
* a JSON object with a S3 URL to download (if preSigned parameter was set to true in API request)

## 3. Access the file and then process / transform into the appropriate tables / schema that you desire.