In [2]:
import numpy as np
import pandas as pd
animals = pd.DataFrame({'kind': ['cat', 'dog', 'cat', 'dog'],
                         'height': [9.1, 6.0, 9.5, 34.0],
                         'weight': [7.9, 7.5, 9.9, 198.0]})

animals.groupby("kind").agg(
        min_height=('height', 'min'),
        max_height=('height', 'max'),
        sum_height=('height', 'sum'),
        average_weight=('weight', np.mean),
    )



Unnamed: 0_level_0,min_height,max_height,sum_height,average_weight
kind,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
cat,9.1,9.5,18.6,8.9
dog,6.0,34.0,40.0,102.75


In [10]:
from bolt4ds.ingest import pd_to_mysql,pd_readsql

In [8]:
pd_to_mysql(animals, 'mysql+mysqlconnector://root@127.0.0.1/boledw', 'all_uid_v5',if_exists='replace')


creating mysql.csv ok
loading mysql.csv ok


True

In [9]:
animals

Unnamed: 0,kind,height,weight
0,cat,9.1,7.9
1,dog,6.0,7.5
2,cat,9.5,9.9
3,dog,34.0,198.0


In [13]:
url = 'mysql+mysqlconnector://root@127.0.0.1/boledw'
sql = "select kind,height,weight from all_uid_v5"
pd_readsql(url,sql)


Unnamed: 0,0,1,2
0,cat,9.1,7.9
1,dog,6.0,7.5
2,cat,9.5,9.9
3,dog,34.0,198.0


In [14]:
import importlib
import pandas as pd
import glob

import bolt4ds.ingest.combine_csv as d6tc


## Use Case: Checking Column Consistency

Let's say you receive a bunch of csv files you want to ingest them, say for example into pandas, dask, pyspark, database.

In [15]:
cfg_fnames = list(glob.glob('bolt4ds/ingest/examples/test-data/input/test-data-input-csv-clean-*.csv'))
print(cfg_fnames)

['bolt4ds/ingest/examples/test-data/input/test-data-input-csv-clean-feb.csv', 'bolt4ds/ingest/examples/test-data/input/test-data-input-csv-clean-mar.csv', 'bolt4ds/ingest/examples/test-data/input/test-data-input-csv-clean-jan.csv']


### Check column consistency across all files

Even if you think the files have a consistent column layout, it worthwhile using `bolt4ds` to assert that that is actually the case. It's very quick to do even with very many large files!

In [16]:
# get previews
c = d6tc.CombinerCSV(cfg_fnames) # all_strings=True makes reading faster
col_sniff = c.sniff_columns()

sniffing columns ok


In [17]:
print('all columns equal?', c.is_all_equal())
print('')
print('which columns are present in which files?')
print('')
print(c.is_column_present())
print('')
print('in what order do columns appear in the files?')
print('')
print(col_sniff['df_columns_order'].reset_index(drop=True))

all columns equal? True

which columns are present in which files?

                                                    date  sales  cost  profit
file_path                                                                    
bolt4ds/ingest/examples/test-data/input/test-da...  True   True  True    True
bolt4ds/ingest/examples/test-data/input/test-da...  True   True  True    True
bolt4ds/ingest/examples/test-data/input/test-da...  True   True  True    True

in what order do columns appear in the files?

   date  sales  cost  profit
0     0      1     2       3
1     0      1     2       3
2     0      1     2       3


### Preview Combined Data

You can see a preview of what the combined data from all files will look like.

In [18]:
c.combine_preview()

Unnamed: 0,date,sales,cost,profit,filepath,filename
0,2011-02-01,200,-90,110,bolt4ds/ingest/examples/test-data/input/test-d...,test-data-input-csv-clean-feb.csv
1,2011-02-02,200,-90,110,bolt4ds/ingest/examples/test-data/input/test-d...,test-data-input-csv-clean-feb.csv
2,2011-02-03,200,-90,110,bolt4ds/ingest/examples/test-data/input/test-d...,test-data-input-csv-clean-feb.csv
3,2011-01-01,100,-80,20,bolt4ds/ingest/examples/test-data/input/test-d...,test-data-input-csv-clean-jan.csv
4,2011-01-02,100,-80,20,bolt4ds/ingest/examples/test-data/input/test-d...,test-data-input-csv-clean-jan.csv
5,2011-01-03,100,-80,20,bolt4ds/ingest/examples/test-data/input/test-d...,test-data-input-csv-clean-jan.csv
6,2011-03-01,300,-100,200,bolt4ds/ingest/examples/test-data/input/test-d...,test-data-input-csv-clean-mar.csv
7,2011-03-02,300,-100,200,bolt4ds/ingest/examples/test-data/input/test-d...,test-data-input-csv-clean-mar.csv
8,2011-03-03,300,-100,200,bolt4ds/ingest/examples/test-data/input/test-d...,test-data-input-csv-clean-mar.csv


