# Library imports, configurations

In [3]:
import os
import re
import s3fs
import pandas as pd
import requests
import json
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', None)
from unidecode import unidecode
from wordcloud import WordCloud
import nltk
nltk.download('punkt')

from stop_words import get_stop_words
import unicodedata

from nltk.tokenize import word_tokenize
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from elasticsearch_dsl import connections
from bs4 import BeautifulSoup as bs

[nltk_data] Downloading package punkt to /home/onyxia/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [11]:
# Create filesystem object
S3_ENDPOINT_URL = "https://" + os.environ["AWS_S3_ENDPOINT"]
fs = s3fs.S3FileSystem(client_kwargs={'endpoint_url': S3_ENDPOINT_URL})

In [12]:
BUCKET = "jplaton/diffusion"
fs.ls(BUCKET)

['jplaton/diffusion/.keep',
 'jplaton/diffusion/ted',
 'jplaton/diffusion/visio_mel']

In [13]:
BUCKET_CLEM = "clementg/diffusion"
fs.ls(BUCKET_CLEM)

['clementg/diffusion/.keep',
 'clementg/diffusion/Fiche de Poste DINUM - francetransfert-3595970061- pdf.csv']

# Functions

In [14]:
# Suppression des accents

def remove_accent(s):

    s = unidecode(str(s))
    return(s)

# Suppression de la ponctuation

def remove_punctuation(s):
    # On retire la ponctuation
 
    s = re.sub(r'[^A-Za-z0-9]',' ',str(s))
    return(s)

# Réduction de la casse

def tolower(s):
    s = ' '.join(str(s).lower().split())
    return(s)


# Sur cette base, on crée une petite fonction qui retire les stop words
stopwords = get_stop_words('french')   
stopwords = [unicodedata.normalize('NFKD',m).encode('ASCII', 'ignore').decode() for m in stopwords]

def remove_stopwords(s):
    s = [w for w in word_tokenize(s) if w not in stopwords]
    s = ' '.join(s)
    return(s)

def remove_accent_from_df(df,list_cols):
    df_c =  df.copy()
    df_c[list_cols] = df_c[list_cols].map(remove_accent)
    return df_c

def remove_punctuation_from_df(df,list_cols):
    df_c =  df.copy()
    df_c[list_cols] = df_c[list_cols].map(remove_punctuation)
    return df_c

def tolower_df(df,list_cols):
    df_c =  df.copy()
    df_c[list_cols] = df_c[list_cols].map(tolower)
    return df_c

# Scraping de annuaire service public pour aller chercher les services à aprtir des ids
def get_service_from_scraping(service_id):
    try:
        url=f'https://lannuaire.service-public.fr/gouvernement/{service_id}'
        response = requests.get(url)
        html = response.content
        soup = bs(html, "lxml")
        return soup.title.get_text().replace(' - Annuaire | Service-public.fr', '')
    except:
        return ''

# DATA Download

In [15]:
FILE_PATH_TED = "ted"
FILE_PATH_TED_S3 = BUCKET + "/" + FILE_PATH_TED

with fs.open(FILE_PATH_TED_S3+"/offres-datagouv-20230409.csv", mode="rb") as file_in:
    offres_2023_df = pd.read_csv(file_in, sep=";")

with fs.open(FILE_PATH_TED_S3+"/offres-datagouv-20221225.csv", mode="rb") as file_in:   
    offres_2022_df = pd.read_csv(file_in, sep=";")

with fs.open(FILE_PATH_TED_S3+"/Bulloterie_10_Saison_2.xlsx", mode="rb") as file_in:  
    bulloterie_df =  pd.read_excel(file_in, sheet_name=0,header=[0,1])


  offres_2023_df = pd.read_csv(file_in, sep=";")
  offres_2022_df = pd.read_csv(file_in, sep=";")


