# Imports

In [1]:
import pandas as pd
import numpy as np
import requests
import json
from json import JSONDecodeError
import chardet
import csv
from io import StringIO
import datetime
from pathlib import Path
import os
import yaml
from datetime import datetime
import time
import emails

# Parameters

In [2]:
#API PARAMETERS

API_KEY = os.environ['DATAGOUV_API_KEY']

#API URL
#api_url = 'https://demo.data.gouv.fr/api/1/'
api_url = 'https://www.data.gouv.fr/api/1/'

HEADERS = {
    'X-API-KEY': API_KEY,
    }

In [3]:
#MAIL PARAMETERS
smtp_host = os.environ['SCHEMA_BOT_MAIL_SMTP']
smtp_user = os.environ['SCHEMA_BOT_MAIL_USER']
smtp_password = os.environ['SCHEMA_BOT_MAIL_PASSWORD']

In [4]:
current_path = Path(os.getenv('WORKING_DIR')) if os.getenv('WORKING_DIR') else Path().absolute()
current_path

PosixPath('/Users/anthonyauffret/Projects/consolidation_schemas')

In [5]:
config_path = os.path.join(current_path, 'config_tableschema.yml')

In [6]:
schemas_list_url = 'https://schema.data.gouv.fr/schemas/schemas.json'

In [7]:
schema_url_base = api_url + 'datasets/?schema={schema_name}'
tag_url_base = api_url + 'datasets/?tag={tag}'
search_url_base = api_url + 'datasets?q={search_word}'

In [8]:
validata_base_url = "https://validata-api.app.etalab.studio/validate?schema={schema_url}&url={rurl}"

In [9]:
#Template for consolidation datasets title

datasets_title_template = 'Fichiers consolidés des données respectant le schéma "{schema_title}"'

#Template for consolidation datasets description (Markdown)

datasets_description_template = '''
Ceci est un jeu de données généré automatiquement par Etalab. Il regroupe les données qui respectent le schéma {schema_name}, par version du schéma.

La fiche présentant le schéma et ses caractéristiques est disponible sur [https://schema.data.gouv.fr/{schema_name}/latest.html](https://schema.data.gouv.fr/{schema_name}/latest.html)

### Qu'est-ce qu'un schéma ?

Les schémas de données permettent de décrire des modèles de données : quels sont les différents champs, comment sont représentées les données, quelles sont les valeurs possibles, etc.

Vous pouvez retrouver l'ensemble des schémas au référentiel sur le site schema.data.gouv.fr

### Comment sont produites ces données ?

Ces données sont produites à partir des ressources publiées sur le site [data.gouv.fr](http://data.gouv.fr) par différents producteurs. Etalab détecte automatiquement les ressources qui obéissent à un schéma et concatène l'ensemble des données en un seul fichier, par version de schéma.

Ces fichiers consolidés permettent aux réutilisateurs de manipuler un seul fichier plutôt qu'une multitude de ressources et contribue ainsi à améliorer la qualité de l'open data.

### Comment intégrer mes données dans ces fichiers consolidés ?

Si vous êtes producteurs de données et que vous ne retrouvez pas vos données dans ces fichiers consolidés, c'est probablement parce que votre ressource sur [data.gouv.fr](http://data.gouv.fr) n'est pas conforme au schéma. Vous pouvez vérifier la conformité de votre ressource via l'outil [https://publier.etalab.studio/upload?schema={schema_name}](https://publier.etalab.studio/upload?schema={schema_name})

En cas de problème persistant, vous pouvez contacter le support data.gouv [lien vers [https://support.data.gouv.fr/](https://support.data.gouv.fr/)].

### Comment produire des données conformes ?

Un certain nombre d'outils existent pour accompagner les producteurs de données. Vous pouvez notamment vous connecter sur le site [https://publier.etalab.studio/select?schema={schema_name}](https://publier.etalab.studio/select?schema={schema_name}) pour pouvoir saisir vos données selon trois modes :

- upload de fichier existant
- saisie via formulaire
- saisie via tableur
'''

In [10]:
#Template for mail/comment (added, updated and deleted schema)

added_schema_comment_template = '''
Bonjour,

Vous recevez ce message car suite à un contrôle automatique de vos données par notre robot de validation, nous constatons que le fichier {resource_title} de ce jeu de données est conforme au schéma {schema_name} (version {most_recent_valid_version}).
Nous avons donc automatiquement ajouté à ce fichier la métadonnée de schéma correspondante, ce qui atteste de la qualité des données que vous avez publiées.

Une question ? Écrivez à validation@data.gouv.fr en incluant l'URL du jeu de données concerné.
'''

updated_schema_comment_template = '''
Bonjour,

Vous recevez ce message car suite à un contrôle automatique de vos données par notre robot de validation, nous constatons que le fichier {resource_title} de ce jeu de données (qui respecte le schéma {schema_name}) n'avait pas dans ses métadonnées la version de schéma la plus récente qu'il respecte.
Nous avons donc automatiquement mis à jour les métadonnées du fichier en indiquant la version adéquate du schéma.

Version précédemment indiquée : {initial_version_name}
Version mise à jour : {most_recent_valid_version}

Une question ? Écrivez à validation@data.gouv.fr en incluant l'URL du jeu de données concerné.
'''

deleted_schema_mail_template_org = '''
Bonjour,<br />
<br />
Vous recevez ce message automatique car vous êtes admin de l'organisation {organisation_name} sur data.gouv.fr. Votre organisation a publié le jeu de données {dataset_title}, dont le fichier {resource_title} se veut conforme au schéma {schema_name}.<br />
Cependant, suite à un contrôle automatique de vos données par notre robot de validation, il s'avère que ce fichier ne respecte aucune version de ce schéma.<br />
Nous avons donc automatiquement supprimé la métadonnée de schéma associée à ce fichier.<br />
<br />
Vous pouvez consulter le [rapport de validation](https://validata.etalab.studio/table-schema?input=url&schema_url={schema_url}&url={resource_url}&repair=true) pour vous aider à corriger les erreurs (ce rapport est relatif à la version la plus récente du schéma, mais votre fichier a bien été testé vis-à-vis de toutes les versions possibles du schéma).<br />
<br />
Vous pourrez alors restaurer la métadonnée de schéma une fois un fichier valide publié.<br />
<br />
Une question ? Écrivez à validation@data.gouv.fr en incluant l'URL du jeu de données concerné.<br />
<br />
Cordialement,<br />
<br />
L'équipe de data.gouv.fr
'''

deleted_schema_mail_template_own = '''
Bonjour,<br />
<br />
Vous recevez ce message automatique car vous avez publié sur data.gouv.fr le jeu de données {dataset_title}, dont le fichier {resource_title} se veut conforme au schéma {schema_name}.<br />
Cependant, suite à un contrôle automatique de vos données par notre robot de validation, il s'avère que ce fichier ne respecte aucune version de ce schéma.<br />
Nous avons donc automatiquement supprimé la métadonnée de schéma associée à ce fichier.<br />
<br />
Vous pouvez consulter le [rapport de validation](https://validata.etalab.studio/table-schema?input=url&schema_url={schema_url}&url={resource_url}&repair=true) pour vous aider à corriger les erreurs (ce rapport est relatif à la version la plus récente du schéma, mais votre fichier a bien été testé vis-à-vis de toutes les versions possibles du schéma).<br />
<br />
Vous pourrez alors restaurer la métadonnée de schéma une fois un fichier valide publié.<br />
<br />
Une question ? Écrivez à validation@data.gouv.fr en incluant l'URL du jeu de données concerné.<br />
<br />
Cordialement,<br />
<br />
L'équipe de data.gouv.fr
'''

deleted_schema_comment_template = '''
Bonjour,

Vous recevez ce message car suite à un contrôle automatique de vos données par notre robot de validation, nous constatons que le fichier {resource_title} de ce jeu de données se veut conforme au schéma {schema_name} alors qu'il ne respecte aucune version de ce schéma.
Nous avons donc automatiquement supprimé la métadonnée de schéma associée à ce fichier.

Vous pouvez consulter le [rapport de validation](https://validata.etalab.studio/table-schema?input=url&schema_url={schema_url}&url={resource_url}&repair=true) pour vous aider à corriger les erreurs (ce rapport est relatif à la version la plus récente du schéma, mais votre fichier a bien été testé vis-à-vis de toutes les versions possibles du schéma).

Vous pourrez alors restaurer la métadonnée de schéma une fois un fichier valide publié.

Une question ? Écrivez à validation@data.gouv.fr en incluant l'URL du jeu de données concerné.
'''

