Skip to content
This repository has been archived by the owner on Nov 1, 2023. It is now read-only.

Improve metadata handling #90

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ad3d6d3
:construction: Let operations between variables handle metadata properly
pabloarosado May 17, 2023
db6e0be
:white-check-mark: Update tests
pabloarosado May 17, 2023
33d8a99
:sparkles: Minor improvements in style and documentation
pabloarosado May 18, 2023
ee9e981
:white_check_mark: Update tests
pabloarosado May 18, 2023
d1be5e3
:tada: Add metadata handling for all common dunder methods
pabloarosado May 18, 2023
26b31ff
:white_check_mark: Add tests
pabloarosado May 18, 2023
a99c0df
:tada: Add merge function that handles metadata (WIP)
pabloarosado May 18, 2023
3c732ab
feat: Let processing log properly store variable name
pabloarosado May 22, 2023
d34a294
test: Update tests
pabloarosado May 22, 2023
43ea392
feat: Add other existing pandas method to properly handle metadata, a…
pabloarosado May 22, 2023
8865d24
feat: Add processing log when loading, creating, or saving a table
pabloarosado May 23, 2023
38159ae
test: Update tests
pabloarosado May 23, 2023
1621297
feat: Improve metadata handling when reading new files and merging ta…
pabloarosado May 24, 2023
967f3c7
feat: Add processing log entry after renaming columns, and other mino…
pabloarosado May 24, 2023
51b0112
test: Update tests
pabloarosado May 24, 2023
789fd0a
feat: Get sources and licenses from dataset if not defined for each v…
pabloarosado May 24, 2023
65f767e
feat: Implement logic to handle metadat for melt function
pabloarosado May 24, 2023
61bc8b1
feat: Implement logic to handle metadata for concat function
pabloarosado May 24, 2023
89e6cf2
Merge branch 'master' of github.com:owid/owid-catalog-py into improve…
pabloarosado Jun 14, 2023
2821139
style: Improve format
pabloarosado Jun 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion owid/catalog/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
__version__ = "0.1.0"

from . import utils
from . import processing, utils
from .catalogs import CHANNEL, LocalCatalog, RemoteCatalog, find, find_latest, find_one
from .datasets import Dataset
from .meta import DatasetMeta, License, Source, TableMeta, VariableMeta
Expand All @@ -23,4 +23,5 @@
"License",
"utils",
"CHANNEL",
"processing",
]
1 change: 1 addition & 0 deletions owid/catalog/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class VariableMeta:
short_unit: Optional[str] = None
display: Optional[Dict[str, Any]] = None
additional_info: Optional[Dict[str, Any]] = None
processing_log: Optional[List[Any]] = None

def to_dict(self) -> Dict[str, Any]:
...
Expand Down
6 changes: 6 additions & 0 deletions owid/catalog/processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""Common operations performed on tables and variables.

