In [1]:
from myml.nbinit import *

In [3]:
import re
from etl.files import yaml_dump

def move_field_to_top(data, field):
    """
    Returns a new dictionary with the specified field moved to the top.
    If the field doesn't exist, returns the original dictionary.
    """
    if field not in data:
        return data

    # Create a new dictionary starting with the specified field
    new_data = {field: data[field]}

    # Add the remaining items in their original order
    for key, value in data.items():
        if key != field:
            new_data[key] = value

    return new_data


def dump_yaml_with_anchors(data):
    """
    Dump a dictionary to a YAML string, converting definition keys to anchors
    and replacing quoted alias strings with YAML aliases.

    Args:
        data (dict): The dictionary to dump.

    Returns:
        str: The YAML string with anchors and aliases.
    """
    # Dump the dict to a YAML string. Using default_flow_style=False to get block style.
    dumped = yaml_dump(data)

    # For any key in the definitions block starting with "def_",
    # insert an anchor. This regex finds lines with an indented key that starts with def_.
    dumped = re.sub(
        r"^(\s+)(def_[^:]+):(.*)$",
        lambda m: f"{m.group(1)}{m.group(2)}: &{m.group(2)}{m.group(3)}",
        dumped,
        flags=re.MULTILINE
    )

    # Replace quoted alias strings like '*def_2329260084214905053'
    # with an unquoted alias *def_2329260084214905053.
    dumped = re.sub(
        r"""(['"])(\*def_[^'"]+)\1""",
        lambda m: m.group(2),
        dumped
    )

    return dumped

In [4]:
import yaml
from collections import defaultdict
from etl.collections.explorer_migration import migrate_csv_explorer
from etl.files import yaml_dump
from etl.paths import EXPLORERS_DIR, STEP_DIR

config = migrate_csv_explorer(EXPLORERS_DIR / "influenza.explorer.backup.tsv")
# print(yaml.dump(config))
# config = yaml.safe_load(yaml_dump(config))

definitions = defaultdict(dict)

for view in config["views"]:
    # Move to common_views
    del view['config']['timelineMinTime']

    # Create shared definitions
    for indicator in view["indicators"]['y']:
        # Strip catalogPath
        indicator['catalogPath'] = indicator['catalogPath'].rsplit('/', 1)[1]

        # Move some fields into definitions
        display = indicator['display']
        for key in ('additionalInfo', 'sourceLink', 'dataPublishedBy', 'sourceName'):
            info = display[key]
            info = info.replace('\\n', '\n')

            h = "def_" + str(abs(hash(display[key])))

            definitions[key][h] = info
            display[key] = '*' + h

definitions['common_views'] = [
    {
        "config": {
            "timelineMinTime": "-4043",
        },
    }
]

config["definitions"] = definitions

config = move_field_to_top(config, "definitions")

