Skip to content

Commit

Permalink
Merge pull request #30 from transportenergy/diagnostic-A003
Browse files Browse the repository at this point in the history
Compute freight volume diagnostic
  • Loading branch information
khaeru committed Nov 13, 2020
2 parents 375683f + ece1d35 commit 39de3f4
Show file tree
Hide file tree
Showing 18 changed files with 371 additions and 12,839 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/qc.yml → .github/workflows/cq.yml
@@ -1,4 +1,4 @@
name: QC
name: Code quality

on:
push:
Expand Down Expand Up @@ -38,6 +38,11 @@ jobs:
pip install flake8
flake8 --count --max-complexity=15 --show-source --statistics
- name: Sort imports with isort
run: |
pip install isort
isort --check-only .
- name: Build package
run: |
pip install twine wheel
Expand Down
2 changes: 1 addition & 1 deletion doc/developing.rst
Expand Up @@ -28,7 +28,7 @@ Style guide

- Run the following tools on all new and modified code::

isort -rc . && black . && mypy . && flake8
isort . && black . && mypy . && flake8

The continuous integration workflow checks that these have been applied; PRs will fail unless they are.

Expand Down
60 changes: 49 additions & 11 deletions doc/historical.rst
Expand Up @@ -8,24 +8,19 @@ This module contains the code that implements the `iTEM Open Data project <https

Sources
=======
These are listed in :file:`sources.yaml`, loaded as :data:.`SOURCES`. from the `iTEM metadata repository <https://github.com/transportenergy/metadata>`_.
These are listed in :file:`sources.yaml`, loaded as :data:`.SOURCES`, from the `iTEM metadata repository <https://github.com/transportenergy/metadata>`_.

Input data is retrieved using via OpenKAPSARC and SDMX APIs, according to the type supported by each data source. See :mod:`item.remote`.


Processing
==========

Current
-------

Input data sets are cleaned and transformed by IPython notebooks in the :file:`item/historical/scripts` directory, as listed in :data:`.SCRIPTS`.

In general, the notebook name corresponds to the input data set which it handles, e.g. :file:`T001.ipynb`.

Planned
-------
The general function :func:`~historical.process` applies common cleaning steps to each dataset, while loading and making use of dataset-specific checks, processing steps, and configuration from a submodule like :mod:`.T001`, as listed in :data:`.MODULES`.
See the documentation of :func:`~historical.process` for a detailed description of the tests.
See the documentation of :func:`~historical.process` for a detailed description of the steps.

*Previously*, input data sets were cleaned and transformed by IPython notebooks in the :file:`item/historical/scripts` directory, as listed in :data:`.SCRIPTS`.
The notebook name corresponds to the input data set which it handles, e.g. :file:`T001.ipynb`.


Diagnostics
Expand Down Expand Up @@ -66,6 +61,11 @@ Code reference
.. autodata:: item.historical.SOURCES
:annotation: ← contents of sources.yaml

.. currentmodule:: item.historical.diagnostic

.. automodule:: item.historical.diagnostic
:members:

.. currentmodule:: item.historical.scripts.util.managers.dataframe

.. autoclass:: ColumnName
Expand Down Expand Up @@ -103,3 +103,41 @@ T001
.. literalinclude:: ../item/data/historical/sources.yaml
:language: yaml
:lines: 12-20


T003
----

.. currentmodule:: item.historical.scripts.T003

.. automodule:: item.historical.scripts.T003
:members:

.. literalinclude:: ../item/data/historical/sources.yaml
:language: yaml
:lines: 32-40


T009
----

.. currentmodule:: item.historical.scripts.T009

.. automodule:: item.historical.scripts.T009
:members:

.. literalinclude:: ../item/data/historical/sources.yaml
:language: yaml
:lines: 75-80


Quality diagnostics
===================

A003
----

.. currentmodule:: item.historical.diagnostic.A003

.. automodule:: item.historical.diagnostic.A003
:members:
61 changes: 36 additions & 25 deletions item/historical/__init__.py
@@ -1,15 +1,16 @@
from copy import copy
from functools import lru_cache
import logging
import os
from copy import deepcopy
from functools import lru_cache

import pandas as pd
import pycountry
import yaml

from item.common import paths
from item.remote import OpenKAPSARC, get_sdmx
from .scripts import T000, T001

from .scripts import T000, T001, T003, T009
from .scripts.util.managers.dataframe import ColumnName