# Functions

In [11]:
#Get the dictionnary with information on the schema (when schemas catagalogue list already loaded)
def get_schema_dict(schema_name, schemas_catalogue_list) :
    res = None
    for schema in schemas_catalogue_list :
        if schema['name'] == schema_name :
            res = schema
    
    if res is None :
        print("No schema named '{}' found.".format(schema_name))
    
    return res

In [12]:
#Add the schema default configuration in the configuration YAML file
def add_schema_default_config(schema_name, config_path) :
    
    schema_dict = get_schema_dict(schema_name, schemas_catalogue_list)
    schema_title = schema_dict['title']
    
    default_schema_config_dict = {
        'consolidate': False,
        'search_words':[schema_title] # setting schema title as a default search keyword for resources
    }
    
    if os.path.exists(config_path) :
        with open(config_path, 'r') as infile :
            config_dict = yaml.safe_load(infile)
    else :
        config_dict = {}
        
    config_dict[schema_name] = default_schema_config_dict
    
    with open(config_path, 'w') as outfile:
        yaml.dump(config_dict, outfile, default_flow_style=False)

In [13]:
#API parsing to get resources infos based on schema metadata, tags and search keywords
def parse_api(url):
    r = requests.get(url)
    data = r.json()
    nb_pages = int(data['total']/data['page_size'])+1
    arr = []
    for i in range(1,nb_pages+1):
        r = requests.get(url+"&page="+str(i))
        data = r.json()
        for dataset in data['data']:
            for res in dataset['resources']:
                if 'format=csv' in res['url']:
                    filename = res['url'].split('/')[-3] + '.csv'
                else:
                    filename = res['url'].split('/')[-1]
                ext = filename.split('.')[-1]
                obj = {}
                obj['dataset_id'] = dataset['id']
                obj['dataset_title'] = dataset['title']
                obj['dataset_slug'] = dataset['slug']
                obj['dataset_page'] = dataset['page']
                obj['resource_id'] = res['id']
                obj['resource_title'] = res['title']
                obj['resource_url'] = res['url']
                obj['resource_last_modified'] = res['last_modified']
                if ext != 'csv':
                    obj['error_type'] = "wrong-file-format"
                else:
                    if not dataset['organization'] and not dataset['owner']:
                        obj['error_type'] = "orphan-dataset"
                    else:
                        obj['organization_or_owner'] = dataset['organization']['slug'] if dataset['organization'] else dataset['owner']['slug']
                        obj['error_type'] = None
                arr.append(obj)
    df = pd.DataFrame(arr)
    return df

In [14]:
#Make the validation report based on the resource url, schema url and validation url
def make_validata_report(rurl, schema_url, validata_base_url=validata_base_url) :
    r = requests.get(validata_base_url.format(schema_url=schema_url, rurl=rurl))
    time.sleep(0.5)
    return r.json()

In [15]:
#Returns if a resource is valid or not regarding a schema (version)
def is_validata_valid(rurl, schema_url, validata_base_url=validata_base_url) :
    try :
        report = make_validata_report(rurl, schema_url, validata_base_url=validata_base_url)
        try :
            res = report['report']['valid']
        except KeyError :
            print('{} ---- 🔴 No info in validata report for resource: {}'.format(datetime.today(), rurl))
            res = False
    except JSONDecodeError :
        print('{} ---- 🔴 Could not make JSON from validata report for resource: {}'.format(datetime.today(), rurl))
        res = False
    return res

In [16]:
#Returns if a resource is valid based on its "ref_table" row
def is_validata_valid_row(row, schema_url) :
    if row['error_type'] is None : #if no error
        rurl = row['resource_url']
        return is_validata_valid(rurl, schema_url)
    else :
        return False

In [17]:
#Creates a dataset on data.gouv.fr for consolidation files (used only if does not exist yet in config file)
def create_schema_consolidation_dataset(schema_name, schemas_catalogue_list, api_url) :
    global HEADERS, datasets_description_template, datasets_title_template
    
    schema_title = get_schema_dict(schema_name, schemas_catalogue_list)['title']
    
    response = requests.post(api_url + 'datasets/', json={
    'title': datasets_title_template.format(schema_title=schema_title),
    'description': datasets_description_template.format(schema_name=schema_name),
    'organization':'534fff75a3a7292c64a77de4',
    'license':'lov2'
    }, headers=HEADERS)
    
    return response

In [18]:
#Generic function to update a field (key) in the config file
def update_config_file(schema_name, key, value, config_path) :
    with open(config_path, 'r') as f :
        config_dict = yaml.safe_load(f)
    
    config_dict[schema_name][key] = value
    
    with open(config_path, 'w') as outfile:
        yaml.dump(config_dict, outfile, default_flow_style=False)

In [19]:
#Adds the resource ID of the consolidated file for a given schema version in the config file
def update_config_version_resource_id(schema_name, version_name, r_id, config_path) :
    with open(config_path, 'r') as f :
        config_dict = yaml.safe_load(f)
    
    if 'latest_resource_ids' not in config_dict[schema_name] :
        config_dict[schema_name]['latest_resource_ids'] = {version_name: r_id}
    else :
        config_dict[schema_name]['latest_resource_ids'][version_name] = r_id
        
    with open(config_path, 'w') as outfile:
        yaml.dump(config_dict, outfile, default_flow_style=False)

In [20]:
#Based on validation columns by version, adds a column to the ref_table that shows the most recent version of the schema for which the resource is valid
def add_most_recent_valid_version(df_ref) :
    
    version_cols_list = [col for col in df_ref.columns if col.startswith('is_valid_v_')]
    
    df_ref['most_recent_valid_version'] = ''
    
    for col in sorted(version_cols_list, reverse=True) :
        df_ref.loc[(df_ref['most_recent_valid_version'] == ''), 'most_recent_valid_version'] = df_ref.loc[(df_ref['most_recent_valid_version'] == ''), col].apply(lambda x : x*col.replace('is_valid_v_',''))
    
    df_ref.loc[(df_ref['most_recent_valid_version'] == ''), 'most_recent_valid_version'] = np.nan
    
    return df_ref

In [21]:
#Gets the current metadata of schema version of a resource (based of ref_table row)
def get_resource_schema_version(row, api_url) :
    
    dataset_id = row['dataset_id']
    resource_id = row['resource_id']
    
    url = api_url + 'datasets/{}/resources/{}/'.format(dataset_id, resource_id)
    r = requests.get(url)
    if r.status_code == 200 :
        r_json = r.json()
        if 'schema' in r_json.keys() :
            if 'version' in r_json['schema'].keys() :
                return r_json['schema']['version']
            else :
                return np.nan
        else :
            return np.nan
    else :
        return np.nan

#Returns if resource schema (version) metadata should be updated or not based on what we know about the resource
def is_schema_version_to_update(row) :
    initial_version_name = row['initial_version_name']
    most_recent_valid_version = row['most_recent_valid_version']
    resource_found_by = row['resource_found_by']
    
    return (resource_found_by == '1 - schema request') and (most_recent_valid_version == most_recent_valid_version) and (initial_version_name != most_recent_valid_version)

#Returns if resource schema (version) metadata should be added or not based on what we know about the resource
def is_schema_to_add(row) :
    resource_found_by = row['resource_found_by']
    is_valid_one_version = row['is_valid_one_version']
    
    return (resource_found_by != '1 - schema request') and is_valid_one_version

#Returns if resource schema (version) metadata should be deleted or not based on what we know about the resource
def is_schema_to_drop(row) :
    resource_found_by = row['resource_found_by']
    is_valid_one_version = row['is_valid_one_version']
    
    return (resource_found_by == '1 - schema request') and (is_valid_one_version == False)

In [22]:
#Function that adds a schema (version) metadata on a resource
def add_resource_schema(api_url, dataset_id, resource_id, schema_name, version_name) :
    
    global HEADERS
    
    schema = {
        "name": schema_name,
        "version": version_name
    }
    
    try :
        url = api_url + 'datasets/{}/resources/{}/'.format(dataset_id, resource_id)
        r = requests.get(url, headers=HEADERS)
        extras = r.json()['extras']
    except :
        extras = {}
    
    extras['consolidation_schema:add_schema'] = schema_name
    
    obj = {'schema': schema, 'extras': extras}
                
    url = api_url + 'datasets/{}/resources/{}/'.format(dataset_id, resource_id)
    response = requests.put(url, json=obj, headers=HEADERS)
    
    if response.status_code != 200 :
        print('🔴 Schema could not be added on resource. Dataset ID: {} - Resource ID: {}'.format(dataset_id, resource_id))
        
    return response.status_code == 200

