# dbpipe

dbpipe is a lightweight and simple way to manage data pipelines. 

```mermaid
graph LR
A(Endpoints)-->B(Pipes)
B-->C(Jobs)
D(Schedules)-->C
C-.->E(Clusters)

```

## Creating Endpoints

In [36]:
from dbpipe import EndPoint


facebook = EndPoint('Facebook','API','https://facebook.com/Posts')
facebook

{'name': 'Facebook', 'type': 'API', 'location': 'https://facebook.com/Posts'}

In [37]:
facebook.save()

In [38]:
instagram = EndPoint('Instagram','API','https://instagram.com/Posts')
instagram

{'name': 'Instagram', 'type': 'API', 'location': 'https://instagram.com/Posts'}

In [39]:
instagram.save()

In [40]:
posttable = EndPoint('DW.Facebook.Posts','Database','ServerName')
posttable

{'name': 'DW.Facebook.Posts', 'type': 'Database', 'location': 'ServerName'}

In [41]:
posttable.save()

## Creating a Pipe

In [42]:
from dbpipe import Pipe


pipe = Pipe(
        name='DW',
        sources=[facebook,instagram],
        destination=posttable,
        processfile="Test.py"
    )

pipe

{'name': 'DW', 'sources': [{'name': 'Facebook', 'type': 'API', 'location': 'https://facebook.com/Posts'}, {'name': 'Instagram', 'type': 'API', 'location': 'https://instagram.com/Posts'}], 'destination': {'name': 'DW.Facebook.Posts', 'type': 'Database', 'location': 'ServerName'}, 'logfile': None, 'processfile': 'Test.py'}

In [43]:
pipe.to_dict()

{'name': 'DW',
 'sources': [{'name': 'Facebook',
   'type': 'API',
   'location': 'https://facebook.com/Posts'},
  {'name': 'Instagram',
   'type': 'API',
   'location': 'https://instagram.com/Posts'}],
 'destination': {'name': 'DW.Facebook.Posts',
  'type': 'Database',
  'location': 'ServerName'},
 'logfile': None,
 'processfile': 'Test.py'}

In [44]:
pipe.save()

## Creating a Schedule

In [45]:
from dbpipe import Schedule

schedule = Schedule(frequency="Daily", start_time="8:00AM")

schedule


{'frequency': 'Daily', 'start_time': '8:00AM', 'end_time': None, 'time_zone': 'UTC'}

In [46]:
schedule.to_dict()

{'frequency': 'Daily',
 'start_time': '8:00AM',
 'end_time': None,
 'time_zone': 'UTC'}

## Creating a Pipe Cluster

In [47]:
from dbpipe.core.pipes import Cluster


clstr = Cluster([pipe,pipe])
clstr

[{'name': 'DW', 'sources': [{'name': 'Facebook', 'type': 'API', 'location': 'https://facebook.com/Posts'}, {'name': 'Instagram', 'type': 'API', 'location': 'https://instagram.com/Posts'}], 'destination': {'name': 'DW.Facebook.Posts', 'type': 'Database', 'location': 'ServerName'}, 'logfile': None, 'processfile': 'Test.py'}, {'name': 'DW', 'sources': [{'name': 'Facebook', 'type': 'API', 'location': 'https://facebook.com/Posts'}, {'name': 'Instagram', 'type': 'API', 'location': 'https://instagram.com/Posts'}], 'destination': {'name': 'DW.Facebook.Posts', 'type': 'Database', 'location': 'ServerName'}, 'logfile': None, 'processfile': 'Test.py'}]

## Creating a Job

In [48]:
from dbpipe import Job


job = Job('My Job',schedule=schedule,jobs=clstr)
job