### Read All Files to Pandas

You can quickly load the combined data into a pandas dataframe with a single command. 

In [19]:
c.to_pandas().head()

Unnamed: 0,date,sales,cost,profit,filepath,filename
0,2011-02-01,200,-90,110,bolt4ds/ingest/examples/test-data/input/test-d...,test-data-input-csv-clean-feb.csv
1,2011-02-02,200,-90,110,bolt4ds/ingest/examples/test-data/input/test-d...,test-data-input-csv-clean-feb.csv
2,2011-02-03,200,-90,110,bolt4ds/ingest/examples/test-data/input/test-d...,test-data-input-csv-clean-feb.csv
3,2011-02-04,200,-90,110,bolt4ds/ingest/examples/test-data/input/test-d...,test-data-input-csv-clean-feb.csv
4,2011-02-05,200,-90,110,bolt4ds/ingest/examples/test-data/input/test-d...,test-data-input-csv-clean-feb.csv


In [23]:

try:
    from tensorflow.python.keras.engine.saving import load_model
except:
    from tensorflow.keras.models import load_model

In [22]:
from tensorflow.keras.models import load_model

In [18]:
import datetime
import bolt4ds.flow.tasks
from bolt4ds import flow as d6tflow
import luigi
class TaskTrain(d6tflow.tasks.TaskPqPandas):
    do_preprocess = luigi.BoolParameter(default=True)
    model = luigi.Parameter(default='xgboost')
    print(model,"model")
d6tflow.run(TaskTrain(do_preprocess=True, model='nnet'))
d6tflow.run(TaskTrain(do_preprocess=True)) # use default model='xgboost'

INFO: Informed scheduler that task   TaskTrain_True_nnet_c261e0a55b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
INFO: [pid 76393] Worker Worker(salt=559991990, workers=1, host=bogon, username=leepand, pid=76393) running   TaskTrain(do_preprocess=True, model=nnet)
INFO: [pid 76393] Worker Worker(salt=559991990, workers=1, host=bogon, username=leepand, pid=76393) done      TaskTrain(do_preprocess=True, model=nnet)
INFO: Informed scheduler that task   TaskTrain_True_nnet_c261e0a55b   has status   DONE
INFO: Worker Worker(salt=559991990, workers=1, host=bogon, username=leepand, pid=76393) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 TaskTrain(do_preprocess=True, model=nnet)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

INFO: Informed scheduler that task   TaskTrain_True_x

<luigi.parameter.Parameter object at 0x121a47e50> model


True

In [19]:
%%file luigi_hello.py
from bolt4ds import flow as d6tflow
import bolt4ds.flow.tasks
import luigi
import pandas as pd

# define 2 tasks that load raw data
class Task1(d6tflow.tasks.TaskPqPandas):
    
    def run(self):
        df = pd.DataFrame({'a':range(3)})
        self.save(df) # quickly save dataframe

class Task2(Task1):
    pass

# define another task that depends on data from task1 and task2
@d6tflow.requires(Task1,Task2)
class Task3(d6tflow.tasks.TaskPqPandas):
    multiplier = luigi.IntParameter(default=2)
    
    def run(self):
        df1 = self.input()[0].load() # quickly load input data
        df2 = self.input()[1].load() # quickly load input data
        df = df1.join(df2, lsuffix='1', rsuffix='2')
        df['b']=df['a1']*self.multiplier # use task parameter
        self.save(df)

# Execute task including all its dependencies
d6tflow.run(Task3())
'''
* 3 ran successfully:
    - 1 Task1()
    - 1 Task2()
    - 1 Task3(multiplier=2)
'''

