## Analysing workflow 1 for implementation
It is a simple flow for storing the input data (also known as primary series):  

CSV file as input -> dummy test (eg. check for missing data) -> transcode -> write in sqlite

- For this domain the input data are time series.
- There are several dataflows (collections) for the domain
    - the name of the dataflow is contained in the first column (dataflow)
    - for our example it is: EBSSBS_PRL_A
    - for now this will be the name of the sqlite file for this dataflow
    - for table name: originals
    - for columns keep original names, first column removed




In [2]:
import pandas as pd

# load csv
df1 = pd.read_csv('./EBSSBS_PRL_A - SDMX-CSV template - V01.csv', sep=';')

# print first 5 lines (head)
print(df1.head())

# some statistics
df1.describe()

                  DATAFLOW FREQ  TIME_PERIOD  REF_AREA INDICATOR ACTIVITY  \
0  ESTAT:EBSSBS_PRL_A(1.0)    A          NaN       NaN       ENT        B   
1  ESTAT:EBSSBS_PRL_A(1.0)    A          NaN       NaN       ENT      B05   
2  ESTAT:EBSSBS_PRL_A(1.0)    A          NaN       NaN       ENT     B051   
3  ESTAT:EBSSBS_PRL_A(1.0)    A          NaN       NaN       ENT     B052   
4  ESTAT:EBSSBS_PRL_A(1.0)    A          NaN       NaN       ENT      B06   

  NUMBER_EMPL PRODUCT TURNOVER CLIENT_RESIDENCE  ...  UNIT_MEASURE UNIT_MULT  \
0          _T      _Z       _Z               _Z  ...            PN         0   
1          _T      _Z       _Z               _Z  ...            PN         0   
2          _T      _Z       _Z               _Z  ...            PN         0   
3          _T      _Z       _Z               _Z  ...            PN         0   
4          _T      _Z       _Z               _Z  ...            PN         0   

   DECIMALS  OBS_STATUS OBS_STATUS_1  CONF_STATUS  CONF_

Unnamed: 0,TIME_PERIOD,REF_AREA,OBS_VALUE,UNIT_MULT,DECIMALS,OBS_STATUS_1,CONF_STATUS,CONF_STATUS_1,DOMINANCE,SHARE_SECOND,COMMENT_OBS
count,0.0,0.0,0.0,6532.0,6532.0,0.0,0.0,0.0,0.0,0.0,0.0
mean,,,,0.945193,0.0,,,,,,
std,,,,1.393731,0.0,,,,,,
min,,,,0.0,0.0,,,,,,
25%,,,,0.0,0.0,,,,,,
50%,,,,0.0,0.0,,,,,,
75%,,,,3.0,0.0,,,,,,
max,,,,3.0,0.0,,,,,,


In [16]:
# extract column names
print(df1.columns.tolist())
# get implicit column types
print(df1.dtypes)
# check for duplicates
df1.columns.has_duplicates

['DATAFLOW', 'FREQ', 'TIME_PERIOD', 'REF_AREA', 'INDICATOR', 'ACTIVITY', 'NUMBER_EMPL', 'PRODUCT', 'TURNOVER', 'CLIENT_RESIDENCE', 'OBS_VALUE', 'UNIT_MEASURE', 'UNIT_MULT', 'DECIMALS', 'OBS_STATUS', 'OBS_STATUS_1', 'CONF_STATUS', 'CONF_STATUS_1', 'DOMINANCE', 'SHARE_SECOND', 'COMMENT_OBS']
DATAFLOW             object
FREQ                 object
TIME_PERIOD         float64
REF_AREA            float64
INDICATOR            object
ACTIVITY             object
NUMBER_EMPL          object
PRODUCT              object
TURNOVER             object
CLIENT_RESIDENCE     object
OBS_VALUE           float64
UNIT_MEASURE         object
UNIT_MULT             int64
DECIMALS              int64
OBS_STATUS           object
OBS_STATUS_1        float64
CONF_STATUS         float64
CONF_STATUS_1       float64
DOMINANCE           float64
SHARE_SECOND        float64
COMMENT_OBS         float64
dtype: object


False

For now we will consider the following types for each column (as saved in sqlite)
also don't forget that SQLite is "typeless" (for more see, https://www.sqlite.org/datatypes.html):

1. FREQ                 VARCHAR(1) = 'A'
1. TIME_PERIOD          INT (with range)
1. REF_AREA             VARCHAR(2)
1. INDICATOR            VARCHAR(4)
1. ACTIVITY             VARCHAR(4)
1. NUMBER_EMPL          VARCHAR(7)
1. PRODUCT              VARCHAR(1) = '_Z'
1. TURNOVER             VARCHAR(1) = '_Z'
1. CLIENT_RESIDENCE     VARCHAR(1) = '_Z'
1. OBS_VALUE            FLOAT ? (not specified)
1. UNIT_MEASURE         VARCHAR(3)
1. UNIT_MULT            VARCHAR(1) in ['0', '3'] (or INT)
1. DECIMALS             VARCHAR(1) in ['0', '1', '2', '3', '4', '5', '6'] (or INT with range)
1. OBS_STATUS           VARCHAR(1)
1. OBS_STATUS_1         VARCHAR(1)
1. CONF_STATUS          VARCHAR(1)
1. CONF_STATUS_1        VARCHAR(1)
1. DOMINANCE            ? (not specified)
1. SHARE_SECOND         ? (not specified)
1. COMMENT_OBS          TEXT

The working types for each column as stored in pandas dataframes:

For now we will consider the following types for each column (as saved in sqlite)
also don't forget that SQLite is "typeless" (for more see, https://www.sqlite.org/datatypes.html):

