In [None]:
import json
import pandas as pd 

with open('../input_data/data_presidentielle_2022_nspp.json', 'r', encoding="utf-8") as file:
    liste_sondages_nspp = json.load(file)

In [None]:
# we extract from the json the relevant info we want about the 1st round 
first_tour_scores = []

for index in range(len(liste_sondages_nspp)):
    sondage = liste_sondages_nspp[index]
    nom_institut = sondage["nom_institut"]
    debut_enquete = sondage["debut_enquete"]
    fin_enquete = sondage["fin_enquete"]
    echantillon = sondage["echantillon"]
    tours = sondage["tours"]
    for tour in tours:
        if tour['tour'] == 'Premier tour': #we restrict ourselves to the first round
            # print(tour)
            for hypothesis in tour['hypotheses']:
                    hypo_name = hypothesis['hypothese']
                    scores = []
                    for candidat in hypothesis['candidats']:
                        candidate_info = {
                            'candidat': candidat['candidat'],
                            'parti': candidat['parti'],
                            'intentions': candidat['intentions']
                        }
                        scores.append(candidate_info)
                    first_tour_scores.append({'nom_institut':nom_institut, 'debut_enquete':debut_enquete, 'fin_enquete':fin_enquete, 'echantillon':echantillon, 'hypothese': hypo_name, 'scores': scores})

first_tour_scores

In [None]:
# start from candiates (and then we will map them to their parties)
unique_candidates = set()
for tour in first_tour_scores:
    for candidate in tour['scores']:
        unique_candidates.add(candidate["candidat"])
unique_candidates

In [None]:
# we collect candidates' scores and infos about the polls 
rows_candidates = []
for tour in first_tour_scores:
    row = {'Company':tour['nom_institut'], 'collectPeriodFrom':tour['debut_enquete'], 'collectPeriodTo':tour['fin_enquete'], 'n':tour['echantillon'], 'Hypothese': tour['hypothese']}
    for candidate in unique_candidates:
        row[candidate] = None
    for candidate in tour['scores']:
        #print(candidate, candidate["candidat"])
        if candidate["candidat"] in unique_candidates:
            row[candidate["candidat"]] = candidate['intentions']
    rows_candidates.append(row)

rows_candidates

In [None]:
# we make sure that for a given poll, the sum of all intentions is close to 100%
df_candidates = pd.DataFrame(rows_candidates)
df_candidates['Total'] = df_candidates[list(unique_candidates)].sum(axis=1)

lower_bound, upper_bound = 97, 103

outliers = df_candidates[(df_candidates['Total'] <= lower_bound) | (df_candidates['Total'] >= upper_bound)]
print("we removed", len(outliers), "outliers polls")

df_candidates = df_candidates[(df_candidates['Total'] > lower_bound) & (df_candidates['Total'] < upper_bound)]

df_candidates

In [None]:
# now we map each person to its party, making sure each candidate is considered and assigned an unique party
map_people_party = {}

for tour in first_tour_scores:
    for candidate in tour['scores']:
        #print(candidate, candidate["candidat"])
        if len(candidate["parti"])==1:
            map_people_party[candidate["candidat"]] = candidate["parti"][0]
        else: 
            if candidate["parti"] == ['LRM', 'Agir', 'Modem']:
                map_people_party[candidate["candidat"]] = 'LRM'
            elif candidate["parti"] == ['Les Républicains', 'UDI']:
                map_people_party[candidate["candidat"]] = 'Les Républicains'
            # else:
            #     print(candidate, candidate["candidat"]) #make sure the candidate that prints here is linked to a given party

            
map_people_party['Arnaud Montebourg'] = 'Parti socialiste'
map_people_party['Christiane Taubira'] = 'Parti socialiste'
map_people_party['Jean-Luc Mélenchon'] = 'France insoumise'

map_people_party


In [None]:
# we sum the scores of several candidates that belong to the same party to create the party score 
df_candidates_party = df_candidates.iloc[:, :5]
df_candidates = df_candidates.fillna(0)


for name, party in map_people_party.items():
    if party not in df_candidates_party.columns:
        df_candidates_party[party] = 0
    df_candidates_party[party] += df_candidates[name]

df_candidates_party['Total'] = df_candidates_party.iloc[:, 5:].sum(axis=1)

assert len(df_candidates_party[(df_candidates_party['Total'] <= lower_bound) | (df_candidates_party['Total'] >= upper_bound)]) == 0
df_candidates_party

In [None]:
# we keep only the biggest parties 
desired_columns = [
    'Company',
    'collectPeriodFrom',
    'collectPeriodTo',
    'n',
    'Hypothese',
    'Rassemblement national', 
    'Reconquête', 
    'Debout la France',  #should we keep that one?
    'Les Républicains', 
    'LRM', 
    'EE-LV', 
    'Parti socialiste', 
    'France insoumise'
]

df_candidates_party_small = df_candidates_party[desired_columns].copy()
df_candidates_party_small['Total'] = df_candidates_party_small.loc[:, desired_columns[5:]].sum(axis=1)
df_candidates_party_small


In [None]:
df_candidates_party_small = df_candidates_party_small.rename(columns={
    'Rassemblement national': 'RN',
    'Reconquête': 'RCQ',
    'Debout la France': 'DLF',
    'Les Républicains': 'LR',
    'Parti socialiste': 'PS',
    'France insoumise': 'LFI',
    'EE-LV': 'EELV'
})

# since we don't have any publication date in the data, we consider it to be the end of the collect 
df_candidates_party_small['PublDate'] = df_candidates_party_small['collectPeriodTo']

df_candidates_party_small

In [None]:
df_candidates_party_small.to_csv('../output_data/df_nspp_prepared.csv', index=False, encoding='utf-8')