# DBLINK and dg transfer

A short example of reading data from a different database.  Here we used another
db (tpch1f) in the cluster.   It can be any database, postgres, mysql, oracle, 
or non database data source like elastic search, kafka.

In [12]:
# First we connect to database.
%load_ext sql
%sql postgresql://ftian@localhost/ftian

The sql extension is already loaded. To reload it, use:
  %reload_ext sql


'Connected: ftian@ftian'

In [8]:
partsupp = """
WITH FOO AS ( 
select 
--
-- Output columns 
--
dg_utils.transducer_column_int4(1) as pk, 
dg_utils.transducer_column_int4(2) as sk, 
dg_utils.transducer_column_int4(3) as avail,
dg_utils.transducer_column_float4(4) as cost,
dg_utils.transducer_column_text(5) as cmt,
dg_utils.transducer($PHI$PhiExec go
// BEGIN INPUT TYPES
// i int32 
// END INPUT TYPES
//
// BEGIN OUTPUT TYPES
// pk int32
// sk int32
// avail int32
// cost float32
// cmt string
// END OUTPUT TYPES
//
package main

import (
        "os"
        "github.com/vitesse-ftian/dggo/vitessedata/xtable"
)
func main() {
        // We use lib/pq, which will panic if the following two ENV variables are
        // set.   Phi is forked from postgres, which sets them.
        os.Unsetenv("PGSYSCONFDIR")
        os.Unsetenv("PGLOCALEDIR")

        // No op loop
        for rec := NextInput(); rec != nil; rec = NextInput() {
        }
        
        dg := xtable.Deepgreen {
                Host: "localhost",
                Port: "5432",
                Db: "tpch1",
        }

        err := dg.Connect()
        if err != nil {
                Log("Deepgreen connect failed, err %%v!", err) 
                panic("Cannot open connection")
        }
        defer dg.Disconnect()
        
        ps, err := xtable.MakeXTable(&dg, "partsupp")
        if err != nil {
                Log("Deepgreen partsupp table  err %%v!", err) 
                panic("Cannot open xtable partsupp")
        }
        rs, err := ps.Execute()
        if err != nil {
                Log("Deepgreen partsupp Execute err %%v!", err) 
                panic("Cannot run xtable")
        }
        defer rs.Close()

        for rs.Next() {
                var pk, sk, avail int32
                var cost float32
                var cmt string
                rs.Scan(&pk, &sk, &avail, &cost, &cmt)

                var outrec OutRecord
                outrec.Set_pk(pk)
                outrec.Set_sk(sk)
                outrec.Set_avail(avail)
                outrec.Set_cost(cost)
                outrec.Set_cmt(cmt)
                WriteOutput(&outrec)
        }
        WriteOutput(nil)
}
$PHI$), 
t.*
from ( select 1::int ) t
)
select count(*), count(distinct pk), count(distinct sk), sum(avail), avg(cost)
from FOO

"""

In [13]:
rows = %sql $partsupp
print(rows)

1 rows affected.
+--------+---------+---------+------------+------------------+
| count  | count_1 | count_2 |    sum     |       avg        |
+--------+---------+---------+------------+------------------+
| 800000 |  200000 |  10000  | 4002581547 | 500.525798170137 |
+--------+---------+---------+------------+------------------+


**dg transfer** used transducer to transfer database, schema, table etc between two Deepgreen databases.   Basically it does 
```
for all tables need to be transfered
    run select * from t on src database, and pipe the result to xdrive
    run insert into t select * from xdrive
    
```
We make sure data are transfered parallelly fully utilizing all hosts.  


The result 1TB TPCH, on 4 machines connected with 2 10GigE card, our transfer is network bound.
Compared to gptransfer (fast mode)
![Transfer 1TB](./img/transfer.png)

We pumped more than 6 billion rows (lineitem table) throw transducer/xdrive in 10 minutes.  That is 10 million rows per second. 