{'name': 'My Job', 'schedule': {'frequency': 'Daily', 'start_time': '8:00AM', 'end_time': None, 'time_zone': 'UTC'}, 'jobs': [{'name': 'DW', 'sources': [{'name': 'Facebook', 'type': 'API', 'location': 'https://facebook.com/Posts'}, {'name': 'Instagram', 'type': 'API', 'location': 'https://instagram.com/Posts'}], 'destination': {'name': 'DW.Facebook.Posts', 'type': 'Database', 'location': 'ServerName'}, 'logfile': None, 'processfile': 'Test.py'}, {'name': 'DW', 'sources': [{'name': 'Facebook', 'type': 'API', 'location': 'https://facebook.com/Posts'}, {'name': 'Instagram', 'type': 'API', 'location': 'https://instagram.com/Posts'}], 'destination': {'name': 'DW.Facebook.Posts', 'type': 'Database', 'location': 'ServerName'}, 'logfile': None, 'processfile': 'Test.py'}]}

In [49]:
job.save()

## Reading a Pipe

In [50]:
from dbpipe import read_pipe


pipe = read_pipe('pipes/DW.json')
pipe

{'name': 'DW', 'sources': [{'name': 'Facebook', 'type': 'API', 'location': 'https://facebook.com/Posts'}, {'name': 'Instagram', 'type': 'API', 'location': 'https://instagram.com/Posts'}], 'destination': {'name': 'DW.Facebook.Posts', 'type': 'Database', 'location': 'ServerName'}, 'logfile': None, 'processfile': 'Test.py'}

In [51]:
pipe.to_dict()

{'name': 'DW',
 'sources': [{'name': 'Facebook',
   'type': 'API',
   'location': 'https://facebook.com/Posts'},
  {'name': 'Instagram',
   'type': 'API',
   'location': 'https://instagram.com/Posts'}],
 'destination': {'name': 'DW.Facebook.Posts',
  'type': 'Database',
  'location': 'ServerName'},
 'logfile': None,
 'processfile': 'Test.py'}

## Reading a Job

In [52]:
from dbpipe import read_job

job = read_job('jobs/My Job.json')
job

{'name': 'My Job', 'schedule': {'frequency': 'Daily', 'start_time': '8:00AM', 'end_time': None, 'time_zone': 'UTC'}, 'jobs': [{'name': 'DW', 'sources': [{'name': 'Facebook', 'type': 'API', 'location': 'https://facebook.com/Posts'}, {'name': 'Instagram', 'type': 'API', 'location': 'https://instagram.com/Posts'}], 'destination': {'name': 'DW.Facebook.Posts', 'type': 'Database', 'location': 'ServerName'}, 'logfile': None, 'processfile': 'Test.py'}, {'name': 'DW', 'sources': [{'name': 'Facebook', 'type': 'API', 'location': 'https://facebook.com/Posts'}, {'name': 'Instagram', 'type': 'API', 'location': 'https://instagram.com/Posts'}], 'destination': {'name': 'DW.Facebook.Posts', 'type': 'Database', 'location': 'ServerName'}, 'logfile': None, 'processfile': 'Test.py'}]}

In [53]:
job.to_dict()

{'name': 'My Job',
 'schedule': {'frequency': 'Daily',
  'start_time': '8:00AM',
  'end_time': None,
  'time_zone': 'UTC'},
 'jobs': [{'name': 'DW',
   'sources': [{'name': 'Facebook',
     'type': 'API',
     'location': 'https://facebook.com/Posts'},
    {'name': 'Instagram',
     'type': 'API',
     'location': 'https://instagram.com/Posts'}],
   'destination': {'name': 'DW.Facebook.Posts',
    'type': 'Database',
    'location': 'ServerName'},
   'logfile': None,
   'processfile': 'Test.py'},
  {'name': 'DW',
   'sources': [{'name': 'Facebook',
     'type': 'API',
     'location': 'https://facebook.com/Posts'},
    {'name': 'Instagram',
     'type': 'API',
     'location': 'https://instagram.com/Posts'}],
   'destination': {'name': 'DW.Facebook.Posts',
    'type': 'Database',
    'location': 'ServerName'},
   'logfile': None,
   'processfile': 'Test.py'}]}

## Lineage

In [54]:
from dbpipe.lineage.mermaid import generate_mermaid_markdown_file


generate_mermaid_markdown_file('pipes','test.md')