# AGOL Endpoint to CKAN Datastore

## 1. Context

This snippet takes data from an AGOL endpoint, transforms it as needed for CKAN, and uploads it to a datastore resource

In [1]:
import ckanapi
import json 
import requests
from pathlib import Path
from datetime import datetime

## 2. Setup constants

- Map AGOL data types to CKAN datastore types
- Extraneous geo data fields: since in CKAN geo data is captured within the `geometry` field, other geo fields are duplicated and unnecessary. Those are removed to avoid confusion.

In [2]:
AGOL_CKAN_TYPE_MAP =  {
    "sqlTypeFloat": "float",
    "sqlTypeNVarchar": "text",
    "sqlTypeInteger": "int",
    "sqlTypeOther": "text",
    "sqlTypeTimestamp2": "timestamp",
    "sqlTypeDouble": "float",
    "esriFieldTypeString": "text",
    "esriFieldTypeDate": "timestamp",
    "esriFieldTypeInteger": "int",
    "esriFieldTypeOID": "int",
    "esriFieldTypeDouble": "float"
}

DELETE_FIELDS = [
    "longitude",
    "latitude",
    "shape__length",
    "shape__area",
    "lat",
    "long",
    "lon",
    "x",
    "y",
    "index_"
]

## 3. Initialize CKAN

In [4]:
ckan = ckanapi.RemoteCKAN(
    address="https://ckanadmin0.intra.dev-toronto.ca/",
    apikey=""
)

## 4. Capture datasets/resources to update

Set up as dictionary with the `package_id` as the key, and the value containing another dictionary with the keys:

- `query_url`: AGOL endpoint, can get it from an AGOL dataset page under the `API Explorer` tab. Usually ends in `FeatureServer/0`. **Note**: no querystring parameters, those are added later
- `resource`: dictionary with information of CKAN datastore resource to load (package_id, name, format, is_preview)

Will loop through this list to load the resources from AGOL

In [6]:
datasets = [
    {
        "package_name": "motor-vehicle-collisions-involving-killed-or-seriously-injured-persons",
        "resource_info": {
            "query_url": "https://services.arcgis.com/S9th0jAJ7bqgIRjw/arcgis/rest/services/KSI/FeatureServer/0",
            "resource": {
                "package_id": "motor-vehicle-collisions-involving-killed-or-seriously-injured-persons",
                "name": "Motor Vehicle Collisions with KSI Data",
                "format": "geojson",
                "is_preview": True,
            },
        },
    },
    {
        "package_name": "neighbourhood-crime-rates",
        "resource_info": {
            "query_url": "https://services.arcgis.com/S9th0jAJ7bqgIRjw/arcgis/rest/services/Neighbourhood_Crime_Rates_2020/FeatureServer/0",
            "resource": {
                "package_id": "neighbourhood-crime-rates",
                "name": "neighbourhood-crime-rates",
                "format": "geojson",
                "is_preview": True,
            },
        },
    },
]

## 4. Loop through dataset list and load AGOL resources

In [14]:
def convert_esri_fields_to_ckan(esri_fields):
    """
    Converts esri fields to ckan resource datastore field object, using the map of 
    AGOL data types to CKAN data types from Step #2
    
    [
        {
            "id": field_name,
            "type": ckan_field_data_type,
        },
        {
            "id": field_name2,
            "type": ckan_field_data_type,
        }
    ]
    """
    print(f"  converting esri fields to CKAN")

    
    ckan_fields = []
    
    for field in esri_fields:
        if field["name"].lower() not in DELETE_FIELDS:
            ckan_fields.append({
                "id": field["name"],
                "type": AGOL_CKAN_TYPE_MAP[field["type"]],
            })
        
    return ckan_fields


def esri_timestamp_to_datetime(ts):
    """
    convert esri timestamp (which is unix) to ISO format datetime string
    """
    return  datetime.fromtimestamp(ts/1000).isoformat()


def geojson_to_ckan_records(geojson_features, ckan_fields):
    """
    convert AGOL GeoJSON features to CKAN datastore record
    """
    print(f"  converting GeoJSON to CKAN records")

    rows = []
    datetime_fields = [ field["id"] for field in ckan_fields if field["type"] == "timestamp" ]
    
    for feature in geojson_features:
        row = {
            **feature["properties"],
            "geometry": json.dumps(feature["geometry"]),
        }
        
        # for every row
        for k,v in {**row}.items():
            # delete field if duplicate geo field (x, y, lat, long, etc.)
            if k.lower() in DELETE_FIELDS:
                del row[k]
            
            # delete "<null>" string field
            if isinstance(v, str) and v.lower() == "<null>":
                del row[k]
                
        # convert esri time data types to JSON standard (for ckan)
        for dtf in datetime_fields:
            row[dtf] = esri_timestamp_to_datetime(row[dtf])
        
        rows.append(row)

    return rows


def get_fields_from_agol(query_url):
    print(f"  getting fields from AGOL")

    query_string_params = {
        "where": "1=1",
        "outFields": "*",
        "outSR": 4326,
        "f": "json",
        "resultRecordCount": 1
    }
    
    params = "&".join([ f"{k}={v}" for k,v in query_string_params.items() ])
    url = "https://" + "/".join([ url_part for url_part in f"{query_url}/query?{params}".replace("https://", "").split("/") if url_part ])
    res = requests.get(url)
    assert res.status_code == 200, f"Status code response: {res.status_code}"
    
    return res.json()["fields"]


