Skip to content

Commit

Permalink
sytle and datum refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
lalmei committed Jan 4, 2021
1 parent a8e1123 commit 18778b1
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 55 deletions.
23 changes: 18 additions & 5 deletions src/whylogs/app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
.. autodata:: ALL_SUPPORTED_FORMATS
"""
from logging import getLogger
from typing import List
from typing import List, Dict, Union, Optional

import typing
# import typing
import yaml as yaml
from marshmallow import Schema, fields, post_load, validate

Expand All @@ -18,6 +18,10 @@
"""Supported output formats for whylogs writer configuration"""


SegmentTag = Dict[str, any]
SegmentTags = List[SegmentTag]


class WriterConfig:
"""
Config for whylogs writers
Expand Down Expand Up @@ -56,8 +60,8 @@ def __init__(
type: str,
formats: List[str],
output_path: str,
path_template: typing.Optional[str] = None,
filename_template: typing.Optional[str] = None,
path_template: Optional[str] = None,
filename_template: Optional[str] = None,
):
self.type = type
self.formats = formats
Expand Down Expand Up @@ -116,6 +120,14 @@ class SessionConfig:
A list of `WriterConfig` objects defining writer outputs
verbose : bool, default=False
Output verbosity
with_rotation_time: str, default = None, to rotate profiles with time, takes values of overall rotation interval,
"s" for seconds
"m" for minutes
"h" for hours
"d" for days
cache: int default =1, sets how many dataprofiles to cache in logger during rotation
segments: List
"""

def __init__(
Expand All @@ -126,6 +138,8 @@ def __init__(
verbose: bool = False,
with_rotation_time: str = None,
cache: int = None,
segments: Optional[Union[List[str], List[SegmentTags]]] = None,
full_dataset_profile: bool = True,
):
self.project = project
self.pipeline = pipeline
Expand Down Expand Up @@ -250,4 +264,3 @@ def load_config():
logger.warning("Failed to load YAML config", e)
pass
return None

87 changes: 55 additions & 32 deletions src/whylogs/app/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ def __init__(self,
writers=List[Writer],
verbose: bool = False,
with_rotation_time: Optional[str] = None,
interval: int = 1,
cache: int = 1,
segments: Optional[Union[List[SegmentTags], List[str]]] = None,
profile_full_dataset: bool = False,
interval: int = 1
):
"""
"""
Expand All @@ -81,7 +81,7 @@ def __init__(self,
self.set_segments(segments)

self._profiles = []
self.intialize_profiles(dataset_timestamp)
self._intialize_profiles(dataset_timestamp)
# intialize to seconds in the day
self.interval = interval
self.with_rotation_time = with_rotation_time
Expand Down Expand Up @@ -109,6 +109,16 @@ def segmented_profiles(self,)->Dict[str, DatasetProfile]:
"""
return self._profiles[-1]["segmented_profiles"]

def get_segment(self, segment_tags: SegmentTags, copy: bool = False)->Optional[DatasetProfile]:

hashed_seg = hash_segment(segment_tags)
segment_profile = self._profiles[-1]["segmented_profiles"].get(
hashed_seg, None)
if copy and (segment_profile is not None):
return segment_profile.copy()
else:
return segment_profile

def set_segments(self, segments: Union[List[SegmentTags], List[str]]) -> None:
if segments:
if all(isinstance(elem, str) for elem in segments):
Expand All @@ -121,8 +131,8 @@ def set_segments(self, segments: Union[List[SegmentTags], List[str]]) -> None:
self.segments = None
self.segment_type = None

def intialize_profiles(self,
dataset_timestamp: Optional[datetime.datetime] = datetime.datetime.now(datetime.timezone.utc)) -> None:
def _intialize_profiles(self,
dataset_timestamp: Optional[datetime.datetime] = datetime.datetime.now(datetime.timezone.utc)) -> None:

full_profile = None
if self.full_profile_check():
Expand Down Expand Up @@ -204,7 +214,7 @@ def _rotate_time(self):
if len(self._profiles) > self.cache:
self._profiles[-self.cache-1] = None

self.intialize_profiles()
self._intialize_profiles()

