# In this notebook, we will search some job offers from the french employment agency

Pre-requise: Create an account for the french employment 

As documented on their website you need to:

- Step1: Go to [pole-emploi](https://pole-emploi.io/create),create a new account and login
- Step2: After login, Go to "mon space"->"cree une application" and agree to the terms of use. You need to provide three information
    - Nom de l'application: You put a name of application that will consume this api e.g. toto
    - URL d'accès: You put the url of your application. e.g. https://datalab.sspcloud.fr/home
    - Description de votre application: Put the app description
  After the creation, you should see the api ID and secret on the page 
- Step3: You will see a list of apis. Select the api that you are intrested in (e.g Infotravail, offre d'emploi) by clicking DEMANDE D'ACCÈS and selecting your application.
 
For more information about the Pole-Emploi API subscription, read its [documentation](https://pole-emploi.io/data/api).

In [61]:
from offres_emploi import Api

from offres_emploi.utils import dt_to_str_iso
import datetime
import pprint as pp
import pandas as pd
import os
import pyarrow.parquet as pq
import s3fs
from pyarrow import fs
import pyarrow as pa


# Get job offers by using key word

## Step 1 Build api client 

In [49]:
client = Api(client_id="changMe", 
            client_secret="changeMe")

## Step 2 Set up search parameters

In [50]:
start_dt = datetime.datetime(2020, 12, 1, 12, 30)
end_dt = datetime.datetime.today()
keyword="data"
params = {
    "motsCles": keyword,
    'minCreationDate': dt_to_str_iso(start_dt),
    'maxCreationDate': dt_to_str_iso(end_dt),
    # add filter to filter job by department 
    # 'department':'973',
}

## Step 3 Get job offers based on search filter

In [51]:
search_on_big_data = client.search(params=params)

# Get the first element of the result
pp.pprint(search_on_big_data["resultats"][0])

Making request with params {'motsCles': 'data', 'minCreationDate': '2020-12-01T12:30:00Z', 'maxCreationDate': '2021-11-30T14:30:10Z'}
Token has not been requested yet. Requesting token
Now requesting token
{'accessibleTH': False,
 'alternance': False,
 'appellationlibelle': 'Data scientist',
 'competences': [{'code': '109527',
                  'exigence': 'S',
                  'libelle': 'Adapter les outils de traitement statistique de '
                             'données'},
                 {'code': '109528',
                  'exigence': 'S',
                  'libelle': "Rédiger l'information produite (études, "
                             'synthèses, rapports, bulletins, ...) et établir '
                             'des prévisions, des évaluations, des '
                             'recommandations, des perspectives, ...'},
                 {'code': '109529',
                  'exigence': 'S',
                  'libelle': 'Présenter et diffuser les résultats des études '
 

In [52]:
# analyze the response
print(f"Response type: {type(search_on_big_data)}")
# We can see response is a dict
print(f"It contains Keys: {search_on_big_data.keys()}")
print(search_on_big_data["Content-Range"])
print(search_on_big_data["filtresPossibles"])
print(type(search_on_big_data["resultats"]))


Response type: <class 'dict'>
It contains Keys: dict_keys(['resultats', 'filtresPossibles', 'Content-Range'])
{'first_index': '0', 'last_index': '149', 'max_results': '1040'}
[{'filtre': 'typeContrat', 'agregation': [{'valeurPossible': 'CDD', 'nbResultats': 88}, {'valeurPossible': 'CDI', 'nbResultats': 919}, {'valeurPossible': 'MIS', 'nbResultats': 33}]}, {'filtre': 'experience', 'agregation': [{'valeurPossible': '0', 'nbResultats': 184}, {'valeurPossible': '1', 'nbResultats': 340}, {'valeurPossible': '2', 'nbResultats': 393}, {'valeurPossible': '3', 'nbResultats': 123}]}, {'filtre': 'qualification', 'agregation': [{'valeurPossible': '0', 'nbResultats': 65}, {'valeurPossible': '9', 'nbResultats': 214}, {'valeurPossible': 'X', 'nbResultats': 761}]}, {'filtre': 'natureContrat', 'agregation': [{'valeurPossible': 'E1', 'nbResultats': 992}, {'valeurPossible': 'E2', 'nbResultats': 37}, {'valeurPossible': 'FS', 'nbResultats': 10}, {'valeurPossible': 'FV', 'nbResultats': 1}]}]
<class 'list'>


You can notice the reponse is a dictionary that has three keys:
- resultats(list): This list contains all job offers that in the range  
- filtresPossibles(dict): Stats pre-calculated, for instance in a list of resultats it has 88 CDD, and 920 CDI. 
- Content-Range(dict): Note the api will send back 1150 row at max (if it exists that much). And the 1150 row is organized by a pagination of 150 rows. And in one reponse it only has the row specified in the range. So we need to use **Range** paramètre such as 0-149, 150-299 etc. to get all the results 

As a result, we need a function to loop over all possible range to get all possible rows

In [53]:
def get_search_result_by_range(client, params:dict, range_min:int, range_max:int):
    # add range to filter
    range_str=f"{str(range_min)}-{str(range_max)}"
    params["range"]=range_str
    response=client.search(params=params)
    return response["resultats"]

def get_all_search_result(client, params:dict):
    response=client.search(params=params)
    total_response_num=int(response["Content-Range"]["max_results"])
    total=[]
    index=0
    while total_response_num>150:
        total_response_num=total_response_num-150
        tmp=get_search_result_by_range(client, params, index, index+149)
        index=index+150
        for item in tmp:
            total.append(item)
    return total

In [54]:
total=get_all_search_result(client,params)

Making request with params {'motsCles': 'data', 'minCreationDate': '2020-12-01T12:30:00Z', 'maxCreationDate': '2021-11-30T14:30:10Z'}
Making request with params {'motsCles': 'data', 'minCreationDate': '2020-12-01T12:30:00Z', 'maxCreationDate': '2021-11-30T14:30:10Z', 'range': '0-149'}
Making request with params {'motsCles': 'data', 'minCreationDate': '2020-12-01T12:30:00Z', 'maxCreationDate': '2021-11-30T14:30:10Z', 'range': '150-299'}
Making request with params {'motsCles': 'data', 'minCreationDate': '2020-12-01T12:30:00Z', 'maxCreationDate': '2021-11-30T14:30:10Z', 'range': '300-449'}
Making request with params {'motsCles': 'data', 'minCreationDate': '2020-12-01T12:30:00Z', 'maxCreationDate': '2021-11-30T14:30:10Z', 'range': '450-599'}
Making request with params {'motsCles': 'data', 'minCreationDate': '2020-12-01T12:30:00Z', 'maxCreationDate': '2021-11-30T14:30:10Z', 'range': '600-749'}
Making request with params {'motsCles': 'data', 'minCreationDate': '2020-12-01T12:30:00Z', 'maxCre

In [55]:
print(len(total))
pp.pprint(total[897])

898
{'accessibleTH': False,
 'alternance': True,
 'appellationlibelle': 'Data scientist',
 'dateActualisation': '2021-09-27T12:22:00.000Z',
 'dateCreation': '2021-09-27T12:22:00.000Z',
 'description': 'Mener les études préalables au développement des actions '
                'marketing (étude de marché, positionnement concurrence, '
                'bonnes pratiques sur le territoire et hors territoire). '
                "Assurer le suivi et l'analyse des offres ainsi que des "
                'résultats commerciaux tous marchés. Mettre en œuvre les '
                'modèles CRM analytiques et opérationnels liés aux actions à '
                "mener, sur l'ensemble des canaux de communication. Assurer le "
                "suivi et l'analyse des résultats commerciaux sur tous les "
                "marchés. Participer à l'élaboration du Plan de Développement "
                'Annuel. Participer à la mise en oeuvre des outils marketing '
                "(scores, segmentations, sui

# Step 4 Generate data frame

In [56]:
# Here we use json normalize to convert json file to a pandas dataframe
df = pd.json_normalize(total)

In [57]:
df.head()

Unnamed: 0,id,intitule,description,dateCreation,dateActualisation,romeCode,romeLibelle,appellationlibelle,typeContrat,typeContratLibelle,...,contact.coordonnees3,agence.courriel,experienceCommentaire,contact.urlPostulation,deplacementCode,deplacementLibelle,salaire.complement2,contact.commentaire,complementExercice,conditionExercice
0,124GJFH,Ingénieur Data Science (H/F),Crise sanitaire : L'employeur garantit une pro...,2021-11-30T13:40:44.000Z,2021-11-30T13:40:46.000Z,M1403,Études et prospectives socio-économiques,Data scientist,CDI,Contrat à durée indéterminée,...,,,,,,,,,,
1,5762522,Data Analyst H/F,"Créé en 2009, INELYS s'est organisé par pôles...",2021-11-30T12:42:59.000Z,2021-11-30T12:42:59.000Z,M1403,Études et prospectives socio-économiques,Data analyst,CDI,Contrat à durée indéterminée,...,,,,,,,,,,
2,5762422,Manager de Projet Data hf H/F,"Votre missionBadenoch + Clark, cabinet de cons...",2021-11-30T12:42:36.000Z,2021-11-30T12:42:36.000Z,M1802,Expertise et support en systèmes d'information,Data manager,CDD,Contrat à durée déterminée - 8 Mois,...,,,,,,,,,,
3,5762278,Data Analyst ESGRSE HF H/F,Description du poste Rattaché à la Direction G...,2021-11-30T12:42:08.000Z,2021-11-30T12:42:08.000Z,M1403,Études et prospectives socio-économiques,Data analyst,CDD,Contrat à durée déterminée - 8 Mois,...,,,,,,,,,,
4,5761328,Stage Data Analyst H/F,TRANSITIONS est une agence de conseil en dével...,2021-11-30T12:39:12.000Z,2021-11-30T12:39:12.000Z,M1403,Études et prospectives socio-économiques,Data analyst,CDD,Contrat à durée déterminée - 8 Mois,...,,,,,,,,,,


# Step 5 Write data frame to S3 as parquet

We have the data frame, now we want to save the data frame on s3. We want to save the data frame in format **parquet**. Because it has an integrated schema.

### 5.1 Configure s3 connection

Here we will set the s3 credential and the output path of the parquet file. As we will generate a parquet file each day. We would like to have the generation date inside the file name.


In [58]:
endpoint = os.environ['AWS_S3_ENDPOINT']
bucket = "pengfei"
current_date=datetime.date.today().strftime("%d-%m-%Y")
output_path = f"diffusion/demo_prod/job_offer_{current_date}"

### 5.2 Write df to s3 as parquet file

In [62]:
# This function write a pandas dataframe to s3 in parquet format
def write_df_to_s3(df, endpoint, bucket_name, path):
    # Convert pandas df to Arrow table
    table = pa.Table.from_pandas(df)
    url = f"https://{endpoint}"
    fs = s3fs.S3FileSystem(client_kwargs={'endpoint_url': url})
    file_uri = f"{bucket_name}/{path}"
    pq.write_to_dataset(table, root_path=file_uri, filesystem=fs)

In [63]:
write_df_to_s3(df,endpoint,bucket,output_path)

## 6. Test the output parquet file

In [64]:
# This function read a parquet file and return a arrow table
def read_parquet_from_s3(endpoint: str, bucket_name, path):
    url = f"https://{endpoint}"
    fs = s3fs.S3FileSystem(client_kwargs={'endpoint_url': url})
    file_uri = f"{bucket_name}/{path}"
    str_info = fs.info(file_uri)
    print(f"input file metadata: {str_info}")
    dataset = pq.ParquetDataset(file_uri, filesystem=fs)
    table = dataset.read()
    return table

In [65]:
arrow_table=read_parquet_from_s3(endpoint,bucket,output_path)

# Convert back to pandas
df_new = arrow_table.to_pandas()
df_new.head()

input file metadata: {'Key': 'pengfei/diffusion/demo_prod/job_offer_30-11-2021', 'name': 'pengfei/diffusion/demo_prod/job_offer_30-11-2021', 'type': 'directory', 'Size': 0, 'size': 0, 'StorageClass': 'DIRECTORY'}


Unnamed: 0,id,intitule,description,dateCreation,dateActualisation,romeCode,romeLibelle,appellationlibelle,typeContrat,typeContratLibelle,...,contact.coordonnees3,agence.courriel,experienceCommentaire,contact.urlPostulation,deplacementCode,deplacementLibelle,salaire.complement2,contact.commentaire,complementExercice,conditionExercice
0,124GJFH,Ingénieur Data Science (H/F),Crise sanitaire : L'employeur garantit une pro...,2021-11-30T13:40:44.000Z,2021-11-30T13:40:46.000Z,M1403,Études et prospectives socio-économiques,Data scientist,CDI,Contrat à durée indéterminée,...,,,,,,,,,,
1,5762522,Data Analyst H/F,"Créé en 2009, INELYS s'est organisé par pôles...",2021-11-30T12:42:59.000Z,2021-11-30T12:42:59.000Z,M1403,Études et prospectives socio-économiques,Data analyst,CDI,Contrat à durée indéterminée,...,,,,,,,,,,
2,5762422,Manager de Projet Data hf H/F,"Votre missionBadenoch + Clark, cabinet de cons...",2021-11-30T12:42:36.000Z,2021-11-30T12:42:36.000Z,M1802,Expertise et support en systèmes d'information,Data manager,CDD,Contrat à durée déterminée - 8 Mois,...,,,,,,,,,,
3,5762278,Data Analyst ESGRSE HF H/F,Description du poste Rattaché à la Direction G...,2021-11-30T12:42:08.000Z,2021-11-30T12:42:08.000Z,M1403,Études et prospectives socio-économiques,Data analyst,CDD,Contrat à durée déterminée - 8 Mois,...,,,,,,,,,,
4,5761328,Stage Data Analyst H/F,TRANSITIONS est une agence de conseil en dével...,2021-11-30T12:39:12.000Z,2021-11-30T12:39:12.000Z,M1403,Études et prospectives socio-économiques,Data analyst,CDD,Contrat à durée déterminée - 8 Mois,...,,,,,,,,,,
