Skip to content

Commit

Permalink
importer working'
Browse files Browse the repository at this point in the history
  • Loading branch information
rustyrazorblade committed Apr 6, 2016
1 parent 5763cc7 commit cb0de67
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 20 deletions.
23 changes: 4 additions & 19 deletions cdm/context.py
Expand Up @@ -11,6 +11,7 @@

import pandas
from cdm.installer import Installer
from cdm.importer import Importer

class InstallerNotFound(Exception): pass

Expand Down Expand Up @@ -95,7 +96,7 @@ def clean_cache(self):
def feedback(self, msg):
logging.info(msg)

def save_dataframe_to_cassandra(self, dataframe, table, types=None,
def save_dataframe_to_cassandra(self, dataframe, table,
transformation=None):
"""
:param session:
Expand All @@ -106,24 +107,8 @@ def save_dataframe_to_cassandra(self, dataframe, table, types=None,
:return:
"""
logging.info("Saving dataframe to %s", table)
pool = Pool(50)
i = 0

prepared = {}

def f(row):
logging.info(row)
if 'stmt' not in prepared:
prepared['stmt'] = self.prepare(table, row.columns)

logging.info(row)
self.session.execute(prepared['stmt'], row.values())

with progressbar.ProgressBar(max_value=len(dataframe)) as bar:
i += 1
for _ in pool.imap_unordered(f, dataframe.itertuples()):
if i % 10 == 0:
bar.update(i)
importer = Importer(self.session, dataframe, transformation)
importer.load(table)


def prepare(self, table, fields):
Expand Down
2 changes: 1 addition & 1 deletion cdm/importer.py
Expand Up @@ -59,7 +59,7 @@ def save(row):
for _ in pool.imap_unordered(save, self.iter()):
i += 1
if i % 10 == 0:
pool.update(i)
p.update(i)



Expand Down
1 change: 1 addition & 0 deletions tests/test_importer.py
Expand Up @@ -44,6 +44,7 @@ def test_insert_statement(importer):
def test_load(importer):
queries = ["DROP TABLE IF EXISTS test_importer",
"CREATE TABLE test_importer (name text primary key, hats int, nickname text)"]

for q in queries:
importer.session.execute(q)

Expand Down

0 comments on commit cb0de67

Please sign in to comment.