Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial commit #1

Merged
merged 1 commit into from Dec 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions .idea/dictionaries/nishiba.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

87 changes: 86 additions & 1 deletion README.md
@@ -1,2 +1,87 @@
# gokart
A wrapper of the data pipeline library "luigi"
A wrapper of the data pipeline library "luigi".


## How to Use
Please use gokart.TaskOnKart instead of luigi.Task to define your tasks.


### Basic Task with M3Task
```python
import gokart

class BasicTask(gokart.TaskOnKart):
def requires(self):
return TaskA()

def output(self):
# please use TaskOnKart.make_target to make Target.
return self.make_target('basic_task.csv')

def run(self):
# load data which TaskA output
texts = self.load()

# do something with texts, and make results.

# save results with the file path {self.workspace_directory}/basic_task_{unique_id}.csv
self.dump(results)
```

### Details of base functions
#### Make Target with TaskOnKart
`TaskOnKart.make_target` judge `Target` type by the passed path extension. The following extensions are supported.

- pkl
- txt
- csv
- tsv
- gz

#### Make Target for models which generate multiple files in saving.
`TaskOnKart.make_model_target` and `TaskOnKart.dump` are designed to save and load models like gensim.model.Word2vec.
```python
class TrainWord2Vec(TaskOnKart):
def output(self):
# please use 'zip'.
return self.make_model_target(
'model.zip',
save_function=gensim.model.Word2Vec.save,
load_function=gensim.model.Word2Vec.load)

def run(self):
# make word2vec
self.dump(word2vec)
```

#### Load input data
##### Pattern 1: Load input data individually.
```python
def requires(self):
return dict(data=LoadItemData(), model=LoadModel())

def run(self):
# pass a key in the dictionary `self.requires()`
data = self.load('data')
model = self.load('model')
```

##### Pattern 2: Load input data at once
```python
def run(self):
input_data = self.load()
"""
The above line is equivalent to the following:
input_data = dict(data=self.load('data'), model=self.load('model'))
"""
```


#### Load input data as pd.DataFrame
```python
def requires(self):
return LoadDataFrame()

def run(self):
data = self.load_data_frame(required_columns={'id', 'name'})
```
3 changes: 3 additions & 0 deletions gokart/__init__.py
@@ -0,0 +1,3 @@
from gokart.run import run
from gokart.parameter import TaskInstanceParameter
from gokart.task import TaskOnKart
136 changes: 136 additions & 0 deletions gokart/file_processor.py
@@ -0,0 +1,136 @@
import pickle
import os
from abc import abstractmethod
from logging import getLogger

import luigi
import luigi.contrib.s3
import luigi.format
import pandas as pd
import pandas.errors

logger = getLogger(__name__)


class FileProcessor(object):
@abstractmethod
def format(self):
pass

@abstractmethod
def load(self, file):
pass

@abstractmethod
def dump(self, obj, file):
pass


class _LargeLocalFileReader(object):
def __init__(self, file) -> None:
self._file = file

def __getattr__(self, item):
return getattr(self._file, item)

def read(self, n):
if n >= (1 << 31):
logger.info(f'reading a large file with total_bytes={n}.')
buffer = bytearray(n)
idx = 0
while idx < n:
batch_size = min(n - idx, 1 << 31 - 1)
logger.info(f'reading bytes [{idx}, {idx + batch_size})...')
buffer[idx:idx + batch_size] = self._file.read(batch_size)
idx += batch_size
logger.info('done.')
return buffer
return self._file.read(n)


class PickleFileProcessor(FileProcessor):
def format(self):
return luigi.format.Nop

def load(self, file):
if isinstance(file, luigi.contrib.s3.ReadableS3File):
return pickle.loads(file.read())
return pickle.load(_LargeLocalFileReader(file))

def dump(self, obj, file):
self._write(pickle.dumps(obj, protocol=4), file)

@staticmethod
def _write(buffer, file):
n = len(buffer)
idx = 0
while idx < n:
logger.info(f'writing a file with total_bytes={n}...')
batch_size = min(n - idx, 1 << 31 - 1)
logger.info(f'writing bytes [{idx}, {idx + batch_size})')
file.write(buffer[idx:idx + batch_size])
idx += batch_size
logger.info('done')


class TextFileProcessor(FileProcessor):
def format(self):
return None

def load(self, file):
return [s.rstrip() for s in file.readlines()]

def dump(self, obj, file):
if isinstance(obj, list):
for x in obj:
file.write(str(x) + '\n')
else:
file.write(str(obj))


class CsvFileProcessor(FileProcessor):
def __init__(self, sep=','):
self._sep = sep
super(CsvFileProcessor, self).__init__()

def format(self):
return None

def load(self, file):
try:
return pd.read_csv(file, sep=self._sep)
except pd.errors.EmptyDataError:
return pd.DataFrame()

def dump(self, obj, file):
assert isinstance(obj, pd.DataFrame), f'requires pd.DataFrame, but {type(obj)} is passed.'
obj.to_csv(file, index=False, sep=self._sep)


class GzipFileProcessor(FileProcessor):
def format(self):
return luigi.format.Gzip

def load(self, file):
return [s.rstrip().decode() for s in file.readlines()]

def dump(self, obj, file):
if isinstance(obj, list):
for x in obj:
file.write((str(x) + '\n').encode())
else:
file.write(str(obj).encode())


def make_file_processor(file_path: str) -> FileProcessor:
extension2processor = {
'.txt': TextFileProcessor(),
'.csv': CsvFileProcessor(sep=','),
'.tsv': CsvFileProcessor(sep='\t'),
'.pkl': PickleFileProcessor(),
'.gz': GzipFileProcessor(),
}

extension = os.path.splitext(file_path)[1]
assert extension in extension2processor, f'{extension} is not supported. The supported extensions are {list(extension2processor.keys())}.'
return extension2processor[extension]
12 changes: 12 additions & 0 deletions gokart/parameter.py
@@ -0,0 +1,12 @@
import luigi
from luigi import task_register


class TaskInstanceParameter(luigi.Parameter):
def parse(self, s):
values = luigi.DictParameter().parse(s)
return task_register.Register.get_task_cls(values['type'])(**values['params'])

def serialize(self, x):
values = dict(type=x.get_task_family(), params=x.to_str_params())
return luigi.DictParameter().serialize(values)
36 changes: 36 additions & 0 deletions gokart/run.py
@@ -0,0 +1,36 @@
import configparser
import os
import sys
from configparser import ConfigParser

import luigi
import luigi.cmdline
import luigi.retcodes


def _read_environ():
config = luigi.configuration.get_config()
for key, value in os.environ.items():
super(ConfigParser, config).set(section=None, option=key, value=value.replace('%', '%%'))


def _check_config():
parser = luigi.configuration.LuigiConfigParser.instance()
for section in parser.sections():
try:
parser.items(section)
except configparser.InterpolationMissingOptionError as e:
raise luigi.parameter.MissingParameterException(f'Environment variable "{e.args[3]}" must be set.')


def run(set_retcode=True):
if set_retcode:
luigi.retcodes.retcode.already_running = 10
luigi.retcodes.retcode.missing_data = 20
luigi.retcodes.retcode.not_run = 30
luigi.retcodes.retcode.task_failed = 40
luigi.retcodes.retcode.scheduling_error = 50

_read_environ()
_check_config()
luigi.cmdline.luigi_run(sys.argv[1:])