In [1]:
import datalabframework as dlf

In [2]:
dlf.utils.pretty_print(dlf.params.metadata())

engines:
  spark:
    config:
      jars:
      - http://www.datanucleus.org/downloads/maven2/oracle/ojdbc6/11.2.0.3/ojdbc6-11.2.0.3.jar
      jobname: default
      master: spark://spark-master:7077
    context: spark
loggers:
  stream:
    enable: true
    severity: info
profile: default
providers:
  ingest:
    format: parquet
    hostname: hdfs-nn
    path: /data/ingest
    service: hdfs
    write:
      coalesce: 2
      options:
        mode: append
      repartition: 4
  source:
    database: MMSOFF
    hostname: 172.16.60.18
    password: MyPassword
    port: 1521
    read:
      cache: true
      repartition: 4
    service: oracle
    sid: offline
    username: books_admin
resources:
  .resources.QR_Transaction:
    ingest:
      column: PAY_DATE
      policy: date
    path: QR_Transaction
    provider: source
  .resources.QR_Transaction_Detail:
    ingest:
      column: CREATED_DATE
      policy: date
    path: QR_Transaction_Detail
    provider: source
variables:
  a: 5
  b:

In [3]:
engine = dlf.engines.get('spark')
spark = engine.context()

PYSPARK_SUBMIT_ARGS:  --jars http://www.datanucleus.org/downloads/maven2/oracle/ojdbc6/11.2.0.3/ojdbc6-11.2.0.3.jar pyspark-shell


In [4]:
day = md = dlf.params.metadata()
for resource in md['resources'].keys():
    df_src = engine.read(resource)
    df_src.show(5)

provider:
  database: MMSOFF
  hostname: 172.16.60.18
  password: MyPassword
  path: ''
  port: 1521
  read:
    cache: true
    repartition: 4
  service: oracle
  sid: offline
  username: books_admin
resource:
  ingest:
    column: PAY_DATE
    policy: date
  path: QR_Transaction
  provider: source
url: null

jdbc:oracle:thin:books_admin/MyPassword@//172.16.60.18:1521/offline
+-----------------+--------------------+-------------+-------------+--------+------------+----------+-----------+---------+----------+-----------------+------------+-------------+------------+-------+-----------+-------+--------------------+------------+-----------+--------------------+------------+-----------+--------------------+------------+-----------+--------------------+------------+-----------+-----------+------+-------------------+--------------------+------------+-----------+-------------+---------------+--------------+--------+---------+-------------------+--------------+---------------+------------+---

## pre checks

schema checks:

 - get table schema from source
 - compare with reference schema
 
value checks:
 - null
 - invalid 

In [None]:
md = dlf.params.metadata()
for resource in md['resources'].keys():
    engine.ingest(src_resource = resource,dest_provider = 'ingest')

database: MMSOFF
hostname: 172.16.60.18
password: MyPassword
path: ''
port: 1521
read:
  cache: true
  repartition: 4
service: oracle
sid: offline
username: books_admin

filter:
  column: PAY_DATE
  policy: date
path: QR_Transaction
provider: source

condition: PAY_DATE < "2018-10-18T00:00:00" 
QR_Transaction/schema ingest
provider:
  format: parquet
  hostname: hdfs-nn
  path: data/ingest
  service: hdfs
  write:
    coalesce: 2
    options:
      mode: append
    repartition: 4
resource:
  path: QR_Transaction/schema
  provider: ingest
url: hdfs://hdfs-nn:8020/data/ingest/QR_Transaction/schema

schema date 2018-10-18 00:00:00
provider:
  database: MMSOFF
  hostname: 172.16.60.18
  password: MyPassword
  path: ''
  port: 1521
  read:
    cache: true
    repartition: 4
  service: oracle
  sid: offline
  username: books_admin
resource:
  path: QR_Transaction
  provider: source
url: null

jdbc:oracle:thin:books_admin/MyPassword@//172.16.60.18:1521/offline
{'provider': {'format': 'parquet

## Ingest strategy

#### **what to read?**
 
FULL scan:  
tables are lost or never ingested before
 
  - cold start
  - disaster recovery from source
     
INCREMENTAL scan:  
We need a way to select/filter new data

  - time based (default, i.e. last_update column)
  - index based (if index is auto incrementing)
  - hash compare indexes (expensive)
  - full rescan and tag with ingest date
 
#### **where to write?**

  - define a naming convention for the target tables (default schema version: `latest`)  
    `<source>/<db-name>/<table-name>/<schema-version-date>`
    
#### **exceptions**
: what to do? (show error)

### HDFS

In [5]:
# read back from hdfs in parquet format
df_trg = engine.read('target', 'ingest')
df_trg.where(col(colname) => 'datetime')
df_trg.show()

hdfs://hdfs-nn:8020//data/ingest/extract/sakila.actor
+--------+----------+------------+-------------------+
|actor_id|first_name|   last_name|        last_update|
+--------+----------+------------+-------------------+
|       1|  PENELOPE|     GUINESS|2006-02-15 04:34:33|
|       2|      NICK|    WAHLBERG|2006-02-15 04:34:33|
|       3|        ED|       CHASE|2006-02-15 04:34:33|
|       4|  JENNIFER|       DAVIS|2006-02-15 04:34:33|
|       5|    JOHNNY|LOLLOBRIGIDA|2006-02-15 04:34:33|
|       6|     BETTE|   NICHOLSON|2006-02-15 04:34:33|
|       7|     GRACE|      MOSTEL|2006-02-15 04:34:33|
|       8|   MATTHEW|   JOHANSSON|2006-02-15 04:34:33|
|       9|       JOE|       SWANK|2006-02-15 04:34:33|
|      10| CHRISTIAN|       GABLE|2006-02-15 04:34:33|
|      11|      ZERO|        CAGE|2006-02-15 04:34:33|
|      12|      KARL|       BERRY|2006-02-15 04:34:33|
|      13|       UMA|        WOOD|2006-02-15 04:34:33|
|      14|    VIVIEN|      BERGEN|2006-02-15 04:34:33|
|      15| 

## post checks

In [6]:
assert(df_src.subtract(df_trg).count()==0)

In [None]:
engine.read(path='abcd', provider='dsds')
write(df, path='abcd', provider='ingest')

engine.read('resource_alias')
engine.write(df, 'resource_alias')