In [1]:
import datalabframework as dlf
dlf.project.profile()

'default'

In [2]:
dlf.version_info

(0, 5, 9)

In [3]:
engine = dlf.engines.get('spark')
spark = engine.context()

PYSPARK_SUBMIT_ARGS:  --packages mysql:mysql-connector-java:8.0.12 pyspark-shell


### MYSQL

In [4]:
# read by resource alias
df_src = engine.read('in')
df_src.show(5)

2018-10-17 09:07:08,817 - INFO - 49fada1 - datalabframework-demos.git - jovyan - main.ipynb - engine.read - {'format': 'rdbms', 'service': 'mysql', 'path': 'actor', 'url': 'jdbc:mysql://mysql:3306/sakila'}
+--------+----------+---------+-------------------+
|actor_id|first_name|last_name|        last_update|
+--------+----------+---------+-------------------+
|     105|    SIDNEY|    CROWE|2006-02-15 04:34:33|
|     172|   GROUCHO| WILLIAMS|2006-02-15 04:34:33|
|      74|     MILLA|   KEITEL|2006-02-15 04:34:33|
|      48|   FRANCES|DAY-LEWIS|2006-02-15 04:34:33|
|      65|    ANGELA|   HUDSON|2006-02-15 04:34:33|
+--------+----------+---------+-------------------+
only showing top 5 rows



In [5]:
df_src.rdd.getNumPartitions()

4

In [6]:
# read by resource path and provider
df_src = engine.read(path='actor', provider='source')
df_src.show(5)

2018-10-17 09:07:09,890 - INFO - 49fada1 - datalabframework-demos.git - jovyan - main.ipynb - engine.read - {'format': 'rdbms', 'service': 'mysql', 'path': 'actor', 'url': 'jdbc:mysql://mysql:3306/sakila'}
+--------+----------+---------+-------------------+
|actor_id|first_name|last_name|        last_update|
+--------+----------+---------+-------------------+
|     105|    SIDNEY|    CROWE|2006-02-15 04:34:33|
|     172|   GROUCHO| WILLIAMS|2006-02-15 04:34:33|
|      74|     MILLA|   KEITEL|2006-02-15 04:34:33|
|      48|   FRANCES|DAY-LEWIS|2006-02-15 04:34:33|
|      65|    ANGELA|   HUDSON|2006-02-15 04:34:33|
+--------+----------+---------+-------------------+
only showing top 5 rows



In [7]:
from pyspark.sql import functions as F
df = df_src.withColumn('date', F.to_date('last_update'))
df.show(5)

+--------+----------+---------+-------------------+----------+
|actor_id|first_name|last_name|        last_update|      date|
+--------+----------+---------+-------------------+----------+
|     105|    SIDNEY|    CROWE|2006-02-15 04:34:33|2006-02-15|
|     172|   GROUCHO| WILLIAMS|2006-02-15 04:34:33|2006-02-15|
|      74|     MILLA|   KEITEL|2006-02-15 04:34:33|2006-02-15|
|      48|   FRANCES|DAY-LEWIS|2006-02-15 04:34:33|2006-02-15|
|      65|    ANGELA|   HUDSON|2006-02-15 04:34:33|2006-02-15|
+--------+----------+---------+-------------------+----------+
only showing top 5 rows



### HDFS

In [8]:
# write by resource alias 
engine.write(df, 'out', mode='overwrite')

2018-10-17 09:07:13,042 - INFO - 49fada1 - datalabframework-demos.git - jovyan - main.ipynb - engine.write - {'format': 'parquet', 'service': 'hdfs', 'path': 'actor', 'url': 'hdfs://hdfs-nn:8020/data/ingest/actor'}


In [9]:
# write by resource path and provider alias
engine.write(df, path='actor', provider='ingest', partitionBy=['date'], mode='overwrite')

2018-10-17 09:07:13,452 - INFO - 49fada1 - datalabframework-demos.git - jovyan - main.ipynb - engine.write - {'format': 'parquet', 'service': 'hdfs', 'path': 'actor', 'url': 'hdfs://hdfs-nn:8020/data/ingest/actor'}


## Post write checks

In [10]:
# read back from hdfs in parquet format
df_trg = engine.read('out', 'ingest')

2018-10-17 09:07:13,895 - INFO - 49fada1 - datalabframework-demos.git - jovyan - main.ipynb - engine.read - {'format': 'parquet', 'service': 'hdfs', 'path': 'actor', 'url': 'hdfs://hdfs-nn:8020/data/ingest/actor'}


In [11]:
assert(df.count()==df_trg.count())
assert(df.subtract(df_trg).count()==0)

#### Check the data on hdfs

If all worked fine up to here, you should be able to see the data at [http://localhost:50070/explorer.html#/data/ingest/actor](http://localhost:50070/explorer.html#/data/ingest/actor)