In [23]:
#Function that updates a schema (version) metadata on a resource
def update_resource_schema(api_url, dataset_id, resource_id, schema_name, version_name) :
    
    global HEADERS
    
    schema = {
        "name": schema_name,
        "version": version_name
    }
    
    try :
        url = api_url + 'datasets/{}/resources/{}/'.format(dataset_id, resource_id)
        r = requests.get(url, headers=HEADERS)
        extras = r.json()['extras']
    except :
        extras = {}
    
    extras['consolidation_schema:update_schema'] = schema_name
    
    obj = {'schema': schema, 'extras': extras}
                
    url = api_url + 'datasets/{}/resources/{}/'.format(dataset_id, resource_id)
    response = requests.put(url, json=obj, headers=HEADERS)
    
    if response.status_code != 200 :
        print('🔴 Resource schema could not be updated. Dataset ID: {} - Resource ID: {}'.format(dataset_id, resource_id))
        
    return response.status_code == 200

In [24]:
#Function that deletes a schema (version) metadata on a resource
def delete_resource_schema(api_url, dataset_id, resource_id, initial_schema_name) :
    
    global HEADERS
    
    schema = {}
    
    try :
        url = api_url + 'datasets/{}/resources/{}/'.format(dataset_id, resource_id)
        r = requests.get(url, headers=HEADERS)
        extras = r.json()['extras']
    except :
        extras = {}
    
    extras['consolidation_schema:remove_schema'] = schema_name
    
    obj = {'schema': schema, 'extras': extras}
                
    url = api_url + 'datasets/{}/resources/{}/'.format(dataset_id, resource_id)
    response = requests.put(url, json=obj, headers=HEADERS)
    
    if response.status_code != 200 :
        print('🔴 Resource schema could not be deleted. Dataset ID: {} - Resource ID: {}'.format(dataset_id, resource_id))
    
    return response.status_code == 200

In [25]:
#Get the (list of) e-mail address(es) of the owner or of the admin(s) of the owner organization of a dataset
def get_owner_or_admin_mails(dataset_id, api_url) :
    r = requests.get(api_url + 'datasets/{}/'.format(dataset_id))
    r_dict = r.json()
    
    if r_dict['organization'] is not None :
        org_id = r_dict['organization']['id']
    else :
        org_id = None
    
    if r_dict['owner'] is not None :
        owner_id = r_dict['owner']['id']
    else :
        owner_id = None
    
    mails_type = None
    mails_list = []
    
    if org_id is not None :
        mails_type = 'organisation_admins'
        r_org = requests.get(api_url + 'organizations/{}/'.format(org_id))
        members_list = r_org.json()['members']
        for member in members_list :
            if member['role'] == 'admin' :
                user_id = member['user']['id']
                r_user = requests.get(api_url + 'users/{}/'.format(user_id), headers=HEADER)
                user_mail = r_user.json()['email']
                mails_list += [user_mail]
                
    else :
        if owner_id is not None :
            mails_type = 'owner'
            r_user = requests.get(api_url + 'users/{}/'.format(owner_id), headers=HEADER)
            user_mail = r_user.json()['email']
            mails_list += [user_mail]
    
    return (mails_type, mails_list)

In [26]:
#Function to send a e-mail
def send_email(subject, message, mail_from, mail_to, smtp_host, smtp_user, smtp_password):
    message = emails.html(html='<p>%s</p>' % message,
                        subject=subject,
                        mail_from=mail_from)
    smtp = {
        'host': smtp_host,
        'port': 587,
        'tls': True,
        'user': smtp_user,
        'password': smtp_password,
    }

    _ = message.send(to=mail_to, smtp=smtp)
    
    return _

In [27]:
#Function to post a comment on a dataset
def post_comment_on_dataset(dataset_id, title, comment, api_url) :
    
    global HEADER
    
    post_object = {
        'title':title,
        'comment' : comment,
        'subject': {'class': 'Dataset', 'id': dataset_id}
    }
    
    _ = requests.post(api_url + 'discussions/', json=post_object, headers=HEADER)
    
    return _

# Main code

## Utils

In [28]:
consolidation_date_str = datetime.now().strftime('%Y%m%d')
print(consolidation_date_str)

20211109


In [29]:
data_path = current_path / 'data' / 'tableschema' / consolidation_date_str
data_path.mkdir(parents=True, exist_ok=True)

In [30]:
consolidated_data_path = current_path / 'consolidated_data' / 'tableschema' / consolidation_date_str
consolidated_data_path.mkdir(parents=True, exist_ok=True)

In [31]:
ref_tables_path = current_path / 'ref_tables' / 'tableschema' / consolidation_date_str
ref_tables_path.mkdir(parents=True, exist_ok=True)

In [32]:
report_tables_path = current_path / 'report_tables' / 'tableschema' / consolidation_date_str
report_tables_path.mkdir(parents=True, exist_ok=True)

## Loading schemas documentation and config

### Schemas documentation

We keep **only table schemas**.

In [33]:
# Getting schemas list :

#Keeping track of schema info
schemas_report_dict = {}

schemas_catalogue_dict = requests.get(schemas_list_url).json()
print('Schema catalogue URL: {}'.format(schemas_catalogue_dict['$schema']))
print('Version: {}'.format(schemas_catalogue_dict['version']))

schemas_catalogue_list = [schema for schema in schemas_catalogue_dict['schemas'] if schema['schema_type'] == 'tableschema']
nb_schemas = len(schemas_catalogue_list)
print('Total number of schemas: {}'.format(nb_schemas))

for schema in schemas_catalogue_list :
    print('- {} ({} versions)'.format(schema['name'], len(schema['versions'])))
    schemas_report_dict[schema['name']] = {'nb_versions':len(schema['versions'])}

Schema catalogue URL: https://opendataschema.frama.io/catalog/schema-catalog.json
Version: 1
Total number of schemas: 24
- etalab/schema-irve (7 versions)
- etalab/schema-decp-dpa (1 versions)
- scdl/catalogue (2 versions)
- scdl/deliberations (6 versions)
- scdl/equipements (2 versions)
- scdl/subventions (3 versions)
- etalab/schema-lieux-covoiturage (6 versions)
- etalab/schema-stationnement (5 versions)
- scdl/budget (1 versions)
- arsante/schema-dae (3 versions)
- NaturalSolutions/schema-arbre (4 versions)
- etalab/schema-stationnement-cyclable (5 versions)
- etalab/schema-inclusion-numerique (2 versions)
- etalab/schema-hautes-remunerations (1 versions)
- scdl/menus-collectifs (9 versions)
- scdl/plats-menus-collectifs (4 versions)
- MTES-MCT/acceslibre-schema (3 versions)
- etalab/schema-sdirve (1 versions)
- Archivistes75/registre_entrees (1 versions)
- CEREMA/schema-arrete-circulation-marchandises (8 versions)
- openmaraude/schema-stationstaxi (3 versions)
- etalab/schema-vehi

### Creating/updating config file with missing schemas

In [34]:
if os.path.exists(config_path) :
    with open(config_path, 'r') as f :
        config_dict = yaml.safe_load(f)
else :
    config_dict = {}
    
for schema in schemas_catalogue_list :
    if schema['name'] not in config_dict.keys() :
        add_schema_default_config(schema['name'], config_path)
        schemas_report_dict[schema['name']]['new_config_created'] = True
        print('{} - ➕ Schema {} added to config file.'.format(datetime.today(), schema['name']))
    else :
        schemas_report_dict[schema['name']]['new_config_created'] = False
        print('{} - 🆗 Schema {} already in config file.'.format(datetime.today(), schema['name']))

