## Project stucture

In [None]:
# Project structure
├── MANIFEST.in
├── Makefile
├── Pipfile
├── docker
│   ├── jupyter
│   │   └── Dockerfile
│   └── postgis
│       ├── Dockerfile
│       └── init.sql
├── docker-compose.yml
├── init_pg.sh
├── postgis
│   └── data
├── pytest.ini
├── set_env.sh
├── setup.py
├── src
│   ├── tutorial
│   │   ├── __init__.py
│   │   └── grpc
│   │       ├── __init__.py
│   │       └── geodatas
│   │           ├── __init__.py
│   │           [...]
└── tests
    ├── __init__.py
    [...]

## Infra - Docker-compose (W.I.P.)

In [None]:
# %load ../../src/tutorial/grpc/geodatas/search_server.py
version: '2'
services:
    db:
      build: ./docker/postgis/
      hostname: postgis
      volumes:
        - ./postgis:/var/lib/postgresql
      ports:
        - "0.0.0.0:${TUTORIAL_GRPC_PG_PORT}:5432"
      environment:
        - POSTGRES_USER:'docker'
        [...]

    graphite:
      image: hopsoft/graphite-statsd
      hostname: graphite
      ports:
        - "0.0.0.0:80:80"
        [...]

    consul:
      image: consul
      container_name: "consul"
      hostname: "consul"
      network_mode: "host" # aveux d'échec (temporaire) ;-)
      command: ["agent", "-server", "-ui", "-bootstrap", "-bind=127.0.0.1"]

## gRPC - Remote Procedure Calls

- Open Source (from Google) - Apache 2.0
- Framework RPC
- HTTP/2 
    - Focus on performances
- Interface Protocol Langage --> Protocol Buffer aka Protobuf
    - Multi-langages: C++, Python, Go, ...
    - Binary Serialization/Deserialization

## gRPC - Communication modes

<img src="./datas/landing-2.svg">

- Unary RPCs: 1-to-1 
- Streaming:
    - Server side
    - Client side
    - Bi-directionnal

### gRPC - Messages definition

In [None]:
# %load ./src/tutorial/grpc/geodatas/proto/search.proto
syntax = "proto3";

message SearchRequest {
  string query = 1;
  float lat = 4;
  float lng = 5;
  int32 page_number = 2;
  int32 result_per_page = 3;
}

message SearchResponses {
    repeated SearchResponse responses = 1;
}

message SearchResponse {
    string response = 1;
}

message MonitorResponse {
    int32 n_things = 1;
}

### gRPC - Services definition

In [None]:
# %load ./src/tutorial/grpc/geodatas/proto/search.proto
syntax = "proto3";

import "google/protobuf/empty.proto";

[...]

service Search {
    rpc monitor(google.protobuf.Empty) returns (MonitorResponse) {}
    rpc search(SearchRequest) returns (SearchResponses) {}
    rpc search_thing(SearchRequest) returns (SearchResponses) {}
}

### gRPC - Client and server code generation

In [None]:
# %load ./setup.py
class BuildPackageProtos(setuptools.Command):
    """Command to generate project *_pb2.py modules from proto files."""

    description = 'build grpc protobuf modules'
    user_options = []

    def initialize_options(self):
        pass

    def finalize_options(self):
        pass

    def run(self):
        if GRPC_INSTALLED:
            command.build_package_protos(self.distribution.package_dir[''])
        else:
            raise ModuleNotFoundError("grpcio-tools is needed in order to generate the proto classes")

### gRPC - Client and server code generation

In [None]:
# %load ./setup.py
class BuildPyCommand(_build_py):
    """Custom build command."""

    def run(self):
        self.run_command('build_proto_modules')
        _build_py.run(self)
        
setup(
    name='tutorial-grpc-geodatas',
    [...]
    install_requires=[
        "grpcio"
    ],
    cmdclass={
        'build_proto_modules': BuildPackageProtos
    },
    [...]
)

### gRPC - Client and server code generation

In [None]:
# %load ./Makefile
protos:
	@python setup.py build_proto_modules

# Search Server

