Skip to content

Commit

Permalink
Merge 0278f8e into b6a2638
Browse files Browse the repository at this point in the history
  • Loading branch information
naddeoa authored Apr 8, 2021
2 parents b6a2638 + 0278f8e commit e08bbd1
Show file tree
Hide file tree
Showing 10 changed files with 3,327 additions and 61 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ The whylogs library is integrated with the following:
- [Java and Apache Spark](https://github.com/whylabs/whylogs-java)
- AWS S3 (for output storage)
- Jupyter Notebooks
- MLflow

### Dependencies

Expand Down
33 changes: 33 additions & 0 deletions examples/anonymous_session_csv_with_timestamp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""
Log a series of dataframes grouped by date and send to WhyLabs for visualization
===============
Using data from a Kaggle dataset (https://www.kaggle.com/yugagrawal95/sample-media-spends-data), split
the dataset up by each day using the Calendar_Week column and log each of the data for that day using whylogs.
"""
import pandas as pd
from datetime import datetime
from whylogs.app.session import start_whylabs_session, LoggerKey

csv_file = "data/sample_media_spend.csv"

# Load some sample data
print(f"Loading {csv_file}")
csv_dataframe = pd.read_csv(csv_file)

# Create a WhyLabs logging session
# Note: data collection consent must be explicitly provided since we'll be uploading profiles to WhyLabs.
with start_whylabs_session(data_collection_consent=True) as session:
# Group each of the rows by the day they occur on using the date string in the Calendar_Week col
for day_string, dataframe_for_day in csv_dataframe.groupby(['Calendar_Week']):
# This dataset has dates of the form 9/5/2020
dt = datetime.strptime(day_string, '%m/%d/%Y')
print(f"Logging data for {day_string}")

# whylabs loggers are specific to the dataset's timestamp so we'll be using a different one for each
# date in our dataset.
logger = session.logger(args=LoggerKey(dataset_timestamp=dt))

# log the data to the logger. The logger will write this data out in binary form when it closes, which
# at the end of the with block in the session's internal logic.
logger.log_dataframe(dataframe_for_day)
3,052 changes: 3,052 additions & 0 deletions examples/data/sample_media_spend.csv

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions examples/log_df_to_whylabs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""
Log a dataframe and send the profile to WhyLabs for visualization
===============
Example for logging a dataframe and sending the results to WhyLabs, where the data can be explored further
"""
import pandas as pd
from whylogs.app.session import start_whylabs_session

# Load some sample data
df = pd.read_csv("data/lending_club_1000.csv")

# Create a WhyLabs logging session
# Note: data collection consent must be explicitly provided
with start_whylabs_session(data_collection_consent=True) as session:
# Log statistics for the dataset
# Resulting dataset profile(s) will be sent to WhyLabs,
# and you will receive a link to view the pretty charts!
with session.logger() as logger:
logger.log_dataframe(df)
2 changes: 2 additions & 0 deletions src/whylogs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from .core import ColumnProfile, DatasetProfile
from .app.session import get_or_create_session
from .app.session import reset_default_session
from .app.session import start_whylabs_session
from .mlflow import enable_mlflow

__all__ = [
Expand All @@ -12,6 +13,7 @@
"WriterConfig",
"enable_mlflow",
"get_or_create_session",
"start_whylabs_session",
"reset_default_session",
"__version__",
]
5 changes: 4 additions & 1 deletion src/whylogs/app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,14 @@ def __init__(
output_path: str,
path_template: Optional[str] = None,
filename_template: Optional[str] = None,
data_collection_consent: Optional[bool] = False,
):
self.type = type
self.formats = formats
self.output_path = output_path
self.path_template = path_template
self.filename_template = filename_template
self.data_collection_consent = data_collection_consent

def to_yaml(self, stream=None):
"""
Expand Down Expand Up @@ -179,7 +181,7 @@ class WriterConfigSchema(Schema):
Marshmallow schema for :class:`WriterConfig` class.
"""

type = fields.Str(validate=validate.OneOf(["local", "s3"]), required=True)
type = fields.Str(validate=validate.OneOf(["local", "s3", "whylabs"]), required=True)
formats = fields.List(
fields.Str(validate=validate.OneOf(ALL_SUPPORTED_FORMATS)),
required=True,
Expand All @@ -188,6 +190,7 @@ class WriterConfigSchema(Schema):
output_path = fields.Str(required=True)
path_template = fields.Str(required=False, allow_none=True)
filename_template = fields.Str(required=False, allow_none=True)
data_collection_consent = fields.Bool(required=False, allow_none=True)

@post_load
def make_writer(self, data, **kwargs):
Expand Down
171 changes: 111 additions & 60 deletions src/whylogs/app/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,63 @@
whylogs logging session
"""
import datetime
from dataclasses import dataclass
from logging import getLogger as _getLogger
from typing import Dict, List, Optional, Union
from uuid import uuid4

import pandas as pd

from whylogs.app.config import SessionConfig, WriterConfig, load_config
from whylogs.app.logger import Logger
from whylogs.app.writers import Writer, writer_from_config
from whylogs.app.logger import Logger, Segment
from whylogs.app.writers import Writer, writer_from_config, WhyLabsWriter
from whylogs.core import DatasetProfile
from whylogs.core.statistics.constraints import DatasetConstraints


@dataclass
class LoggerKey:
"""
Create a new logger or return an existing one for a given dataset name.
If no dataset_name is specified, we default to project name
Args:
metadata
dataset_name : str
Name of the dataset. Default is the project name
dataset_timestamp: datetime.datetime, optional
The timestamp associated with the dataset. Could be the timestamp
for the batch, or the timestamp
for the window that you are tracking
tags: dict
Tag the data with groupable information. For example, you might want to tag your data
with the stage information (development, testing, production etc...)
metadata: dict
Useful to debug the data source. You can associate non-groupable information in this field
such as hostname,
session_timestamp: datetime.datetime, optional
Override the timestamp associated with the session. Normally you
shouldn't need to override this value
segments:
Can be either:
- List of tag key value pairs for tracking datasetments
- List of tag keys for whylogs to split up the data in the backend
"""
dataset_name: Optional[str] = None
dataset_timestamp: Optional[datetime.datetime] = None
session_timestamp: Optional[datetime.datetime] = None
tags: Dict[str, str] = None
metadata: Dict[str, str] = None
segments: Optional[Union[List[Dict], List[str]]] = None
profile_full_dataset: bool = False
with_rotation_time: str = None
cache_size: int = 1
constraints: DatasetConstraints = None


defaultLoggerArgs = LoggerKey()


class Session:
"""
Parameters
Expand All @@ -38,7 +82,7 @@ def __init__(
writers: List[Writer],
verbose: bool = False,
with_rotation_time: str = None,
cache_size: int = None
cache_size: int = None,
):
if writers is None:
writers = []
Expand All @@ -56,8 +100,18 @@ def __init__(
self.with_rotation_time = with_rotation_time
self.cache_size = cache_size

# enable special logic when starting/closing a Session if we're using whylabs client to save dataset profiles
whylabs_writer_is_present = any(isinstance(w, WhyLabsWriter) for w in self.writers)
self.use_whylabs_writer = _use_whylabs_client or whylabs_writer_is_present

# add WhyLabs writer if it's not already present (which can happen if it's not specified in the config)
if _use_whylabs_client and whylabs_writer_is_present is False:
self.writers.append(WhyLabsWriter(output_path=None, formats=["protobuf"]))

def __enter__(self):
# TODO: configure other aspects
if self.use_whylabs_writer:
from whylogs.whylabs_client.wrapper import start_session
start_session()
return self

def __exit__(self, tpe, value, traceback):
Expand All @@ -66,96 +120,73 @@ def __exit__(self, tpe, value, traceback):
def __repr__(self):
return self._config.to_yaml()

def get_config(self,):
def get_config(self, ):
return self._config

def is_active(self):
return self._active

def logger(
self,
dataset_name: Optional[str] = None,
dataset_timestamp: Optional[datetime.datetime] = None,
session_timestamp: Optional[datetime.datetime] = None,
tags: Dict[str, str] = None,
metadata: Dict[str, str] = None,
segments: Optional[Union[List[Dict], List[str]]] = None,
profile_full_dataset: bool = False,
with_rotation_time: str = None,
cache_size: int = 1,
constraints: DatasetConstraints = None,
self,
dataset_name: Optional[str] = None,
dataset_timestamp: Optional[datetime.datetime] = None,
session_timestamp: Optional[datetime.datetime] = None,
tags: Dict[str, str] = None,
metadata: Dict[str, str] = None,
segments: Optional[List[Segment]] = None,
profile_full_dataset: bool = False,
with_rotation_time: str = None,
cache_size: int = 1,
constraints: DatasetConstraints = None,
) -> Logger:
"""
Create a new logger or return an existing one for a given dataset name.
If no dataset_name is specified, we default to project name
Parameters
----------
metadata
dataset_name : str
Name of the dataset. Default is the project name
dataset_timestamp: datetime.datetime, optional
The timestamp associated with the dataset. Could be the timestamp
for the batch, or the timestamp
for the window that you are tracking
tags: dict
Tag the data with groupable information. For example, you might want to tag your data
with the stage information (development, testing, production etc...)
metadata: dict
Useful to debug the data source. You can associate non-groupable information in this field
such as hostname,
session_timestamp: datetime.datetime, optional
Override the timestamp associated with the session. Normally you
shouldn't need to override this value
segments:
Can be either:
- List of tag key value pairs for tracking datasetments
- List of tag keys for whylogs to split up the data in the backend
args: LoggerKey
The properties of the logger if they're anything but the defaults.
Returns
-------
ylog : whylogs.app.logger.Logger
whylogs logger
"""
if tags is None:
tags = {}

if not self._active:
raise RuntimeError(
"Session is already closed. Cannot create more loggers")

if dataset_name is None:
# using the project name for the datasetname
dataset_name = self.project

if session_timestamp is None:
session_timestamp = self._session_time
if with_rotation_time is None:
with_rotation_time = self.with_rotation_time

# remove inactive loggers first
for name, logger in list(self._loggers.items()):
if not logger.is_active():
self._loggers.pop(name)

logger = self._loggers.get(dataset_name)
logger_key = str(LoggerKey(
dataset_name=dataset_name,
dataset_timestamp=dataset_timestamp,
session_timestamp=session_timestamp,
tags=tags,
metadata=metadata,
segments=segments,
profile_full_dataset=profile_full_dataset,
with_rotation_time=with_rotation_time,
cache_size=cache_size,
constraints=constraints
))
logger = self._loggers.get(logger_key)

if logger is None:
logger = Logger(
session_id=self._session_id,
dataset_name=dataset_name,
dataset_name=dataset_name or self.project,
dataset_timestamp=dataset_timestamp,
session_timestamp=session_timestamp,
session_timestamp=session_timestamp or self._session_time,
writers=self.writers,
tags=tags,
tags=tags or {},
metadata=metadata,
verbose=self.verbose,
with_rotation_time=with_rotation_time,
with_rotation_time=with_rotation_time or self.with_rotation_time,
segments=segments,
profile_full_dataset=profile_full_dataset,
cache_size=cache_size,
constraints=constraints,
)
self._loggers[dataset_name] = logger
self._loggers[logger_key] = logger

return logger

Expand Down Expand Up @@ -296,6 +327,11 @@ def close(self):
logger.close()
self.remove_logger(name)

if self.use_whylabs_writer:
from whylogs.whylabs_client.wrapper import end_session
url = end_session()
print(f"You can explore your data in Observatory here: {url}")

def remove_logger(self, dataset_name: str):
"""
Remove a logger from the dataset. This is called by the logger when it's being closed
Expand All @@ -318,12 +354,17 @@ def remove_logger(self, dataset_name: str):
self._loggers.pop(dataset_name)


#: Global flag for whether whylabs client should be used
_use_whylabs_client = False


def session_from_config(config: SessionConfig) -> Session:
"""
Construct a whylogs session from a `SessionConfig`
"""
writers = list(map(lambda x: writer_from_config(x), config.writers))
return Session(config.project, config.pipeline, writers, config.verbose, config.with_rotation_time, config.cache_size)
return Session(config.project, config.pipeline, writers, config.verbose, config.with_rotation_time,
config.cache_size)


#: A global session
Expand All @@ -346,6 +387,16 @@ def reset_default_session():
_session = session_from_config(config)


def start_whylabs_session(path_to_config: Optional[str] = None, data_collection_consent: Optional[bool] = None):
if not data_collection_consent:
raise PermissionError(
"When creating a session that will send data to WhyLabs, data_collection_consent must be set to True")

global _use_whylabs_client
_use_whylabs_client = True
return get_or_create_session(path_to_config)


def get_or_create_session(path_to_config: Optional[str] = None):
"""
Retrieve the current active global session.
Expand Down
Loading

0 comments on commit e08bbd1

Please sign in to comment.