Skip to content

Commit

Permalink
Merge a1477fe into 063945e
Browse files Browse the repository at this point in the history
  • Loading branch information
therako committed Aug 2, 2018
2 parents 063945e + a1477fe commit 3fe3283
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 56 deletions.
2 changes: 1 addition & 1 deletion bqsqoop/__init__.py
@@ -1 +1 @@
__version__ = "0.0.8"
__version__ = "0.0.9"
49 changes: 32 additions & 17 deletions bqsqoop/extractor/elasticsearch/elasticsearch.py
Expand Up @@ -31,15 +31,24 @@ def validate_config(self):
return None

def extract_to_parquet(self):
_async_worker = async_worker.AsyncWorker(
self._no_of_workers)
for i in range(self._no_of_workers):
_async_worker.send_data_to_worker(
worker_id=i,
**self._get_extract_job_fn_and_params()
if self._no_of_workers > 1:
_async_worker = async_worker.AsyncWorker(
self._no_of_workers)
for i in range(self._no_of_workers):
_async_worker.send_data_to_worker(
worker_id=i,
**self._get_extract_job_fn_and_params()
)
logging.debug('Waiting for Extractor job results...')
return _async_worker.get_job_results()
else:
args = self._get_extract_job_fn_and_params()
del args['worker_callback']
res = helper.ESHelper.scroll_and_extract_data(
worker_id=0,
**args
)
logging.debug('Waiting for Extractor job results...')
return _async_worker.get_job_results()
return [res]

def _get_extract_job_fn_and_params(self):
search_args = dict(
Expand All @@ -52,15 +61,21 @@ def _get_extract_job_fn_and_params(self):
}
}
)
_fields = helper.ESHelper.get_fields(
self._config['url'], self._config['index'])
if '_all' not in self._fields:
search_args['_source_include'] = ','.join(
self._fields)
_fields = helper.ESHelper.get_fields(
self._config['url'], self._config['index'])
return dict(worker_callback=helper.ESHelper.scroll_and_extract_data,
total_worker_count=self._no_of_workers,
es_hosts=self._config['url'],
es_timeout=self._timeout,
output_folder=self._output_folder,
search_args=search_args,
fields=_fields)
else:
search_args['_source_include'] = ','.join(_fields.keys())
fn_params = dict(
worker_callback=helper.ESHelper.scroll_and_extract_data,
total_worker_count=self._no_of_workers,
es_hosts=self._config['url'],
es_timeout=self._timeout,
output_folder=self._output_folder,
search_args=search_args,
fields=_fields)
if "datetime_format" in self._config:
fn_params["datetime_format"] = self._config["datetime_format"]
return fn_params
30 changes: 17 additions & 13 deletions bqsqoop/extractor/elasticsearch/helper.py
Expand Up @@ -21,14 +21,16 @@ def get_fields(self, es_hosts, index):
_properties = index_type_value['properties']
_keys = list(_properties.keys())
for _key in _keys:
_fields[_key] = _properties[_key]["type"]
if "type" in _properties[_key]:
_fields[_key] = _properties[_key]["type"]
break
return _fields

@classmethod
def scroll_and_extract_data(self, worker_id, total_worker_count, es_hosts,
es_timeout, search_args, fields,
output_folder, progress_bar=True):
output_folder, progress_bar=True,
datetime_format="%Y-%m-%dT%H:%M:%S"):
_es = self._get_es_client(es_hosts)
search_args = self._add_slice_if_needed(
total_worker_count, search_args, worker_id)
Expand All @@ -37,34 +39,36 @@ def scroll_and_extract_data(self, worker_id, total_worker_count, es_hosts,
_page = _es.search(**search_args)
_data, _sid = self._get_data_from_es_page(_page)
_total_hits = self._get_total_hits(_page)
df = None
_parquetUtil = parquet_util.ParquetUtil(_output_file)
_pbar = ProgressBar(
total_length=_total_hits, position=worker_id, enabled=progress_bar)
if _data:
df = pd.DataFrame.from_dict(_data)
self._fix_types_from_es(df, fields)
_parquetUtil.append_df_to_parquet(df)
_pbar.move_progress(len(_data))
self._write_data(_data, fields, _parquetUtil,
_pbar, datetime_format)
while True:
_page = _es.scroll(scroll_id=_sid, scroll=es_timeout)
_data, _sid = self._get_data_from_es_page(_page)
if _data:
df = pd.DataFrame.from_dict(_data)
self._fix_types_from_es(df, fields)
_parquetUtil.append_df_to_parquet(df)
_pbar.move_progress(len(_data))
self._write_data(_data, fields, _parquetUtil, _pbar,
datetime_format)
else:
break
_parquetUtil.close()
return _output_file

@classmethod
def _fix_types_from_es(self, df, fields):
def _write_data(self, data, fields, parquetUtil, pbar, datetime_format):
df = pd.DataFrame.from_dict(data)
self._fix_types_from_es(df, fields, datetime_format=datetime_format)
parquetUtil.append_df_to_parquet(df)
pbar.move_progress(len(data))

@classmethod
def _fix_types_from_es(self, df, fields, datetime_format):
for _name, _type in fields.items():
if _type == "date":
df[_name] = pd.to_datetime(
df[_name], format="%Y-%m-%dT%H:%M:%S")
df[_name], format=datetime_format)

