# Datalabframework

The datalabframework is a productivity framework for ETL, ML application. Simplifying some of the common activities which are typical in Data pipeline such as project scaffolding, data ingesting, start schema generation, forecasting etc.

## Principles

- Both notebooks and code files are first citizens

Following python package conventions, the root of the project is tagged by a `__main__.py` and directory of source code (either python or notebooks) contains the `__init__.py` code. By doing so, python and notebook files can reference each other.

Python notebooks and Python code can be mixed and matched, and are interoperable with each other. By using the datalabframework, you can include notebooks as modules to python code, and you can include python modules in a notebook. 

- Decouple Code and Data Resources

Data can be located anywhere, on remote HDFS clusters, or Object Store Services exposed via S3 protocols etc. Also you can keep data on the local file system. No matter where data is located, we want to de-couple data resources from the code executed in the data pipeline.

Separating data and code is done by declaring all data resources/providers as configuration in metadata files. Metadata files make possible to define aliases for data resources, data services and engine configurations, and keeping the ETL and ML code tidy with no hardcoded parameters.

- Decouple Code from Configuration

Code either stored as notebooks or as python files should be decoupled from both engine configurations and from data locations. All configuration is kept in `metadata.yml` yaml files. Multiple setups for test, exploration, production can be described  in the same `metadata.yml` file or in separate multiple files using __profiles__. All profiles inherit from a default profiles, to reduce duplication of configurations settings across profiles.

Chapters

  - [install.ipynb](Installing)
  - [load.ipynb](Load a project)
  - [metadata.ipynb](Configuring Metadata)

## Data Lab Framework

In [1]:
import datalabframework as dlf

### Package things
Package version: package variables `version_info`, `__version__`

In [2]:
dlf.version_info

(0, 7, 0)

In [3]:
dlf.__version__

'0.7.0'

Check is the datalabframework is loaded in the current python context

In [4]:
try:
    __DATALABFRAMEWORK__
    print("the datalabframework is loaded")
except NameError:
    print("the datalabframework is not loaded")

the datalabframework is loaded


In [5]:
#list of modules loaded as `from datalabframework import * ` 
dlf.__all__

['logging', 'project']

### Modules: project

Project is all about setting the correct working directories where to run and find your notebooks, python files and configuration files. When the datalabframework is imported, it starts by searching for a `__main__.py` file, according to python module file naming conventions. All modules and alias paths are all relative to this project root path.

#### Load a project profile

Loading the profile can be done with the `datalabframework.project.load` function call. It will look for files ending with `metadata.yml`. The function can optionally set the current working directory and import the key=values of .env file into the python os environment. if no parameters are specified, the default profile is loaded.

In [6]:
help(dlf.project.load)

Help on function load in module datalabframework.project:

load(profile='default', rootdir_path=None, search_parent_dirs=True, factory_defaults=True)
    Performs the following steps:
        - set rootdir for the given project
        - import variables from  <rootdir>/.env (if present),
        - load the `profile` from the metadata files
        - setup and start the data engine
    
    :param profile: load the given metadata profile (default: 'default')
    :param rootdir_path: root directory for loaded project (default: current working directory)
    
    :param search_parent_dirs: search parent dirs to detect rootdir by 
           looking for a '__main__.py' or 'main.ipynb' file (default: True)
    
    :param factory_defaults: add preset default configuration. 
           Project provided metadata file can override this default values (default: True)
    
    :return: None
    
    Note that:
    
    1)  Metadata files are merged up, so you can split the information in multip

### Metadata profiles

In [7]:
help(dlf.project.metadata)

Help on function metadata in module datalabframework.project:

metadata()
    return a metadata object which provides just one method:
    :return: a Metadata object
    
    Notes about metadata configurations:
    
    #### Metadata files
    
    1) Metadata files are merged up, so you can split the information in 
       multiple files as long as they end with `metadata.yml`.   
       For example: `metadata.yml`, `abc.metadata.yaml`, `abc_metadata.yml` 
       are all valid metadata file names.
    
    2) All metadata files in all subdirectories from the project root directory are loaded,
       unless the directory contains a file `metadata.ignore.yml`
    
    3) Metadata files can provide multiple profile configurations, 
       by separating each profile configuration with as `bare documents` wihtin the same yaml file  
       by separating the configuration with a line containing three hyphens (`---`)  
       (see https://yaml.org/spec/1.2/spec.html#YAML)
    
    4) Each m

In [4]:
import datalabframework as dlf

# Loading default profile
project = dlf.project.load()
dlf.project.engine().config()