1. FREQ                 VARCHAR(1) = 'A'
1. TIME_PERIOD          INT (with range)
1. REF_AREA             VARCHAR(2)
1. INDICATOR            VARCHAR(4)
1. ACTIVITY             VARCHAR(4)
1. NUMBER_EMPL          VARCHAR(7)
1. PRODUCT              VARCHAR(1) = '_Z'
1. TURNOVER             VARCHAR(1) = '_Z'
1. CLIENT_RESIDENCE     VARCHAR(1) = '_Z'
1. OBS_VALUE            FLOAT ? (not specified)
1. UNIT_MEASURE         VARCHAR(3)
1. UNIT_MULT            VARCHAR(1) in ['0', '3'] (or INT)
1. DECIMALS             VARCHAR(1) in ['0', '1', '2', '3', '4', '5', '6'] (or INT with range)
1. OBS_STATUS           VARCHAR(1)
1. OBS_STATUS_1         VARCHAR(1)
1. CONF_STATUS          VARCHAR(1)
1. CONF_STATUS_1        VARCHAR(1)
1. DOMINANCE            ? (not specified)
1. SHARE_SECOND         ? (not specified)
1. COMMENT_OBS          TEXT

In [54]:
import numpy as np

# using numpy types: https://numpy.org/doc/stable/user/basics.types.html

# in can be a strict representation for some number of bytes
print(df1['FREQ'].astype(np.string_).head())
# here pandas decide the length of string
print(df1['ACTIVITY'].astype(np.string_).head())
# check for None or numpy.NaN in a column
print(sum(df1['FREQ'].astype(np.string_).isna()))
# check to see if all values in a column are a specified value
len(df1['FREQ']) == sum(df1['FREQ'] == 'A')


0    b'A'
1    b'A'
2    b'A'
3    b'A'
4    b'A'
Name: FREQ, dtype: bytes8
0       b'B'
1     b'B05'
2    b'B051'
3    b'B052'
4     b'B06'
Name: ACTIVITY, dtype: bytes72
0


True

In [55]:
# check possible categories for a category column
print(df1['ACTIVITY'].astype('category').cat.categories)


Index(['B', 'B05', 'B051', 'B052', 'B06', 'B061', 'B062', 'B07', 'B071',
       'B072',
       ...
       'R92', 'R920', 'R93', 'R931', 'R932', 'S95', 'S951', 'S952', 'S96',
       'S960'],
      dtype='object', length=343)


In [56]:

# change all column types based on a dictionary with column name type
df1.astype({
    'FREQ': 'S',     
    'TIME_PERIOD': pd.Int16Dtype(), # np.uint16, cannot use this type if column is nullable
    'REF_AREA': str,
    'INDICATOR': 'category'        
# ACTIVITY        
# NUMBER_EMPL     
# PRODUCT         
# TURNOVER        
# CLIENT_RESIDENCE
# OBS_VALUE       
# UNIT_MEASURE   
# UNIT_MEASURE 
# UNIT_MULT    
# DECIMALS     
# OBS_STATUS   
# OBS_STATUS_1 
# CONF_STATUS  
# CONF_STATUS_1
# DOMINANCE    
# SHARE_SECOND 
# COMMENT_OBS   
}, ).head()

Unnamed: 0,DATAFLOW,FREQ,TIME_PERIOD,REF_AREA,INDICATOR,ACTIVITY,NUMBER_EMPL,PRODUCT,TURNOVER,CLIENT_RESIDENCE,...,UNIT_MEASURE,UNIT_MULT,DECIMALS,OBS_STATUS,OBS_STATUS_1,CONF_STATUS,CONF_STATUS_1,DOMINANCE,SHARE_SECOND,COMMENT_OBS
0,ESTAT:EBSSBS_PRL_A(1.0),b'A',,,ENT,B,_T,_Z,_Z,_Z,...,PN,0,0,P,,,,,,
1,ESTAT:EBSSBS_PRL_A(1.0),b'A',,,ENT,B05,_T,_Z,_Z,_Z,...,PN,0,0,P,,,,,,
2,ESTAT:EBSSBS_PRL_A(1.0),b'A',,,ENT,B051,_T,_Z,_Z,_Z,...,PN,0,0,P,,,,,,
3,ESTAT:EBSSBS_PRL_A(1.0),b'A',,,ENT,B052,_T,_Z,_Z,_Z,...,PN,0,0,P,,,,,,
4,ESTAT:EBSSBS_PRL_A(1.0),b'A',,,ENT,B06,_T,_Z,_Z,_Z,...,PN,0,0,P,,,,,,


## Tasks

1. Extract a flow or (any kind of flows available?)
- input: input file from the folder in-data
- output: binary pandas.dataframe
- run: take the file and convert to pandas (one internal check: to have the mandatory columns?)

2. Transform with check (is doing a type conversion for each column, parametriseable?) and check restrictions for each column
- input: binary pandas
- output: binary pandas
- run: conversions and type domains checks

3. Another check (dummy, or better several dummies) to show the power of paralelism (if a check do not depand on the result of the previous check)
- input: binary pandas
- output: a sqlite staged record
- run: raise an error if not valid input or just not create output? 

4. No idea (I will create a dummy one to make a conversion x -> x: identity)
- input: binary pandas
- output: binary pandas
- run: conversions

5. Write SQLITE
- input: binary pandas
- output: write in output (not staged)
- run: transaction

6. Wrapper to archive the input

7. Wripper to archive the session

Questions:
1. give a try with staged and not staged
2. what happens if the input contains several flows (first column)
3. what happens if some input rows can pe imported and some not

Ideas:
1. because the entire domain  (all files: code and configurations) are versioned in log for each process
to add a checksum of the file used or direct the commit id