2021-11-09 16:43:24.326850 - 🆗 Schema etalab/schema-irve already in config file.
2021-11-09 16:43:24.326998 - 🆗 Schema etalab/schema-decp-dpa already in config file.
2021-11-09 16:43:24.327032 - 🆗 Schema scdl/catalogue already in config file.
2021-11-09 16:43:24.327055 - 🆗 Schema scdl/deliberations already in config file.
2021-11-09 16:43:24.327117 - 🆗 Schema scdl/equipements already in config file.
2021-11-09 16:43:24.327153 - 🆗 Schema scdl/subventions already in config file.
2021-11-09 16:43:24.327177 - 🆗 Schema etalab/schema-lieux-covoiturage already in config file.
2021-11-09 16:43:24.327409 - 🆗 Schema etalab/schema-stationnement already in config file.
2021-11-09 16:43:24.327449 - 🆗 Schema scdl/budget already in config file.
2021-11-09 16:43:24.327471 - 🆗 Schema arsante/schema-dae already in config file.
2021-11-09 16:43:24.327499 - 🆗 Schema NaturalSolutions/schema-arbre already in config file.
2021-11-09 16:43:24.327520 - 🆗 Schema etalab/schema-stationnement-cyclable already in c

**⚠️⚠️⚠️ EDIT CONFIG FILE IF NEEDED (especially for new schemas) ⚠️⚠️⚠️**

Then, reload config file:

In [35]:
with open(config_path, 'r') as f :
    config_dict = yaml.safe_load(f)

## Building reference tables (parsing and listing resources + Validata check)

In [36]:
%%time
for schema_name in config_dict.keys() :
    
    print('{} - ℹ️ STARTING SCHEMA: {}'.format(datetime.now(), schema_name))
    
    #NEEDED PARAMETERS
    
    #Schema description and consolidation configuration
    schema_config = config_dict[schema_name]
    
    if schema_config['consolidate'] == True :
    
        #Schema official specification (in catalogue)
        schema_dict = get_schema_dict(schema_name, schemas_catalogue_list)

        #Datasets to exclude (from config)
        datasets_to_exclude = []
        if 'consolidated_dataset_id' in schema_config.keys() :
            datasets_to_exclude += [schema_config['consolidated_dataset_id']]
        if 'exclude_dataset_ids' in schema_config.keys() :
            if type(schema_config['exclude_dataset_ids']) == list :
                datasets_to_exclude += schema_config['exclude_dataset_ids']

        #Tags and search words to use to get resources that could match schema (from config)
        tags_list = []
        if 'tags' in schema_config.keys() :
            tags_list += schema_config['tags']

        search_words_list = []
        if 'search_words' in schema_config.keys() :
            search_words_list = schema_config['search_words']

        #Schema versions not to consolidate
        drop_versions = []
        if 'drop_versions' in schema_config.keys() :
            drop_versions += schema_config['drop_versions']
        
        schemas_report_dict[schema_name]['nb_versions_to_drop_in_config'] = len(drop_versions)

        #PARSING API TO GET ALL ELIGIBLE RESOURCES FOR CONSOLIDATION

        df_list = []

        #Listing resources by schema request
        df_schema = parse_api(schema_url_base.format(schema_name=schema_name))
        schemas_report_dict[schema_name]['nb_resources_found_by_schema'] = len(df_schema)
        if len(df_schema) > 0 :
            df_schema['resource_found_by'] = '1 - schema request'
            df_schema['initial_version_name'] = df_schema.apply(lambda row : get_resource_schema_version(row, api_url), axis=1)
            df_list += [df_schema]

        #Listing resources by tag requests
        schemas_report_dict[schema_name]['nb_resources_found_by_tags'] = 0
        for tag in tags_list : 
            df_tag = parse_api(tag_url_base.format(tag=tag))
            schemas_report_dict[schema_name]['nb_resources_found_by_tags'] += len(df_tag)
            if len(df_tag) > 0 :
                df_tag['resource_found_by'] = '2 - tag request'
                df_list += [df_tag]

        #Listing resources by search (keywords) requests
        schemas_report_dict[schema_name]['nb_resources_found_by_search_words'] = 0
        for search_word in search_words_list :
            df_search_word = parse_api(search_url_base.format(search_word=search_word))
            schemas_report_dict[schema_name]['nb_resources_found_by_search_words'] += len(df_search_word)
            if len(df_search_word) > 0 :
                df_search_word['resource_found_by'] = '3 - search request'
                df_list += [df_search_word]

        if len(df_list) > 0 :
            df = pd.concat(df_list, ignore_index=True)
            df = df[~(df['dataset_id'].isin(datasets_to_exclude))]
            df = df.sort_values('resource_found_by')
            df = df.drop_duplicates(subset=['resource_id'], keep='first')
            
            print('{} -- 🔢 {} resource(s) found for this schema.'.format(datetime.now(), len(df)))
            
            if 'initial_version_name' not in df.columns : # in case there is no resource found by schema request
                df['initial_version_name'] = np.nan

            #FOR EACH RESOURCE AND SCHEMA VERSION, CHECK IF RESOURCE MATCHES THE SCHEMA VERSION

            #Apply validata check for each version that is not explicitly dropped in config file
            version_names_list = []
            
            for version in schema_dict['versions'] :
                version_name = version['version_name']
                if version_name not in drop_versions :
                    schema_url = version['schema_url']
                    df['is_valid_v_{}'.format(version_name)] = df.apply(lambda row : is_validata_valid_row(row, schema_url), axis=1)
                    version_names_list += [version_name]
                    print('{} --- ☑️ Validata check done for version {}'.format(datetime.now(), version_name))
                else :
                    print('{} --- ❌ Version {} to drop according to config file'.format(datetime.now(), version_name))
            
            if len(version_names_list) > 0 :
                #Check if resources are at least matching one schema version (only those matching will be downloaded in next step)
                df['is_valid_one_version'] = sum([df['is_valid_v_{}'.format(version_name)] for version_name in version_names_list]) > 0
                schemas_report_dict[schema_name]['nb_valid_resources'] = df['is_valid_one_version'].sum()
                df = add_most_recent_valid_version(df)
                df.to_csv(os.path.join(ref_tables_path, 'ref_table_{}.csv'.format(schema_name.replace('/', '_'))), index=False)
                print('{} -- ✅ Validata check done for {}.'.format(datetime.now(), schema_name))
            else :
                schemas_report_dict[schema_name]['nb_valid_resources'] = 0
                print('{} -- ❌ All possible versions for this schema were dropped by config file.'.format(datetime.now()))

        else :
            print('{} -- ⚠️ No resource found for this schema.'.format(datetime.now(), schema_name))
            
    else :
        print('{} -- ❌ Schema not to consolidate according to config file.'.format(datetime.now()))

2021-11-09 16:43:27.708685 - ℹ️ STARTING SCHEMA: Archivistes75/registre_entrees
2021-11-09 16:43:27.708822 -- ❌ Schema not to consolidate according to config file.
2021-11-09 16:43:27.708857 - ℹ️ STARTING SCHEMA: CEREMA/schema-arrete-circulation-marchandises
2021-11-09 16:43:27.708879 -- ❌ Schema not to consolidate according to config file.
2021-11-09 16:43:27.708899 - ℹ️ STARTING SCHEMA: MTES-MCT/acceslibre-schema
2021-11-09 16:43:27.708918 -- ❌ Schema not to consolidate according to config file.
2021-11-09 16:43:27.708935 - ℹ️ STARTING SCHEMA: NaturalSolutions/schema-arbre
2021-11-09 16:43:27.708988 -- ❌ Schema not to consolidate according to config file.
2021-11-09 16:43:27.709028 - ℹ️ STARTING SCHEMA: arsante/schema-dae
2021-11-09 16:43:27.709047 -- ❌ Schema not to consolidate according to config file.
2021-11-09 16:43:27.709071 - ℹ️ STARTING SCHEMA: datakode/schema-pei
2021-11-09 16:43:27.709093 -- ❌ Schema not to consolidate according to config file.
2021-11-09 16:43:27.709111 - 

## Downloading valid data

We download only data that is valid for at least one version of the schema.