In [17]:
with fs.open(BUCKET_CLEM+"/Fiche de Poste DINUM - francetransfert-3595970061- pdf.csv", mode="rb") as file_in:
    fdp_dinum_df = pd.read_csv(file_in, sep=";",encoding='Windows-1252')

In [16]:
with fs.open(FILE_PATH_TED_S3+"/dila_refOrga_admin_Etat_fr_20230505.json", mode="rb") as file_in:  
    orga =  json.load(file_in)

### Traitement référentiel services

In [9]:
orga_df = pd.json_normalize(orga,record_path='service')

In [None]:
orga_df['services_fils'] = orga_df['hierarchie'].map(lambda x: [x[i]['service'] for i in range(len(x))])

In [94]:
services_clean = ( orga_df
    .pipe(remove_accent_from_df,['nom','type_organisme','sigle','services_fils'])
    .pipe(remove_punctuation_from_df,['nom','type_organisme','sigle','services_fils'])
    .pipe(tolower_df,['nom','type_organisme','sigle','services_fils'])
)

In [95]:
services_clean = services_clean[['nom','type_organisme','sigle']] #,'services_fils' #services_clean[services_clean['services_fils'].map(len)==1]

In [96]:
services_clean.head(5)

Unnamed: 0,nom,type_organisme,sigle
0,bureau de coordination strategique,administration centrale ou ministere,
1,direction de l information legale et administrative,administration centrale ou ministere,dila
2,bureau de la simplification et de la qualite de service,administration centrale ou ministere,
3,service de la protection sdlp,administration centrale ou ministere,
4,service national d accueil telephonique de l enfance en danger snated,groupement d interet public,


### Traitement offres

In [18]:
offres_df = pd.concat([offres_2023_df,offres_2022_df])

In [19]:
offres_df.columns = offres_df.columns.map(remove_accent).map(remove_punctuation).map(tolower).map(lambda s: s.replace(' ', '_'))

 offres_df[offres_df['metier'].str.contains('scientist|engineer',na=False)].head(2)

In [20]:
offres_df_clean = ( offres_df
    .pipe(remove_accent_from_df,['organisme_de_rattachement','metier','intitule_du_poste','localisation_du_poste'])
    .pipe(remove_punctuation_from_df,['organisme_de_rattachement','metier','intitule_du_poste','localisation_du_poste'])
    .pipe(tolower_df,['organisme_de_rattachement','metier','intitule_du_poste','localisation_du_poste'])
)

In [21]:
offres_df_clean = offres_df_clean[['organisme_de_rattachement','metier','intitule_du_poste','localisation_du_poste']] #offres_df_clean[offres_df_clean['metier'].str.contains("ux designer",na=False)|offres_df_clean['intitule_du_poste'].str.contains("data designer",na=False)].head(5)

In [22]:
offres_df_clean.head(5)

Unnamed: 0,organisme_de_rattachement,metier,intitule_du_poste,localisation_du_poste
0,structures de cooperation territoriale,chargee charge de politique environnementale,geomaticien ne ecologue syndicat mixte de gestion du conservatoire botanique,haute loire 43
1,structures de cooperation territoriale,chargee charge de politique environnementale,charge e mission flore habitats naturels vegetations agro pastorales,haute loire 43
2,structures de cooperation territoriale,chargee charge de politique environnementale,charge e de missions scientifiques et techniques,haute loire 43
3,structures de cooperation territoriale,plombiere operatrice chauffagiste plombier operateur chauffagiste en froid et ventilation,agent de maintenance des batiments communaute de communes lacq orthez cclo,pyrenees atlantiques 64
4,centres communaux d action sociale,educatrice educateur de jeunes enfants,puericulteur rice educateur rice de jeunes enfants c c a s de fleurance,gers 32


### Traitement Fiche de poste DNUM

In [14]:
# Traitement des colonnes
fdp_dinum_df.columns = fdp_dinum_df.columns.map(remove_accent).map(remove_punctuation).map(tolower).map(lambda s: s.replace(' ', '_'))