{}
{'profile': 'default', 'variables': {'a': 'hello', 'b': 'hello world', 'c': '/bin/bash', 'd': 'foo', 'e': '2019-04-11 03:36:52', 'f': '2019-04-11', 'my_date_var': '2019-04-11', 'my_env_var': 'guest', 'my_string_concat_var1': 'spark running at local[*]', 'my_string_concat_var2': 'Hi There!: the current date is 2019-04-11', 'my_string_var': 'Hi There!'}, 'engine': {'type': 'spark', 'master': 'local[*]', 'jobname': None, 'timezone': 'naive', 'submit': {'jars': None, 'packages': None, 'py-files': None, '': None}, 'config': {'spark.sql.autoBroadcastJoinThreshold': -1}}, 'providers': {'hdfs': {'format': 'parquet', 'path': '/', 'service': 'hdfs', 'hostname': 'hdfs-namenode'}, 'minio': {'format': 'parquet', 'password': 'minio-password', 'username': 'minio-username', 'path': '/data', 'service': 'minio', 'hostname': 'minio'}, 'pagila': {'format': 'jdbc', 'password': 'postgres', 'username': 'postgres', 'path': 'pagila', 'service': 'postgres', 'hostname': 'postgres'}, 'sakila': {'format': 'jdbc

type: spark
name: default
version: 2.4.0
info:
    python_version: 3.6.8
    hadoop_version: 3.1.1
    hadoop_detect: spark
    hadoop_home: /opt/hadoop
    spark_home: /opt/spark
    spark_classpath:
      - /opt/spark/jars/*
      - /opt/hadoop/etc/hadoop
      - /opt/hadoop/share/hadoop/common/lib/*
      - /opt/hadoop/share/hadoop/common/*
      - /opt/hadoop/share/hadoop/hdfs
      - /opt/hadoop/share/hadoop/hdfs/lib/*
      - /opt/hadoop/share/hadoop/hdfs/*
      - /opt/hadoop/share/hadoop/mapreduce/lib/*
      - /opt/hadoop/share/hadoop/mapreduce/*
      - /opt/hadoop/share/hadoop/yarn
      - /opt/hadoop/share/hadoop/yarn/lib/*
      - /opt/hadoop/share/hadoop/yarn/*
    spark_classpath_source: /opt/spark/conf/spark-env.sh
config:
    spark.driver.host: 5de54992bc62
    spark.repl.local.jars: file:///home/jovyan/.ivy2/jars/org.apache.hadoop_hadoop-aws-3.1.1.jar,file:///home/jovyan/.ivy2/jars/mysql_mysql-connector-java-8.0.12.jar,file:///home/jovyan/.ivy2/jars/org.postgresql_pos

In [5]:
dlf.logging.notice('ss', extra={'l':11})
dlf.logging.warning({'l':11}, extra={'a':42})
dlf.logging.warning({'l':11})
dlf.logging.warning('hello')

NOTICE - run_code - ss - {'l': 11}


In [1]:
import datalabframework as dlf

# Loading default profile
project = dlf.project.load()
dlf.project.metadata()

Loading packages:
  -  org.apache.hadoop:hadoop-aws:3.1.1
  -  mysql:mysql-connector-java:8.0.12
  -  org.postgresql:postgresql:42.2.5


profile: default
variables:
    a: hello
    b: hello world
    c: /bin/bash
    d: foo
    e: '2019-04-11 03:40:07'
    f: '2019-04-11'
    my_date_var: '2019-04-11'
    my_env_var: guest
    my_string_concat_var1: spark running at local[*]
    my_string_concat_var2: 'Hi There!: the current date is 2019-04-11'
    my_string_var: Hi There!
engine:
    type: spark
    master: local[*]
    jobname:
    timezone: naive
    submit:
        jars:
        packages:
        py-files:
    config:
        spark.sql.autoBroadcastJoinThreshold: -1
providers:
    pagila:
        hostname: postgres
        username: postgres
        service: postgres
        format: jdbc
        path: pagila
        password: postgres
    hdfs:
        hostname: hdfs-namenode
        service: hdfs
        format: parquet
        path: /
    localfs:
        service: file
        format: csv
        path: data
    sakila:
        hostname: mysql
        username: mysql
        service: mysql
        format: jdbc
   

In [2]:
dlf.project.engine().config()

type: spark
name: default
version: 2.4.0
info:
    python_version: 3.6.8
    hadoop_version: 3.1.1
    hadoop_detect: spark
    hadoop_home: /opt/hadoop
    spark_home: /opt/spark
    spark_classpath:
      - /opt/spark/jars/*
      - /opt/hadoop/etc/hadoop
      - /opt/hadoop/share/hadoop/common/lib/*
      - /opt/hadoop/share/hadoop/common/*
      - /opt/hadoop/share/hadoop/hdfs
      - /opt/hadoop/share/hadoop/hdfs/lib/*
      - /opt/hadoop/share/hadoop/hdfs/*
      - /opt/hadoop/share/hadoop/mapreduce/lib/*
      - /opt/hadoop/share/hadoop/mapreduce/*
      - /opt/hadoop/share/hadoop/yarn
      - /opt/hadoop/share/hadoop/yarn/lib/*
      - /opt/hadoop/share/hadoop/yarn/*
    spark_classpath_source: /opt/spark/conf/spark-env.sh
config:
    spark.driver.host: 5de54992bc62
    spark.repl.local.jars: file:///home/jovyan/.ivy2/jars/org.apache.hadoop_hadoop-aws-3.1.1.jar,file:///home/jovyan/.ivy2/jars/mysql_mysql-connector-java-8.0.12.jar,file:///home/jovyan/.ivy2/jars/org.postgresql_pos

In [3]:
spark = dlf.project.engine().context()

Let's inspect some of the profile metadata loaded

In [4]:
# inspect loaded metadata
md = dlf.project.metadata()

#print out the sections of the metadata
list(md.keys())

['profile', 'variables', 'engine', 'providers', 'resources', 'loggers']

In [5]:
md['engine']

type: spark
master: local[*]
jobname:
timezone: naive
submit:
    jars:
    packages:
    py-files:
config:
    spark.sql.autoBroadcastJoinThreshold: -1

In [6]:
md['variables']

a: hello
b: hello world
c: /bin/bash
d: foo
e: '2019-04-11 03:40:07'
f: '2019-04-11'
my_date_var: '2019-04-11'
my_env_var: guest
my_string_concat_var1: spark running at local[*]
my_string_concat_var2: 'Hi There!: the current date is 2019-04-11'
my_string_var: Hi There!

In [7]:
md['engine']

type: spark
master: local[*]
jobname:
timezone: naive
submit:
    jars:
    packages:
    py-files:
config:
    spark.sql.autoBroadcastJoinThreshold: -1

In [8]:
md['loggers']

root:
    severity: info
datalabframework:
    name: dlf
    stream:
        severity:
        enable:
    stdio:
        severity: notice
        enable: true
    file:
        severity: notice
        enable: true
        path:
    kafka:
        severity: info
        enable: false
        hosts: kafka-node1:9092 kafka-node2:9092
        topic: dlf

## Inspect current project configuration

You can inspect the current project configuration, by calling the `datalabframework.project.config` function.

In [9]:
help(dlf.project.config)

Help on function config in module datalabframework.project:

config()



In [10]:
dlf.project.config()

version: 0.7.0
username: jovyan
session_id: '0x7fe0ccc65c0b11e9'
profile: default
rootdir: /home/jovyan/work/basic
script_path: main.ipynb
dotenv_path: .env
notebooks_files:
  - main.ipynb
  - versions.ipynb
  - hello.ipynb
  - main-Copy1.ipynb
python_files:
  - __main__.py
metadata_files:
  - metadata.yml
repository:
    type:
    committer: ''
    hash: 0
    commit: 0
    branch: ''
    url: ''
    name: ''
    date: ''
    clean: false

Data resources are relative to the `rootpath`. 

In [11]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

SparkContext.getOrCreate().stop()
#'spark://spark-master:7077'
spark = SparkSession.builder\
            .master('local[*]') \
            .config('spark.hadoop.fs.s3a.endpoint','http://minio:9000')\
            .config('spark.hadoop.fs.s3a.impl','org.apache.hadoop.fs.s3a.S3AFileSystem')\
            .config('spark.hadoop.fs.s3a.path.style.access','true')\
            .config('spark.hadoop.fs.s3a.access.key','minio-username')\
            .config('spark.hadoop.fs.s3a.secret.key','minio-password')\
            .getOrCreate()

for i in spark.sparkContext.getConf().getAll():
    print(i)

df = spark.createDataFrame([(1,2), (3,4)], ['a', 'b'])

# df.write.options(inferSchema=True, header=True).csv('file:///home/jovyan/work/basic/ohohpippo123.csv')
# spark.read.options(inferSchema=True, header=True).csv('pippo123.csv').show()

('spark.driver.host', '5de54992bc62')
('spark.repl.local.jars', 'file:///home/jovyan/.ivy2/jars/org.apache.hadoop_hadoop-aws-3.1.1.jar,file:///home/jovyan/.ivy2/jars/mysql_mysql-connector-java-8.0.12.jar,file:///home/jovyan/.ivy2/jars/org.postgresql_postgresql-42.2.5.jar,file:///home/jovyan/.ivy2/jars/com.amazonaws_aws-java-sdk-bundle-1.11.271.jar,file:///home/jovyan/.ivy2/jars/com.google.protobuf_protobuf-java-2.6.0.jar')
('spark.app.id', 'local-1554954088157')
('spark.hadoop.fs.s3a.path.style.access', 'true')
('spark.jars', 'file:///home/jovyan/.ivy2/jars/org.apache.hadoop_hadoop-aws-3.1.1.jar,file:///home/jovyan/.ivy2/jars/mysql_mysql-connector-java-8.0.12.jar,file:///home/jovyan/.ivy2/jars/org.postgresql_postgresql-42.2.5.jar,file:///home/jovyan/.ivy2/jars/com.amazonaws_aws-java-sdk-bundle-1.11.271.jar,file:///home/jovyan/.ivy2/jars/com.google.protobuf_protobuf-java-2.6.0.jar')
('spark.executor.extraJavaOptions', '-Duser.timezone=UTC')
('spark.hadoop.fs.s3a.access.key', 'minio-user

In [12]:
    
df = spark.createDataFrame(((1,2), (3,4)))
df.write.parquet('s3a://data/abc',mode='overwrite')

df = spark.read.parquet('s3a://data/abc')
df.show()

+---+---+
| _1| _2|
+---+---+
|  1|  2|
|  3|  4|
+---+---+



In [13]:
import datalabframework as dlf
dlf.project.load('prod')

Loading packages:
  -  org.apache.hadoop:hadoop-aws:3.1.1
  -  mysql:mysql-connector-java:8.0.12
  -  org.postgresql:postgresql:42.2.5


<datalabframework.project.Project at 0x7fdec84bc940>

In [14]:
spark = dlf.project.engine().context()

In [15]:
df = spark.createDataFrame(((1,2), (3,4)))
df.write.parquet('s3a://data/abc',mode='overwrite')

df = spark.read.parquet('s3a://data/abc')
df.show()

+---+---+
| _1| _2|
+---+---+
|  1|  2|
|  3|  4|
+---+---+



In [16]:
engine = dlf.project.engine()

md = dlf.project.resource('xyz', 'minio')
md

{'hash': '0x9a37c2af26109088',
 'url': 's3a://data/xyz',
 'service': 'minio',
 'format': 'parquet',
 'host': 'minio',
 'port': 9000,
 'driver': None,
 'database': None,
 'schema': None,
 'table': None,
 'username': 'minio-username',
 'password': 'minio-password',
 'resource_path': 'xyz',
 'provider_path': '/data',
 'provider_alias': 'minio',
 'resource_alias': 'xyz',
 'cache': None,
 'date_column': None,
 'date_start': None,
 'date_end': None,
 'date_window': None,
 'date_partition': None,
 'update_column': None,
 'hash_column': None,
 'state_column': None,
 'options': {},
 'mapping': {}}

In [17]:
engine.save(df,md, mode='overwrite')
df=engine.load(md)
df.show()

+---+---+
| _1| _2|
+---+---+
|  1|  2|
|  3|  4|
+---+---+



### Resources

Data binding works with the metadata files. It's a good practice to declare the actual binding in the metadata and avoiding hardcoding the paths in the notebooks and python source files.

In [18]:
md =dlf.project.resource('ascombe')
md

{'hash': '0x1586b5767f0c5518',
 'url': '/home/jovyan/work/basic/data/ascombe.csv',
 'service': 'file',
 'format': 'csv',
 'host': '127.0.0.1',
 'port': None,
 'driver': None,
 'database': None,
 'schema': None,
 'table': None,
 'username': None,
 'password': None,
 'resource_path': 'ascombe.csv',
 'provider_path': '/home/jovyan/work/basic/data',
 'provider_alias': 'localfs',
 'resource_alias': 'ascombe',
 'cache': None,
 'date_column': None,
 'date_start': None,
 'date_end': None,
 'date_window': None,
 'date_partition': None,
 'update_column': None,
 'hash_column': None,
 'state_column': None,
 'options': {'header': True, 'inferSchema': True},
 'mapping': {}}

### Modules: Engines

This submodules will allow you to start a context, from the configuration described in the metadata. It also provide, basic load/store data functions according to the aliases defined in the configuration.

Let's start by listing the aliases and the configuration of the engines declared in `metadata.yml`.


__Context: Spark__  
Let's start the engine session, by selecting a spark context from the list. Your can have many spark contexts declared, for instance for single node 

In [19]:
engine = dlf.project.engine()
engine.config()

type: spark
name: prod
version: 2.4.0
info:
    python_version: 3.6.8
    hadoop_version: 3.1.1
    hadoop_detect: spark
    hadoop_home: /opt/hadoop
    spark_home: /opt/spark
    spark_classpath:
      - /opt/spark/jars/*
      - /opt/hadoop/etc/hadoop
      - /opt/hadoop/share/hadoop/common/lib/*
      - /opt/hadoop/share/hadoop/common/*
      - /opt/hadoop/share/hadoop/hdfs
      - /opt/hadoop/share/hadoop/hdfs/lib/*
      - /opt/hadoop/share/hadoop/hdfs/*
      - /opt/hadoop/share/hadoop/mapreduce/lib/*
      - /opt/hadoop/share/hadoop/mapreduce/*
      - /opt/hadoop/share/hadoop/yarn
      - /opt/hadoop/share/hadoop/yarn/lib/*
      - /opt/hadoop/share/hadoop/yarn/*
    spark_classpath_source: /opt/spark/conf/spark-env.sh
config:
    spark.driver.host: 5de54992bc62
    spark.repl.local.jars: file:///home/jovyan/.ivy2/jars/org.apache.hadoop_hadoop-aws-3.1.1.jar,file:///home/jovyan/.ivy2/jars/mysql_mysql-connector-java-8.0.12.jar,file:///home/jovyan/.ivy2/jars/org.postgresql_postgr

You can quickly inspect the properties of the context by calling the `info()` function

By calling the `context` method, you access the Spark SQL Context directly. 

In [20]:
spark = engine.context()

Once again, let's read the csv data again, this time using the spark context. First using the engine `write` utility, then directly using the spark context and the `dlf.data.path` function to localize our labeled dataset.

In [21]:
#read using the engine utility (directly using the load function)
# load/save option from the metadata.yml files
df = engine.load('ascombe')
df.show(2)

+---+----+----+----+----+----+----+---+----+
|_c0| _c1| _c2| _c3| _c4| _c5| _c6|_c7| _c8|
+---+----+----+----+----+----+----+---+----+
|idx|  Ix|  Iy| IIx| IIy|IIIx|IIIy|IVx| IVy|
|  0|10.0|8.04|10.0|9.14|10.0|7.46|8.0|6.58|
+---+----+----+----+----+----+----+---+----+
only showing top 2 rows



In [22]:
#read using the engine utility (directly using the load function, option inline)
md = dlf.project.resource('ascombe')
md

{'hash': '0x1586b5767f0c5518',
 'url': '/home/jovyan/work/basic/data/ascombe.csv',
 'service': 'file',
 'format': 'csv',
 'host': '127.0.0.1',
 'port': None,
 'driver': None,
 'database': None,
 'schema': None,
 'table': None,
 'username': None,
 'password': None,
 'resource_path': 'ascombe.csv',
 'provider_path': '/home/jovyan/work/basic/data',
 'provider_alias': 'localfs',
 'resource_alias': 'ascombe',
 'cache': None,
 'date_column': None,
 'date_start': None,
 'date_end': None,
 'date_window': None,
 'date_partition': None,
 'update_column': None,
 'hash_column': None,
 'state_column': None,
 'options': {'header': True, 'inferSchema': True},
 'mapping': {}}

In [None]:
import datalabframework as dlf
dlf.project.load('default')
engine = dlf.project.engine()
spark  = engine.context()

In [None]:
df = engine.load('ascombe', header=True)
df.show(2)

In [None]:
engine.save(df,'correlation', mode='overwrite')
engine.load('correlation').show(2)

In [None]:
engine.save(df,'correlation', mode='overwrite', header=False)
engine.load('correlation', header=False).show(2)

In [None]:
engine.save(df,'correlation', mode='overwrite', header=True)
engine.load('correlation', header=True).show(2)

In [None]:
import datalabframework as dlf
dlf.project.load('prod')
engine = dlf.project.engine()

df = engine.load('ascombe', header=True)
df.show(2)

In [None]:
engine.save(df,'correlation', mode='overwrite')
engine.load('correlation').show(2)

In [None]:
engine.save(df,'correlation', mode='overwrite', header=False)
engine.load('correlation', header=False).show(2)

In [None]:
engine.save(df,'correlation', mode='overwrite', header=True)
engine.load('correlation', header=True).show(2)

In [None]:
df.toPandas()

In [None]:
#read using the engine utility (also from resource metadata)
md =dlf.project.resource('ascombe')
df = engine.load(md, inferSchema=True, header=True)
df.show(2)

In [None]:
df.printSchema()

In [None]:
df.show()

Finally, let's calculate the correlation for each set I,II, III, IV between the `x` and `y` columns and save the result on an separate dataset.

In [None]:
from pyspark.ml.feature import VectorAssembler

for s in ['I', 'II', 'III', 'IV']:
    va = VectorAssembler(inputCols=[s+'x', s+'y'], outputCol=s)
    df = va.transform(df)
    df = df.drop(s+'x', s+'y')
    
df.show()

After assembling the dataframe into four sets of 2D vectors, let's calculate the pearson correlation for each set. In the case the the ascombe sets, all sets should have the same pearson correlation.

In [None]:
spark = engine.context()

In [None]:
from pyspark.ml.stat import Correlation
from pyspark.sql.types import DoubleType

corr = {}
cols = ['I', 'II', 'III', 'IV']

# calculate pearson correlations
for s in cols:
    corr[s] = Correlation.corr(df, s, 'pearson').collect()[0][0][0,1].item()

# declare schema
from pyspark.sql.types import StructType, StructField, FloatType
schema = StructType([StructField(s, FloatType(), True) for s in cols])

# create output dataframe
corr_df = spark.createDataFrame(data=[corr], schema=schema)

In [None]:
import pyspark.sql.functions as f
corr_df.select([f.round(f.avg(c), 3).alias(c) for c in cols]).show()

Save the results. It's a very small data frame, however Spark when saving  csv format files, assumes large data sets and partitions the files inside an object (a directory) with the name of the target file. See below:


In [None]:
df.show()

In [None]:
engine.save(corr_df,'correlation', mode='overwrite', header=True)

Roundtrip, reading it back to check all went fine

In [None]:
engine.load('correlation', header=True, inferSchema=True).show()

#### Access data from HDFS

We can override the default resource provider, 
by explicitely passing a different provider

In [None]:
md = dlf.project.resource('correlation', 'hdfs')
engine.save(corr_df,md, header=True, mode='overwrite')

In [None]:
engine.load('correlation', 'hdfs', header=True, inferSchema=True).show()

#### Access data from Minio

Minio is an object storage server released under Apache License v2.0. It is compatible with Amazon S3 cloud storage service.   
It is best suited for storing unstructured data such as photos, videos, log files, backups and container / VM images.   
Size of an object can range from a few KBs to a maximum of 5TB. More info at https://github.com/minio/minio

In [None]:
md = dlf.project.resource('correlation', 'minio')
md

In [None]:
md = dlf.project.resource('correlation', 'minio')
engine.save(corr_df,md, header=True, mode='overwrite')

In [None]:
engine.load('correlation', 'minio', header=True, inferSchema=True).show()

#### Inspect data objects given a provider

In [1]:
import datalabframework as dlf

In [2]:
dlf.project.load('prod')
engine = dlf.project.engine()

Loading packages:
  -  org.apache.hadoop:hadoop-aws:3.1.1
  -  com.microsoft.sqlserver:mssql-jdbc:6.4.0.jre8
  -  mysql:mysql-connector-java:8.0.12
  -  org.postgresql:postgresql:42.2.5


In [3]:
engine.list('localfs').toPandas()

Unnamed: 0,name,type
0,.ipynb_checkpoints,DIRECTORY
1,correlation.csv,FILE
2,ascombe.csv,FILE
3,prod,DIRECTORY


In [4]:
engine.list(path='prod', provider='localfs').toPandas()

Unnamed: 0,name,type
0,etl,DIRECTORY
1,raw,DIRECTORY
2,reports,DIRECTORY
3,ml,DIRECTORY


In [5]:
engine.list('hdfs').toPandas()

Unnamed: 0,name,type
0,correlation.csv,DIRECTORY


In [6]:
engine.list(path='correlation.csv', provider='hdfs').toPandas()

Unnamed: 0,name,type
0,_temporary,DIRECTORY


In [7]:
engine.list('minio').toPandas()

Unnamed: 0,name,type


In [8]:
engine.list('school').toPandas()

Unnamed: 0,name,type
0,Department,BASE TABLE
1,Person,BASE TABLE
2,OnsiteCourse,BASE TABLE
3,OnlineCourse,BASE TABLE
4,StudentGrade,BASE TABLE
5,CourseInstructor,BASE TABLE
6,Course,BASE TABLE
7,OfficeAssignment,BASE TABLE


In [9]:
engine.load('SELECT * FROM INFORMATION_SCHEMA.TABLES', 'school').toPandas()

Unnamed: 0,TABLE_CATALOG,TABLE_SCHEMA,TABLE_NAME,TABLE_TYPE
0,School,dbo,Department,BASE TABLE
1,School,dbo,Person,BASE TABLE
2,School,dbo,OnsiteCourse,BASE TABLE
3,School,dbo,OnlineCourse,BASE TABLE
4,School,dbo,StudentGrade,BASE TABLE
5,School,dbo,CourseInstructor,BASE TABLE
6,School,dbo,Course,BASE TABLE
7,School,dbo,OfficeAssignment,BASE TABLE


In [10]:
engine.list('sakila').toPandas()

Unnamed: 0,name,type
0,actor,BASE TABLE
1,actor_info,VIEW
2,address,BASE TABLE
3,category,BASE TABLE
4,city,BASE TABLE
5,country,BASE TABLE
6,customer,BASE TABLE
7,customer_list,VIEW
8,film,BASE TABLE
9,film_actor,BASE TABLE


In [11]:
engine.list('pagila').toPandas()

Unnamed: 0,name,type
0,actor_info,VIEW
1,customer_list,VIEW
2,film_list,VIEW
3,nicer_but_slower_film_list,VIEW
4,film,BASE TABLE
5,inventory,BASE TABLE
6,staff,BASE TABLE
7,payment_p2007_05,BASE TABLE
8,payment_p2007_06,BASE TABLE
9,sales_by_film_category,VIEW


In [12]:
df = engine.list('pagila').toPandas()
df[df['type']=='BASE TABLE'].sort_values('name')

Unnamed: 0,name,type
12,actor,BASE TABLE
25,address,BASE TABLE
10,category,BASE TABLE
26,city,BASE TABLE
14,country,BASE TABLE
27,customer,BASE TABLE
4,film,BASE TABLE
15,film_actor,BASE TABLE
18,film_category,BASE TABLE
5,inventory,BASE TABLE


In [13]:
query = """
    SELECT c.last_name,
             COUNT(p.amount) AS amount
    FROM customer c
    LEFT JOIN payment p
        ON c.customer_id = p.customer_id
    WHERE c.last_name like 'A%'
    GROUP BY  c.last_name
    ORDER BY  c.last_name ASC;
"""
engine.load(query, 'pagila').toPandas()

Unnamed: 0,last_name,amount
0,ABNEY,21
1,ADAM,28
2,ADAMS,27
3,ALEXANDER,27
4,ALLARD,32
5,ALLEN,31
6,ALVAREZ,27
7,ANDERSON,24
8,ANDREW,25
9,ANDREWS,23


### Modules: Export

This submodules will allow you to export cells and import them in other notebooks as python packages. Check the notebook [versions.ipynb](versions.ipynb), where you will see how to export the notebook, then follow the code here below to check it really works!


In [14]:
import datalabframework as dlf
dlf.project.load()

In [15]:
from hello import python_version

In [16]:
python_version()

Hello world: python 3.6.3


### Modules: Logging

In [17]:
import datalabframework as dlf
import datalabframework.logging as log

dlf.project.load(factory_defaults=False)
dlf.project.metadata()['loggers']

Loading packages:
  -  org.apache.hadoop:hadoop-aws:3.1.1
  -  com.microsoft.sqlserver:mssql-jdbc:6.4.0.jre8
  -  mysql:mysql-connector-java:8.0.12
  -  org.postgresql:postgresql:42.2.5


root:
    severity: info
datalabframework:
    name: dlf
    stream:
        severity:
        enable:
    stdio:
        severity: notice
        enable: true
    file:
        severity: notice
        enable: true
        path:
    kafka:
        severity: info
        enable: false
        hosts: kafka-node1:9092 kafka-node2:9092
        topic: dlf

In [20]:
log.error('test')
log.warning('pipppo', extra={'a':42})
log.notice({'a':42})

ERROR - run_code - test - {}
NOTICE - run_code - data - {'a': 42}


Connect to a database

From host:
`docker exec -it 23fc6098cbdf psql --username=postgres --dbname=pagila `

In [21]:
dlf.files.get_dotenv_path(dlf.paths.rootdir())

'.env'

In [22]:
import datalabframework as dlf
dlf.project.load()


Loading packages:
  -  org.apache.hadoop:hadoop-aws:3.1.1
  -  com.microsoft.sqlserver:mssql-jdbc:6.4.0.jre8
  -  mysql:mysql-connector-java:8.0.12
  -  org.postgresql:postgresql:42.2.5


<datalabframework.project.Project at 0x7fa8b7d51160>

In [23]:
from datalabframework.metadata import load as get_project_metadata
from datalabframework import files, paths

from datalabframework._utils import Singleton, YamlDict, repo_data, to_ordered_dict, python_version, relpath, abspath

In [24]:
get_project_metadata(
                'default',
                abspath(files.get_metadata_files(paths.rootdir()), paths.rootdir()),
                abspath(dlf.files.get_dotenv_path(dlf.paths.rootdir()), paths.rootdir()))

{'profile': 'default',
 'variables': {'a': 'hello',
  'b': 'hello world',
  'c': '/bin/bash',
  'd': 'foo',
  'e': '2019-04-12 03:33:32',
  'f': '2019-04-12',
  'my_date_var': '2019-04-12',
  'my_env_var': 'guest',
  'my_string_concat_var1': 'spark running at local[*]',
  'my_string_concat_var2': 'Hi There!: the current date is 2019-04-12',
  'my_string_var': 'Hi There!'},
 'engine': {'type': 'spark',
  'master': 'local[*]',
  'jobname': None,
  'timezone': 'naive',
  'submit': {'jars': None, 'packages': None, 'py-files': None},
  'config': {'spark.sql.autoBroadcastJoinThreshold': -1}},
 'providers': {'minio': {'username': 'minio-username',
   'path': '/data',
   'service': 'minio',
   'hostname': 'minio',
   'password': 'minio-password',
   'format': 'parquet'},
  'localfs': {'path': 'data', 'format': 'csv', 'service': 'file'},
  'pagila': {'username': 'postgres',
   'path': 'pagila',
   'service': 'postgres',
   'hostname': 'postgres',
   'password': 'postgres',
   'format': 'jdbc'},

In [25]:
import datalabframework as dlf
dlf.project.load('prod')

engine = dlf.project.engine()

Loading packages:
  -  org.apache.hadoop:hadoop-aws:3.1.1
  -  com.microsoft.sqlserver:mssql-jdbc:6.4.0.jre8
  -  mysql:mysql-connector-java:8.0.12
  -  org.postgresql:postgresql:42.2.5


In [26]:
md = dlf.project.resource('staff', 'pagila')
md

{'hash': '0x64af7cc8c77c8fba',
 'url': 'jdbc:postgresql://postgres:5432/pagila',
 'service': 'postgres',
 'format': 'jdbc',
 'host': 'postgres',
 'port': 5432,
 'driver': 'org.postgresql.Driver',
 'database': 'pagila',
 'schema': 'public',
 'table': 'staff',
 'username': 'postgres',
 'password': 'postgres',
 'resource_path': 'staff',
 'provider_path': 'pagila',
 'provider_alias': 'pagila',
 'resource_alias': 'staff',
 'cache': None,
 'date_column': None,
 'date_start': None,
 'date_end': None,
 'date_window': None,
 'date_partition': None,
 'update_column': None,
 'hash_column': None,
 'state_column': None,
 'options': {},
 'mapping': {}}

In [27]:
df = engine.load('staff', 'pagila')
df.select('first_name', 'last_name').toPandas()

Unnamed: 0,first_name,last_name
0,Mike,Hillyer
1,Jon,Stephens


In [28]:
#Use JOIN to display the total amount rung up by each staff member
# use tables 'staff' and 'payment'

query = """
    SELECT SUM(p.amount) as total_sales, s.last_name
    FROM payment p 
    INNER JOIN staff s ON p.staff_id = s.staff_id GROUP BY s.last_name
    """
df = engine.load(query, 'pagila')
df.toPandas()

Unnamed: 0,total_sales,last_name
0,33927.04,Stephens
1,33489.47,Hillyer


In [29]:
md = dlf.project.resource('SELECT COUNT(1) FRom payment', 'pagila')
engine.load(md).toPandas()

Unnamed: 0,count
0,16049


In [30]:
query = """
    SEleCT *
    FROM information_schema.tables
    WHERE 
        table_type = 'BASE TABLE' AND
        table_schema = 'public'
    ORDER BY table_schema,table_name
    """
md = dlf.project.resource(query, 'pagila')
md

{'hash': '0x253970984be15485',
 'url': 'jdbc:postgresql://postgres:5432/pagila',
 'service': 'postgres',
 'format': 'jdbc',
 'host': 'postgres',
 'port': 5432,
 'driver': 'org.postgresql.Driver',
 'database': 'pagila',
 'schema': 'public',
 'table': "( SEleCT * FROM information_schema.tables WHERE table_type = 'BASE TABLE' AND table_schema = 'public' ORDER BY table_schema,table_name ) as _query",
 'username': 'postgres',
 'password': 'postgres',
 'resource_path': "\n    SEleCT *\n    FROM information_schema.tables\n    WHERE \n        table_type = 'BASE TABLE' AND\n        table_schema = 'public'\n    ORDER BY table_schema,table_name\n    ",
 'provider_path': 'pagila',
 'provider_alias': 'pagila',
 'resource_alias': "\n    SEleCT *\n    FROM information_schema.tables\n    WHERE \n        table_type = 'BASE TABLE' AND\n        table_schema = 'public'\n    ORDER BY table_schema,table_name\n    ",
 'cache': None,
 'date_column': None,
 'date_start': None,
 'date_end': None,
 'date_window'

In [48]:
engine.load(md, 'pagila').toPandas()

Unnamed: 0,table_catalog,table_schema,table_name,table_type,self_referencing_column_name,reference_generation,user_defined_type_catalog,user_defined_type_schema,user_defined_type_name,is_insertable_into,is_typed,commit_action
0,pagila,public,actor,BASE TABLE,,,,,,YES,NO,
1,pagila,public,address,BASE TABLE,,,,,,YES,NO,
2,pagila,public,category,BASE TABLE,,,,,,YES,NO,
3,pagila,public,city,BASE TABLE,,,,,,YES,NO,
4,pagila,public,country,BASE TABLE,,,,,,YES,NO,
5,pagila,public,customer,BASE TABLE,,,,,,YES,NO,
6,pagila,public,film,BASE TABLE,,,,,,YES,NO,
7,pagila,public,film_actor,BASE TABLE,,,,,,YES,NO,
8,pagila,public,film_category,BASE TABLE,,,,,,YES,NO,
9,pagila,public,inventory,BASE TABLE,,,,,,YES,NO,


In [49]:
import datalabframework as dlf
dlf.project.load('prod')

engine = dlf.project.engine()

Loading packages:
  -  org.apache.hadoop:hadoop-aws:3.1.1
  -  com.microsoft.sqlserver:mssql-jdbc:6.4.0.jre8
  -  mysql:mysql-connector-java:8.0.12
  -  org.postgresql:postgresql:42.2.5


In [50]:
engine.list('pagila').schema
#.show(20,False)

StructType(List(StructField(name,StringType,true),StructField(type,StringType,true)))

In [51]:
engine.list('localfs')

DataFrame[name: string, type: string]

In [52]:
import logging

logger = logging.getLogger('pippo')
logger.name

'pippo'

In [53]:
engine.list('minio')

DataFrame[name: string, type: string]

In [54]:
engine.list('hdfs')

DataFrame[name: string, type: string]

In [55]:
import datalabframework as dlf
import datalabframework.logging as log

dlf.project.load()


Loading packages:
  -  org.apache.hadoop:hadoop-aws:3.1.1
  -  com.microsoft.sqlserver:mssql-jdbc:6.4.0.jre8
  -  mysql:mysql-connector-java:8.0.12
  -  org.postgresql:postgresql:42.2.5


<datalabframework.project.Project at 0x7f55d15ca1d0>

In [56]:
log.notice({'rows':23, 'time':120})
log.warning('my message')
log.warning('this is another message', extra={'action':'join', 'aaa':42})


NOTICE - run_code - data - {'rows': 23, 'time': 120}


In [57]:
log.notice(dict(engine.config()['info']))

NOTICE - run_code - data - {'python_version': '3.6.3', 'hadoop_home': '/opt/hadoop', 'hadoop_detect': 'spark', 'spark_classpath_source': '/opt/spark/conf/spark-env.sh', 'spark_home': '/opt/spark', 'spark_classpath': ['/opt/spark/jars/*', '/opt/hadoop/etc/hadoop', '/opt/hadoop/share/hadoop/common/lib/*', '/opt/hadoop/share/hadoop/common/*', '/opt/hadoop/share/hadoop/hdfs', '/opt/hadoop/share/hadoop/hdfs/lib/*', '/opt/hadoop/share/hadoop/hdfs/*', '/opt/hadoop/share/hadoop/mapreduce/lib/*', '/opt/hadoop/share/hadoop/mapreduce/*', '/opt/hadoop/share/hadoop/yarn', '/opt/hadoop/share/hadoop/yarn/lib/*', '/opt/hadoop/share/hadoop/yarn/*'], 'hadoop_version': '3.1.1'}
