# Prepare ASIF data to S3

# Objective(s)

## Business needs 

Prepare (cleaning  & removing unwanted rows) ASIF data using Athena and save output to S3 + Glue. 

## Description

**Objective**

Raw data is not 100% cleaned, as described by the analytical HTML table Template_analysis_from_lambda-2020-11-22-08-12-20.html (analysis of the raw table). 

**Steps**

We will clean the table by doing the following steps:
1. Keeping year 1998 to 2007
2. Clean citycode → Only 4 digits
3. Clean setup → replace to Null all values with a length lower/higher than 4 
4. Remove rows when cic is unknown


## Target

* The file is saved in S3: 
  * bucket: datalake-datascience 
  * path: DATA/ECON/FIRM_SURVEY/ASIF_CHINA/PREPARED 
* Glue data catalog should be updated
  * database: firms_survey 
  * table prefix: asif_firms 
    * table name (prefix + last folder S3 path): asif_firms_prepared 

# Metadata

* Key: fzt56oqnn52261m
* Parent key (for update parent):  
* Notebook US Parent (i.e the one to update): 
* https://github.com/thomaspernet/Financial_dependency_pollution/blob/master/01_data_preprocessing/02_prepare_tables_model/00_prepare_asif.md
* Epic: Epic 1
* US: US 3
* Date Begin: 11/22/2020
* Duration Task: 0
* Description: Prepare (cleaning  & removing unwanted rows) ASIF data using Athena and save output to S3 + Glue. 
* Step type: Prepare table
* Status: Active
* Source URL: US 03 Prepare ASIF
* Task type: Jupyter Notebook
* Users: Thomas Pernet
* Watchers: Thomas Pernet
* User Account: https://468786073381.signin.aws.amazon.com/console
* Estimated Log points: 6
* Task tag: #asif,#athena
* Toggl Tag: #data-preparation
* current nb commits: 
 * Meetings:  
* Presentation:  
* Email Information:  
  * thread: Number of threads: 0(Default 0, to avoid display email)
  *  

# Input Cloud Storage [AWS/GCP]

## Table/file

* Origin: 
* Athena
* Name: 
* china_asif
* Github: 
  * https://github.com/thomaspernet/Financial_dependency_pollution/blob/master/01_data_preprocessing/00_download_data_from/ASIF_PANEL/firm_asif.py

# Destination Output/Delivery

## Table/file

* Origin: 
* S3
* Athena
* Name:
* DATA/ECON/FIRM_SURVEY/ASIF_CHINA/PREPARED
* asif_firms_prepared
* GitHub:
* https://github.com/thomaspernet/Financial_dependency_pollution/blob/master/01_data_preprocessing/02_prepare_tables_model/00_prepare_asif.md
* URL: 
  * datalake-datascience/DATA/ECON/FIRM_SURVEY/ASIF_CHINA/PREPARED
* 

# Knowledge

## List of candidates