# compute new rotate_at and while loop in case current function
# takes longer than interval
Expand Down Expand Up @@ -283,7 +293,7 @@ def log(
:param feature_name: a dictionary of key->value for multiple features. Each entry represent a single columnar feature
:param feature_name: name of a single feature. Cannot be specified if 'features' is specified
:param value: value of as single feature. Cannot be specified if 'features' is specified
:param segment: define either a list of egment keys or a list of segments tags:
:param segment: define either a list of egment keys or a list of segments tags:
`[{"key":<featurename>,"value": <featurevalue>},... ]`
:param profile_full_data: when segmenting dataset, an option to keep the full unsegmented profile of the dataset.
Expand All @@ -307,22 +317,35 @@ def log(

if features is not None:
# full profile
if self.full_profile_check():
self._profiles[-1]["full_profile"].track(features)

if self.segments:
self.log_segments(pd.DataFrame(features))
self.log_dataframe(pd.DataFrame([features]))
else:
if full_profile_check():
if self.full_profile_check():
self._profiles[-1]["full_profile"].track_datum(
feature_name, value)

if self.segments:
if (segment_type == "keys") and (feature_name in self.segments):
pass
self.log_segment_datum(feature_name, value)

def log_segment_datum(self, feature_name, value):
segment_tags = []
segment_tags.append(
{"key": feature_name, "value": value})
segment_profile = self.get_segment(segment_tags)
if (segment_type == "keys"):
if (feature_name in self.segments):

if segment_profile is None:
return
else:
for each_profile in self._profiles[-1]["segmented_profiles"]:
each_profile.track_datum(feature_name, value)
segment_profile.track_datum(feature_name, value)
else:
for each_profile in self._profiles[-1]["segmented_profiles"]:
each_profile.track_datum(feature_name, value)
elif (segment_type == "set"):
if segment_tags not in self.segments:
return
else:
segment_profile.track_datum(feature_name, value)

def log_csv(self,
filepath_or_buffer: FilePathOrBuffer,
Expand All @@ -347,12 +370,7 @@ def log_csv(self,
self.set_segments(segments)

df = pd.read_csv(filepath_or_buffer, **kwargs)

if self.full_profile_check():
self._profiles[-1]["full_profile"].track_dataframe(df)

if self.segments:
self.log_segments(df)
self.log_dataframe(df)

def log_dataframe(self, df,
segments: Optional[Union[List[SegmentTags], List[str]]] = None,
Expand Down Expand Up @@ -390,12 +408,11 @@ def log_segments_keys(self, data):
try:
grouped_data = data.groupby(self.segments)
except KeyError as e:
return
raise e

segments = grouped_data.groups.keys()

for each_segment in segments:
# assert len(each_segment) == len(self.segments)
try:
segment_df = grouped_data.get_group(each_segment)
segment_tags = []
Expand All @@ -408,24 +425,29 @@ def log_segments_keys(self, data):

def log_fixed_segments(self, data):

for each_seg in self.segments:
segment_keys = [feature["key"] for feature in each_seg]
seg = tuple([feature["value"] for feature in each_seg])
# we group each segment seperately since the segment tags are allowed
# to overlap

for segment_tag in self.segments:
# create keys
segment_keys = [feature["key"] for feature in segment_tag]
seg = tuple([feature["value"] for feature in segment_tag])

grouped_data = data.groupby(segment_keys)

if len(seg) == 1:
seg = seg[0]
# check if segment exist
if seg not in grouped_data.groups:
continue
segment_df = grouped_data.get_group(seg)
self.log_df_segment(segment_df, each_seg)

self.log_df_segment(segment_df, segment_tag)

def log_df_segment(self, df, segment_tags: SegmentTags):
segment_tags = sorted(segment_tags, key=lambda x: x["key"])
# check if segment is being tracked
hashed_seg = hash_segment(segment_tags)
segment_profile = self._profiles[-1]["segmented_profiles"].get(
hashed_seg, None)

segment_profile = self.get_segment(segment_tags)
if segment_profile is None:
segment_profile = DatasetProfile(
self.dataset_name,
Expand All @@ -436,6 +458,7 @@ def log_df_segment(self, df, segment_tags: SegmentTags):
session_id=self.session_id
)
segment_profile.track_dataframe(df)
hashed_seg = hash_segment(segment_tags)
self._profiles[-1]["segmented_profiles"][hashed_seg] = segment_profile
else:
segment_profile.track_dataframe(df)
Expand Down
9 changes: 6 additions & 3 deletions src/whylogs/core/columnprofile.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ def to_summary(self):
opts = dict(
counters=self.counters.to_protobuf(),
frequent_items=self.frequent_items.to_summary(),
unique_count=self.cardinality_tracker.to_summary(_UNIQUE_COUNT_BOUNDS_STD),
unique_count=self.cardinality_tracker.to_summary(
_UNIQUE_COUNT_BOUNDS_STD),
)
if self.string_tracker is not None and self.string_tracker.count > 0:
opts["string_summary"] = self.string_tracker.to_summary()
Expand Down Expand Up @@ -190,6 +191,8 @@ def from_protobuf(message):
schema_tracker=SchemaTracker.from_protobuf(message.schema),
number_tracker=NumberTracker.from_protobuf(message.numbers),
string_tracker=StringTracker.from_protobuf(message.strings),
frequent_items=FrequentItemsSketch.from_protobuf(message.frequent_items),
cardinality_tracker=HllSketch.from_protobuf(message.cardinality_tracker),
frequent_items=FrequentItemsSketch.from_protobuf(
message.frequent_items),
cardinality_tracker=HllSketch.from_protobuf(
message.cardinality_tracker),
)
7 changes: 6 additions & 1 deletion tests/unit/app/test_segments.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import os
import shutil
import pytest
from freezegun import freeze_time
from pandas import util
import datetime
import json
import hashlib

from whylogs.app.config import load_config
from whylogs.app.session import session_from_config, get_or_create_session
from whylogs.app.config import SessionConfig, WriterConfig
Expand Down Expand Up @@ -91,8 +94,10 @@ def test_segments_with_rotation(df_lending_club, tmpdir):
frozen_time.tick(delta=datetime.timedelta(seconds=1))
logger.log_dataframe(df_lending_club)
frozen_time.tick(delta=datetime.timedelta(seconds=1))

df = util.testing.makeDataFrame()
logger.log_dataframe(df)
with pytest.raises(KeyError):
logger.log_dataframe(df)
output_files = []
for root, subdirs, files in os.walk(output_path):
output_files += files
Expand Down
35 changes: 21 additions & 14 deletions tests/unit/cli/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ def test_init_empty_dir(tmp_path):
with open(project_directory/"plain.txt", 'a'):
os.utime(project_directory/"plain.txt", None)
result = runner.invoke(
client.cli, ['init','--project-dir', project_directory],input='no\n'.format(project_directory))
client.cli, ['init', '--project-dir', project_directory], input='no\n'.format(project_directory))

assert result.exit_code == 0


def test_init_dir(tmp_path):

runner = CliRunner()
Expand All @@ -27,53 +28,59 @@ def test_init_dir(tmp_path):
project_directory.mkdir()

result = runner.invoke(
client.cli, ['-v','init','--project-dir', project_directory],input='yes\ntest\ntest\n{}\n\n'.format(project_directory))
client.cli, ['-v', 'init', '--project-dir', project_directory], input='yes\ntest\ntest\n{}\n\n'.format(project_directory))

assert result.exit_code == 0


def test_demo_init_empty_dir(tmp_path):

runner = CliRunner()

project_directory = tmp_path / "sub"
project_directory.mkdir()

with open(project_directory/"plain.txt", 'a'):
os.utime(project_directory/"plain.txt", None)
result = runner.invoke(democli, ['-v','init','--project-dir', project_directory],input='no\n'.format(tmp_path))

result = runner.invoke(democli, [
'-v', 'init', '--project-dir', project_directory], input='no\n'.format(tmp_path))

assert result.exit_code == 0

os.remove(project_directory/"plain.txt")


def test_demo_init_dir(tmpdir):

runner = CliRunner()

project_directory = tmpdir.mkdir("sub")

result = runner.invoke(democli, ['-v','init','--project-dir', project_directory],input='yes\ntest\ntest\n{}\nyes\n1\nyes\n'.format(project_directory))

result = runner.invoke(democli, ['-v', 'init', '--project-dir', project_directory],
input='yes\ntest\ntest\n{}\nyes\n1\nyes\n'.format(project_directory))

assert result.exit_code == 0


def test_demo_noweb_dir(tmpdir):

runner = CliRunner()

project_directory = tmpdir.mkdir("sub")

result = runner.invoke(democli, ['-v','init','--project-dir', project_directory],input='yes\ntest\ntest\n{}\nyes\n1\nno\n'.format(project_directory))

result = runner.invoke(democli, ['-v', 'init', '--project-dir', project_directory],
input='yes\ntest\ntest\n{}\nyes\n1\nno\n'.format(project_directory))

assert result.exit_code == 0


def test_demo_no_init_prof_dir(tmpdir):

runner = CliRunner()

project_directory = tmpdir.mkdir("sub")

result = runner.invoke(democli, ['-v', 'init', '--project-dir', project_directory],
input='yes\ntest\ntest\n{}\nno\n'.format(project_directory))

result = runner.invoke(democli, ['-v','init','--project-dir', project_directory],input='yes\ntest\ntest\n{}\nno\n'.format(project_directory))

assert result.exit_code == 0

0 comments on commit 18778b1

Please sign in to comment.