> [0;32m/Users/mojmir/projects/etl/etl/collections/explorer_migration.py[0m(210)[0;36mextract_config[0;34m()[0m
[0;32m    209 [0;31m                    [0m__import__[0m[0;34m([0m[0;34m"ipdb"[0m[0;34m)[0m[0;34m.[0m[0mset_trace[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[0;32m--> 210 [0;31m                    [0mprint[0m[0;34m([0m[0;36m12[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[0;32m    211 [0;31m                [0;31m# config[key] = self._convert_special_field(key, value)[0m[0;34m[0m[0;34m[0m[0m
[0m
'hideAnnotationFieldsInTitle'
'hideAnnotationFieldsInTitle'
['true']


In [105]:
new_views = []
for view in config['views']:
    dims = view['dimensions']
    if dims['metric'] == 'confirmed_cases__by_surveillance_type' and dims['interval'] == 'weekly' and dims['confirmed_cases_or_symptoms'] == 'confirmed_cases':
        new_views.append(view)

config['views'] = new_views

In [8]:
# extract display for indicators

tables = defaultdict(dict)

for view in config['views']:
    for ind in view['indicators']['y']:
        table_name, col = ind['catalogPath'].split('#')
        if 'variables' not in tables[table_name]:
            tables[table_name]['variables'] = {}
        variable_entry = {
            "title": col,
            'unit': ind['display'].pop('unit')
        }
        variable_entry["display"] = {
            "name": ind['display'].pop('name')
        }
        if 'shortUnit' in ind['display']:
            variable_entry['short_unit'] = ind['display'].pop('shortUnit')
        tables[table_name]['variables'][col] = variable_entry

In [9]:
# dump explorer config
path_new = STEP_DIR / "export/explorers/who/latest/influenza.config.yml"
with open(path_new, "w") as f:
    f.write(dump_yaml_with_anchors(config))

# dump metadata for the grapher step
# path_new = STEP_DIR / "data/grapher/who/latest/flu.meta.yml"
# with open(path_new, "w") as f:
#     f.write(yaml_dump({"tables": tables}))

In [45]:
print(info.replace('\\n', '\n'))

**Dataset Description:** 
- FluNET is a human influenza surveillance dataset that aggregates data from (1) the Global Influenza Surveillance and Response System (GISRS), (2) other national influenza reference laboratories that collaborate with the GISRS, and (3) other influenza surveillance data uploaded from WHO regional databases.
- Some of these samples are tested to determine whether they are influenza and whether they are influenza A or influenza B. Some surveillance centers also test the samples to identify their subtype. These are described as strains for influenza A (e.g., A H7N9) and lineages for influenza B (e.g., B Yamagata). This testing can use molecular detection, virus culture, or immunological methods.

**Surveillance types:**
- Surveillance data from each country may come from sentinel sites or non-sentinel sites.
- Sentinel sites are health centers in a country that are selected to perform high-quality testing of cases: they test for flu subtypes and lineages in a rou

In [29]:
definitions

{'-2329260084214905053': "**Dataset Description:** \\n- FluNET is a human influenza surveillance dataset that aggregates data from (1) the Global Influenza Surveillance and Response System (GISRS), (2) other national influenza reference laboratories that collaborate with the GISRS, and (3) other influenza surveillance data uploaded from WHO regional databases.\\n- Some of these samples are tested to determine whether they are influenza and whether they are influenza A or influenza B. Some surveillance centers also test the samples to identify their subtype. These are described as strains for influenza A (e.g., A H7N9) and lineages for influenza B (e.g., B Yamagata). This testing can use molecular detection, virus culture, or immunological methods.\\n\\n**Surveillance types:**\\n- Surveillance data from each country may come from sentinel sites or non-sentinel sites.\\n- Sentinel sites are health centers in a country that are selected to perform high-quality testing of cases: they test 

In [None]:
# import yaml

# from etl.files import yaml_dump

# config = migrate_csv_explorer("/home/lucas/repos/owid-content/explorers/monkeypox.explorer.tsv")
# # print(yaml.dump(config))
# with open("/home/lucas/repos/etl/etl/steps/export/explorers/who/latest/monkeypox2.config.yml", "w") as f:
#     yaml_dump(config, f)

# path_new = ""
# with open(path_new, "w"):
#     yaml.safe_dump(config, default_flow_style=False, sort_keys=False, width=float("inf"))

# 2/ Read all explorers, more raw experimenting
# import pandas as pd

# # Read and parse all config
# explorers = {}
# explorer_dir = Path("/home/lucas/repos/owid-content/explorers/")
# explorers_path = explorer_dir.glob("*.explorer.tsv")
# explorers_path = sorted(list(explorers_path))
# for path in explorers_path:
#     name = Path(path.stem).stem
#     print(name)
#     explorer_json = parse_explorer(name, path)
#     explorers[name] = explorer_json

# Filter and keep public ones
# explorers = {k: v for k, v in explorers.items() if v["isPublished"] == "true"}


# analysis = []
# types_rename = {
#     "grapher": "G",
#     "indicator": "I",
#     "csv": "C",
# }
# settings = []
# for name, explorer in explorers.items():
#     if name in {"global-food"}:
#         continue
#     migration = ExplorerMigration(explorer, name)
#     try:
#         settings_ = migration.run()
#     except TableURLNotInCataloException as e:
#         print(f"{name}: {e}")
#     except NotSupportedException as e:
#         print(f"{name}: {e}")
#     else:
#         settings.append(settings_)

# df = pd.DataFrame(analysis).sort_values("name")