def get_features_from_agol(query_url):
    print(f"  getting features from AGOL")

    features = []
    overflow = True
    offset = 0
    query_string_params = {
        "where": "1=1",
        "outFields": "*",
        "outSR": 4326,
        "f": "geojson",
        "resultType": "standard",
        "resultOffset": offset,
    }
    
    while overflow is True:
        params = "&".join([ f"{k}={v}" for k,v in query_string_params.items() ])
        url = "https://" + "/".join([ url_part for url_part in f"{query_url}/query?{params}".replace("https://", "").split("/") if url_part ])
        res = requests.get(url)
        assert res.status_code == 200, f"Status code response: {res.status_code}"
        
        geojson = res.json()
        features.extend(geojson["features"])
        overflow = "properties" in geojson and "exceededTransferLimit" in geojson["properties"] and geojson["properties"]["exceededTransferLimit"] is True
        offset = offset + len(geojson["features"])
        query_string_params["resultOffset"] = offset

    return features


def backup_ckan_fields(package_name, resource_name, backup_file_path):
    backup = False

    package = ckan.action.package_show(id=package_name)
    resource = [ r for r in package["resources"] if r["name"] == resource_name ]
    
    if len(resource) == 0:
        print(f"Nothing to backup: {resource_name} not in {package_name}")
        return backup
    
    resource = resource[0]
    if not resource["datastore_active"]:
        print(f"Nothing to backup: {resource_name} in {package_name} but not in datastore")
        return backup
    
    datastore_resource = ckan.action.datastore_search(id=resource["id"], limit=0)
    
    fields = [f for f in datastore_resource["fields"] if f["id"] != "_id" ]
    for f in fields:
        if "info" in f and f["info"]["notes"] != "":
            backup = True
            break
            
    if backup:
        with open(Path(backup_file_path), "w") as f:
            json.dump(fields, f)
        print(f"  backedup existing fields: {backup_file_path}")

    return backup


def get_backedup_fields(backup_file_path):
    print(f"  loading files from backup JSON: {backup_file_path}")
    
    with open(backup_file_path, "r") as f:
        fields = json.load(f)
        
    return fields


def update_fields_with_backup(generated_fields, backedup_fields):
    fields = [ f for f in generated_fields ]
    
    for f in fields:
        for g in backedup_fields:
            if f["id"] == g["id"] and "info" in g:
                f["info"] = g["info"]
                
    return fields


def delete_datastore_resource(package_name, resource_name):
    print(f"  deleting datastore resource: {resource_name}")
    
    package = ckan.action.package_show(id=package_name)
    
    for resource in package["resources"]:
        if resource["name"] == resource_name and resource["datastore_active"]:
            ckan.action.datastore_delete(id=resource["id"])

            
def insert_datastore_records(package_name, resource, fields, records, chunk_size=5000):
    print(f"  inserting to datastore: {resource_name}")
    package = ckan.action.package_show(id=package_name)
    
    resource_exists = False
    for r in package["resources"]:
        if r["name"] == resource["name"]:
            resource_exists = True
            resource = r
            break
            
    if not resource_exists:
        resource = ckan.action.resource_create(
            package_id=package_name,
            **resource
        )
        
    ckan.action.datastore_create(id=resource["id"], fields=fields)
    
    chunks = [
        records[i : i + chunk_size]
        for i in range(0, len(records), chunk_size)
    ]
    
    count = 0
    for n, chunk in enumerate(chunks):
        count = len(chunk) + count
        print(f"    inserting chunk {1+n}/{len(chunks)}: {count}/{len(records)} records")
        ckan.action.datastore_create(id=resource["id"], records=chunk)
        
    ckan.action.resource_patch(id=resource["id"], last_modified=datetime.now().isoformat())

In [15]:
backup_file_path = Path("./backup/")
backup_file_path.mkdir(parents=True, exist_ok=True)

for n, dataset in enumerate(datasets):
    query_url = dataset["resource_info"]["query_url"]
    package_name = dataset["package_name"]
    resource = resource_name = dataset["resource_info"]["resource"]
    resource_name = resource["name"]
    fields_path = backup_file_path / f"{package_name}.{resource_name}.fields.json"
    
    print(f"{1+n}. {package_name}")
    
    # get fields and records from AGOL
    esri_fields = get_fields_from_agol(query_url)
    features = get_features_from_agol(query_url)

    # convert fields and records to CKAN
    generated_ckan_fields = convert_esri_fields_to_ckan(esri_fields)
    records = geojson_to_ckan_records(features, generated_ckan_fields)
        
    # take backup of fields
    if not fields_path.exists():
        backup = backup_ckan_fields(package_name, resource_name, fields_path)
    
    # if backup file exists, use those field definitions
    if fields_path.exists():
        backedup_fields = get_backedup_fields(fields_path)
        fields = update_fields_with_backup(generated_ckan_fields, backedup_fields)
    else:
        fields = generated_ckan_fields
        
    # delete datastore resource
    delete_datastore_resource(package_name, resource_name)
    
    # insert datastore records
    insert_datastore_records(
        package_name=package_name, 
        resource=resource,
        fields=fields,
        records=records,
    )
    
    break

1. motor-vehicle-collisions-involving-killed-or-seriously-injured-persons
  getting fields from AGOL
  getting features from AGOL
  converting esri fields to CKAN
  converting GeoJSON to CKAN records
  loading files from backup JSON: backup\motor-vehicle-collisions-involving-killed-or-seriously-injured-persons.Motor Vehicle Collisions with KSI Data.fields.json
  deleting datastore resource: Motor Vehicle Collisions with KSI Data
  inserting to datastore: Motor Vehicle Collisions with KSI Data
    inserting chunk 1/4: 5000/16860 records
    inserting chunk 2/4: 10000/16860 records
    inserting chunk 3/4: 15000/16860 records
    inserting chunk 4/4: 16860/16860 records
