# Provide geometries from CARTO with geostore IDs
This script reads a CARTO table provides each feature with a geostore ID and writes back the table to CARTO.
## Tables to read:
- [River basins - level 3](https://resourcewatch.carto.com/u/wri-rw/tables/wat_068_rw0_watersheds_edit/public?redirected=true) `SELECT * FROM "wri-rw".wat_068_rw0_watersheds_edit WHERE level = 3`

- [Country geometries - coastal](https://resourcewatch.carto.com/u/wri-rw/dataset/gadm36_0) `SELECT * FROM "wri-rw".gadm36_0 WHERE coastal = True`

- [EEZ geometries](https://resourcewatch.carto.com/u/wri-rw/tables/com_011_1_maritime_boundaries_territorial_waters/public?redirected=true): `com_011_1_maritime_boundaries_territorial_waters`  WRI still working out a few discrepancies with the country iso codes used in the two datasets. We want to ensure consistency between the code columns so a country geometry can be matched with its EEZ geometry. We plan to make some edits to the gid_0 and iso_ter1 fields by next week (May31st week), but the underlying geometries should remain the same.

## Methodology
1. Create a geojson from the CARTO table in new row
2. Get ID for geojson from Geostore in new row
3. Publish to CARTO

## Results
Populated columns within each table: `geostore_prod` and `geostore_staging`

## Issues found:
* Some of the geometries will require sanitization as there is a mix between Polygons, Multipolygons and GeometryCollections.
* Geostore size limits for complex Polygons is 16mb, which is very small for a geometry that cover a full country.
* API inconsistency and random errors 503/504
* Carto 429: too many requests at the same time


In [1]:
import os
import sys
from dotenv import load_dotenv
import json
import geojson
import logging
import math
from functools import partial 
import asyncio
import nest_asyncio
from concurrent.futures import ProcessPoolExecutor
from decimal import Decimal
from typing import List, Tuple
import requests
import pandas as pd

load_dotenv("../.env")
logger = logging.getLogger('Script')
logger.setLevel(logging.INFO)
nest_asyncio.apply()

## Global Vars

In [2]:
token = os.environ.get("RW_CARTO_KEY")
account = os.environ.get("RW_CARTO_ACCOUNT")
MAX_WORKERS = 20
SLEEP_TIME = 5

## Functions

In [4]:
def postGeostore(geojson, cartodb_id, env):
    '''
    Register valid geojson to the geostore service. Return the geostore id.
    '''
    serverUrl = {
    'prod': 'https://api.resourcewatch.org/v1/geostore',
    'staging': 'https://staging-api.globalforestwatch.org/v1/geostore'
    }
    try:
        header = {
                'Content-Type':'application/json'
                }
        body = {
                'geojson': json.loads(geojson)
                }
        
        response = requests.post(serverUrl[env], headers=header, json=body)
        response.raise_for_status()
        
        return {
                'geostore_id': response.json().get('data',{}).get('id',''),
                'cartodb_id': cartodb_id,
                'metadata': None
               }
    
    except requests.exceptions.HTTPError as e:
        logger.error(f'Api fetch error in {env}!')
        logger.error(e)
        logger.error(response.text)
        logger.error(response.url)
        status_code = e.response.status_code
        return {
                'cartodb_id': cartodb_id, 
                'geostore_id': None,
                'metadata': {
                    'errror': e,
                    'url': response.url,
                    'text': response.text,
                }
               }
            
    except Exception as e:
        logger.error('is the api!')
        logger.error(e)
        return {
                'cartodb_id': cartodb_id, 
                'geostore_id': None,
                'metadata': {
                    'errror': e,
                }
               }

def get(payload: dict)-> list:
    '''
    
    '''
    cartoUrl = f'https://{account}.carto.com/api/v2/sql'
    try:
        response = requests.get(cartoUrl, params = payload)
        response.raise_for_status()
        logger.info(f'sucess get from page {str(payload.get("page", ""))}')
        return response.json().get('rows',[])        
    except Exception as e:
        logger.error('carto fetch error for call:')
        logger.error(payload)
        logger.error(e)
        return []

def getCount(q: str)-> int:
    '''
    '''
    
    payload = {
        'api_key': token,
        'q': f'select count(*) from ({q}) st'    
    }
    
    result = get(payload)
    
    return result[0]['count'] if len(result) > 0 else None


def composeCallsTask(payload, table_name):
    '''
    
    '''
    partialData = get(payload)
    if len(partialData) > 0:
        partialGeostoreIdsProd = pd.DataFrame.from_dict(registerInGeostore(partialData, 'prod'), orient='columns')
        partialGeostoreIdsStaging = pd.DataFrame.from_dict(registerInGeostore(partialData, 'staging'), orient='columns')

        return updateCarto(partialGeostoreIdsProd, partialGeostoreIdsStaging, table_name)
    return {}


def cartoWorker(q: str, table_name: str):
    '''
    Carto worker that orchestrate and execute
    '''
    #
    paginationSize = 20
     
    nRows = getCount(q)
    nPages = math.ceil(nRows/paginationSize)

    eventLoop = asyncio.get_event_loop()
    executor = ProcessPoolExecutor(max_workers=MAX_WORKERS)
    tasks = [eventLoop.run_in_executor(executor, partial(composeCallsTask, payload = {
                                                                        'api_key': token,
                                                                        'sort_order': 'asc',
                                                                        'q': q,
                                                                        'rows_per_page': paginationSize,
                                                                        'sort_order': 'asc',
                                                                        'page': i,
                                                                        'order_by': 'cartodb_id'
                                                                        },
                                                             table_name = table_name 
                                                         )
                                       ) for i in range(0, nPages) ]
    data=[]
    data = asyncio.gather(*tasks)
    return eventLoop.run_until_complete(data)
        

def registerInGeostore(data, env = 'prod'):
    '''
    There is a different approach that will be passing the table and the carto account and a id filter to generate the geostore
    '''
    dataResult = []
    for row in data:
        result = postGeostore(row.get('geometry'), 
                              row.get('cartodb_id'), 
                              env)
        
        dataResult.append(result)
    
    return dataResult

def updateCarto(produnctionList, stagingList, table_name):
    '''
    
    '''
    data = pd.merge(produnctionList, stagingList, on='cartodb_id', suffixes=['_prod','_staging'])
    values = [f"({str(row['cartodb_id'])}, '{str(row['geostore_id_prod'] or '')}', '{str(row['geostore_id_staging'] or 'null')}')" for row in data.to_dict('records') 
              if (row['geostore_id_prod'] is not None 
              or row['geostore_id_staging'] is not None)]
    
    updateQuery = f'''update {table_name} set geostore_prod=tmp.geostore_prod, geostore_staging=tmp.geostore_staging 
        from (values {', '.join(values)}) as tmp (cartodb_id, geostore_prod, geostore_staging) 
        where {table_name}.cartodb_id=tmp.cartodb_id
        '''
    logger.info(updateQuery)
    try:
        payload = {
        'api_key': token,
        'q': updateQuery   
        }
        
        
        cartoUpdateResponse = get(payload)
        
        return {
            'data': data,
            'sqlUpdate': updateQuery,
            'updateResponse': cartoUpdateResponse
        }
    
    except Exception as e:
        logger.error('is updating carto!')
        logger.error(e)
        return {}
    
def getGeometriesFromCarto(table_name):
    '''
    
    '''
    dataQuery = f'''
    select cartodb_id, 
        CASE
           WHEN GeometryType(the_geom) like 'GeometryCollection' THEN st_asgeojson(st_makevalid(ST_Simplify(ST_CollectionExtract(the_geom, 3), 0.1, true)))
           Else st_asgeojson(st_makevalid(ST_Simplify(ST_RemoveRepeatedPoints(the_geom, 0.1), 0.5, true)))
        END  as geometry
    from {table_name}
    where geostore_prod is null
    '''
    
    dataCarto = cartoWorker(dataQuery, table_name)
    
    return dataCarto

## Processing

In [5]:
getGeometriesFromCarto('gadm36_0')

Api fetch error in prod!
404 Client Error: Not Found for url: https://api.resourcewatch.org/v1/geostore
{"errors":[{"status":404,"detail":"Unknown geometry type: GeometryCollection"}]}
https://api.resourcewatch.org/v1/geostore
Api fetch error in prod!
404 Client Error: Not Found for url: https://api.resourcewatch.org/v1/geostore
{"errors":[{"status":404,"detail":"Unknown geometry type: GeometryCollection"}]}
https://api.resourcewatch.org/v1/geostore
Api fetch error in prod!
404 Client Error: Not Found for url: https://api.resourcewatch.org/v1/geostore
{"errors":[{"status":404,"detail":"Unknown geometry type: GeometryCollection"}]}
https://api.resourcewatch.org/v1/geostore
Api fetch error in prod!
404 Client Error: Not Found for url: https://api.resourcewatch.org/v1/geostore
{"errors":[{"status":404,"detail":"Unknown geometry type: GeometryCollection"}]}
https://api.resourcewatch.org/v1/geostore
Api fetch error in prod!
404 Client Error: Not Found for url: https://api.resourcewatch.org/

[{'data':     cartodb_id                  geostore_id_prod  \
  0            9                              None   
  1           12                              None   
  2           15                              None   
  3           26                              None   
  4           33                              None   
  5           40                              None   
  6           43                              None   
  7           44                              None   
  8           54                              None   
  9           63                              None   
  10          66                              None   
  11          67                              None   
  12          70                              None   
  13          71                              None   
  14          73                              None   
  15          74                              None   
  16          75                              None   
  17          76    

In [None]:
#getGeometriesFromCarto('wat_068_rw0_watersheds_edit')

In [99]:
getGeometriesFromCarto('com_011_1_maritime_boundaries_territorial_waters')

Api fetch error in prod!
504 Server Error: Gateway Timeout for url: https://api.resourcewatch.org/v1/geostore
{"message": "Network error communicating with endpoint"}
https://api.resourcewatch.org/v1/geostore
Api fetch error in prod!
504 Server Error: Gateway Timeout for url: https://api.resourcewatch.org/v1/geostore
{"message": "Network error communicating with endpoint"}
Api fetch error in prod!
https://api.resourcewatch.org/v1/geostore
504 Server Error: Gateway Timeout for url: https://api.resourcewatch.org/v1/geostore
{"message": "Network error communicating with endpoint"}
https://api.resourcewatch.org/v1/geostore
Api fetch error in prod!
504 Server Error: Gateway Timeout for url: https://api.resourcewatch.org/v1/geostore
{"message": "Network error communicating with endpoint"}
https://api.resourcewatch.org/v1/geostore
Api fetch error in prod!
503 Server Error: first byte timeout for url: https://api.resourcewatch.org/v1/geostore

<?xml version="1.0" encoding="utf-8"?>
<!DOCTYPE h

[{'data':     cartodb_id                  geostore_id_prod  \
  0           10                              None   
  1           13  311b2a5f2215e65a9a2dfce8ae3bedff   
  2           22                              None   
  3           28                              None   
  4           35  dd7fc8debb2aa0c603a2f9b43fd5275f   
  5           42  1bb1ceac337d437643650d0fbbcda432   
  6           47  ebdcfd61205c799e86578d8d47c78e12   
  7           55  b3aafd74cac599064facc56b57991e35   
  8           69  96c94c63cf3d86fe4fc1a6111f8d6101   
  9           79  768d90bdb5c2f0dc09593edb1b35c981   
  10          82  cd12392830423fd34d7d6f10314fe667   
  11         120  f2a3c9817b15a434f2760fdb2977aa4c   
  12         122  fe279bdcbb0432a535bf35626b9230c3   
  13         125  ea159af912f26bf8c31d7399b435fbbb   
  14         130                              None   
  15         145                              None   
  16         151                              None   
  17         154  0d

# Old methodology using cartoframes

### Accessing the tables
#### Countries geometry

In [None]:
countries_df = cf.io.carto.read_carto('SELECT * FROM "wri-rw".gadm36_0 WHERE coastal = True', credentials=creds)
countries_df.head()

#### Watersheds

In [None]:
watersheds_df = cf.io.carto.read_carto('SELECT * FROM "wri-rw".wat_068_rw0_watersheds_edit WHERE level = 3', credentials=creds)
watersheds_df.head()

#### EEZ 

In [None]:
eez_table = 'com_011_1_maritime_boundaries_territorial_waters'
eez_df = cf.io.carto.read_carto(eez_table, credentials=creds)

eez_df.head()

### Creating geojson and getting geostore id

#### Countries geometries

In [None]:
countries_g_id_dict = {}
countries_geojson_dict = {}
for index, row in countries_df.iterrows():
    g = shpToGeojson(row.the_geom)
    g_id = registerGeostore(g)
    if re.search('^Error ', g_id)!=None: #if there is an error with geostore
        countries_geojson_dict[row.cartodb_id] = g
    else: 
        countries_g_id_dict[row.cartodb_id] = g_id

In [None]:
Api fetch error in prod!
Api fetch error in prod!
Api fetch error in prod!print(len(countries_g_id_dict))
print(len(countries_geojson_dict))
len(countries_g_id_dict)+len(countries_geojson_dict)

In [None]:
countries_df.shape

In [None]:
# save dicts locally
with open('countries_g_id_dict.json', 'w') as fp:
    json.dump(countries_g_id_dict, fp)
with open('countries_geojson_dict.json', 'w') as fp:
    json.dump(countries_geojson_dict, fp)

In [None]:
countries_df.to_csv(f'./countries_df_v20210528.csv')

#### Watersheds

In [None]:
watershed_g_id_dict = {}
watershed_geojson_dict = {}
for index, row in watersheds_df.iterrows():
    g = shpToGeojson(row.the_geom)
    g_id = registerGeostore(g)
    if re.search('^Error ', g_id)!=None: #if there is an error with geostore
        watershed_geojson_dict[row.cartodb_id] = g
    else: 
        watershed_g_id_dict[row.cartodb_id] = g_id

In [None]:
print(len(watershed_g_id_dict))
print(len(watershed_geojson_dict))
len(watershed_g_id_dict)+len(watershed_geojson_dict)

In [None]:
watersheds_df.shape

In [None]:
# save dicts locally
with open('watershed_g_id_dict.json', 'w') as fp:
    json.dump(watershed_g_id_dict, fp)


In [None]:
# transform dictionary to table with two columns and merge
watershed_id_df = pd.DataFrame.from_dict(watershed_g_id_dict, orient = 'index', columns = ["geo_id"])
watershed_id_df.reset_index(inplace=True)
watershed_id_df = watershed_id_df.rename(columns = {'index':'cartodb_id'})
watershed_id_df.merge(watersheds_df, left_on='cartodb_id', right_on='cartodb_id')
watershed_id_df.head()

In [None]:
#save locally
watershed_id_df.to_csv(f'./watersheds_df_id_v20210528.csv')

In [None]:
watershed_id_df = pd.read_csv(f'./watersheds_df_id_v20210528.csv')

In [None]:
watershed_id_df.head()

#### EEZ

In [None]:
eez_g_id_dict = {}
eez_geojson_dict = {}
for index, row in eez_df.iterrows():
    g = shpToGeojson(row.the_geom)
    g_id = registerGeostore(g)
    if re.search('^Error ', g_id)!=None: #if there is an error with geostore
        eez_geojson_dict[row.cartodb_id] = g
    else: 
        eez_g_id_dict[row.cartodb_id] = g_id

In [None]:
eez_df.shape

In [None]:
print(len(eez_g_id_dict))
print(len(eez_geojson_dict))
len(eez_g_id_dict)+len(eez_geojson_dict)

In [None]:
# save dicts locally
with open('eez_g_id_dict.json', 'w') as fp:
    json.dump(eez_g_id_dict, fp)
with open('eez_geojson_dict.json', 'w') as fp:
    json.dump(eez_geojson_dict, fp)

In [None]:
## save locally
eez_df.to_csv(f'./{eez_table}_v20210528.csv')

## check failed registration, those elements in geojson_dict
- for countries
- for eez

### EEZ
Process to validate the geometry. Use PostGIS and `ST_MakeValid` ([documentation](https://postgis.net/docs/ST_MakeValid.html)). The geometries are returned as hex and transformed into wkb. The valid geometry can be sent to geostore and added to the dictionary that contains the cartodb_id and the geostore id. 

In [None]:
with open('eez_geojson_dict.json') as json_file:
    eez_geojson_dict = json.load(json_file)

In [None]:
', '.join(eez_geojson_dict.keys())

In [None]:
eez_subset = cf.io.carto.read_carto(f'SELECT the_geom AS before_geom, ST_MakeValid(the_geom) AS after_geom FROM com_011_1_maritime_boundaries_territorial_waters WHERE cartodb_id in ({", ".join(eez_geojson_dict.keys())})', 
                                    credentials=creds)

In [None]:
#eez_subset.to_csv("valid_eez_subset.csv")
eez_subset = gpd.read_file('valid_eez_subset.csv')

In [None]:
eez_subset

In [None]:
eez_df.info()

In [None]:
type(eez_df.the_geom[0])

In [None]:
eez_subset['before_geom']

In [None]:
for g in eez_subset.index:
    try: 
        initial_string2 = eez_subset.loc[[g]].after_geom        
        initial_byte2 = bytes(bytearray.fromhex(initial_string2.values[0]))
        geom_shape = shapely.wkb.loads(initial_byte2)
        if geom_shape.is_valid:
            print("is valid")
            gjson = shpToGeojson(geom_shape)
            print("geojson")           
            g_id = registerGeostore(gjson)
            print(g_id)

            
    except:
        print(len(initial_string2.values))

In [None]:
for g in eez_subset.index:
    try: 
        initial_string2 = eez_subset.loc[[g]].after_geom        
        initial_byte2 = bytes(bytearray.fromhex(initial_string2.values[0]))
        geom_shape = shapely.wkb.loads(initial_byte2)
        if geom_shape.is_valid:
            print("is valid")
            gjson = shpToGeojson(geom_shape)
            print("geojson")           
            g_id = registerGeostore(gjson)
            print(g_id)
            time.sleep(10)
    except:
        print(len(initial_string2.values))

In [None]:
time.sleep(10)

In [None]:
xx = gpd.GeoSeries([geom_shape]).to_json()

In [None]:
body = xx
header = {
    'Content-Type':'application/json'
                }
r = requests.post('http://api.resourcewatch.org/v1/geostore', headers=header, json=body)


In [None]:
r

In [None]:
g_id = registerGeostore(gjson)

In [None]:
for g in eez_geojson_dict:
    rr = eez_df.loc[eez_df['cartodb_id']== int(g)]
    
    #s = rr.the_geom
    print(rr.sovereign1)
    #s2 = s.buffer(0)
    #print(s2.is_valid)
    #g.is_valid
    #break

In [None]:
len(eez_geojson_dict)

In [None]:
## check failed registration, geostore_id  == ''
## s.is_valid, s.make_valid / buffer(0)

### Countries

In [None]:
with open('countries_geojson_dict.json') as json_file:
    countries_geojson_dict = json.load(json_file)

In [None]:
', '.join(countries_geojson_dict.keys())

In [None]:
countries_subset = cf.io.carto.read_carto(f'SELECT * FROM "wri-rw".gadm36_0 WHERE cartodb_id in ({", ".join(countries_geojson_dict.keys())})', 
                                    credentials=creds)

In [None]:
countries_subset.head()

In [None]:
for g in countries_geojson_dict:
    rr = countries_subset.loc[countries_subset['cartodb_id']==int(g)]   
    s = rr.the_geom
    if s.is_valid:
        print(rr)
    else:
        print(g)
    #g.is_valid
    #break

In [None]:
## check failed registration, geostore_id  == ''
## s.is_valid, s.make_valid / buffer(0)

## write carto table

### Watersheds

In [None]:
watershed_id_df = pd.read_csv(f'./watersheds_df_id_v20210528.csv')

In [None]:
watershed_id_df.head()

In [None]:
watershed_id_df.rename(columns = {"geo_id":"geostore_prod"}, inplace=True)

In [None]:
watershed_id_df.head()

#### Using requests code

In [None]:
sql = 'SELECT * FROM wat_068_rw0_watersheds_edit WHERE level = 3'

In [None]:
def get_query_carto(sql, creds=None, account='wri-rw'):
    urlCarto = f"https://{account}.carto.com/api/v2/sql"
    params = {"q": sql}
    if creds: params["api_key"] = creds
    r = requests.get(urlCarto, params=params)
    if r.status_code==200:
        return r.json().get('rows', None)
    return f'Error {r.status_code}'

In [None]:
%%time
watersheds_df = get_query_carto(sql=sql)

In [None]:
print(watersheds_df[0].keys())

In [None]:
sql_alter = '''
    ALTER TABLE wat_068_rw0_watersheds_edit
    ADD COLUMN geostore_prod VARCHAR,
    ADD COLUMN geostore_staging VARCHAR;
'''

In [None]:
%%time
get_query_carto(sql_alter, creds=creds.api_key)

In [None]:
watershed_update_list = watershed_id_df[["cartodb_id", "geostore_prod"]].to_dict(orient = "records")
print(watershed_update_list[0])

In [None]:
%%time
records = []
for d in watershed_update_list:
    geo_id = d['geostore_prod']
    cartodb_id = d['cartodb_id']
    update_sql = f"""
        UPDATE wat_068_rw0_watersheds_edit 
           SET geostore_prod = '{geo_id}'
         WHERE level = 3 
           AND cartodb_id = {cartodb_id}
        """
    response = get_query_carto(update_sql, creds=creds.api_key)
    records += [{
        **d,
        'response':response
    }]

In [None]:
len([r for r in records if r.get("response", None)])

#### Using cartoframes

In [None]:
# cf.io.carto.to_carto(df, <tablename>, if_exists='replace', credentials=creds)
# cf.update_privacy_table(<tablename>, privacy='public', credentials=creds)
# api sql by CARTO, alter table: two new fields (geostore_prod, and geostore_staging), 
# https://carto.com/developers/sql-api/reference/#operation/postSQLStatement
# update row
# bulk update: insert table and then update table using cartodb_id

#### Countries

In [None]:
# cf.io.carto.to_carto(df, <tablename>, if_exists='replace', credentials=creds)
# cf.update_privacy_table(<tablename>, privacy='public', credentials=creds)

#### EEZ

In [None]:
# cf.io.carto.to_carto(df, <tablename>, if_exists='replace', credentials=creds)
# cf.update_privacy_table(<tablename>, privacy='public', credentials=creds)

In [None]:
#simplify, reduce number of the points or reduce the precision of the points (decimals in the lat long). 