In [37]:
%%time
for schema_name in config_dict.keys() :
    
    print('{} - ℹ️ STARTING SCHEMA: {}'.format(datetime.now(), schema_name))
    
    ref_table_path = os.path.join(ref_tables_path, 'ref_table_{}.csv'.format(schema_name.replace('/', '_')))
    
    if os.path.exists(ref_table_path) :
        df_ref = pd.read_csv(ref_table_path)
        df_ref['is_downloaded'] = False
        
        if len(df_ref[df_ref['is_valid_one_version'] == True]) > 0 :
        
            schema_data_path = Path(data_path) / schema_name.replace('/','_')
            schema_data_path.mkdir(exist_ok=True)

            for index,row in df_ref[df_ref['is_valid_one_version'] == True].iterrows():
                rurl = row['resource_url']
                r = requests.get(rurl, allow_redirects=True)
                
                if r.status_code == 200 :
                    p = Path(schema_data_path) / row['dataset_slug']
                    p.mkdir(exist_ok=True)
                    written_filename = '{}.csv'.format(row['resource_id'])

                    with open('{}/{}'.format(p, written_filename), 'wb') as f:
                        f.write(r.content)
                    
                    df_ref.loc[(df_ref['resource_id'] == row['resource_id']), 'is_downloaded'] = True
                    
                    print('{} --- ⬇️✅ downloaded file [{}] {}'.format(datetime.now(), row['resource_title'], rurl))
                else :
                    print('{} --- ⬇️❌ File could not be downloaded: [{}] {}'.format(datetime.now(), row['resource_title'], rurl))
                    
        else :
            print('{} -- ⚠️ No valid resource for this schema'.format(datetime.now()))
            
        df_ref.to_csv(ref_table_path, index=False)
    
    else :
        print('{} -- ❌ No reference table made for this schema (schema not to consolidate, no version to consolidate or no resource found).'.format(datetime.now()))

2021-11-09 16:56:22.936418 - ℹ️ STARTING SCHEMA: Archivistes75/registre_entrees
2021-11-09 16:56:22.936701 -- ❌ No reference table made for this schema (schema not to consolidate, no version to consolidate or no resource found).
2021-11-09 16:56:22.936733 - ℹ️ STARTING SCHEMA: CEREMA/schema-arrete-circulation-marchandises
2021-11-09 16:56:22.936792 -- ❌ No reference table made for this schema (schema not to consolidate, no version to consolidate or no resource found).
2021-11-09 16:56:22.936814 - ℹ️ STARTING SCHEMA: MTES-MCT/acceslibre-schema
2021-11-09 16:56:22.937041 -- ❌ No reference table made for this schema (schema not to consolidate, no version to consolidate or no resource found).
2021-11-09 16:56:22.937077 - ℹ️ STARTING SCHEMA: NaturalSolutions/schema-arbre
2021-11-09 16:56:22.937243 -- ❌ No reference table made for this schema (schema not to consolidate, no version to consolidate or no resource found).
2021-11-09 16:56:22.937272 - ℹ️ STARTING SCHEMA: arsante/schema-dae
2021-1

## Consolidation

In [38]:
%%time
for schema_name in config_dict.keys() :
    
    print('{} - ℹ️ STARTING SCHEMA: {}'.format(datetime.now(), schema_name))
    
    schema_data_path = Path(data_path) / schema_name.replace('/','_')
    
    if os.path.exists(schema_data_path) :
        
        schema_consolidated_data_path = Path(consolidated_data_path) / schema_name.replace('/','_')
        schema_consolidated_data_path.mkdir(exist_ok=True)
        
        ref_table_path = os.path.join(ref_tables_path, 'ref_table_{}.csv'.format(schema_name.replace('/', '_')))
        df_ref = pd.read_csv(ref_table_path) #(This file necessarily exists if data folder exists)
        
        #We will test if downloaded files are empty or not (so we set default values)
        df_ref['is_empty'] = np.nan
        df_ref.loc[(df_ref['is_downloaded'] == True), 'is_empty'] = False
        
        schema_dict = get_schema_dict(schema_name, schemas_catalogue_list)
        
        version_names_list = [col.replace('is_valid_v_','') for col in df_ref.columns if col.startswith('is_valid_v_')]
        
        for version in schema_dict['versions'] :
            version_name = version['version_name']
            if version_name in version_names_list :
                df_ref_v = df_ref[(df_ref['is_valid_v_'+version_name] == True) & (df_ref['is_downloaded'] == True)]
                
                if len(df_ref_v) > 0 :
                    #Get schema version parameters for ddup
                    version_dict = requests.get(version['schema_url']).json()
                    version_cols_list = [field_dict['name'] for field_dict in version_dict['fields']]
                    
                    if 'primaryKey' in version_dict.keys() :
                        primary_key = version_dict['primaryKey']
                    else :
                        primary_key = None
                        
                    df_r_list = []
                    
                    for index,row in df_ref_v.iterrows():
                        file_path = os.path.join(schema_data_path, row['dataset_slug'], '{}.csv'.format(row['resource_id']))
                        with open(file_path,'rb') as f:
                            encoding = chardet.detect(f.read()).get('encoding')
                            if(encoding == 'Windows-1254'):
                                encoding = 'iso-8859-1'
                        
                        df_r = pd.read_csv(file_path, sep=None, engine="python", dtype='str', encoding=encoding, na_filter=False)
                        
                        if len(df_r) > 0 : #Keeping only non empty files
                            #Keep only schema columns (and add empty columns for missing ones)
                            df_r = df_r[[col for col in version_cols_list if col in df_r.columns]]
                            for col in version_cols_list :
                                if col not in df_r.columns :
                                    df_r[col] = np.nan

                            df_r['last_modified'] = row['resource_last_modified']
                            df_r['datagouv_dataset_id'] = row['dataset_id']
                            df_r['datagouv_resource_id'] = row['resource_id']
                            df_r['datagouv_organization_or_owner'] = row['organization_or_owner']
                            df_r_list += [df_r]
                            
                        else :
                            df_ref.loc[(df_ref['resource_id'] == row['resource_id']), 'is_empty'] = True
                    
                    if len(df_r_list) >= 5 :
                        df_conso = pd.concat(df_r_list, ignore_index=True)

                        #Sorting by most recent (resource last modification date at the moment)
                        df_conso = df_conso.sort_values('last_modified', ascending=False)

                        #Deduplication
                        if primary_key is not None :
                            ddup_cols = primary_key
                        else :
                            ddup_cols = version_cols_list

                        df_conso = df_conso.drop_duplicates(ddup_cols, keep='first').reset_index(drop=True)  
                        
                        #Avoid "null" value in CSV files
                        df_conso = df_conso.fillna('')
                        
                        df_conso.to_csv(os.path.join(schema_consolidated_data_path, 'consolidation_{}_v_{}_{}.csv'.format(schema_name.replace('/','_'), version_name, consolidation_date_str)), index=False, encoding="utf-8",na_rep='null')
                        print('{} -- ✅ DONE: {} version {}'.format(datetime.today(), schema_name, version_name))
                    
                    else :
                        print('{} -- ⚠️ Less than 5 (non-empty) valid resources for version {} : consolidation file is not built'.format(datetime.today(), version_name))
                    
                else :
                    print('{} -- ⚠️ No valid resource for version {} of this schema'.format(datetime.today(), version_name))
        
        df_ref.to_csv(ref_table_path, index=False)
    
    else :
        print('{} -- ❌ No data downloaded for this schema.'.format(datetime.today()))

2021-11-09 16:56:42.774333 - ℹ️ STARTING SCHEMA: Archivistes75/registre_entrees
2021-11-09 16:56:42.774882 -- ❌ No data downloaded for this schema.
2021-11-09 16:56:42.774939 - ℹ️ STARTING SCHEMA: CEREMA/schema-arrete-circulation-marchandises
2021-11-09 16:56:42.775057 -- ❌ No data downloaded for this schema.
2021-11-09 16:56:42.775089 - ℹ️ STARTING SCHEMA: MTES-MCT/acceslibre-schema
2021-11-09 16:56:42.775167 -- ❌ No data downloaded for this schema.
2021-11-09 16:56:42.775195 - ℹ️ STARTING SCHEMA: NaturalSolutions/schema-arbre
2021-11-09 16:56:42.775285 -- ❌ No data downloaded for this schema.
2021-11-09 16:56:42.775315 - ℹ️ STARTING SCHEMA: arsante/schema-dae
2021-11-09 16:56:42.775395 -- ❌ No data downloaded for this schema.
2021-11-09 16:56:42.775630 - ℹ️ STARTING SCHEMA: datakode/schema-pei
2021-11-09 16:56:42.775920 -- ❌ No data downloaded for this schema.
2021-11-09 16:56:42.776187 - ℹ️ STARTING SCHEMA: etalab/schema-decp-dpa
2021-11-09 16:56:42.776337 -- ❌ No data downloaded fo

## Upload