@classmethod
def _output_file_for(self, output_folder, index):
Expand Down
46 changes: 21 additions & 25 deletions tests/extractor/elasticsearch/test_elasticsearch.py
Expand Up @@ -47,6 +47,7 @@ def test_async_extract_call(self, async_worker, es_helper, mock_uuid):
_valid_config['no_of_workers'] = 2
_e = ElasticSearchExtractor(_valid_config)
es_helper.get_fields = MagicMock()
es_helper.get_fields.return_value = {"fieldA": "int", "fieldB": "bool"}
_mock_worker = MagicMock()
async_worker.return_value = _mock_worker
_mock_send_data_to_worker = MagicMock()
Expand All @@ -55,7 +56,8 @@ def test_async_extract_call(self, async_worker, es_helper, mock_uuid):
_mock_worker.get_job_results = _mock_job_results
_search_args = {
'index': 'some_es_index', 'scroll': '60s',
'size': 1000, 'body': {'query': {'match_all': {}}}
'size': 1000, 'body': {'query': {'match_all': {}}},
'_source_include': 'fieldA,fieldB'
}

self.assertEqual(_e.extract_to_parquet(), ["file1.parq"])
Expand All @@ -82,33 +84,27 @@ def test_async_extract_call(self, async_worker, es_helper, mock_uuid):

@patch('uuid.uuid4', return_value="F43C2651-18C8-4EB0-82D2-10E3C7226015")
@patch('bqsqoop.extractor.elasticsearch.helper.ESHelper')
@patch('bqsqoop.utils.async_worker.AsyncWorker')
def test_extract_specific_fields(self, async_worker, es_helper, mock_uuid):
def test_extract_specific_fields_with_single_worker(
self, es_helper, mock_uuid):
_valid_config['no_of_workers'] = 1
_valid_config['fields'] = ['field1', 'field2']
_e = ElasticSearchExtractor(_valid_config)
es_helper.get_fields = MagicMock()
_mock_worker = MagicMock()
async_worker.return_value = _mock_worker
_mock_send_data_to_worker = MagicMock()
_mock_worker.send_data_to_worker = _mock_send_data_to_worker
_mock_job_results = MagicMock(return_value=["file1.parq"])
_mock_worker.get_job_results = _mock_job_results
_search_args = {
'index': 'some_es_index', 'scroll': '60s', 'size': 1000,
'body': {
'query': {
'match_all': {},
'_source_include': 'field1,field2'
}
}
}
es_helper.get_fields.return_value = {
"field1": "int", "field2": "bool", "ignored_field": "text"}
es_helper.scroll_and_extract_data.return_value = "file1.parq"

self.assertEqual(_e.extract_to_parquet(), ["file1.parq"])
_call_args = _mock_send_data_to_worker.call_args_list
_, _kwargs1 = _call_args[0]
_search_args = {
'index': 'some_es_index', 'scroll': '60s', 'size': 1000,
'body': {'query': {'match_all': {}}},
'_source_include': 'field1,field2'}
self.assertEqual(_kwargs1['search_args'], _search_args)
es_helper.scroll_and_extract_data.assert_called_with(
es_hosts='es_endpoint', es_timeout='60s',
fields={
"field1": "int", "field2": "bool",
"ignored_field": "text"},
search_args={
'index': 'some_es_index', 'scroll': '60s', 'size': 1000,
'body': {'query': {'match_all': {}}},
'_source_include': 'field1,field2'},
total_worker_count=1,
worker_id=0,
output_folder='./F43C2651'
)

0 comments on commit 3fe3283

Please sign in to comment.