Task3().outputLoad() # quickly load output data. Task1().outputLoad() also works
'''
   a1  a2  b
0   0   0  0
1   1   1  2
2   2   2  4
'''

# Intelligently rerun workflow after changing parameters
d6tflow.preview(Task3(multiplier=3))
'''
└─--[Task3-{'multiplier': '3'} (PENDING)] => this changed and needs to run
   |--[Task1-{} (COMPLETE)] => this doesn't change and doesn't need to rerun
   └─--[Task2-{} (COMPLETE)] => this doesn't change and doesn't need to rerun
'''

Writing luigi_hello.py


In [21]:
!python luigi_hello.py Task3 --id=some_id --scheduler-host localhost --scheduler-port 8082

Loading postgres module without psycopg2 installed. Will crash at runtime if postgres functionality is used.
Welcome to bolt4ds.flow!
INFO: Informed scheduler that task   Task3_2_fb057692ad   has status   PENDING
INFO: Informed scheduler that task   Task2__99914b932b   has status   PENDING
INFO: Informed scheduler that task   Task1__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
INFO: [pid 75441] Worker Worker(salt=326048280, workers=1, host=localhost, username=leepand, pid=75441) running   Task1()
INFO: [pid 75441] Worker Worker(salt=326048280, workers=1, host=localhost, username=leepand, pid=75441) done      Task1()
INFO: Informed scheduler that task   Task1__99914b932b   has status   DONE
INFO: [pid 75441] Worker Worker(salt=326048280, workers=1, host=localhost, username=leepand, pid=75441) running   Task2()
INFO: [pid 75441] Worker Worker(salt=326048280, workers=1, host=localhost, username=leepand, pid=75441) done      Task2()
IN

In [23]:
!python luigi_hello2.py HelloWorldTask --id=some_id --scheduler-host localhost --scheduler-port 8082