log = logging.getLogger(__name__)
Expand All @@ -21,7 +22,7 @@
# 'T000',
# 'T001',
"T002",
"T003",
# "T003",
"T004",
"T005",
"T006",
Expand All @@ -30,7 +31,7 @@
]

#: Submodules usable with :func:`process`.
MODULES = {0: T000, 1: T001}
MODULES = {0: T000, 1: T001, 3: T003, 9: T009}

#: Path for output from :func:`process`.
OUTPUT_PATH = paths["data"] / "historical" / "output"
Expand All @@ -56,8 +57,7 @@

with open(paths["data"] / "historical" / "sources.yaml") as f:
#: The current version of the file is always accessible at
#: https://github.com/transportenergy/metadata/blob/master/historical/
#: sources.yaml
#: https://github.com/transportenergy/metadata/blob/master/historical/sources.yaml
SOURCES = yaml.safe_load(f)


Expand All @@ -66,20 +66,20 @@ def cache_results(id_str, df):
The files written are:
- :file:`{id_str}_cleaned_PF.csv`, in long or 'programming-friendly'
format, i.e. with a 'Year' column.
- :file:`{id_str}_cleaned_UF.csv`, in wide or 'user-friendly' format, with
one column per year.
- :file:`{id_str}-clean.csv`, in long or 'programming-friendly' format, i.e. with a
a 'Year' column.
- :file:`{id_str}-clean-wide.csv`, in wide or 'user-friendly' format, with one
column per year.
"""
OUTPUT_PATH.mkdir(exist_ok=True)

# Long format ('programming friendly view')
path = OUTPUT_PATH / f"{id_str}_cleaned_PF.csv"
path = OUTPUT_PATH / f"{id_str}-clean.csv"
df.to_csv(path, index=False)
print(f"Write {path}")
log.info(f"Write {path}")

# Pivot to wide format ('user friendly view') and write to CSV
path = OUTPUT_PATH / f"{id_str}_cleaned_UF.csv"
path = OUTPUT_PATH / f"{id_str}-clean-wide.csv"

# - Set all columns but 'Value' as the index → pd.Series with MultiIndex.
# - 'Unstack' the 'Year' dimension to columns, i.e. wide format.
Expand All @@ -88,7 +88,7 @@ def cache_results(id_str, df):
df.set_index([ev.value for ev in ColumnName if ev != ColumnName.VALUE]).unstack(
ColumnName.YEAR.value
).reset_index().to_csv(path, index=False)
print(f"Write {path}")
log.info(f"Write {path}")


def fetch_source(id, use_cache=True):
Expand All @@ -105,7 +105,7 @@ def fetch_source(id, use_cache=True):
"""
# Retrieve source information from sources.yaml
id = source_str(id)
source_info = copy(SOURCES[id])
source_info = deepcopy(SOURCES[id])

