In [None]:
%load_ext autoreload
%autoreload 2
# default_exp indexers.indexer

In [None]:
# export
from integrators.data.schema import *
from integrators.pod.client import PodClient, DEFAULT_POD_ADDRESS
from integrators.imports import *

In [None]:
# hide
from nbdev.showdoc import *

# Indexer

In [None]:
# export
POD_FULL_ADDRESS_ENV    = 'POD_FULL_ADDRESS'
RUN_UID_ENV             = 'RUN_UID'
POD_SERVICE_PAYLOAD_ENV = 'POD_SERVICE_PAYLOAD'
DATABASE_KEY_ENV        = 'databaseKey'
OWNER_KEY_ENV           = 'ownerKey'


class IndexerBase(Indexer):

    def __init__(self, indexerClass=None, *args, **kwargs):
        if indexerClass is None: indexerClass=self.__class__.__name__
        super().__init__(indexerClass=indexerClass, *args, **kwargs)    
    
    def populate(self, api, updated_items, new_nodes, edges=False):
        for item in new_nodes:
            item.update(api, edges=False)
        if edges:
            for item in new_nodes:
                item.update_edges(api)

        for item in updated_items:
            item.update(api)
    
class IndexerData():
    def __init__(self, **kwargs):
        for k, v in kwargs.items():
            self.__setattr__(k, v)
        
    def __repr__(self):
        return f"IndexerData \n{self.__dict__}"
        
            
def get_indexer_run_data(client, indexer_run):
    if indexer_run.targetDataType is None:
        print("No targetDataType defined")
        return None

    # get all items with the specified type
    else:
        return client.search_by_fields({"_type": indexer_run.targetDataType})
    
def test_registration(integrator):
    """Check whether an integrator is registred. Registration is necessary to be able to load the right indexer
    when retrieving it from the database."""
    import integrators.integrator_registry as registry
    assert integrator.__name__ in dir(registry)

# Running your own indexer

When we run an indexer we have four steps. 1) Get the indexer and indexer run based on the run uid. 2) run the indexer 3) populate the graph with the new information. To mock that, first we create a client and add some toy data.

In [None]:
from integrators.indexers.geo.geo_indexer import GeoIndexer

client = PodClient()

def create_toy_dataset(client):
    location = Location.from_data(latitude=-37.81, longitude=144.96)
    address = Address.from_data()
    indexer = Indexer.from_data(indexerClass="GeoIndexer", name="GeoIndexer")
    indexer_run = IndexerRun.from_data(progress=0, targetDataType="Address")

    for x in [location, address, indexer, indexer_run]: client.create(x)
        
    edge_success = client.create_edge(Edge(indexer_run, indexer, "indexer"))
    edge_success2 = client.create_edge(Edge(location, address, "location"))

    assert edge_success and edge_success2
    
    return indexer, indexer_run, location, address
    
indexer, indexer_run, location, address = create_toy_dataset(client)

Before we can move on, we need to make sure that our indexer is registred. This hold for any integrator that we create.

> Important: Note that before running an indexer, it needs to be registered. We can do this by importing the file in `integrators.indexer_registry.py`.

In [None]:
test_registration(GeoIndexer)

Now we start with the setting we would normally have: some memri client makes a call to the pod to execute an indexer run. Lets start by getting the indexer and the indexer run.

In [None]:
indexer_run = client.get(indexer_run.uid)
indexer = indexer_run.indexer[0]

Next, we retrieve the data, which was specified in the client by the `targetDataType`.

In [None]:
data = indexer.get_data(client, indexer_run)
data

1 items found to index