In [49]:
%%time
for schema_name in config_dict.keys() :
    
    with open(config_path, 'r') as f :
        config_dict = yaml.safe_load(f)
    
    print('{} - ℹ️ STARTING SCHEMA: {}'.format(datetime.now(), schema_name))
    
    schema_consolidated_data_path = Path(consolidated_data_path) / schema_name.replace('/','_')
    
    if os.path.exists(schema_consolidated_data_path) :
        #Check if dataset_id is in config. If not, create a dataset on datagouv
        schema_config = config_dict[schema_name]
        if 'consolidated_dataset_id' not in schema_config.keys() :
            response = create_schema_consolidation_dataset(schema_name, schemas_catalogue_list, api_url)
            if response.status_code == 201 :
                consolidated_dataset_id = response.json()['id']
                update_config_file(schema_name, 'consolidated_dataset_id', consolidated_dataset_id, config_path)
                print('{} -- 🟢 No consolidation dataset for this schema - Successfully created (id: {})'.format(datetime.today(), consolidated_dataset_id))
            else :
                print('{} -- 🔴 No consolidation dataset for this schema - Failed to create one'.format(datetime.today()))
        else :
            consolidated_dataset_id = schema_config['consolidated_dataset_id']
            
        schemas_report_dict[schema_name]['consolidated_dataset_id'] = consolidated_dataset_id
            
        #Creating last consolidation resources
        version_names_list = [filename.replace('consolidation_'+schema_name.replace('/','_')+'_v_', '').replace('_'+ consolidation_date_str +'.csv','') for filename in os.listdir(schema_consolidated_data_path) if not filename.startswith('.')]
        
        for version_name in sorted(version_names_list) :
            with open(config_path, 'r') as f :
                config_dict = yaml.safe_load(f)
        
            schema = {
                "name": schema_name,
                "version": version_name
            }
            obj = {}
            obj['schema'] = schema
            obj['type'] = 'main'
            obj['title'] = "Dernière version consolidée (v{} du schéma) - {}".format(version_name, consolidation_date_str)
            
            file_path = os.path.join(schema_consolidated_data_path, 'consolidation_{}_v_{}_{}.csv'.format(schema_name.replace('/','_'), version_name, consolidation_date_str))
            
            
            #Uploading file (creating a new resource if version was not there before)
            try :
                r_id = config_dict[schema_name]['latest_resource_ids'][version_name]
                url = api_url + 'datasets/' + consolidated_dataset_id + '/resources/' + r_id + '/upload/'
                r_to_create = False
                expected_status_code = 200
                
            except KeyError :
                url = api_url + 'datasets/' + consolidated_dataset_id + '/upload/'
                r_to_create = True
                expected_status_code = 201
            
            with open(file_path, 'rb') as file:
                files = {'file': (file_path.split('/')[-1], file.read())}
            
            response = requests.post(url, files=files, headers=HEADERS)

            if response.status_code == expected_status_code :
                if r_to_create == True :
                    r_id = response.json()['id']
                    update_config_version_resource_id(schema_name, version_name, r_id, config_path)
                    print('{} --- ➕ New latest resource ID created for {} v{} (id: {})'.format(datetime.today(), schema_name, version_name, r_id))
            else :
                r_id = None
                print('{} --- ⚠️ Version {}: file could not be uploaded.'.format(datetime.today(), version_name))
                
                
            if r_id is not None :
                r_url = api_url + 'datasets/{}/resources/{}/'.format(consolidated_dataset_id, r_id)
                r_response = requests.put(r_url, json=obj, headers=HEADERS)

                if r_response.status_code == 200 :
                    if r_to_create == True :
                        print('{} --- ✅ Version {}: Successfully created consolidated file.'.format(datetime.today(), version_name))
                    else :
                        print('{} --- ✅ Version {}: Successfully updated consolidated file.'.format(datetime.today(), version_name))
                else :
                    print('{} --- ⚠️ Version {}: file uploaded but metadata could not be updated.'.format(datetime.today(), version_name))
                
    else :
        schemas_report_dict[schema_name]['consolidated_dataset_id'] = np.nan
        print('{} -- ❌ No consolidated file for this schema.'.format(datetime.today()))

#Reopening config file to update config_dict (in case it has to be reused right after)
with open(config_path, 'r') as f :
    config_dict = yaml.safe_load(f)

2021-10-12 12:18:16.667065 - ℹ️ STARTING SCHEMA: Archivistes75/registre_entrees
2021-10-12 12:18:16.667481 -- ❌ No consolidated file for this schema.
2021-10-12 12:18:16.679848 - ℹ️ STARTING SCHEMA: CEREMA/schema-arrete-circulation-marchandises
2021-10-12 12:18:16.680125 -- ❌ No consolidated file for this schema.
2021-10-12 12:18:16.692072 - ℹ️ STARTING SCHEMA: MTES-MCT/acceslibre-schema
2021-10-12 12:18:16.692241 -- ❌ No consolidated file for this schema.
2021-10-12 12:18:16.703748 - ℹ️ STARTING SCHEMA: NaturalSolutions/schema-arbre
2021-10-12 12:18:16.703989 -- ❌ No consolidated file for this schema.
2021-10-12 12:18:16.716180 - ℹ️ STARTING SCHEMA: arsante/schema-dae
2021-10-12 12:18:16.716565 -- ❌ No consolidated file for this schema.
2021-10-12 12:18:16.728458 - ℹ️ STARTING SCHEMA: datakode/schema-pei
2021-10-12 12:18:16.728810 -- ❌ No consolidated file for this schema.
2021-10-12 12:18:16.741020 - ℹ️ STARTING SCHEMA: etalab/schema-decp-dpa
2021-10-12 12:18:16.741443 -- ❌ No consol

## Schemas (versions) feedback loop on resources

### Adding needed infos for each resource in reference tables

In [51]:
%%time
for schema_name in config_dict.keys() :
    
    ref_table_path = os.path.join(ref_tables_path, 'ref_table_{}.csv'.format(schema_name.replace('/', '_')))
    
    if os.path.isfile(ref_table_path) :
        df_ref = pd.read_csv(ref_table_path)

        df_ref = add_most_recent_valid_version(df_ref)
        df_ref['is_schema_version_to_update'] = df_ref.apply(is_schema_version_to_update, axis=1)
        df_ref['is_schema_to_add'] = df_ref.apply(is_schema_to_add, axis=1)
        df_ref['is_schema_to_drop'] = df_ref.apply(is_schema_to_drop, axis=1)
        
        df_ref.to_csv(ref_table_path, index=False)

        print('{} - ✅ Infos added for schema {}'.format(datetime.today(), schema_name))
        
    else :
        print('{} - ❌ No reference table for schema {}'.format(datetime.today(), schema_name))

2021-10-12 12:20:11.787472 - ❌ No reference table for schema Archivistes75/registre_entrees
2021-10-12 12:20:11.787779 - ❌ No reference table for schema CEREMA/schema-arrete-circulation-marchandises
2021-10-12 12:20:11.788208 - ❌ No reference table for schema MTES-MCT/acceslibre-schema
2021-10-12 12:20:11.788283 - ❌ No reference table for schema NaturalSolutions/schema-arbre
2021-10-12 12:20:11.788460 - ❌ No reference table for schema arsante/schema-dae
2021-10-12 12:20:11.788665 - ❌ No reference table for schema datakode/schema-pei
2021-10-12 12:20:11.788869 - ❌ No reference table for schema etalab/schema-decp-dpa
2021-10-12 12:20:11.789286 - ❌ No reference table for schema etalab/schema-hautes-remunerations
2021-10-12 12:20:11.789493 - ❌ No reference table for schema etalab/schema-inclusion-numerique
2021-10-12 12:20:11.827797 - ✅ Infos added for schema etalab/schema-irve
2021-10-12 12:20:11.828091 - ❌ No reference table for schema etalab/schema-lieux-covoiturage
2021-10-12 12:20:11.

### Updating resources schemas and sending comments/mails to notify producers

⚠️⚠️⚠️ **TODO: UNCOMMENT MAIL SENDING AND DISCUSSION COMMENTING (+ DELETE PRINTS) FOR NOTIFICATION TO PRODUCERS.**