"""
from .tables import concat, melt, merge, pivot, read_csv, read_excel

__all__ = ["concat", "melt", "merge", "pivot", "read_csv", "read_excel"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cherry on the top would be monkey patching pandas if we are in enable_tracing context manager

from owid.catalog import tables as t

def enable_tracing():
    original_concat = pd.concat
    pd.concat = t.concat
    ....
    yield
    ....
    pd.concat = original_concat

it's not necessary though, might do more bad than good.

83 changes: 79 additions & 4 deletions owid/catalog/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,21 @@ def read(cls, path: Union[str, Path]) -> "Table":
path = path.as_posix()

if path.endswith(".csv"):
return cls.read_csv(path)
table = cls.read_csv(path)

elif path.endswith(".feather"):
return cls.read_feather(path)
table = cls.read_feather(path)

elif path.endswith(".parquet"):
return cls.read_parquet(path)
table = cls.read_parquet(path)
else:
raise ValueError(f"could not detect a suitable format to read from: {path}")

# # Add processing log to the metadata of each variable in the table.
# TODO: Unit tests fail when adding processing log to each variable.
# table = _add_processing_log_to_each_loaded_variable(table)

raise ValueError(f"could not detect a suitable format to read from: {path}")
return table

# Mypy complaints about this not matching the defintiion of NDFrame.to_csv but I don't understand why
def to_csv(self, path: Any, **kwargs: Any) -> None: # type: ignore
Expand Down Expand Up @@ -505,3 +511,72 @@ def reset_index(self, *args, **kwargs) -> Optional["Table"]: # type: ignore
# preserve metadata in _fields, calling reset_index() on a table drops it
t._fields = self._fields
return t # type: ignore

def merge(self, right, *args, **kwargs) -> "Table":
return merge(left=self, right=right, *args, **kwargs)

def melt(self, *args, **kwargs) -> "Table":
return melt(frame=self, *args, **kwargs)

def pivot(self, *args, **kwargs) -> "Table":
return pivot(data=self, *args, **kwargs)


def merge(left, right, *args, **kwargs) -> Table:
# TODO: This function needs further logic. For example, to handle "on"/"left_on"/"right_on" columns,
# or suffixes, or overlapping columns (that will end in "_x" and "_y" by default), or indexes.
tb = Table(pd.merge(left, right, *args, **kwargs))
columns_that_were_in_left = set(tb.columns) & set(left.columns)
columns_that_were_in_right = set(tb.columns) & set(right.columns)

for column in columns_that_were_in_left:
tb[column].metadata = left[column].metadata
for column in columns_that_were_in_right:
tb[column].metadata = right[column].metadata

return tb


# TODO: Handle metadata and processing info for each of the following functions.
def concat(*args, **kwargs) -> Table:
return Table(pd.concat(*args, **kwargs))


def melt(*args, **kwargs) -> Table:
return Table(pd.melt(*args, **kwargs))


def pivot(*args, **kwargs) -> Table:
return Table(pd.pivot(*args, **kwargs))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, perhaps the table level metadata should be copied (excl: primary key).



def read_csv(*args, **kwargs) -> Table:
return Table(pd.read_csv(*args, **kwargs))


def read_excel(*args, **kwargs) -> Table:
return Table(pd.read_excel(*args, **kwargs))


def _add_processing_log_to_each_loaded_variable(table):
# Add entry to processing log, specifying that each variable was loaded from this table.
# Generate a URI for the table.
table_uri = f"{table.metadata.dataset.uri}/{table.metadata.short_name}"
# Get the names of the columns currently used for the index of the table (if any).
index_columns = table.metadata.primary_key

# If the table has an index, reset it so we have access to all variables in the table.
if len(index_columns) > 0:
table = table.reset_index(drop=False)
for column in table.columns:
# If no processing log is found for a variable, start a new one.
if table[column].metadata.processing_log is None:
table[column].metadata.processing_log = []
# Append a new entry to the processing log.
table[column].metadata.processing_log.append(
[{"variable": column, "parents": [table_uri], "operation": "load"}]
)
if len(index_columns) > 0:
table = table.set_index(index_columns)

return table
207 changes: 205 additions & 2 deletions owid/catalog/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,33 @@

import json
from os import path
from typing import Any, Dict, Optional, cast
from typing import Any, Dict, List, Optional, Union, cast

import pandas as pd
import structlog
from pandas._typing import Scalar
from pandas.core.series import Series

from .meta import VariableMeta
from .meta import License, Source, VariableMeta
from .properties import metadata_property

log = structlog.get_logger()

SCHEMA = json.load(open(path.join(path.dirname(__file__), "schemas", "table.json")))
METADATA_FIELDS = list(SCHEMA["properties"])

# When creating a new variable, we need to pass a temporary name. For example, when doing tb["a"] + tb["b"]:
# * If variable.name is None, a ValueError is raised.
# * If variable.name = self.checked_name then the metadata of the first variable summed ("a") is modified.
# * If variable.name is always a random string (that does not coincide with an existing variable) then
# when replacing a variable (e.g. tb["a"] += 1) the original variable loses its metadata.
# For these reasons, we ensure that variable.name is always filled, even with a temporary name.
# In fact, if the new variable becomes a column in a table, its name gets overwritten by the column name (which is a
# nice feature). For example, when doing tb["c"] = tb["a"] + tb["b"], the variable name of "c" will be "c", even if we
# passed a temporary variable name. Therefore, this temporary name may be irrelevant in practice.
# TODO: Is there a better solution for these issues?
UNNAMED_VARIABLE = "**TEMPORARY UNNAMED VARIABLE**"


class Variable(pd.Series):
_name: Optional[str] = None
Expand Down Expand Up @@ -49,6 +66,9 @@ def name(self, name: str) -> None:
# make sure there is always a placeholder metadata object
if name not in self._fields:
self._fields[name] = VariableMeta()
# else:
# # See comments above, where UNNAMED_VARIABLE is defined, explaining this.
# name = UNNAMED_VARIABLE

self._name = name

Expand Down Expand Up @@ -87,10 +107,193 @@ def astype(self, *args: Any, **kwargs: Any) -> "Variable":
v.name = self.name
return cast(Variable, v)

# TODO: If I set the type hint of the following functions to -> "Variable" I get typing errors.

def __add__(self, other: Union[Scalar, Series, "Variable"]) -> Series:
# variable = super().__add__(other)
variable = Variable(self.values + other, name=UNNAMED_VARIABLE) # type: ignore
variable.metadata = combine_variables_metadata(variables=[self, other], operation="+", name=self.name)
return variable

def __sub__(self, other: Union[Scalar, Series, "Variable"]) -> Series:
# variable = super().__sub__(other)
variable = Variable(self.values - other, name=UNNAMED_VARIABLE) # type: ignore
variable.metadata = combine_variables_metadata(variables=[self, other], operation="-", name=self.name)
return variable

def __mul__(self, other: Union[Scalar, Series, "Variable"]) -> Series:
# variable = super().__mul__(other)
variable = Variable(self.values * other, name=UNNAMED_VARIABLE) # type: ignore
variable.metadata = combine_variables_metadata(variables=[self, other], operation="*", name=self.name)
return variable

def __truediv__(self, other: Union[Scalar, Series, "Variable"]) -> Series:
# variable = super().__truediv__(other)
variable = Variable(self.values / other, name=UNNAMED_VARIABLE) # type: ignore
variable.metadata = combine_variables_metadata(variables=[self, other], operation="/", name=self.name)
return variable

def __floordiv__(self, other: Union[Scalar, Series, "Variable"]) -> Series:
# variable = super().__floordiv__(other)
variable = Variable(self.values // other, name=UNNAMED_VARIABLE) # type: ignore
variable.metadata = combine_variables_metadata(variables=[self, other], operation="//", name=self.name)
return variable

def __mod__(self, other: Union[Scalar, Series, "Variable"]) -> Series:
# variable = super().__mod__(other)
variable = Variable(self.values % other, name=UNNAMED_VARIABLE) # type: ignore
variable.metadata = combine_variables_metadata(variables=[self, other], operation="%", name=self.name)
return variable

def __pow__(self, other: Union[Scalar, Series, "Variable"]) -> Series:
# For some reason, the following line modifies the metadata of the original variable.
# variable = super().__pow__(other)
# So, instead, we define a new variable.
variable = Variable(self.values**other, name=UNNAMED_VARIABLE) # type: ignore
variable.metadata = combine_variables_metadata(variables=[self, other], operation="**", name=self.name)
return variable

def fillna(self, value=None, *args, **kwargs) -> Series:
# variable = super().fillna(value)
# NOTE: Argument "inplace" will modify the original variable's data, but not its metadata.
# But we should not use "inplace" anyway.
if "inplace" in kwargs and kwargs["inplace"] is True:
log.warning("Avoid using fillna(inplace=True) may not handle metadata as expected.")
variable = Variable(super().fillna(value, *args, **kwargs), name=UNNAMED_VARIABLE) # type: ignore
variable.metadata = combine_variables_metadata(variables=[self, value], operation="fillna", name=self.name)
return variable

# TODO: Should we also include the "add", "sub", "mul", "truediv" methods here? For example
# def add(self, other: Union[Scalar, Series, "Variable"]) -> "Variable":
# return self.__add__(other=other)
# These methods have some additional arguments, namely axis='columns', level=None, fill_value=None that would need
# to be implemented here.


# dynamically add all metadata properties to the class
for k in VariableMeta.__dataclass_fields__:
if hasattr(Variable, k):
raise Exception(f'metadata field "{k}" would overwrite a Pandas built-in')

setattr(Variable, k, metadata_property(k))


def _combine_variable_units_or_short_units(variables: List[Variable], operation, unit_or_short_unit) -> Optional[str]:
# Gather units (or short units) of all variables.
units_or_short_units = pd.unique([getattr(variable.metadata, unit_or_short_unit) for variable in variables])
# Initialise the unit (or short unit) of the output variable.
unit_or_short_unit_combined = None
if operation in ["+", "-"]:
# If units (or short units) do not coincide among all variables, raise a warning and assign None.
if len(units_or_short_units) != 1:
log.warning(f"Different values of '{unit_or_short_unit}' detected among variables: {units_or_short_units}")
unit_or_short_unit_combined = None
else:
# Otherwise, assign the common unit.
unit_or_short_unit_combined = units_or_short_units[0]

return unit_or_short_unit_combined


def combine_variables_units(variables: List[Variable], operation: str) -> Optional[str]:
return _combine_variable_units_or_short_units(variables=variables, operation=operation, unit_or_short_unit="unit")


def combine_variables_short_units(variables: List[Variable], operation) -> Optional[str]:
return _combine_variable_units_or_short_units(
variables=variables, operation=operation, unit_or_short_unit="short_unit"
)


def _combine_variables_titles_and_descriptions(
variables: List[Variable], operation: str, title_or_description: str
) -> Optional[str]:
# Keep the title only if all variables have exactly the same title.
# Otherwise we assume that the variable has a different meaning, and its title should be manually handled.
title_or_description_combined = None
if operation in ["+", "-"]:
titles_or_descriptions = pd.unique([getattr(variable.metadata, title_or_description) for variable in variables])
if len(titles_or_descriptions) == 1:
title_or_description_combined = titles_or_descriptions[0]

return title_or_description_combined


def combine_variables_titles(variables: List[Variable], operation: str) -> Optional[str]:
return _combine_variables_titles_and_descriptions(
variables=variables, operation=operation, title_or_description="title"
)


def combine_variables_descriptions(variables: List[Variable], operation: str) -> Optional[str]:
return _combine_variables_titles_and_descriptions(
variables=variables, operation=operation, title_or_description="description"
)


def get_unique_sources_from_variables(variables: List[Variable]) -> List[Source]:
# Make a list of all sources of all variables.
sources = sum([variable.metadata.sources for variable in variables], [])

# Get unique array of tuples of source fields (respecting the order).
unique_sources_array = pd.unique([tuple(source.to_dict().items()) for source in sources])

# Make a list of sources.
unique_sources = [Source.from_dict(dict(source)) for source in unique_sources_array] # type: ignore

return unique_sources


def get_unique_licenses_from_variables(variables: List[Variable]) -> List[License]:
# Make a list of all licenses of all variables.
licenses = sum([variable.metadata.licenses for variable in variables], [])

# Get unique array of tuples of license fields (respecting the order).
unique_licenses_array = pd.unique([tuple(license.to_dict().items()) for license in licenses])

# Make a list of licenses.
unique_licenses = [License.from_dict(dict(license)) for license in unique_licenses_array]

return unique_licenses


def combine_variables_processing_logs(variables: List[Variable]):
# Make a list with all entries in the processing log of all variables.
# TODO: Currently, the processing log is not catching the name of the new variable created, but the first variable
# involved in the operation.
processing_log = sum(
[
variable.metadata.processing_log if variable.metadata.processing_log is not None else []
for variable in variables
],
[],
)

return processing_log


def combine_variables_metadata(
variables: List[Any], operation: str, name: Optional[str] = UNNAMED_VARIABLE
) -> VariableMeta:
# Initialise an empty metadata.
metadata = VariableMeta()

# Skip other objects passed in variables that may not contain metadata (e.g. a scalar).
variables_only = [variable for variable in variables if hasattr(variable, "metadata")]

# Combine each metadata field using the logic of the specified operation.
metadata.title = combine_variables_titles(variables=variables_only, operation=operation)
metadata.description = combine_variables_descriptions(variables=variables_only, operation=operation)
metadata.unit = combine_variables_units(variables=variables_only, operation=operation)
metadata.short_unit = combine_variables_short_units(variables=variables_only, operation=operation)
metadata.sources = get_unique_sources_from_variables(variables=variables_only)
metadata.licenses = get_unique_licenses_from_variables(variables=variables_only)
metadata.processing_log = combine_variables_processing_logs(variables=variables_only)

# List names of variables and scalars (or other objects passed in variables).
variables_and_scalars_names = [
variable.name if hasattr(variable, "name") else str(variable) for variable in variables
]
metadata.processing_log.extend([{"variable": name, "parents": variables_and_scalars_names, "operation": operation}])

return metadata
Loading