In [1]:
import datalabframework as dlf

In [2]:
dlf.project.workrun('ingest-prod')
dlf.utils.pretty_print(dlf.params.metadata())

engines:
  spark:
    config:
      jobname: default
      master: spark://spark-master:7077
      packages:
      - mysql:mysql-connector-java:8.0.12
    context: spark
loggers:
  stream:
    enable: true
    severity: info
providers:
  ingest:
    format: parquet
    hostname: hdfs-nn
    path: /data/ingest/extract
    service: hdfs
  source:
    database: sakila
    hostname: mysql
    password: root
    port: 3306
    service: mysql
    username: root
resources:
  .resources.actor:
    path: actor
    provider: source
  .resources.address:
    path: address
    provider: source
  .resources.category:
    path: category
    provider: source
  .resources.city:
    path: city
    provider: source
  .resources.country:
    path: country
    provider: source
  .resources.customer:
    path: customer
    provider: source
  .resources.film:
    path: film
    provider: source
  .resources.film_actor:
    path: film_actor
    provider: source
  .resources.film_category:
    path: film_cate

In [3]:
engine = dlf.engines.get('spark')
spark = engine.context()

PYSPARK_SUBMIT_ARGS:  --packages mysql:mysql-connector-java:8.0.12 pyspark-shell


In [6]:
md = dlf.params.metadata()
for resource in md['resources']:
    ds = md['resources'][resource]
    if ds['provider']=='source':
        print(ds['path'])
        # read
        engine.read(resource).show(5)

actor
jdbc:mysql://mysql:3306/sakila
+--------+----------+------------+-------------------+
|actor_id|first_name|   last_name|        last_update|
+--------+----------+------------+-------------------+
|       1|  PENELOPE|     GUINESS|2006-02-15 04:34:33|
|       2|      NICK|    WAHLBERG|2006-02-15 04:34:33|
|       3|        ED|       CHASE|2006-02-15 04:34:33|
|       4|  JENNIFER|       DAVIS|2006-02-15 04:34:33|
|       5|    JOHNNY|LOLLOBRIGIDA|2006-02-15 04:34:33|
+--------+----------+------------+-------------------+
only showing top 5 rows

address
jdbc:mysql://mysql:3306/sakila
+----------+--------------------+--------+--------+-------+-----------+-----------+--------------------+-------------------+
|address_id|             address|address2|district|city_id|postal_code|      phone|            location|        last_update|
+----------+--------------------+--------+--------+-------+-----------+-----------+--------------------+-------------------+
|         1|   47 MySakila Dr

In [4]:
day = md = dlf.params.metadata()
for resource in md['resources']:
    ds = md['resources'][resource]
    if ds['provider']=='source':
        print(ds['path'])
        # read
        df_src = engine.read(resource)
        
        # define target path
        target_path = '{}.{}'.format(ds['provider'],ds['path'])
        
        # write
        engine.write(df_src, target_path, 'ingest', mode='overwrite')

actor
jdbc:mysql://mysql:3306/sakila
address
jdbc:mysql://mysql:3306/sakila
category
jdbc:mysql://mysql:3306/sakila
city
jdbc:mysql://mysql:3306/sakila
country
jdbc:mysql://mysql:3306/sakila
customer
jdbc:mysql://mysql:3306/sakila
film
jdbc:mysql://mysql:3306/sakila
film_actor
jdbc:mysql://mysql:3306/sakila
film_category
jdbc:mysql://mysql:3306/sakila
film_text
jdbc:mysql://mysql:3306/sakila
inventory
jdbc:mysql://mysql:3306/sakila
language
jdbc:mysql://mysql:3306/sakila
payment
jdbc:mysql://mysql:3306/sakila
rental
jdbc:mysql://mysql:3306/sakila
staff
jdbc:mysql://mysql:3306/sakila
store
jdbc:mysql://mysql:3306/sakila


## pre checks

schema checks:

 - get table schema from source
 - compare with reference schema
 
value checks:
 - null
 - invalid 

## Ingest strategy

#### **what to read?**
 
FULL scan:  
tables are lost or never ingested before
 
  - cold start
  - disaster recovery from source
     
INCREMENTAL scan:  
We need a way to select/filter new data

  - time based (default, i.e. last_update column)
  - index based (if index is auto incrementing)
  - hash compare indexes (expensive)
  - full rescan and tag with ingest date
 
#### **where to write?**

  - define a naming convention for the target tables (default schema version: `latest`)  
    `<source>/<db-name>/<table-name>/<schema-version-date>`
    
#### **exceptions**
: what to do? (show error)

### HDFS

In [5]:
# read back from hdfs in parquet format
df_trg = engine.read('target', 'ingest')
df_trg.where(col(colname) => 'datetime')
df_trg.show()

hdfs://hdfs-nn:8020//data/ingest/extract/sakila.actor
+--------+----------+------------+-------------------+
|actor_id|first_name|   last_name|        last_update|
+--------+----------+------------+-------------------+
|       1|  PENELOPE|     GUINESS|2006-02-15 04:34:33|
|       2|      NICK|    WAHLBERG|2006-02-15 04:34:33|
|       3|        ED|       CHASE|2006-02-15 04:34:33|
|       4|  JENNIFER|       DAVIS|2006-02-15 04:34:33|
|       5|    JOHNNY|LOLLOBRIGIDA|2006-02-15 04:34:33|
|       6|     BETTE|   NICHOLSON|2006-02-15 04:34:33|
|       7|     GRACE|      MOSTEL|2006-02-15 04:34:33|
|       8|   MATTHEW|   JOHANSSON|2006-02-15 04:34:33|
|       9|       JOE|       SWANK|2006-02-15 04:34:33|
|      10| CHRISTIAN|       GABLE|2006-02-15 04:34:33|
|      11|      ZERO|        CAGE|2006-02-15 04:34:33|
|      12|      KARL|       BERRY|2006-02-15 04:34:33|
|      13|       UMA|        WOOD|2006-02-15 04:34:33|
|      14|    VIVIEN|      BERGEN|2006-02-15 04:34:33|
|      15| 

## post checks

In [6]:
assert(df_src.subtract(df_trg).count()==0)

In [None]:
engine.read(path='abcd', provider='dsds')
write(df, path='abcd', provider='ingest')

engine.read('resource_alias')
engine.write(df, 'resource_alias')