In [15]:
fdp_clean = ( fdp_dinum_df
    .pipe(remove_accent_from_df,['poste','intitule_du_service_demandeur_bureau_section','corps_grade','presentation','missions','competences'])
    .pipe(remove_punctuation_from_df,['poste','intitule_du_service_demandeur_bureau_section','corps_grade','presentation','missions','competences'])
    .pipe(tolower_df,['poste','intitule_du_service_demandeur_bureau_section','corps_grade','presentation','missions','competences'])
)

In [16]:
fdp_clean = fdp_clean[['poste','intitule_du_service_demandeur_bureau_section','corps_grade','presentation','missions','competences']] #fdp_clean.head(2)


# Insertion des données dans Elastic Search

In [4]:
# Paramétrage du client
es_client = connections.create_connection(hosts=['http://elasticsearch-master:9200/'])

In [5]:
# Fontion qui transforme un dataframe en document/index à stocker dans ElasticSearch
def doc_generator(df,index_name):
    df_iter = df.iterrows()
    for index, document in df_iter:
        yield {
                "_index": index_name,
                "_type": "_doc",
                "_source": document,
            }

In [68]:
# Indexation des offres
helpers.bulk(es_client, doc_generator(offres_df_clean,"offres"))

  helpers.bulk(es_client, doc_generator(offres_df_clean))
  helpers.bulk(es_client, doc_generator(offres_df_clean))


(406755, [])

In [73]:
# Indexation des services
helpers.bulk(es_client, doc_generator(services_clean,"services"))

  helpers.bulk(es_client, doc_generator(services_clean,"services"))
  helpers.bulk(es_client, doc_generator(services_clean,"services"))


(7636, [])

In [89]:
# Indexation des fiches de poste
helpers.bulk(es_client, doc_generator(fdp_clean,"fiches"))

  helpers.bulk(es_client, doc_generator(fdp_clean,"fiches"))
  helpers.bulk(es_client, doc_generator(fdp_clean,"fiches"))


(30, [])

In [19]:
from elasticsearch_dsl import Search
s = Search(index='offres').query('match',metier='charge')

In [20]:
r=s.execute()

  es.search(index=self._index, body=self.to_dict(), **self._params).body,


In [21]:
df_results = pd.DataFrame((d.to_dict() for d in s.scan()))

  for hit in scan(es, query=self.to_dict(), index=self._index, **self._params):


In [133]:
df_results

Unnamed: 0,organisme_de_rattachement,metier,intitule_du_poste,localisation_du_poste
0,universite paris saclay,chargee charge de controle interne budgetaire et comptable,gestionnaire des operations financieres h f,essonne
1,structures de cooperation territoriale,chargee charge de developpement territorial,che fe de projet petites villes de demain com com du civraisien en poitou,vienne
2,centres communaux d action sociale,chargee charge de gestion administrative et ou de paie,assistant charge du portage de repas et du logement h f,morbihan
3,conseils departementaux,chargee charge de gestion administrative et ou de paie,secretaire du service crip conseil departemental de la gironde,gironde
4,communes,chargee charge d amenagement des espaces verts et de production vegetale,agent d entretien des espaces verts h f le petit quevilly,seine maritime
...,...,...,...,...
153856,communes,chargee charge de l instruction de dossiers administratifs,instructeur instructrice des demandes autorisations droits des sols commune d aix en provence,bouches du rhone
153857,communes,chargee charge des d exploitation des equipements sportifs et ludiques,chef d equipe exploitation h f lyon,rhone
153858,direction des achats de l etat dae,chargee charge de conduite des politiques publiques,dae consultant interne etudes actions d optimisation des achats de l etat et de ses operateurs h f,paris
153859,communes,chargee charge de l instruction de dossiers administratifs,agent des formalites civiles commune d aix en provence,bouches du rhone


In [6]:
 # Recherche dans l'ensemble des champs le meilleur écho (le plus pertinent) #