In [None]:
w./src/tutorial/grpc/geodatas
├── __init__.py
├── common
│   ├── __init__.py
│   └── base.py <----------- (*)
├── models
│   ├── __init__.py
│   ├── ogrgeojson.py <----- (*)
│   └── thing.py <---------- (*)
├── proto
│   ├── __init__.py
│   ├── search.proto >>-------┐
│   ├── search_pb2.py <---- ((*))
│   └── search_pb2_grpc.py  ((*))
├── rpc
│   ├── __init__.py
│   └── search_servicer.py
├── search_client.py 
├── search_server.py <------ (*)
└── tools
    ├── __init__.py
    ├── geometry.py <------- (*)
    ├── rpc_stub.py <------- (*)
    └── service_discovery.py


- (*)   Files focus
- ((*)) Generated files

In [None]:
../../src/tutorial/grpc/geodatas
├── __init__.py
├── common
│   ├── __init__.py
│   └── base.py              (*) - SQLAchemy: engine, Base, session
├── models
│   ├── __init__.py
│   ├── ogrgeojson.py        (*) - SQLAchemy: reflective table
│   └── thing.py             (*) - SQLAchemy: table creation
├── proto
│   ├── __init__.py
│   ├── search.proto
│   ├── search_pb2.py       ((*)) - gRPC: Messages definitions (generated from `search.proto`)
│   └── search_pb2_grpc.py  ((*)) - gRPC: Services definitions (generated from `search.proto`)
├── rpc
│   ├── __init__.py
│   └── search_servicer.py
├── search_client.py 
├── search_server.py         (*) - Application: Search Server
└── tools
    ├── __init__.py
    ├── geometry.py          (*) - GeoAlchemy/Shapely: helper for Position (lat, lng, srid) object
    ├── rpc_stub.py          (*) - gRPC/Consul: helpers for stub generation
    └── service_discovery.py

## Application - Search Server

### Signals handler

In [None]:
# %load ../../src/tutorial/grpc/geodatas/search_server.py
import signal

SIGNALS = [signal.SIGINT, signal.SIGTERM]


def _signal_handler(_sig, _stack):
    """ Empty signal handler used to override python default one """
    pass


def serve(block=True, port: int = random.randint(50000, 59000)):
    [...]
    # Register signal handler, only if blocking
    if block:
        for sig in SIGNALS:
            signal.signal(sig, _signal_handler)
    [...]
    # Wait for a signal before exiting
    sig = signal.sigwait(SIGNALS)

### Search Server: gRPC --> instanciation, port attribution, RPC, ...

In [None]:
# %load ../../src/tutorial/grpc/geodatas/search_server.py
from concurrent import futures
import grpc
from tutorial.grpc.geodatas.rpc.search_servicer import SearchServicer
from tutorial.grpc.geodatas.proto import search_pb2_grpc


def serve(block=True, port: int = random.randint(50000, 59000)):
    [...]
    max_number_of_clients = 10
    # Creates Server with which RPCs can be serviced
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
                         maximum_concurrent_rpcs=max_number_of_clients)
    # Register Rpc Handlers with `SearchServicer` server
    search_pb2_grpc.add_SearchServicer_to_server(SearchServicer(), server)

    # Opens an insecure port for accepting RPCs.
    grpc_host_and_port = f'[::]:{port}'
    insecure_port = server.add_insecure_port(grpc_host_and_port)
    if insecure_port == 0:
        logger.error(f"Failed to start gRPC server on {insecure_port}")
        raise ConnectionError
    [...]

### Search Server: Consul [un]registration + Start Search Server (RPCs)

In [None]:
# %load ../../src/tutorial/grpc/geodatas/search_server.py
from tutorial.grpc.geodatas.tools.service_discovery import consuled


def serve(block=True, port: int = random.randint(50000, 59000)):
    [...]
    with consuled(insecure_port):
        logger.info(f"Starting server on port {insecure_port}...")
        server.start()
        logger.info("Ready and waiting for connections.")

        if not block:
            return server, insecure_port

        # Wait for a signal before exiting
        [...]

        server.stop(5).wait()

## Search Servicer: Implementation --> Business Logic

