In [1]:
import requests
import pandas as pd
import logging
import sys

'''reading API and CSV'''
def get_json(start_date: str, end_date: str) -> dict:
    try:
        intensity = requests.get('https://api.carbonintensity.org.uk/intensity/'+start_date+'/'+end_date)
        return intensity.json()['data']
    except (requests.exceptions.RequestException, KeyError) as e:
        logging.error(e)
        return None
        
def get_csv(infile: str) -> pd.DataFrame:
    try:
        power = pd.read_csv(infile)
        return power
    except IOError as e:
        logging.error(e)
        return None

In [2]:
from datetime import datetime

'''converting dates'''
def convert_intensity_date(intensity_date: str) -> datetime:
    return datetime.strptime(intensity_date, '%Y-%m-%dT%H:%MZ')

def convert_power_date(power_date: str) -> datetime:
    return datetime.strptime(power_date, '%Y-%m-%d %H:%M:%S.%f0')

In [3]:
import bisect

'''resampling data to common rate (half an hour)'''
def resampling(intensity: dict, power: pd.DataFrame) -> list:

    final_values = []
    last_result = None

    # current date and time
    for i in range(1, len(intensity)):
        from_intensity = convert_intensity_date(intensity[i]['from'])
        to_intensity = convert_intensity_date(intensity[i]['to'])
        value_intensity = intensity[i]['intensity']['actual']

        pair_power = [(convert_power_date(x[0]),x[3]) for x in power.values]
        pair_power.sort(key=lambda x: x[0])
        date_power, value_power = zip(*pair_power)
        left = bisect.bisect_left(date_power, from_intensity)
        right = bisect.bisect_right(date_power, to_intensity)
        date_results = date_power[left+1:right]
        power_results = value_power[left+1:right]

        if len(date_results) > 0:
            avg_power = sum(power_results)/len(power_results)
            final_value = avg_power*value_intensity
            last_result = final_value
        elif last_result != None:
            final_value = last_result
        else:
            final_value = float('nan')

        final_values.append((i, final_value))

    return final_values

In [4]:
import csv

'''creating csv file and storing calculated data in it'''
def store_in_csv(intensity: dict, power: pd.DataFrame, filename: str) -> int:
    values = resampling(intensity, power)

    try:
        file = open(filename, 'w')
    except OSError as e:
        logging.error(e)
        return -1
    with file:
        writer = csv.writer(file)
        writer.writerow(["HalfHourId","Value"])
        for i in range(len(values)):
            writer.writerow([values[i][0], values[i][1]])
            
    return 1

In [5]:
import sqlite3

'''creating sqlite3 db and storing calculated data in it'''
def store_in_sqlite3(intensity: dict, power: pd.DataFrame, db_name: str, table_name: str) -> int:
    values = resampling(intensity, power)

    try:
        conn = sqlite3.connect(db_name)

        c = conn.cursor()

        c.execute("DROP TABLE IF EXISTS %s" % table_name)

        # Create table
        c.execute("CREATE TABLE %s (HalfHourId real, value real)" % table_name)

        for i in range(len(values)):
            half_hour_id = values[i][0]
            if str(values[i][1]) == 'nan':
                value = 'null'
            else:
                value = values[i][1]
            # Insert a row of data
            c.execute("INSERT INTO %s VALUES (?, ?)" % table_name, (half_hour_id, value))

        # Save (commit) the changes
        conn.commit()

        # We can also close the connection if we are done with it.
        # Just be sure any changes have been committed or they will be lost.
        conn.close()
    except sqlite3.Error as e:
        logging.error(e)
        return -1
    return 1

In [6]:
'''querying from created db'''
def query_from_sqlite3(db_name, table_name):
    try:
        conn = sqlite3.connect(db_name)
        c = conn.cursor()
        c.execute('SELECT * FROM %s WHERE HalfHourId >= 14 and HalfHourId <= 20' % table_name)
        return c.fetchall()
        conn.close()
    except sqlite3.Error as e:
        logging.error(e)
        return -1
    return 1

In [7]:
'''defining input names'''
intensity = get_json(start_date='2019-11-25', end_date='2019-11-26')
power = get_csv('power_measurements_2019-11-25.csv')

'''output names'''
csv_name = 'results.csv'
db_name = 'results.db'
table_name = 'results'

'''calculating and storing data'''
store_in_csv(intensity, power, csv_name)
store_in_sqlite3(intensity, power, db_name, table_name)

'''querying from created db'''
query_from_sqlite3(db_name, table_name)

[(14.0, 298053.4147652754),
 (15.0, 298053.4147652754),
 (16.0, 298053.4147652754),
 (17.0, 298053.4147652754),
 (18.0, 298053.4147652754),
 (19.0, 298053.4147652754),
 (20.0, 298053.4147652754)]

In [8]:
'''test for exception'''
assert get_json(start_date='2029-11-25', end_date='2019-11-26') == None

ERROR:root:'data'


In [9]:
'''test for exception'''
assert get_csv('doesnt_exist.csv') == None

ERROR:root:[Errno 2] File b'doesnt_exist.csv' does not exist: b'doesnt_exist.csv'


In [11]:
'''test for exception'''
intensity = get_json(start_date='2019-11-25', end_date='2019-11-26')
power = get_csv('power_measurements_2019-11-25.csv')

'''create unwritable file'''
import os
from stat import S_IREAD

unwritable = 'cant_write_here.txt'
if not os.path.isfile(unwritable):
    open(unwritable, 'a').close()
    os.chmod(unwritable, S_IREAD)

'''try to write into unwritable file'''
assert store_in_csv(intensity, power, unwritable) == -1

ERROR:root:[Errno 13] Permission denied: 'cant_write_here.txt'


In [12]:
'''test for good result and repeatability'''
intensity = get_json(start_date='2019-11-25', end_date='2019-11-26')
power = get_csv('power_measurements_2019-11-25.csv')

db_name = 'already_exists.db'
table_name = 'already_exists'

expected_result =  [(14.0, 298053.4147652754),
                    (15.0, 298053.4147652754),
                    (16.0, 298053.4147652754),
                    (17.0, 298053.4147652754),
                    (18.0, 298053.4147652754),
                    (19.0, 298053.4147652754),
                    (20.0, 298053.4147652754)]

assert store_in_sqlite3(intensity, power, db_name, table_name) == 1
assert query_from_sqlite3(db_name, table_name) == expected_result

assert store_in_sqlite3(intensity, power, db_name, table_name) == 1
assert query_from_sqlite3(db_name, table_name) == expected_result