-
Notifications
You must be signed in to change notification settings - Fork 0
/
utilities.py
36 lines (27 loc) · 950 Bytes
/
utilities.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from urllib.parse import urljoin
import requests
from slugify import slugify
import json
# TODO: Hmmm,importing the conf here is not clean.
from conf import *
MAX_DATASET_NAME_LENGTH = 100 # Ckan limitation
class CKANAPIException(Exception):
pass
class Object:
def toJSON(self):
return json.dumps(self, default=lambda o: o.__dict__,
sort_keys=True, indent=4)
def make_ckan_api_call(action_url, params=None):
if not params:
params = {}
url = urljoin(CKAN_INSTANCE_URL, action_url)
headers = {'Authorization': ADMIN_API_KEY}
result = None
try:
r = requests.post(url, json=params, headers=headers)
result = r.json()
return result
except requests.exceptions.ConnectionError as e:
print("There is a problem with your CKAN instance on "+ url+". It is not reachable.")
def dataset_title_to_name(title):
return slugify(title)[:MAX_DATASET_NAME_LENGTH]