In [None]:
# %load ../../src/tutorial/grpc/geodatas/rpc/search_servicer.py
class SearchServicer(search_pb2_grpc.SearchServicer):
    stat = statsd.StatsClient('localhost', 8125)

    @stat.timer("search")
    def search(self, request: search_pb2.SearchRequest, _context) -> search_pb2.SearchResponses:
        [...]
        # responses = _search_thing(request.lat, request.lng)
        responses = _search_commune(request.lat, request.lng)

        return search_pb2.SearchResponses(responses=responses)

    @stat.timer("monitor")
    def monitor(self, _request, _context):
        [...]

### Business Logic: Search "commune"

In [None]:
# %load ../../src/tutorial/grpc/geodatas/rpc/search_servicer.py
from sqlalchemy.orm.query import Query
from tutorial.grpc.geodatas.common.base import session_factory
from tutorial.grpc.geodatas.models.ogrgeojson import OGRGeoJSON
from tutorial.grpc.geodatas.proto import search_pb2, search_pb2_grpc
from tutorial.grpc.geodatas.tools.geometry import Position


def _query_to_responses(query: Query,
                        func_response: Callable[[Query], str]) -> List[search_pb2.SearchResponse]:
    return [search_pb2.SearchResponse(response=func_response(rec))
            for rec in query]


def _search_commune(lat: float, lng: float, srid: int = 4326) -> List[search_pb2.SearchResponse]:
    session = session_factory() # SQLAlchemy
    query = session.query(OGRGeoJSON).filter(
        func.ST_Contains(OGRGeoJSON.wkb_geometry,
                         Position(lat, lng, srid).to_wkbelement))
    return _query_to_responses(query,
                               lambda rec: "{} - {} - {}".format(rec.insee, rec.nom, rec.wikipedia))

### Business Logic: Search Thing

In [None]:
# %load ../../src/tutorial/grpc/geodatas/rpc/search_servicer.py
from tutorial.grpc.geodatas.models.thing import Thing


def _search_thing(lat: float, lng: float, srid: int = -1) -> List[search_pb2.SearchResponse]:
    session = session_factory()
    query = session.query(Thing).filter(
        func.ST_Contains(Thing.geom,
                         Position(lat, lng, srid).to_wktelement))
    return _query_to_responses(query, lambda rec: rec.name)

## Execution logs of Search Server