* [Analytical raw dataset (HTML)](https://s3.console.aws.amazon.com/s3/buckets/datalake-datascience?region=eu-west-3&prefix=ANALYTICS/HTML_OUTPUT/ASIF_UNZIP_DATA_CSV/)

In [1]:
from awsPy.aws_authorization import aws_connector
from awsPy.aws_s3 import service_s3
from awsPy.aws_glue import service_glue
from pathlib import Path
import pandas as pd
import numpy as np
import seaborn as sns
import os, shutil, json

path = os.getcwd()
parent_path = str(Path(path).parent.parent)


name_credential = 'financial_dep_SO2_accessKeys.csv'
region = 'eu-west-3'
bucket = 'datalake-datascience'
path_cred = "{0}/creds/{1}".format(parent_path, name_credential)

In [2]:
con = aws_connector.aws_instantiate(credential = path_cred,
                                       region = region)
client= con.client_boto()
s3 = service_s3.connect_S3(client = client,
                      bucket = bucket, verbose = True) 
glue = service_glue.connect_glue(client = client) 

In [3]:
pandas_setting = True
if pandas_setting:
    cm = sns.light_palette("green", as_cmap=True)
    pd.set_option('display.max_columns', None)
    pd.set_option('display.max_colwidth', None)

# Prepare query 

Write query and save the CSV back in the S3 bucket `datalake-datascience` 

# Steps

# Table `XX`

- Table name: `XX`


Since the table to create has missing value, please use the following at the top of the query

```
CREATE TABLE database.table_name WITH (format = 'PARQUET') AS
```

Choose a location in S3 to save the CSV. It is recommended to save in it the `datalake-datascience` bucket. Locate an appropriate folder in the bucket, and make sure all output have the same format

In [4]:
DatabaseName = 'firms_survey'
table_name = 'asif_firms_prepared'
s3_output = 'SQL_OUTPUT_ATHENA'

First, we need to delete the table (if exist)

In [5]:
try:
    response = glue.delete_table(
        database=DatabaseName,
        table=table_name
    )
    print(response)
except Exception as e:
    print(e)

An error occurred (EntityNotFoundException) when calling the DeleteTable operation: Table asif_firms_prepared not found.


### Brief analysis

- Count size of:
    - `year` in 1998 to 2007
    - `firms` has no digit
    - `citycode` 
    - `cic`
    - `setup`

In [6]:
query = """
SELECT year, COUNT(*) as CNT
FROM "firms_survey"."asif_unzip_data_csv"
WHERE year in ('1998', '1999', '2000', '2001', '2002', '2003', '2004', '2005', '2006', '2007')
  GROUP BY  year
  ORDER BY year
"""
output = s3.run_query(
                    query=query,
                    database=DatabaseName,
                    s3_output=s3_output,
    filename = 'count_year'
                )
output

Unnamed: 0,year,CNT
0,1998,165118
1,1999,162033
2,2000,162885
3,2001,169031
4,2002,181557
5,2003,196222
6,2004,276474
7,2005,271835
8,2006,301961
9,2007,336768


In [7]:
query = """
WITH test AS (
SELECT digit, COUNT(digit) AS CNT
FROM (
SELECT regexp_like(firm, '\d+') as digit
FROM "firms_survey"."asif_unzip_data_csv"
WHERE year in ('1998', '1999', '2000', '2001', '2002', '2003', '2004', '2005', '2006', '2007')
)
GROUP BY digit
)
SELECT CNT, COUNT(CNT)
FROM test
GROUP BY CNT

"""
output = s3.run_query(
                    query=query,
                    database=DatabaseName,
                    s3_output=s3_output,
    filename = 'count_digit'
                )
output

Unnamed: 0,CNT,_col1
0,2223884,1


In [8]:
query = """
SELECT len, COUNT(*) as CNT
FROM (
SELECT LENGTH(citycode) as len
FROM "firms_survey"."asif_unzip_data_csv"
  )
  GROUP BY len
  ORDER BY CNT
"""
output = s3.run_query(
                    query=query,
                    database=DatabaseName,
                    s3_output=s3_output,
    filename = 'count_citycode'
                )
output

Unnamed: 0,len,CNT
0,1.0,3
1,3.0,18
2,2.0,50
3,,80
4,0.0,917
5,4.0,2222860


In [9]:
query = """
SELECT len, COUNT(*) as CNT
FROM (
SELECT LENGTH(cic) as len
FROM "firms_survey"."asif_unzip_data_csv"
  )
  GROUP BY len
  ORDER BY CNT
"""
output = s3.run_query(
                    query=query,
                    database=DatabaseName,
                    s3_output=s3_output,
    filename = 'count_cic'
                )
output

Unnamed: 0,len,CNT
0,19.0,1
1,10.0,1
2,14.0,2
3,20.0,2
4,9.0,3
5,11.0,4
6,16.0,4
7,13.0,4
8,15.0,4
9,18.0,6


In [10]:
query = """
SELECT len, COUNT(*) as CNT
FROM (
SELECT LENGTH(setup) as len
FROM "firms_survey"."asif_unzip_data_csv"
  )
  GROUP BY len
  ORDER BY CNT
"""
output = s3.run_query(
                    query=query,
                    database=DatabaseName,
                    s3_output=s3_output,
    filename = 'count_setup'
                )
output

Unnamed: 0,len,CNT
0,6.0,2
1,0.0,7
2,5.0,19
3,,39
4,1.0,1110
5,2.0,2628
6,3.0,3346
7,4.0,2216777


Clean up the folder with the previous csv file. Be careful, it will erase all files inside the folder

In [11]:
s3_output = 'DATA/ECON/FIRM_SURVEY/ASIF_CHINA/PREPARED'

In [12]:
s3.remove_all_bucket(path_remove = s3_output)

True

In [13]:
%%time
query = """
CREATE TABLE firms_survey.asif_firms_prepared WITH (format = 'PARQUET') AS
SELECT *
FROM "firms_survey"."asif_unzip_data_csv"
WHERE 
 (
  year in ('1998', '1999', '2000', '2001', '2002', '2003', '2004', '2005', '2006', '2007')
  )
  AND 
  (
  LENGTH(citycode) = 4
  AND 
  LENGTH(setup) = 4
  AND 
  LENGTH(cic) <= 4
  AND 
  regexp_like(firm, '\d+') = TRUE
    )  
"""
output = s3.run_query(
                    query=query,
                    database=DatabaseName,
                    s3_output=s3_output
                )

CPU times: user 2.42 s, sys: 117 ms, total: 2.53 s
Wall time: 20.4 s


In [14]:
output

{'Results': {'State': 'SUCCEEDED',
  'SubmissionDateTime': datetime.datetime(2020, 11, 22, 18, 31, 33, 338000, tzinfo=tzlocal()),
  'CompletionDateTime': datetime.datetime(2020, 11, 22, 18, 31, 53, 229000, tzinfo=tzlocal())},
 'QueryID': '7d092fa5-d52b-44fb-89e6-b98ea727f320'}

Need to remove the metadata generated by Athena. We remove it to avoid parsing incorrect value with the crawler

# Validate query

This step is mandatory to validate the query in the ETL. If you are not sure about the quality of the query, go to the next step.

To validate the query, please fillin the json below. Don't forget to change the schema so that the crawler can use it.

1. Add a partition key:
    - Inform if there is group in the table so that, the parser can compute duplicate
2. Add the steps number -> Not automtic yet. Start at 0
3. Change the schema if needed. It is highly recommanded to add comment to the fields
4. Provide a description -> detail the steps 

1. Add a partition key

In [15]:
partition_keys = ['firm', 'year']

2. Add the steps number

In [16]:
step = 0

3. Change the schema

We load the schema from the raw data

In [17]:
schema = glue.get_table_information(
    database = DatabaseName,
    table = 'asif_unzip_data_csv'
)['Table']['StorageDescriptor']['Columns']
schema

[{'Name': 'firm', 'Type': 'string', 'Comment': 'firm ID'},
 {'Name': 'year', 'Type': 'string', 'Comment': 'balance sheet year'},
 {'Name': 'export', 'Type': 'int', 'Comment': 'Where: export delivery value'},
 {'Name': 'dq', 'Type': 'string', 'Comment': 'Province county code'},
 {'Name': 'name', 'Type': 'string', 'Comment': 'Corporate Name'},
 {'Name': 'town', 'Type': 'string', 'Comment': 'Township (town)'},
 {'Name': 'village',
  'Type': 'string',
  'Comment': 'Street (village), house number'},
 {'Name': 'street', 'Type': 'string', 'Comment': 'Street office'},
 {'Name': 'c15',
  'Type': 'string',
  'Comment': 'Community (neighborhood) and village committee'},
 {'Name': 'zip', 'Type': 'string', 'Comment': 'Zip code'},
 {'Name': 'product1_', 'Type': 'string', 'Comment': 'The main products (1)'},
 {'Name': 'c26', 'Type': 'string', 'Comment': 'The main products (2)'},
 {'Name': 'c27', 'Type': 'string', 'Comment': 'The main products (3)'},
 {'Name': 'cic', 'Type': 'string', 'Comment': 'Indu

4. Provide a description

In [18]:
description = """
Prepare ASIF raw data by removing unconsistent year format, industry and birth year
"""

5. provide metadata

- DatabaseName
- TablePrefix
- 

In [19]:
json_etl = {
    'step': step,
    'description':description,
    'query':query,
    'schema': schema,
    'partition_keys':partition_keys,
    'metadata':{
    'DatabaseName' : DatabaseName,
    'TableName' : table_name,
    'target_S3URI' : os.path.join('s3://',bucket, s3_output),
    'from_athena': 'True'    
    }
}

In [20]:
with open(os.path.join(str(Path(path).parent), 'parameters_ETL_Financial_dependency_pollution.json')) as json_file:
    parameters = json.load(json_file)

In [21]:
parameters['TABLES']['PREPARATION']['STEPS'].pop(0)

{'step': 0,
 'description': '\nPrepare ASIF raw data by removing unconsistent year format, industry and birth year\n',
 'query': '\nCREATE TABLE firms_survey.asif_firms_prepared WITH (format = \'PARQUET\') AS\nSELECT *\nFROM "firms_survey"."asif_unzip_data_csv"\nWHERE \n (\n  year in (\'1998\', \'1999\', \'2000\', \'2001\', \'2002\', \'2003\', \'2004\', \'2005\', \'2006\', \'2007\')\n  )\n  AND \n  (\n  LENGTH(citycode) = 4\n  AND \n  LENGTH(setup) = 4\n  AND \n  LENGTH(cic) <= 4\n  AND \n  regexp_like(firm, \'\\d+\') = TRUE\n    )  \n',
 'schema': [{'Name': 'firm', 'Type': 'string', 'Comment': ''},
  {'Name': 'year', 'Type': 'string', 'Comment': ''},
  {'Name': 'export', 'Type': 'int', 'Comment': ''},
  {'Name': 'dq', 'Type': 'string', 'Comment': ''},
  {'Name': 'name', 'Type': 'string', 'Comment': ''},
  {'Name': 'town', 'Type': 'string', 'Comment': ''},
  {'Name': 'village', 'Type': 'string', 'Comment': ''},
  {'Name': 'street', 'Type': 'string', 'Comment': ''},
  {'Name': 'c15', 'T

In [22]:
parameters['TABLES']['PREPARATION']['STEPS'].append(json_etl)

Save JSON

In [23]:
with open(os.path.join(str(Path(path).parent), 'parameters_ETL_Financial_dependency_pollution.json'), "w")as outfile:
    json.dump(parameters, outfile)

# Create or update the data catalog

The query is saved in the S3 (bucket `datalake-datascience`) but the table is not available yet in the Data Catalog. Use the function `create_table_glue` to generate the table and update the catalog.

Few parameters are required:

- name_crawler: Name of the crawler
- Role: Role to temporary provide an access tho the service
- DatabaseName: Name of the database to create the table
- TablePrefix: Prefix of the table. Full name of the table will be `TablePrefix` + folder name

To update the schema, please use the following structure

```
schema = [
    {
        "Name": "VAR1",
        "Type": "",
        "Comment": ""
    },
    {
        "Name": "VAR2",
        "Type": "",
        "Comment": ""
    }
]
```

In [24]:
glue.update_schema_table(
    database = DatabaseName,
    table = table_name,
    schema= schema)

{'ResponseMetadata': {'RequestId': 'ae4b4a0e-a152-4ad9-8a4d-c526b7f18ec9',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Sun, 22 Nov 2020 17:31:54 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'ae4b4a0e-a152-4ad9-8a4d-c526b7f18ec9'},
  'RetryAttempts': 0}}

## Check Duplicates

One of the most important step when creating a table is to check if the table contains duplicates. The cell below checks if the table generated before is empty of duplicates. The code uses the JSON file to create the query parsed in Athena. 

You are required to define the group(s) that Athena will use to compute the duplicate. For instance, your table can be grouped by COL1 and COL2 (need to be string or varchar), then pass the list ['COL1', 'COL2'] 

In [25]:
partition_keys = ['firm', 'year']

with open(os.path.join(str(Path(path).parent), 'parameters_ETL_Financial_dependency_pollution.json')) as json_file:
    parameters = json.load(json_file)

In [26]:
### COUNT DUPLICATES
if len(partition_keys) > 0:
    groups = ' , '.join(partition_keys)

    query_duplicates = parameters["ANALYSIS"]['COUNT_DUPLICATES']['query'].format(
                                DatabaseName,table_name,groups
                                )
    dup = s3.run_query(
                                query=query_duplicates,
                                database=DatabaseName,
                                s3_output="SQL_OUTPUT_ATHENA",
                                filename="duplicates_{}".format(table_name))
    display(dup)


Unnamed: 0,CNT,CNT_DUPLICATE
0,1,2191919


# Analytics

In this part, we are providing basic summary statistic. Since we have created the tables, we can parse the schema in Glue and use our json file to automatically generates the analysis.

The cells below execute the job in the key `ANALYSIS`. You need to change the `primary_key` and `secondary_key` 

For a full analysis of the table, please use the following Lambda function. Be patient, it can takes between 5 to 30 minutes. Times varies according to the number of columns in your dataset.

Use the function as follow:

- `output_prefix`:  s3://datalake-datascience/ANALYTICS/OUTPUT/TABLE_NAME/
- `region`: region where the table is stored
- `bucket`: Name of the bucket
- `DatabaseName`: Name of the database
- `table_name`: Name of the table
- `group`: variables name to group to count the duplicates
- `primary_key`: Variable name to perform the grouping -> Only one variable for now
- `secondary_key`: Variable name to perform the secondary grouping -> Only one variable for now
- `proba`: Chi-square analysis probabilitity
- `y_var`: Continuous target variables

Check the job processing in Sagemaker: https://eu-west-3.console.aws.amazon.com/sagemaker/home?region=eu-west-3#/processing-jobs

The notebook is available: https://s3.console.aws.amazon.com/s3/buckets/datalake-datascience?region=eu-west-3&prefix=ANALYTICS/OUTPUT/&showversions=false

Please, download the notebook on your local machine, and convert it to HTML:

```
cd "/Users/thomas/Downloads/Notebook"
aws s3 cp s3://datalake-datascience/ANALYTICS/OUTPUT/asif_unzip_data_csv/Template_analysis_from_lambda-2020-11-22-08-12-20.ipynb .

## convert HTML no code
jupyter nbconvert --no-input --to html Template_analysis_from_lambda-2020-11-21-14-30-45.ipynb
jupyter nbconvert --to html Template_analysis_from_lambda-2020-11-22-08-12-20.ipynb
```

Then upload the HTML to: https://s3.console.aws.amazon.com/s3/buckets/datalake-datascience?region=eu-west-3&prefix=ANALYTICS/HTML_OUTPUT/

Add a new folder with the table name in upper case

In [27]:
import boto3

key, secret_ = con.load_credential()
client_lambda = boto3.client(
    'lambda',
    aws_access_key_id=key,
    aws_secret_access_key=secret_,
    region_name = region)

In [28]:
primary_key = 'year'
secondary_key = 'citycode'
y_var = 'output'

In [29]:
payload = {
    "input_path": "s3://datalake-datascience/ANALYTICS/TEMPLATE_NOTEBOOKS/Template_analysis_from_lambda.ipynb",
    "output_prefix": "s3://datalake-datascience/ANALYTICS/OUTPUT/{}/".format(table_name.upper()),
    "parameters": {
        "region": "{}".format(region),
        "bucket": "{}".format(bucket),
        "DatabaseName": "{}".format(DatabaseName),
        "table_name": "{}".format(table_name),
        "group": "{}".format(','.join(partition_keys)),
        "primary_key": "{}".format(primary_key),
        "secondary_key": "{}".format(secondary_key),
        "y_var": "{}".format(y_var),
    },
}
payload

{'input_path': 's3://datalake-datascience/ANALYTICS/TEMPLATE_NOTEBOOKS/Template_analysis_from_lambda.ipynb',
 'output_prefix': 's3://datalake-datascience/ANALYTICS/OUTPUT/ASIF_FIRMS_PREPARED/',
 'parameters': {'region': 'eu-west-3',
  'bucket': 'datalake-datascience',
  'DatabaseName': 'firms_survey',
  'table_name': 'asif_firms_prepared',
  'group': 'firm,year',
  'primary_key': 'year',
  'secondary_key': 'citycode',
  'y_var': 'output'}}

In [30]:
response = client_lambda.invoke(
    FunctionName='RunNotebook',
    InvocationType='RequestResponse',
    LogType='Tail',
    Payload=json.dumps(payload),
)
response

{'ResponseMetadata': {'RequestId': '11269bb4-e3c5-4896-bca7-8638f4a76b98',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Sun, 22 Nov 2020 17:32:48 GMT',
   'content-type': 'application/json',
   'content-length': '75',
   'connection': 'keep-alive',
   'x-amzn-requestid': '11269bb4-e3c5-4896-bca7-8638f4a76b98',
   'x-amzn-remapped-content-length': '0',
   'x-amz-executed-version': '$LATEST',
   'x-amz-log-result': 'U1RBUlQgUmVxdWVzdElkOiAxMTI2OWJiNC1lM2M1LTQ4OTYtYmNhNy04NjM4ZjRhNzZiOTggVmVyc2lvbjogJExBVEVTVApFTkQgUmVxdWVzdElkOiAxMTI2OWJiNC1lM2M1LTQ4OTYtYmNhNy04NjM4ZjRhNzZiOTgKUkVQT1JUIFJlcXVlc3RJZDogMTEyNjliYjQtZTNjNS00ODk2LWJjYTctODYzOGY0YTc2Yjk4CUR1cmF0aW9uOiAyMjEzLjcxIG1zCUJpbGxlZCBEdXJhdGlvbjogMjMwMCBtcwlNZW1vcnkgU2l6ZTogMTI4IE1CCU1heCBNZW1vcnkgVXNlZDogODIgTUIJSW5pdCBEdXJhdGlvbjogMjg4LjY1IG1zCQo=',
   'x-amzn-trace-id': 'root=1-5fbaa0be-662dd43e4af22f937d7ca7db;sampled=0'},
  'RetryAttempts': 0},
 'StatusCode': 200,
 'LogResult': 'U1RBUlQgUmVxdWVzdElkOiAxMTI2OWJiNC1lM2M1LTQ4O

For a partial analysis, run the cells below

# Generation report

In [31]:
import os, time, shutil, urllib, ipykernel, json
from pathlib import Path
from notebook import notebookapp

In [None]:
def create_report(extension = "html", keep_code = False):
    """
    Create a report from the current notebook and save it in the 
    Report folder (Parent-> child directory)
    
    1. Exctract the current notbook name
    2. Convert the Notebook 
    3. Move the newly created report
    
    Args:
    extension: string. Can be "html", "pdf", "md"
    
    
    """
    
    ### Get notebook name
    connection_file = os.path.basename(ipykernel.get_connection_file())
    kernel_id = connection_file.split('-', 1)[0].split('.')[0]

    for srv in notebookapp.list_running_servers():
        try:
            if srv['token']=='' and not srv['password']:  
                req = urllib.request.urlopen(srv['url']+'api/sessions')
            else:
                req = urllib.request.urlopen(srv['url']+ \
                                             'api/sessions?token=' + \
                                             srv['token'])
            sessions = json.load(req)
            notebookname = sessions[0]['name']
        except:
            pass  
    
    sep = '.'
    path = os.getcwd()
    #parent_path = str(Path(path).parent)
    
    ### Path report
    #path_report = "{}/Reports".format(parent_path)
    #path_report = "{}/Reports".format(path)
    
    ### Path destination
    name_no_extension = notebookname.split(sep, 1)[0]
    source_to_move = name_no_extension +'.{}'.format(extension)
    dest = os.path.join(path,'Reports', source_to_move)
    
    ### Generate notebook
    if keep_code:
        os.system('jupyter nbconvert --to {} {}'.format(
    extension,notebookname))
    else:
        os.system('jupyter nbconvert --no-input --to {} {}'.format(
    extension,notebookname))
    
    ### Move notebook to report folder
    #time.sleep(5)
    shutil.move(source_to_move, dest)
    print("Report Available at this adress:\n {}".format(dest))

In [None]:
create_report(extension = "html")