fullsearch_easy = es_client.search(index = "services", # l'index dans lequel on cherche
                       q = "direction generale des finances publiques dgfip	data scientis ssi dpn dtnum bureau si mission service a l utilisateur data scientist expert h f", # notre requête textuelle
                              size = 1) # taille de l'ensemble les échos souhaités

  fullsearch_easy = es_client.search(index = "services", # l'index dans lequel on cherche


In [7]:
fullsearch_easy

ObjectApiResponse({'took': 14, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 6310, 'relation': 'eq'}, 'max_score': 27.02822, 'hits': [{'_index': 'services', '_type': '_doc', '_id': 'QbHPPYsB1My3OOFZX1kn', '_score': 27.02822, '_ignored': ['services_fils.keyword'], '_source': {'nom': 'direction generale des finances publiques dgfip', 'type_organisme': 'administration centrale ou ministere', 'sigle': '', 'services_fils': 'cabinet et communication cabinet international delegation aux relations internationales delegation a la transformation numerique dtnum mission de cooperation internationale mission strategie des relations avec les differents publics mission unification du recouvrement fiscal murf direction de l immobilier de l etat direction de la legislation fiscale service de la securite juridique et du controle fiscal sjcf service de la gestion fiscale gf service des gestions publiques locales des activites bancair

In [100]:
 # Recherche dans l'ensemble des champs le meilleur écho (le plus pertinent) #
fullsearch_scientist = es_client.search(index = "services", # l'index dans lequel on cherche
                       q = "centre hospitalier universitaire de tours	data scientist datascientist centre de donnees cliniques", # notre requête textuelle
                              size = 1) # taille de l'ensemble les échos souhaités

  fullsearch1 = es_client.search(index = "services", # l'index dans lequel on cherche


In [101]:
fullsearch_scientist

ObjectApiResponse({'took': 6, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 7636, 'relation': 'eq'}, 'max_score': 15.201317, 'hits': [{'_index': 'services', '_type': '_doc', '_id': 'DrFaKIsB1My3OOFZEzmY', '_score': 15.201317, '_source': {'copyright': 'direction de l information legale et administrative premiere ministre', 'nom': 'centre de valorisation des ressources humaines cvrh de tours', 'type_organisme': 'service a competence nationale', 'sigle': ''}}]}})

In [103]:
 # Recherche dans l'ensemble des champs le meilleur écho (le plus pertinent) #
fullsearch_engineer= es_client.search(index = "services", # l'index dans lequel on cherche
                       q = "service hydrographique et oceanographique de la marine	data engineer	ingenieur en traitement de l information conception developpement et experimentation d ia h f", # notre requête textuelle
                              size = 1) # taille de l'ensemble les échos souhaités

  fullsearch_engineer= es_client.search(index = "services", # l'index dans lequel on cherche


In [104]:
fullsearch_engineer

ObjectApiResponse({'took': 23, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 7636, 'relation': 'eq'}, 'max_score': 27.117067, 'hits': [{'_index': 'services', '_type': '_doc', '_id': 't7FaKIsB1My3OOFZFT--', '_score': 27.117067, '_source': {'copyright': 'direction de l information legale et administrative premiere ministre', 'nom': 'service hydrographique et oceanographique de la marine shom', 'type_organisme': 'etablissement public', 'sigle': ''}}]}})

In [106]:
fullsearch_designer = es_client.search(index = "services", # l'index dans lequel on cherche
                       q = "caisse des depots et consignations	architecte technique expert conception et architecture de donnees d entreprise data designer h f	", # notre requête textuelle
                              size = 1) # taille de l'ensemble les échos souhaités

  fullsearch_designer = es_client.search(index = "services", # l'index dans lequel on cherche


In [107]:
fullsearch_designer

ObjectApiResponse({'took': 17, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 7636, 'relation': 'eq'}, 'max_score': 21.853615, 'hits': [{'_index': 'services', '_type': '_doc', '_id': 'RbFaKIsB1My3OOFZEDBa', '_score': 21.853615, '_source': {'copyright': 'direction de l information legale et administrative premiere ministre', 'nom': 'caisse des depots et consignations cdc', 'type_organisme': 'juridiction', 'sigle': ''}}]}})

In [24]:
def get_services(es,query):
    fullsearch = es.search(index = "services", # l'index dans lequel on cherche
                       q = query, # notre requête textuelle
                       size = 1)
    return fullsearch['hits']['hits'][0]['_source']['services_fils']

In [25]:
def add_services(df, list_cols,es):
    df['service'] = df[list_cols].apply(lambda row: get_services(es,' '.join(row.values.astype(str))), axis=1)
    return df

In [9]:
get_services(es_client,"caisse des depots et consignations	architecte technique expert conception et architecture de donnees d entreprise data designer h f")

  fullsearch = es.search(index = "services", # l'index dans lequel on cherche


'caisse des depots et consignations cdc direction regionale antilles guyane caisse des depots et consignations cdc direction regionale auvergne rhone alpes caisse des depots et consignations cdc direction regionale bourgogne franche comte caisse des depots et consignations cdc direction regionale bretagne caisse des depots et consignations cdc direction regionale centre val de loire caisse des depots et consignations cdc direction regionale corse caisse des depots et consignations cdc direction regionale grand est caisse des depots et consignations cdc direction regionale hauts de france caisse des depots et consignations cdc direction regionale normandie caisse des depots et consignations cdc direction regionale nouvelle aquitaine caisse des depots et consignations cdc direction regionale pays de la loire caisse des depots et consignations cdc direction regionale provence alpes cote d azur caisse des depots et consignations cdc direction regionale reunion ocean indien caisse des depot

In [26]:
list_cols = ['organisme_de_rattachement','metier','intitule_du_poste','localisation_du_poste']
offres_df_temp = offres_df_clean[offres_df_clean['metier'].str.contains("data scientist",na=False)|offres_df_clean['intitule_du_poste'].str.contains("data scientist",na=False)]
offres_with_services =  offres_df_temp.head(50).pipe(add_services,list_cols,es_client)

  fullsearch = es.search(index = "services", # l'index dans lequel on cherche
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['service'] = df[list_cols].apply(lambda row: get_services(es,' '.join(row.values.astype(str))), axis=1)


In [148]:
list_cols = ['poste','intitule_du_service_demandeur_bureau_section','corps_grade'] #,'presentation','missions','competences'
fdp_temp = fdp_clean[fdp_clean['poste'].str.contains("data engineer",na=False)|fdp_clean['competences'].str.contains("data engineer",na=False)]
fdp_with_services =  fdp_temp.pipe(add_services,list_cols,es_client)

  fullsearch = es.search(index = "services", # l'index dans lequel on cherche
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['service'] = df[list_cols].apply(lambda row: get_services(es,' '.join(row.values.astype(str))), axis=1)


In [149]:
fdp_with_services

Unnamed: 0,poste,intitule_du_service_demandeur_bureau_section,corps_grade,presentation,missions,competences,service
24,ingenieur data f h,direction interministerielle du numerique dinum departement etalab,,la direction interministerielle du numerique dinum est un service de la premiere ministre place sous l autorite du ministre de la transformation et de la fonction publiques elle oriente anime soutient et coordonne les actions des administrations de l etat et celles des organismes places sous sa tutelle visant a ameliorer la qualite l efficacite l efficience et la fiabilite des services rendus par le systeme d information et de communication de l etat elle contribue a l ouverture des donnees publiques aux strategies d innovation ouverte et au travail avec les ecosystemes d innovation elle pilote la politique de mutualisation du systeme d information et de communication de l etat et peut intervenir dans la gouvernance des operations de mutualisation ainsi que dans la conception l elaboration et la mise en oeuvre de ces operations enfin elle soutient le developpement des competences de l etat et des organismes places sous sa tutelle dans le domaine du numerique en facilitant l emergence d une filiere professionnelle interministerielle du numerique rejoindre la dinum c est prendre part a un collectif de plus de 200 agents reunis autour d une meme ambition le numerique au service de l efficacite de l action publique la le titulaire du poste integre le departement etalab de la dinum etalab vise a valoriser et exploiter les donnees comme levier d efficacite de l action publique il coordonne la conception et la mise en oeuvre de la strategie de l etat dans le domaine de la donnee il promeut l action de l etat en matiere d ouverture de circulation et d exploitation des donnees publiques il accompagne les administrations dans l usage de la data science de l intelligence artificielle et diffuse une culture de l innovation ouverte au sein des administrations,au sein du datalab l ingenieur e data travaille avec un product owner des data scientists et un analyste de donnees pour developper des cas d usages mobilisant des donnees et a fort impact au sein d une equipe produit suivant la methode agile l agent a pour mission de mettre en place des outils de traitement de la donnee pour realiser des cas d usage il elle met en oeuvre des traitements sur les donnees manipulees dans le cadre des cas d usage portes par le datalab afin de favoriser et de faciliter la reutilisation de celles ci par son equipe vous serez amene e a utiliser et mettre en oeuvre des outils de traitements et de stockage de donnees bases de donnees pour faciliter leur mise a jour et leur mise a disposition vous serez sensible a la question de la securite et de l administration systeme et gererez cette problematique dans les infrastructures data que vous utiliserez a ce titre vous collaborerez etroitement avec le responsable datasecops f h de l equipe faciliter l interoperabilite des donnees en integrant des variables pivots permettant le croisement avec d autres donnees et vous assurer du respect de bonnes pratiques techniques permettant une meilleure exploitation des donnees pour cela vous integrerez les dimensions de completude d unicite de conformite de tracabilite et de coconformite de tracabilite et de coherence des donnees garantir les bonnes pratiques de developpements appliquees sur les donnees traitements montee en qualite calculs d indicateurs dans l optique de maximiser l impact de projet data a fort potentiel veiller a ce que les outils et traitements developpes dans le cadre du datalab soient transferables aux entites chargees de les executer en production dans les administrations lorsque les cas d usage data ont demontre leur impact et sont finalises mener une veille technologique active pour mobiliser les meilleures technologies au cours de la realisation de vos missions vous devriez etre en veille permanente pour detecter des projets a impact a valoriser ou a accompagner vous pourriez par ailleurs etre amene e a renforcer ponctuellement les autres agents du departement suivant l interet du service enfin en fonction de l evolution des priorites fixees a la dinum et au departement etalab vous pourrez etre amene e a prendre ponctuellement en charge d autres taches et projets,de formation superieure bac 5 vous disposez d une experience d au moins 5 ans en ingenierie de donnees vous etes un e expert e de la donnee a ce titre vous etes sensible a la question de la qualite des donnees et a leur accessibilite vous disposez egalement de tres bonnes connaissances dans la mise en place d outils de stockage de donnees minio elasticsearch postgres mongo etc de traitement de donnees apache arrow et spark python pandas polars etc d ordonnancement de flux airflow prefect luigi dbt etc bonnes connaissances en developpement backend notamment dans la mise en oeuvre d api et en administration systeme linux docker connaissance de l ecosysteme des outils libres connaissance du droit de la donnee et des enjeux juridiques lies au traitement des donnees sensibles bonne connaissance des donnees manipulees au sein de l administration serait un plus expertise dans les outils lies a l exploitation de la donnee data engineering solides competences en developpement python pandas polars arrow etc et en outils de developpement operationnel git ci cd docker airflow etc capacite a contribuer sur des logiciels libres force de proposition rigueur reactivite esprit analytique et de synthese,direction interministerielle du numerique dinum