In [54]:
%%time
for schema_name in config_dict.keys() :
    
    ref_table_path = os.path.join(ref_tables_path, 'ref_table_{}.csv'.format(schema_name.replace('/', '_')))
    
    if os.path.isfile(ref_table_path) :
        df_ref = pd.read_csv(ref_table_path)
        df_ref['resource_schema_update_success'] = np.nan
        df_ref['producer_notification_success'] = np.nan

        for idx, row in df_ref.iterrows() :
            if row['is_schema_version_to_update'] :
                resource_update_success = update_resource_schema(api_url, row['dataset_id'], row['resource_id'], schema_name, row['most_recent_valid_version'])
                df_ref.loc[(df_ref['resource_id'] == row['resource_id']), 'resource_schema_update_success'] = resource_update_success
                
                if resource_update_success == True :
                    title = 'Mise à jour de la version de la métadonnée schéma'
                    comment = updated_schema_comment_template.format(resource_title = row['resource_title'],
                                                                     schema_name = schema_name,
                                                                     initial_version_name = row['initial_version_name'],
                                                                     most_recent_valid_version = row['most_recent_valid_version']
                                                                    )
                    #comment_post = post_comment_on_dataset(dataset_id=row['dataset_id'],
                    #                                       title=title,
                    #                                       comment=comment,
                    #                                       api_url=api_url
                    #                                      )
                    #
                    #producer_notification_success = (comment_post.status_code == 201)
                    
                    #df_ref.loc[(df_ref['resource_id'] == row['resource_id']), 'producer_notification_success'] = producer_notification_success
                    #No notification at the moment:
                    df_ref.loc[(df_ref['resource_id'] == row['resource_id']), 'producer_notification_success'] = False
                    
            
            elif row['is_schema_to_add'] :
                resource_update_success = add_resource_schema(api_url, row['dataset_id'], row['resource_id'], schema_name, row['most_recent_valid_version'])
                df_ref.loc[(df_ref['resource_id'] == row['resource_id']), 'resource_schema_update_success'] = resource_update_success
                
                if resource_update_success == True :
                    title = 'Ajout de la métadonnée schéma'
                    comment = added_schema_comment_template.format(resource_title = row['resource_title'],
                                                             schema_name = schema_name,
                                                             most_recent_valid_version = row['most_recent_valid_version']
                                                            )
                    #comment_post = post_comment_on_dataset(dataset_id=row['dataset_id'],
                    #                                       title=title,
                    #                                       comment=comment,
                    #                                       api_url=api_url
                    #                                      )
                    #
                    #producer_notification_success = (comment_post.status_code == 201)
                    #df_ref.loc[(df_ref['resource_id'] == row['resource_id']), 'producer_notification_success'] = producer_notification_success
                    #No notification at the moment:
                    df_ref.loc[(df_ref['resource_id'] == row['resource_id']), 'producer_notification_success'] = False
            
            #Right now, we don't drop schema and do no notification
            elif row['is_schema_to_drop'] :
            #    resource_update_success = delete_resource_schema(api_url, row['dataset_id'], row['resource_id'], schema_name)
            #    df_ref.loc[(df_ref['resource_id'] == row['resource_id']), 'resource_schema_update_success'] = resource_update_success
            #    
            #    if resource_update_success == True :
            #        title = 'Suppression de la métadonnée schéma'
            #        
            #        mails_type, mails_list = get_owner_or_admin_mails(row['dataset_id'], api_url)
            #        
            #        if len(mails_list) > 0 : #If we found some email addresses, we send mails
            #            
            #            if mails_type == 'organisation_admins' :
            #                message = deleted_schema_mail_template_org.format(organisation_name=row['organization_or_owner'],
            #                                                                  dataset_title=row['dataset_title'],
            #                                                                  resource_title=row['resource_title'],
            #                                                                  schema_name=schema_name,
            #                                                                  schema_url=get_schema_dict(schema_name, schemas_catalogue_list)['schema_url'],
            #                                                                  resource_url=row['resource_url']
            #                                                                 )
            #            elif mails_type == 'owner' :
            #                message = deleted_schema_mail_template_own.format(dataset_title=row['dataset_title'],
            #                                                                  resource_title=row['resource_title'],
            #                                                                  schema_name=schema_name,
            #                                                                  schema_url=get_schema_dict(schema_name, schemas_catalogue_list)['schema_url'],
            #                                                                  resource_url=row['resource_url']
            #                                                                 )
            #                
            #            
            #            #Sending mail
            #            
            #            producer_notification_success_list = []
            #            print('- {} | {}:'.format(row['dataset_title'], row['resource_title']))
            #            for mail_to in mails_list :
            #                #mail_send = send_email(subject=title,
            #                #                       message=message,
            #                #                       mail_from=mail_from,
            #                #                       mail_to=mail_to,
            #                #                       smtp_host=smtp_host,
            #                #                       smtp_user=smtp_user,
            #                #                       smtp_password=smtp_password)

            #                #producer_notification_success_list += [(mail_send.status_code == 250)]
            #            
            #            #producer_notification_success = any(producer_notification_success_list) # Success if at least one person receives the mail
            #            
            #        else : #If no mail address, we post a comment on dataset
            #            comment = deleted_schema_comment_template.format(resource_title=row['resource_title'],
            #                                                             schema_name=schema_name,
            #                                                             schema_url=get_schema_dict(schema_name, schemas_catalogue_list)['schema_url'],
            #                                                             resource_url=row['resource_url']
            #                                                            )
            #            
            #            #comment_post = post_comment_on_dataset(dataset_id=row['dataset_id'],
            #            #                                       title=title,
            #            #                                       comment=comment,
            #            #                                       api_url=api_url
            #            #                                      )
            #        
            #            #producer_notification_success = (comment_post.status_code == 201)
            #        
            #        #df_ref.loc[(df_ref['resource_id'] == row['resource_id']), 'producer_notification_success'] = producer_notification_success
            
                #TO DROP when schema will be deleted and producer notified:
                df_ref.loc[(df_ref['resource_id'] == row['resource_id']), 'resource_schema_update_success'] = False
                df_ref.loc[(df_ref['resource_id'] == row['resource_id']), 'producer_notification_success'] = False
            
        
        df_ref.to_csv(ref_table_path, index=False)

        print('{} - ✅ Resources updated for schema {}'.format(datetime.today(), schema_name))
        
    else :
        print('{} - ❌ No reference table for schema {}'.format(datetime.today(), schema_name))

2021-10-12 12:44:23.439698 - ❌ No reference table for schema Archivistes75/registre_entrees
2021-10-12 12:44:23.440106 - ❌ No reference table for schema CEREMA/schema-arrete-circulation-marchandises
2021-10-12 12:44:23.440200 - ❌ No reference table for schema MTES-MCT/acceslibre-schema
2021-10-12 12:44:23.440256 - ❌ No reference table for schema NaturalSolutions/schema-arbre
2021-10-12 12:44:23.440320 - ❌ No reference table for schema arsante/schema-dae
2021-10-12 12:44:23.440364 - ❌ No reference table for schema datakode/schema-pei
2021-10-12 12:44:23.440405 - ❌ No reference table for schema etalab/schema-decp-dpa
2021-10-12 12:44:23.440632 - ❌ No reference table for schema etalab/schema-hautes-remunerations
2021-10-12 12:44:23.440861 - ❌ No reference table for schema etalab/schema-inclusion-numerique
2021-10-12 12:44:46.576639 - ✅ Resources updated for schema etalab/schema-irve
2021-10-12 12:44:46.576964 - ❌ No reference table for schema etalab/schema-lieux-covoiturage
2021-10-12 12:

## Updating consolidation documentation resource