# Path for cached data. NB OpenKAPSARC does its own caching
cache_path = paths["historical input"] / f"{id}.csv"
Expand Down Expand Up @@ -172,20 +172,28 @@ def process(id):
9. Output data to two files. See :meth:`cache_results`.
"""
# Load the data from a common location, based on the dataset ID
id_str = source_str(id)
path = paths["data"] / "historical" / "input" / f"{id_str}_input.csv"
df = pd.read_csv(path)

# Get the module for this data set
dataset_module = MODULES[id]

if getattr(dataset_module, "FETCH", False):
# Fetch directly from source
path = fetch_source(id)
else:
# Load the data from version stored in the transportenergy/metadata repo
# TODO remove this option; always fetch from source or cache
path = paths["historical input"] / f"{id_str}_input.csv"

# Read the data
df = pd.read_csv(path)

try:
# Check that the input data is of the form expected by process()
dataset_module.check(df)
except AttributeError:
# Optional check() function does not exist
print("No pre-processing checks to perform")
log.info("No pre-processing checks to perform")
except AssertionError as e:
# An 'assert' statement in check() failed
msg = "Input data is invalid"
Expand All @@ -201,20 +209,20 @@ def process(id):
drop_cols = columns["drop"]
except KeyError:
# No variable COLUMNS in dataset_module, or no key 'drop'
print(f"No columns to drop for {id_str}")
log.info(f"No columns to drop for {id_str}")
else:
df.drop(columns=drop_cols, inplace=True)
print(f"Drop {len(drop_cols)} extra column(s)")
log.info(f"Drop {len(drop_cols)} extra column(s)")

# Call the dataset-specific process() function; returns a modified df
df = dataset_module.process(df)
print(f"{len(df)} observations")
log.info(f"{len(df)} observations")

# Assign ISO 3166 alpha-3 codes and iTEM regions from a country name column
country_col = columns["country_name"]
# Use pandas.Series.apply() to apply the same function to each entry in
# the column. Join these to the existing data frame as additional columns.
df = pd.concat([df, df[country_col].apply(iso_and_region)], axis=1)
df = df.combine_first(df[country_col].apply(iso_and_region))

# Values to assign across all observations: the dataset ID
assign_values = {ColumnName.ID.value: id_str}
Expand Down Expand Up @@ -257,7 +265,10 @@ def iso_and_region(name):

# Use pycountry's built-in, case-insensitive lookup on all fields including
# name, official_name, and common_name
alpha_3 = pycountry.countries.lookup(name).alpha_3
try:
alpha_3 = pycountry.countries.lookup(name).alpha_3
except LookupError:
alpha_3 = ""

# Look up the region, construct a Series, and return
return pd.Series(
Expand Down
47 changes: 47 additions & 0 deletions item/historical/diagnostic/A003.py
@@ -0,0 +1,47 @@
from item.historical import OUTPUT_PATH
from item.utils import convert_units


def compute(activity, stock):
"""Quality diagnostic for freight load factor.
Returns the ratio of road freight traffic from :mod:`.T003` and the total number
of freight vehicles from :mod:`.T009`.
Parameters
----------
activity : pandas.DataFrame
From :mod:`.T003`.
stock : pandas.DataFrame
From :mod:`.T009`.
"""
# Select activity
activity = activity.query("Mode == 'Road' and `Vehicle Type` == 'All'").set_index(
["ISO Code", "Year"]
)

# Select stock
vehicle_types = [
"Light goods road vehicles",
"Lorries (vehicle wt over 3500 kg)",
"Road tractors",
]
mask = stock.Fuel.isin(["All"]) & stock["Vehicle Type"].isin(vehicle_types)
stock = stock[mask].groupby(["ISO Code", "Year"]).sum(numeric_only=True)

df = (
# Compute ratio, drop nulls
(activity["Value"] / stock["Value"])
.dropna()
# Restore column names, for convert_units()
.rename("Value")
.reset_index()
.assign(Variable="Load factor", Service="Freight", Fuel="All", Mode="Road")
# To preferred units
.pipe(convert_units, "Gt km / year / kvehicle", "kt km / year / vehicle")
)

# Save output before returning
df.to_csv(OUTPUT_PATH / "A003.csv")

return df
Expand Up @@ -3,7 +3,12 @@

import pandas as pd

from . import fetch_source, source_str
from item.historical import fetch_source, source_str

from . import A003

# Quality checks
QUALITY = [A003.compute]

# Jinja2 template for diagnostics index page
INDEX_TEMPLATE = """<html><body>
Expand Down Expand Up @@ -75,31 +80,48 @@ def coverage(df, area="COUNTRY", measure="VARIABLE", period="TIME_PERIOD"):
def run_all(output_path):
"""Run all diagnostics."""
from zipfile import ZIP_DEFLATED, ZipFile

from jinja2 import Template

output_path = Path(output_path)
output_path.mkdir(parents=True, exist_ok=True)

groups = {"Coverage": []}
source_data_paths = []
data_files = []

# Coverage
groups = {"Coverage": [], "Quality": []}

for source_id in [0, 1, 2, 3]:
# Output filename
filename = f"{source_str(source_id)}.txt"
groups["Coverage"].append(filename)

# Read source data
source_data_paths.append(fetch_source(source_id, use_cache=True))
data = pd.read_csv(source_data_paths[-1])
data_files.append(fetch_source(source_id, use_cache=True))
data = pd.read_csv(data_files[-1])

# Generate coverage and write to file
# TODO this doesn't allow for column names other than the defaults to
# coverage(), above; generalize
(output_path / filename).write_text(coverage(data))

# Archive cached source data
# Quality
from item.historical import process

for check in QUALITY:
# Output filename
filename = f"{check.__name__.split('.')[-1]}.csv"
groups["Quality"].append(filename)

data_files.append(output_path / filename)
# TODO this is specific to A003; generalize
check(process(3), process(9)).to_csv(data_files[-1])

# Archive data files
zf = ZipFile(
output_path / "data.zip", mode="w", compression=ZIP_DEFLATED, compresslevel=9
)
for path in source_data_paths:
for path in data_files:
zf.write(filename=path, arcname=path.name)

groups["Cached raw source data"] = ["data.zip"]
Expand Down

0 comments on commit 39de3f4

Please sign in to comment.