# CKAN data downloader example
Stores data from portal.opendata.dk each 4.5 minutes into a PostgreSQL server

In [1]:
from time import sleep
from urllib import request
import json
import psycopg2

In [2]:
table_name = 'iot.aarhus_trafik_data'
url_link = '/api/3/action/datastore_search?resource_id=b3eeb0ff-c8a8-4824-99d6-e0a3747c8b0d'
unique_contrain = ['_id','"TIMESTAMP"']

host = '12.34.56.789'
port ='5432'
dbname = 'dbname'
user = 'username'
password = 'password'

In [9]:
def get_ckan_data(url_link):
    url_base = 'http://portal.opendata.dk'
    url = url_base + url_link
    with request.urlopen(url) as f:
        json_data_byte = f.read()
        json_data = json.loads(json_data_byte)

    return json_data

In [10]:
def do_sql(sql, fetch=False):
    conn_string = f"host='{host}' port='{port}' dbname='{dbname}' user='{user}' password='{password}'"
    print("Connecting to database\n ->{0}".format(conn_string))
    conn = psycopg2.connect(conn_string)
    cursor = conn.cursor()
    print("Connected!\n")
    cursor.execute(sql)
    conn.commit()
    # retrieve the records from the database
    if fetch:
        records = cursor.fetchall()
        return records

In [11]:
def create_table_def(table_def):
    check_table = do_sql(f"SELECT to_regclass'{table_name}');", fetch=True)
    print(check_table)
    if check_table[0][0] is None:

        sql_start = f'CREATE TABLE {table_name} ('
        sql_mid = ''
        sql_end = ');'

        for column in enumerate(table_def):

            i = column[0]
            column = column[1]
            end = ', '

            if column['id'] in unique_contrain:
                unique_column_name = '"' + '", "'.join(unique_contrain) + '"'
                unique = f', UNIQUE({unique_column_name})'

            else:
                unique = ''


            if i == len(table_def) - 1:
                end = ' '

            sql_mid += '"' + column['id'] + '"' + ' ' + column['type'] + unique + end


        sql = sql_start + sql_mid + sql_end
        print('\n executing sql:',sql)
        do_sql(sql)

    else:
        print(table_name, 'already exists')

In [12]:
def insert_into_table(table_content, types, table_name):

    sql_content = ""
    for row in table_content:

        columns = ''
        sql_row = "("

        i = 0
        for key in row:
            columns += '"' + key + '",'
            sql_row += "'" + str(row[key]).replace("'","''") + "'::" + types[key] + ","
            i += 1

        sql_row = sql_row[:-1] + "),"

        sql_content += sql_row

    sql_top = f'INSERT INTO {table_name} ({columns[:-1]}) VALUES '

    return sql_top + sql_content[:-1] + "ON CONFLICT (" + ', '.join(unique_contrain) + ") DO NOTHING;"

In [13]:
def run(url_link):
    
    def reload_json_data(url_link):
        print('getting data via link', url_link)

        json_data = get_ckan_data(url_link)
        table_def = json_data['result']['fields']
        table_content = json_data['result']['records']
        link_next = json_data['result']['_links']['next']
        return table_def, table_content, link_next

    table_def, table_content, link_next = reload_json_data(url_link)

    create_table_def(table_def)
    
    while len(table_content) > 0:
        print('results found')

        types = {}
        for d in table_def:
            types[d['id']] = d['type']

        sql_insert = insert_into_table(table_content, types, table_name)

        do_sql(sql_insert)

        table_def, table_content, link_next = reload_json_data(link_next)

    else:
        print('no results found')

In [14]:
def timer(url_link):

    while True:
        wait = 60*4.5
        try:
            run(url_link)
        except:
            sleep(60)
            run(url_link)
        print('waiting', round(wait/60, 2), 'minutes')
        sleep(wait)

In [None]:
# run
timer(url_link)