DEBUG: Checking if HelloWorldTask(id=some_id) is complete
DEBUG: Checking if PrintWordTask(path=results/some_id/hello.txt, word=Hello) is complete
DEBUG: Checking if PrintWordTask(path=results/some_id/world.txt, word=World) is complete
INFO: Informed scheduler that task   HelloWorldTask_some_id_e9ca25296d   has status   PENDING
DEBUG: Checking if MakeDirectory(path=results/some_id) is complete
INFO: Informed scheduler that task   PrintWordTask_results_some_id__World_b6aa9eaeeb   has status   PENDING
INFO: Informed scheduler that task   MakeDirectory_results_some_id_c583f20464   has status   PENDING
INFO: Informed scheduler that task   PrintWordTask_results_some_id__Hello_6f5c8c8326   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 4
INFO: [pid 75457] Worker Worker(salt=066806557, workers=1, host=localhost, username=leepand, pid=75457) running   MakeDirectory(path=results/some_id)
INFO: [pid

In [12]:
import bolt4ds.flow.tasks

In [3]:
import bolt4ds.sparsity as sp5


  from pandas.core.index import Index, MultiIndex


In [28]:
#%%file luigi_hello2.py
# my_module.py, available in your sys.path
import luigi
import os


class PrintWordTask(luigi.Task):
    path = luigi.Parameter()
    word = luigi.Parameter()

    def run(self):
        with open(self.path, 'w') as out_file:
            out_file.write(self.word)
            out_file.close()

    def output(self):
        return luigi.LocalTarget(self.path)

    def requires(self):
        return [
            MakeDirectory(path=os.path.dirname(self.path)),
        ]


class HelloWorldTask(luigi.Task):
    id = luigi.Parameter(default='test')

    def run(self):
        with open(self.input()[0].path, 'r') as hello_file:
            hello = hello_file.read()
        with open(self.input()[1].path, 'r') as world_file:
            world = world_file.read()
        with open(self.output().path, 'w') as output_file:
            content = '{} {}!'.format(hello, world)
            output_file.write(content)
            output_file.close()

    def requires(self):
        return [
            PrintWordTask(
                path='results/{}/hello.txt'.format(self.id),
                word='Hello',
            ),
            PrintWordTask(
                path='results/{}/world.txt'.format(self.id),
                word='World',
            ),
        ]

    def output(self):
        path = 'results/{}/hello_world.txt'.format(self.id)
        return luigi.LocalTarget(path)


class MakeDirectory(luigi.Task):
    path = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget(self.path)

    def run(self):
        os.makedirs(self.path)


if __name__ == '__main__':
    luigi.build([HelloWorldTask(id="test2")])

INFO: Informed scheduler that task   HelloWorldTask_test2_065205024b   has status   PENDING
INFO: Informed scheduler that task   PrintWordTask_results_test2_wo_World_f17b29e2da   has status   PENDING
INFO: Informed scheduler that task   MakeDirectory_results_test2_cf09820580   has status   PENDING
INFO: Informed scheduler that task   PrintWordTask_results_test2_he_Hello_3a8204f044   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
INFO: [pid 74895] Worker Worker(salt=517990512, workers=1, host=localhost, username=leepand, pid=74895) running   MakeDirectory(path=results/test2)
INFO: [pid 74895] Worker Worker(salt=517990512, workers=1, host=localhost, username=leepand, pid=74895) done      MakeDirectory(path=results/test2)
INFO: Informed scheduler that task   MakeDirectory_results_test2_cf09820580   has status   DONE
INFO: [pid 74895] Worker Worker(salt=517990512, workers=1, host=localhost, username=leepand, pid=74895) running   PrintWordTask(path=re

In [7]:
import bolt4ds.flow.targets

In [6]:
from bolt4ds.flow.settings import dir, dirpath

In [1]:
import pandas as pd
import numpy as np
import uuid
import itertools
import importlib

import bolt4ds.b4djoin.utils
importlib.reload(bolt4ds.b4djoin.utils)

# ******************************************
# generate sample data
# ******************************************
nobs = 10
uuid1 = [str(uuid.uuid4()) for _ in range(nobs)]
dates1 = pd.date_range('1/1/2010','1/1/2011')

df1 = pd.DataFrame(list(itertools.product(uuid1,dates1)),columns=['id','date'])
df1['v']=np.random.sample(df1.shape[0])

In [2]:
df1.groupby(['id']).head(2).head(6)


Unnamed: 0,id,date,v
0,7f30b30f-b6cb-4534-bd7e-2952f1d89656,2010-01-01,0.62731
1,7f30b30f-b6cb-4534-bd7e-2952f1d89656,2010-01-02,0.85739
366,7bd0cd65-53d1-4896-b97e-34fc44973a27,2010-01-01,0.113923
367,7bd0cd65-53d1-4896-b97e-34fc44973a27,2010-01-02,0.271886
732,6bd9796c-ac8a-4340-9f9f-3cf02ba43f84,2010-01-01,0.364606
733,6bd9796c-ac8a-4340-9f9f-3cf02ba43f84,2010-01-02,0.545378


In [4]:

df2 = df1.copy()

j = bolt4ds.b4djoin.PreJoin([df1,df2],['id','date'])
assert j.is_all_matched() # succeeds
assert j.is_all_matched('id') # succeeds
assert j.is_all_matched('date') # succeeds

In [7]:
# create mismatch
df2['id'] = df1['id'].str[1:-1]

j = bolt4ds.b4djoin.PreJoin([df1,df2],['id','date'])

try:
    assert j.is_all_matched() # fails
except:
    print('assert fails!')

assert fails!


In [9]:

print(j.show_unmatched('id')['left'])
print(j.show_unmatched('id')['right'])

                                       id       date         v
732  6bd9796c-ac8a-4340-9f9f-3cf02ba43f84 2010-01-01  0.364606
733  6bd9796c-ac8a-4340-9f9f-3cf02ba43f84 2010-01-02  0.545378
734  6bd9796c-ac8a-4340-9f9f-3cf02ba43f84 2010-01-03  0.454076
                                      id       date         v
1464  9241101-211e-4a49-bf76-b3ba0b02436 2010-01-01  0.818811
1465  9241101-211e-4a49-bf76-b3ba0b02436 2010-01-02  0.085298
1466  9241101-211e-4a49-bf76-b3ba0b02436 2010-01-03  0.463425


In [13]:

# create partial mismatch
uuid_sel = np.array(uuid1)[np.random.choice(nobs, nobs//5, replace=False)].tolist()
df2 = df1[~df1['id'].isin(uuid_sel)]

j = bolt4ds.b4djoin.PreJoin([df1,df2],['id','date'])

try:
    assert j.is_all_matched() # fails
except:
    print('assert fails!')

assert fails!