IndexerData 
{'items_with_location': [Address (#2)]}

In [None]:
updated_items, new_items = indexer.index(data, indexer_run, client)

indexing 1 items
Loading formatted geocoded file...
creating IndexerRun (#4)
('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))


In [None]:
indexer.populate(client, updated_items, new_items)

creating Country (#None)
updating Address (#2)


In [None]:
client.delete_all()

# Running the full Indexer pipeline

In [None]:
# hide
# export
def run_indexer(indexer_run, client):
    indexer = indexer_run.indexer[0]
    data = indexer.get_data(client, indexer_run)
    updated_items, new_items = indexer.index(data, indexer_run, client)
    indexer.populate(client, updated_items, new_items)

def run_integrator_from_run_uid(run_uid, client):
    run = client.get(run_uid)
    
    if isinstance(run, IndexerRun):
        run_indexer(run, client)
    else:
        raise NotImplementedError(f"Cannot execute item of type {run}")

In [None]:
# export

def run_integrator(environ=None, pod_full_address=None, integrator_run_uid=None, database_key=None, owner_key=None, verbose=False):
    """Runs an integrator, you can either provide the run settings as parameters to this function (for local testing)
    or via environment variables (this is how the pod communicates with integrators)."""
    params = [pod_full_address, integrator_run_uid, database_key, owner_key]
    
    if all([p is None for p in params]):
        try:
            print("Reading run parameters from environment variables")
            pod_full_address    = environ.get(POD_FULL_ADDRESS_ENV, DEFAULT_POD_ADDRESS)
            integrator_run_uid  = int(environ[RUN_UID_ENV])
            pod_service_payload = json.loads(environ[POD_SERVICE_PAYLOAD_ENV])
            
            database_key = pod_service_payload[DATABASE_KEY_ENV]
            owner_key    = pod_service_payload[OWNER_KEY_ENV]
            
        except KeyError as e:
            print(f"Environmentvariable {e} not found, exiting")
            return
    else:
        assert not (None in params), f"Defined some params to run indexer, but not all. Missing {[p for p in params if p is None]}"
    if verbose:
        for name, val in [("pod_full_address", pod_full_address), ("integrator_run_uid", integrator_run_uid),
                  ("database_key", database_key), ("owner_key", owner_key)]:
            print(f"{name}={val}")
        
    client = PodClient(url=pod_full_address, database_key=database_key, owner_key=owner_key)
    run_integrator_from_run_uid(integrator_run_uid, client)
    

In [None]:
show_doc(run_integrator)

<h4 id="run_integrator" class="doc_header"><code>run_integrator</code><a href="__main__.py#L3" class="source_link" style="float:right">[source]</a></h4>

> <code>run_integrator</code>(**`environ`**=*`None`*, **`pod_full_address`**=*`None`*, **`integrator_run_uid`**=*`None`*, **`database_key`**=*`None`*, **`owner_key`**=*`None`*, **`verbose`**=*`False`*)

Runs an integrator, you can either provide the run settings as parameters to this function (for local testing)
or via environment variables (this is how the pod communicates with integrators).

Running an indexer by providing parameters as variables

In [None]:
indexer, indexer_run, location, address = create_toy_dataset(client)
run_integrator(pod_full_address=DEFAULT_POD_ADDRESS, integrator_run_uid=indexer_run.uid,
               database_key=client.database_key, owner_key=client.owner_key)
client.delete_all()

1 items found to index
indexing 1 items
updating IndexerRun (#9)
creating Country (#None)
updating Address (#7)


Running an indexer by providing environment variables

In [None]:
indexer, indexer_run, location, address = create_toy_dataset(client)

payload = json.dumps({DATABASE_KEY_ENV: client.database_key,
               OWNER_KEY_ENV: client.owner_key})
              
env = {POD_FULL_ADDRESS_ENV: DEFAULT_POD_ADDRESS,
       RUN_UID_ENV: indexer_run.uid,
       POD_SERVICE_PAYLOAD_ENV: payload}
run_integrator(environ=env)
client.delete_all()

Reading run parameters from environment variables
1 items found to index
indexing 1 items
updating IndexerRun (#14)
creating Country (#None)
updating Address (#12)


In [None]:
client.delete_all()

# Export -

In [None]:
# hide
from nbdev.export import *
notebook2script()

Converted basic.ipynb.
Converted index.ipynb.
Converted indexers.GeoIndexer.ipynb.
Converted indexers.NoteListIndexer.NoteList.ipynb.
Converted indexers.NoteListIndexer.Parser.ipynb.
Converted indexers.NoteListIndexer.ipynb.
Converted indexers.NoteListIndexer.util.ipynb.
Converted indexers.indexer.ipynb.
Converted itembase.ipynb.
Converted pod.client.ipynb.
