Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
see Changlog for summary.

* dev: (39 commits)
  version 0.5.0
  formatting
  fix a deep merge issue
  fix bug in merge for concepts
  use just name to change column dtype
  remove the type preserve logic
  bug fix
  bug fix
  handle empty dict for filter row
  bug fix in merge
  preserve data types in run_op
  update merge procedure
  make separated file for each procedure
  enable dask in merge()
  update ihme downloader
  bug fix for translate_column
  bug fix: change value to string before concat with other string
  (#107) add default options to read_csv in datapackage reader.
  (#106) add drop empty datapoints option
  bug fix: run_op failed when there is only one indicator
  ...
  • Loading branch information
semio committed Oct 17, 2018
2 parents 7909389 + bc6f97b commit 1ebd142
Show file tree
Hide file tree
Showing 40 changed files with 3,166 additions and 1,557 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## version 0.5.0 2018-10-17

- split `procedure.py` into seperated files for each procedure
- bug fix, improvments for procedures
- bug fix, improvments for datapackage loading and creation of datapackage.json
- more factory methods for data sources

## version 0.4.2 2018-04-21

- bug fix for various procedures
Expand Down
2 changes: 1 addition & 1 deletion ddf_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-

__version__ = '0.4.2'
__version__ = '0.5.0'

from . import (str, cli, datapackage, i18n, io,
patch, qa, transformer, factory)
Expand Down
64 changes: 64 additions & 0 deletions ddf_utils/chef/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import sys
import hashlib
import logging
import json
from functools import wraps, partial
from collections import Sequence, Mapping
from time import time
Expand All @@ -24,6 +25,69 @@ def create_dsk(data, parts=10):
return data


def dsk_to_pandas(data):
"""The reverse for create_dsk function"""
for k, v in data.items():
if isinstance(v, dd.DataFrame):
data[k] = v.compute()
return data


def build_dictionary(chef, dict_def, ignore_case=False):
"""build a dictionary from a dictionary definition"""
if (len(dict_def) == 3 and
'base' in dict_def and
'key' in dict_def and
'value' in dict_def):
value = dict_def['value']
key = dict_def['key']
if isinstance(key, str):
keys = [key]
else:
keys = key
ingredient = chef.dag.node_dict[dict_def['base']].evaluate()
if ingredient.dtype == 'synonyms':
df = ingredient.get_data()[value]
elif ingredient.dtype == 'entities':
df = ingredient.get_data()[ingredient.key]
else:
raise NotImplementedError('unsupported data type {}'.format(ingredient.dtype))
return build_dictionary_from_dataframe(df, keys, value, ignore_case)
elif isinstance(dict_def, str):
base_path = chef.config['dictionaries_dir']
path = os.path.join(base_path, dict_def)
return build_dictionary_from_file(path)
else:
return dict_def


def build_dictionary_from_file(file_path):
d = json.load(open(file_path, 'r'))
assert isinstance(d, dict)
return d


def build_dictionary_from_dataframe(df, keys, value, ignore_case=False):
dic = dict()
for k in keys:
d = (df[[k, value]]
.dropna(how='any')
.set_index(k)[value]
.to_dict())
for i, v in d.items():
if ignore_case:
if isinstance(i, str):
i = i.lower()
if i not in dic.keys():
dic[i] = v
elif dic[i] != v:
raise KeyError("ambiguous key: {} is mapped "
"to both {} and {}".format(i, dic[i], v))
else:
continue
return dic


def prompt_select(selects, text_before=None):
"""ask user to choose in a list of options"""
def value_proc(v):
Expand Down
63 changes: 39 additions & 24 deletions ddf_utils/chef/ingredient.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
import dask.dataframe as dd
from ddf_utils.model.package import Datapackage
from ddf_utils.model.repo import Repo, is_url
from itertools import product

from ..str import format_float_digits
from .exceptions import IngredientError
from .helpers import gen_sym, query, read_opt, sort_df


class BaseIngredient(object):
class BaseIngredient:
def __init__(self, chef, ingred_id, key, data=None):
self.chef = chef
self.ingred_id = ingred_id
Expand All @@ -38,10 +39,12 @@ def dtype(self):
if len(keys) == 1:
if keys[0] == 'concept':
return 'concepts'
else:
return 'entities'
else:
return 'datapoints'
return 'entities'

if 'synonym' in keys:
return 'synonyms'

return 'datapoints'

def key_to_list(self):
"""helper function: make a list that contains primaryKey of this ingredient"""
Expand All @@ -63,21 +66,22 @@ def get_data(self):

def reset_data(self):
self.data = None
return

def _serve_concepts(self, outpath, **options):
data = self.compute()
assert isinstance(data, dict)
assert len(data) == 1
for _, df in data.items():
filename = read_opt(options, 'file_name', default='ddf--concepts.csv')
for _, df_ in data.items():
# change boolean into string
# and remove tailing spaces
df = df_.copy()
for i, v in df.dtypes.iteritems():
if v == 'bool':
df[i] = df[i].map(lambda x: str(x).upper())
if v == 'object':
df[i] = df[i].str.strip()
path = os.path.join(outpath, 'ddf--concepts.csv')
path = os.path.join(outpath, filename)
df = sort_df(df, key='concept')
df.to_csv(path, index=False, encoding='utf8')

Expand Down Expand Up @@ -110,7 +114,7 @@ def _serve_entities(self, outpath, **options):
col = 'is--'+s
df_ = df[df[col]=='TRUE'].dropna(axis=1, how='all')
if df_.empty:
logging.warning("empty dataframe for {}, not serving".format(s))
logging.warning("empty dataframe for %s, not serving", str(s))
continue
df_ = df_.loc[:, lambda x: ~x.columns.str.startswith('is--')].copy()
df_[col] = 'TRUE'
Expand Down Expand Up @@ -139,6 +143,8 @@ def _serve_datapoints(self, outpath, **options):
data = self.compute()
assert isinstance(data, dict)
digits = read_opt(options, 'digits', default=5)
serve_empty = read_opt(options, 'drop_empty_datapoints', default=True)
split_by = read_opt(options, 'split_datapoints_by', default=False)

def to_disk(df_input, k, path):
df = df_input.copy()
Expand All @@ -153,7 +159,8 @@ def to_disk(df_input, k, path):
df.to_csv(path, encoding='utf8', index=False)

for k, df in data.items():
split_by = read_opt(options, 'split_datapoints_by', default=False)
if not serve_empty and df.empty:
continue
by = self.key_to_list()
df = sort_df(df, key=by)
if not split_by:
Expand All @@ -166,7 +173,6 @@ def to_disk(df_input, k, path):
# split datapoints by entities. Firstly we calculate all possible
# combinations of entities, and then filter the dataframe, create
# file names and save them into disk.
from itertools import product
values = list()
[values.append(df[col].unique()) for col in split_by]
all_combinations = product(*values)
Expand All @@ -182,11 +188,12 @@ def to_disk(df_input, k, path):
query = ''
entity_strings = list()
for entity, value in zip(split_by, comb):
entity_strings.append(entity + '-' + value)
value_ = str(value)
entity_strings.append(entity + '-' + value_)
if len(query) > 0:
query = query + " and " + "{} == '{}'".format(entity, value)
query = query + " and " + "{} == '{}'".format(entity, value_)
else:
query = "{} == '{}'".format(entity, value)
query = "{} == '{}'".format(entity, value_)

if by:
path = os.path.join(
Expand All @@ -210,7 +217,7 @@ def to_disk(df_input, k, path):
to_disk(df_part[columns], k, path)

def serve(self, outpath, **options):
"""save the ingledient to disk.
"""save the ingredient to disk.
Parameters
----------
Expand All @@ -223,13 +230,16 @@ def serve(self, outpath, **options):
how many digits to keep at most.
path : `str`
which sub-folder under the outpath to save the output files
split_datapoints_by: `str` or `list`
split datapoints by a dimension or a list of dimensions.
drop_empty_datapoints: bool
whether serve measures with no datapoints.
"""
logging.info('serving ingredient: ' + self.ingred_id)
# create outpath if not exists
if 'path' in options:
sub_folder = options.pop('path')
assert not os.path.isabs(sub_folder) # sub folder should not be abspath
assert not os.path.isabs(sub_folder), "`path` in serve section should not be absolute"
outpath = os.path.join(outpath, sub_folder)
os.makedirs(outpath, exist_ok=True)

Expand All @@ -240,6 +250,8 @@ def serve(self, outpath, **options):
self._serve_concepts(outpath, **options)
elif t == 'entities':
self._serve_entities(outpath, **options)
elif t == 'synonyms':
raise NotImplementedError('can not serve synonyms now.')
else:
raise IngredientError('Not a correct collection: ' + t)

Expand Down Expand Up @@ -504,12 +516,14 @@ def _get_data_concepts(self):
kw = list(self.values.keys())[0]
if kw == ['$in']:
return {'concept': df[self.values[kw]]}
else:
return {'concept': df[df.columns.drop(self.values[kw])]}
else:
return {'concept': df[self.values]}
else:
return {'concept': df}
return {'concept': df[df.columns.drop(self.values[kw])]}
return {'concept': df[self.values]}
return {'concept': df}

def _get_data_synonyms(self):
ks = self.key_to_list()
ks.remove('synonym')
return {ks[0]: self.ddf.synonyms[ks[0]]}

def get_data(self, copy=False, key_as_index=False):
"""read in and return the ingredient data
Expand Down Expand Up @@ -548,7 +562,8 @@ def _get_data_ddf(self, copy=False, key_as_index=False):
funcs = {
'datapoints': self._get_data_datapoint,
'entities': self._get_data_entities,
'concepts': self._get_data_concepts
'concepts': self._get_data_concepts,
'synonyms': self._get_data_synonyms
}

def filter_row(df, avaliable_scopes):
Expand Down

0 comments on commit 1ebd142

Please sign in to comment.