In [None]:
%reload_ext autoreload
%autoreload 2

In [None]:
# | default_exp npg.etl

In [None]:
# | export
import json
import zipfile
from typing import Callable

import pandas as pd
import requests as rq
from fastcore.all import *

import jupyter_playground.core as core
from jupyter_playground.core import DownloadContent, cache

if in_jupyter():
    from tqdm.notebook import tqdm

## Downloading Dataset metadata and attachments

In [None]:
# | export
HOST = "northernpowergrid"
DATASET = "primary-operational-metering"


def _always_true(_):
    return True


def get_dataset_attachments(host: str, dataset: str, filter: Callable = _always_true):
    q = f"https://{host}.opendatasoft.com/api/v2/catalog/datasets/{dataset}/attachments"
    res = rq.get(q)
    res.raise_for_status()

    return L(a for a in res.json()["attachments"] if filter(a))


files = get_dataset_attachments(
    HOST, DATASET, lambda x: x["href"].endswith("_zip")
).map(lambda x: x["href"])
files.map(lambda x: x[-10:])

(#12) ['_02_28_zip','_01_31_zip','_04_30_zip','_03_31_zip','_05_31_zip','_06_30_zip','_07_31_zip','_08_31_zip','_09_30_zip','_11_30_zip'...]

For reference:

In [None]:
base = "https://{host}.opendatasoft.com/api/v2/".format(host=HOST)
suffix = "catalog/datasets/{dataset_id}/attachments".format(dataset_id=DATASET)
print(base + suffix)

https://northernpowergrid.opendatasoft.com/api/v2/catalog/datasets/primary-operational-metering/attachments


## Create Parser and Extraction Steps
> A series of functions to unpack and work with the data.

These functions deal with several quirks of the dataset:

1. Replacments of bad JSON sections.
2. Characters which don't parse correctly as utf-8
3. Avoid need to download and extrac multiple large JSON files
4. Quickly parse and discard objects and files completed minimise memory usage.

In [None]:
# | export
pipe = core.IncrementalPipeline("npg_etl", funcs=[core.attachment_download])
pipe

IncrementalPipeline(name='npg_etl', _funcs=['attachment_download'])

In [None]:
# | eval: false

files[:1].map(pipe)

                      

(#1) [32d8b228a35b2302ed5871f407311777]

## Extracting the Zipfile and Serialising the Content

Steps:

1. Check the file is a Zipfile.
1. Extract from the Zipfile exactly one JSON file.
2. Checking is within a given maximum size unzipped.
3. Read as whole file directly as text, fixing a JSON error.
4. Finally mask the large string with `DownloadContent`

In [None]:
# | exports


@cache.cache
def parse_attachment_content(
    Download: DownloadContent, max_file_size: Union[int, float] = 4e9
) -> dict:
    with zipfile.ZipFile(io.BytesIO(Download.content)) as f:
        if f.testzip() is not None:
            raise ValueError("Content byte string is not a zipfile")
        n = f.infolist()
        if len(n) != 1:
            raise ValueError("Expecting only one file but found the following:", n)
        if n[0].file_size > int(max_file_size):
            raise ValueError(f"{n[0]} exceeds max_file_size ({int(max_file_size)})")
        file_txt = (
            zipfile.Path(f, at=n[0].filename)
            .read_text(encoding="utf-8", errors="ignore")
            .replace("[,", "[", 1)  # this is much quicker than regexing
        )
    # remove newlines and whitespace
    return DownloadContent(file_txt)


pipe.append_func(parse_attachment_content)

IncrementalPipeline(name='npg_etl', _funcs=['attachment_download', 'parse_attachment_content'])

In [None]:
# | eval: false 

files[:1].map(pipe)

                      

(#1) [4baf216cf69414089e228916246be3eb]

## Parsing JSON and Building a Dataframe

> Parse the JSON and create a dataframe for the values.

1. Create a dataframe for each timeseries array and concatenate them.
2. Discard the JSON object and initial frames for memory.
3. Fix a bad column that failed to decode from utf-8
4. `Groupby` effectively on each row and extract each set of values.
5. Clear junk index and reset index used in Groupby
6. Downcast float, create column for partitioning per file (year and month) and clarify timezone.
7. Check neighbouring month haven't slipped into file and discard them.
8. Cast non-numeric columns to categories.

Resulting arrays are DataFrames approx 300Mb each.


In [None]:
# | exports

_cat_cols = ["substation", "circuit", "unit", "description"]


# @cache.cache
def build_attachment_dataframe(text: DownloadContent) -> pd.DataFrame:
    df = (
        pd.concat(  # together the timeseries arrays
            [pd.json_normalize(t) for t in json.loads(text.content)["timeseries"]],
            axis=0,
            ignore_index=True,
        )
        .rename({"values": "data"}, axis=1)
        .assign(unit=lambda d: d.unit.replace(r"^\s*$", "deg", regex=True))
        .set_index(_cat_cols)
        .groupby(_cat_cols, sort=False)  # no sort for performance
        .apply(lambda d: pd.json_normalize(d.iat[0, -1]))  # normalise another tier for each row
        .reset_index(-1, drop=True)  # discard index from second parse
        .reset_index(drop=False)  # clear rest
        .assign(  # fix up types
            timestamp=lambda d: pd.to_datetime(d.timestamp),
            value=lambda d: pd.to_numeric(d.value, downcast="float"),
            yyyy_mm=lambda d: d.timestamp.dt.strftime("%Y-%m"),
        )
    )
    d = df.yyyy_mm.value_counts()  # remove values from neighboring months
    if len(d) != 1:
        print(text, '\n' + d.__repr__())  # log these for reference

    df[_cat_cols] = df[_cat_cols].astype("category")
    return df[d.index[d.argmax()] == df.yyyy_mm]


pipe.append_func(build_attachment_dataframe)

IncrementalPipeline(name='npg_etl', _funcs=['attachment_download', 'parse_attachment_content', 'build_attachment_dataframe'])

## Example DataFrame

In [None]:
# | hide
import nbdev; nbdev.nbdev_export()

In [None]:
# | eval: false

from jupyter_playground.npg.etl import pipe

df = files[-1:].map(pipe)[0]

df

2it [00:13,  7.51s/it]

16e9515bba143d2ad61d80406e56fbdb 
2022-12    20136044
2023-01           6
Name: yyyy_mm, dtype: int64


                       

Unnamed: 0,substation,circuit,unit,description,timestamp,value,yyyy_mm
0,LINTON 132/25kV (ULGHAM CROSSING),LINTON - TRX1,KW,active power (kw),2022-12-01 00:00:00+00:00,400.0,2022-12
1,LINTON 132/25kV (ULGHAM CROSSING),LINTON - TRX1,KW,active power (kw),2022-12-01 00:30:00+00:00,0.0,2022-12
2,LINTON 132/25kV (ULGHAM CROSSING),LINTON - TRX1,KW,active power (kw),2022-12-01 01:00:00+00:00,0.0,2022-12
3,LINTON 132/25kV (ULGHAM CROSSING),LINTON - TRX1,KW,active power (kw),2022-12-01 01:30:00+00:00,0.0,2022-12
4,LINTON 132/25kV (ULGHAM CROSSING),LINTON - TRX1,KW,active power (kw),2022-12-01 02:00:00+00:00,0.0,2022-12
...,...,...,...,...,...,...,...
20136045,WAVERLEY BUSINESS PARK,WAVERLEY BUSINESS PARK - WHITTLE WAY,A,current (amp),2022-12-31 21:30:00+00:00,7.0,2022-12
20136046,WAVERLEY BUSINESS PARK,WAVERLEY BUSINESS PARK - WHITTLE WAY,A,current (amp),2022-12-31 22:00:00+00:00,8.0,2022-12
20136047,WAVERLEY BUSINESS PARK,WAVERLEY BUSINESS PARK - WHITTLE WAY,A,current (amp),2022-12-31 22:30:00+00:00,7.0,2022-12
20136048,WAVERLEY BUSINESS PARK,WAVERLEY BUSINESS PARK - WHITTLE WAY,A,current (amp),2022-12-31 23:00:00+00:00,8.0,2022-12


In [None]:
# | eval:false
df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 20136044 entries, 0 to 20136049
Data columns (total 7 columns):
 #   Column       Dtype              
---  ------       -----              
 0   substation   category           
 1   circuit      category           
 2   unit         category           
 3   description  category           
 4   timestamp    datetime64[ns, UTC]
 5   value        float32            
 6   yyyy_mm      object             
dtypes: category(4), datetime64[ns, UTC](1), float32(1), object(1)
memory usage: 653.1+ MB


## Write the Whole Dataset

Write out the dataset with some partitioning per data and month.

In [None]:
@pipe.decorate_func
def writes_file(df):
    df.to_parquet("npg.parquet", partition_cols=["description", "yyyy_mm"])

Finally run against all the files, approx 60-90 minutes assuming already downloaded.

In [None]:
# |eval: false
for f in tqdm(files, leave=False):
    pipe(f, tqdm_position=1)

  0%|          | 0/12 [00:00<?, ?it/s]



16e9515bba143d2ad61d80406e56fbdb 
2022-12    20136044
2023-01           6
Name: yyyy_mm, dtype: int64




In [None]:
# | hide
from nbdev.showdoc import *

In [None]:
pd.show_versions()




INSTALLED VERSIONS
------------------
commit           : 2e218d10984e9919f0296931d92ea851c6a6faf5
python           : 3.11.0.final.0
python-bits      : 64
OS               : Linux
OS-release       : 5.15.90.1-microsoft-standard-WSL2
Version          : #1 SMP Fri Jan 27 02:56:13 UTC 2023
machine          : x86_64
processor        : x86_64
byteorder        : little
LC_ALL           : None
LANG             : C.UTF-8
LOCALE           : en_US.UTF-8

pandas           : 1.5.3
numpy            : 1.24.2
pytz             : 2022.7.1
dateutil         : 2.8.2
setuptools       : 67.6.0
pip              : 23.0.1
Cython           : None
pytest           : None
hypothesis       : None
sphinx           : None
blosc            : None
feather          : None
xlsxwriter       : 3.0.9
lxml.etree       : None
html5lib         : None
pymysql          : None
psycopg2         : None
jinja2           : 3.1.2
IPython          : 8.11.0
pandas_datareader: None
bs4              : None
bottleneck       : None
brotli 