In [56]:
%%time
for schema_name in config_dict.keys() :
    
    ref_table_path = os.path.join(ref_tables_path, 'ref_table_{}.csv'.format(schema_name.replace('/', '_')))
    
    with open(config_path, 'r') as f :
        config_dict = yaml.safe_load(f)
    
    print('{} - ℹ️ STARTING SCHEMA: {}'.format(datetime.now(), schema_name))
    
    schema_config = config_dict[schema_name]
    
    if os.path.isfile(ref_table_path) :
    
        if 'consolidated_dataset_id' in schema_config.keys() :
            consolidated_dataset_id = schema_config['consolidated_dataset_id']
            
            obj = {}
            obj['type'] = 'documentation'
            obj['title'] = "Documentation sur la consolidation - {}".format(consolidation_date_str)

            #Uploading documentation file (creating a new resource if version was not there before)
            try :
                doc_r_id = config_dict[schema_name]['documentation_resource_id']
                url = api_url + 'datasets/' + consolidated_dataset_id + '/resources/' + doc_r_id + '/upload/'
                doc_r_to_create = False
                expected_status_code = 200

            except KeyError :
                url = api_url + 'datasets/' + consolidated_dataset_id + '/upload/'
                doc_r_to_create = True
                expected_status_code = 201

            with open(ref_table_path, 'rb') as file:
                files = {'file': (ref_table_path.split('/')[-1], file.read())}

            response = requests.post(url, files=files, headers=HEADERS)
            
            if response.status_code == expected_status_code :
                if doc_r_to_create == True :
                    doc_r_id = response.json()['id']
                    update_config_file(schema_name, 'documentation_resource_id', doc_r_id, config_path)
                    print('{} --- ➕ New documentation resource ID created for {} (id: {})'.format(datetime.today(), schema_name, doc_r_id))
            else :
                doc_r_id = None
                print('{} --- ⚠️ Documentation file could not be uploaded.'.format(datetime.today()))


            if doc_r_id is not None :
                doc_r_url = api_url + 'datasets/{}/resources/{}/'.format(consolidated_dataset_id, doc_r_id)
                doc_r_response = requests.put(doc_r_url, json=obj, headers=HEADERS)
                if doc_r_response.status_code == 200 :
                    if doc_r_to_create == True :
                        print('{} --- ✅ Successfully created documentation file.'.format(datetime.today()))
                    else :
                        print('{} --- ✅ Successfully updated documentation file.'.format(datetime.today()))
                else :
                    print('{} --- ⚠️ Documentation file uploaded but metadata could not be updated.'.format(datetime.today()))
        
        else :
            print('{} -- ❌ No consolidation dataset ID for this schema.'.format(datetime.today()))
            
    else :
        print('{} -- ❌ No reference table for this schema.'.format(datetime.today()))

#Reopening config file to update config_dict (in case it has to be reused right after)
with open(config_path, 'r') as f :
    config_dict = yaml.safe_load(f)

2021-10-12 12:49:50.867769 - ℹ️ STARTING SCHEMA: Archivistes75/registre_entrees
2021-10-12 12:49:50.867955 -- ❌ No reference table for this schema.
2021-10-12 12:49:50.879817 - ℹ️ STARTING SCHEMA: CEREMA/schema-arrete-circulation-marchandises
2021-10-12 12:49:50.880168 -- ❌ No reference table for this schema.
2021-10-12 12:49:50.891719 - ℹ️ STARTING SCHEMA: MTES-MCT/acceslibre-schema
2021-10-12 12:49:50.892002 -- ❌ No reference table for this schema.
2021-10-12 12:49:50.904852 - ℹ️ STARTING SCHEMA: NaturalSolutions/schema-arbre
2021-10-12 12:49:50.905461 -- ❌ No reference table for this schema.
2021-10-12 12:49:50.917992 - ℹ️ STARTING SCHEMA: arsante/schema-dae
2021-10-12 12:49:50.918113 -- ❌ No reference table for this schema.
2021-10-12 12:49:50.929845 - ℹ️ STARTING SCHEMA: datakode/schema-pei
2021-10-12 12:49:50.930145 -- ❌ No reference table for this schema.
2021-10-12 12:49:50.943482 - ℹ️ STARTING SCHEMA: etalab/schema-decp-dpa
2021-10-12 12:49:50.943790 -- ❌ No reference table fo

## Consolidation Reports

### Report by schema

In [57]:
reports_list = []

for schema_name in schemas_report_dict.keys() :
    schema_report_dict = schemas_report_dict[schema_name]
    schema_report_dict['schema_name'] = schema_name
    reports_list += [schema_report_dict]
    
reports_df = pd.DataFrame(reports_list)

reports_df = reports_df[['schema_name'] + [col for col in reports_df.columns if col != 'schema_name']].rename(columns={'config_created':'new_config_created'}) #rename to drop at next launch

In [58]:
stats_df_list = []
for schema_name in config_dict.keys() :
    
    ref_table_path = os.path.join(ref_tables_path, 'ref_table_{}.csv'.format(schema_name.replace('/', '_')))
    
    if os.path.isfile(ref_table_path) :
        df_ref = pd.read_csv(ref_table_path)
        df_ref['schema_name'] = schema_name
        df_ref['is_schema_version_updated'] = df_ref['is_schema_version_to_update'] & df_ref['resource_schema_update_success']
        df_ref['is_schema_added'] = df_ref['is_schema_to_add'] & df_ref['resource_schema_update_success']
        df_ref['is_schema_dropped'] = df_ref['is_schema_to_drop'] & df_ref['resource_schema_update_success']
        df_ref['resource_schema_update_success'] = False
        df_ref.to_csv(ref_table_path, index=False)
        stats_df_list += [df_ref[['schema_name', 'is_schema_version_to_update', 'is_schema_to_add', 'is_schema_to_drop', 'resource_schema_update_success', 'is_schema_version_updated','is_schema_added', 'is_schema_dropped']].fillna(False).groupby('schema_name').sum().reset_index()]

stats_df = pd.concat(stats_df_list).reset_index(drop=True)

In [59]:
stats_df.head()

Unnamed: 0,schema_name,is_schema_version_to_update,is_schema_to_add,is_schema_to_drop,resource_schema_update_success,is_schema_version_updated,is_schema_added,is_schema_dropped
0,etalab/schema-irve,119,0,297,0,119,0,0


In [60]:
reports_df = reports_df.merge(stats_df, on='schema_name', how='left')

In [61]:
reports_df.head()

Unnamed: 0,schema_name,nb_versions,new_config_created,nb_versions_to_drop_in_config,nb_resources_found_by_schema,nb_resources_found_by_tags,nb_resources_found_by_search_words,nb_valid_resources,consolidated_dataset_id,is_schema_version_to_update,is_schema_to_add,is_schema_to_drop,resource_schema_update_success,is_schema_version_updated,is_schema_added,is_schema_dropped
0,etalab/schema-irve,7,True,6.0,430.0,0.0,248.0,127.0,5448d3e0c751df01f85d0572,119.0,0.0,297.0,0.0,119.0,0.0,0.0
1,etalab/schema-decp-dpa,1,True,,,,,,,,,,,,,
2,scdl/catalogue,2,True,,,,,,,,,,,,,
3,scdl/deliberations,6,True,,,,,,,,,,,,,
4,scdl/equipements,2,True,,,,,,,,,,,,,


In [62]:
reports_df.to_excel(os.path.join(report_tables_path, 'report_by_schema_{}.xlsx'.format(consolidation_date_str)), index=False)

## Detailed reports (by schema and resource source)

In [63]:
%%time
for schema_name in config_dict.keys() :
    
    ref_table_path = os.path.join(ref_tables_path, 'ref_table_{}.csv'.format(schema_name.replace('/', '_')))
    
    if os.path.isfile(ref_table_path) :
        df_ref = pd.read_csv(ref_table_path)        
    
        df_ref['total_nb_resources'] = 1
        df_ref['error_type'].fillna('no-error', inplace=True)

        cols_to_sum = ['total_nb_resources']
        cols_to_sum += [col for col in df_ref.columns if col.startswith('is_')]
        df_report = df_ref.groupby(['resource_found_by', 'error_type']).agg({col:sum for col in cols_to_sum}).reset_index()

        df_report.to_excel(os.path.join(report_tables_path, 'report_table_{}.xlsx'.format(schema_name.replace('/', '_'))), index=False)

        print('{} - ✅ Report done for schema {}'.format(datetime.today(), schema_name))
        
    else :
        print('{} - ❌ No reference table for schema {}'.format(datetime.today(), schema_name))

2021-10-12 12:52:28.930177 - ❌ No reference table for schema Archivistes75/registre_entrees
2021-10-12 12:52:28.930636 - ❌ No reference table for schema CEREMA/schema-arrete-circulation-marchandises
2021-10-12 12:52:28.930711 - ❌ No reference table for schema MTES-MCT/acceslibre-schema
2021-10-12 12:52:28.930881 - ❌ No reference table for schema NaturalSolutions/schema-arbre
2021-10-12 12:52:28.930943 - ❌ No reference table for schema arsante/schema-dae
2021-10-12 12:52:28.931142 - ❌ No reference table for schema datakode/schema-pei
2021-10-12 12:52:28.931224 - ❌ No reference table for schema etalab/schema-decp-dpa
2021-10-12 12:52:28.931407 - ❌ No reference table for schema etalab/schema-hautes-remunerations
2021-10-12 12:52:28.931483 - ❌ No reference table for schema etalab/schema-inclusion-numerique
2021-10-12 12:52:28.967465 - ✅ Report done for schema etalab/schema-irve
2021-10-12 12:52:28.967582 - ❌ No reference table for schema etalab/schema-lieux-covoiturage
2021-10-12 12:52:28.