In [None]:
$ python search_server.py
[...] SQLAlchemy
2019-02-24 16:30:32,138 - __main__ - INFO - Search service, version=0.1.0
2019-02-24 16:30:32,145 - tutorial.grpc.geodatas.tools.service_discovery - INFO - register started
2019-02-24 16:30:32,145 - root - DEBUG - consul_host=127.0.0.1
2019-02-24 16:30:32,184 - tutorial.grpc.geodatas.tools.service_discovery - INFO - services: {'search-service-50361': {'ID': 'search-service-50361', 'Service': 'search-service', 'Tags': [], 'Meta': {}, 'Port': 50361, 'Address': '127.0.0.1', 'Weights': {'Passing': 1, 'Warning': 1}, 'EnableTagOverride': False}, 'search-service-53261': {'ID': 'search-service-53261', 'Service': 'search-service', 'Tags': [], 'Meta': {}, 'Port': 53261, 'Address': '127.0.0.1', 'Weights': {'Passing': 1, 'Warning': 1}, 'EnableTagOverride': False}, 'search-service-54816': {'ID': 'search-service-54816', 'Service': 'search-service', 'Tags': [], 'Meta': {}, 'Port': 54816, 'Address': '127.0.0.1', 'Weights': {'Passing': 1, 'Warning': 1}, 'EnableTagOverride': False}, 'search-service-55823': {'ID': 'search-service-55823', 'Service': 'search-service', 'Tags': [], 'Meta': {}, 'Port': 55823, 'Address': '127.0.0.1', 'Weights': {'Passing': 1, 'Warning': 1}, 'EnableTagOverride': False}, 'search-service-56867': {'ID': 'search-service-56867', 'Service': 'search-service', 'Tags': [], 'Meta': {}, 'Port': 56867, 'Address': '127.0.0.1', 'Weights': {'Passing': 1, 'Warning': 1}, 'EnableTagOverride': False}}
2019-02-24 16:30:32,185 - __main__ - INFO - Starting server on port 53261...
2019-02-24 16:30:32,188 - __main__ - INFO - Ready and waiting for connections.
[...] Receiving a search request from a client
2019-02-24 16:31:00,992 - tutorial.grpc.geodatas.rpc.search_servicer - INFO - search request: query: "search_client"
result_per_page: 10
lat: 4.849999904632568
lng: 45.75
2019-02-24 16:31:01,008 - tutorial.grpc.geodatas.rpc.search_servicer - INFO - search responses: [response: "69123 - Lyon - fr:Lyon"]
[...] Shutdown (search) server
2019-02-24 16:31:31,481 - __main__ - INFO - Signal 2 received, shutting down...
2019-02-24 16:31:31,482 - tutorial.grpc.geodatas.tools.service_discovery - INFO - unregister started
2019-02-24 16:31:31,510 - tutorial.grpc.geodatas.tools.service_discovery - INFO - services: {'search-service-50361': {'ID': 'search-service-50361', 'Service': 'search-service', 'Tags': [], 'Meta': {}, 'Port': 50361, 'Address': '127.0.0.1', 'Weights': {'Passing': 1, 'Warning': 1}, 'EnableTagOverride': False}, 'search-service-54816': {'ID': 'search-service-54816', 'Service': 'search-service', 'Tags': [], 'Meta': {}, 'Port': 54816, 'Address': '127.0.0.1', 'Weights': {'Passing': 1, 'Warning': 1}, 'EnableTagOverride': False}, 'search-service-55823': {'ID': 'search-service-55823', 'Service': 'search-service', 'Tags': [], 'Meta': {}, 'Port': 55823, 'Address': '127.0.0.1', 'Weights': {'Passing': 1, 'Warning': 1}, 'EnableTagOverride': False}, 'search-service-56867': {'ID': 'search-service-56867', 'Service': 'search-service', 'Tags': [], 'Meta': {}, 'Port': 56867, 'Address': '127.0.0.1', 'Weights': {'Passing': 1, 'Warning': 1}, 'EnableTagOverride': False}}

## Unit tests with Pytest

./tests/
├── __init__.py
├── conftest.py
├── test_search_client.py
└── test_search_server.py

### Pytest Fixtures

In [None]:
# %load ./tests/conftest.py
import grpc
import pytest
from tutorial.grpc.geodatas.search_server import serve
from tutorial.grpc.geodatas.proto import search_pb2_grpc


@pytest.fixture(scope="session", autouse=True)
def start_core_rpc_server(request):
    """Spawn an instance of the rpc service, and close it at the end of test sessions.
    """
    server, insecure_port = serve(block=False)
    assert server is not None
    
    pytest.insecure_port = insecure_port

    def _kill_server():
        server.stop(0)

    request.addfinalizer(_kill_server)


@pytest.fixture
def core_rpc_stub():
    """Create a new rpc stub and connect to the server"""
    channel = grpc.insecure_channel(f'localhost:{pytest.__dict__.get("insecure_port")}')
    stub = search_pb2_grpc.SearchStub(channel)
    return stub

## Unit tests on Search Server

In [None]:
# %load ./tests/test_search_server.py
from tutorial.grpc.geodatas.models.thing import Thing
from tutorial.grpc.geodatas.proto import search_pb2


def test_search_server(core_rpc_stub, add_to_session):
    thing_name = "test_squared_thing"
    test_thing = Thing(name=thing_name, geom='POLYGON((0 0,1 0,1 1,0 1,0 0))')
    add_to_session(test_thing)

    def _filter_responses(_responses):
        return list(filter(lambda r: r.response == thing_name, _responses))
    
    req = search_pb2.SearchRequest(query="search_client", lat=0.5, lng=0.5, result_per_page=10)
    resp = core_rpc_stub.search_thing(req)
    responses = _filter_responses(resp.responses)
    assert len(responses) == 1
    assert responses[0].response == thing_name

    req = search_pb2.SearchRequest(query="search_client", lat=-1.0, lng=0.0, result_per_page=10)
    resp = core_rpc_stub.search_thing(req)
    responses = _filter_responses(resp.responses)
    assert responses == []