# Proposition d'algorithme pour le choix des PFR

** Ce notebook est prévu pour être lancé sur le master d'un cluster spark, par exemple avec la commande : **   

** ou en local, par exemple avec par exemple la commande (pour python3) : **   

## Principe 

Un principe itératif:
1. on regarde pour chacune son ou ses projets préférés, puis on détermine tous les groupes valides que l'on peut faire à partir de là.
2. si aucune solution n'est retournée, on baisse d'un point les notes maximales (on passe toutes les notes 50 à 49 par exemple) et on recommence au point 1.
3. si au moins une solution est retournée, c'est fini. Il reste à choisir la solution en bonne intelligence...

De cette manière on cherche à satisfaire chacun au mieux, si ce n'est pas possible on *baisse un peu la barre* pour tout le monde et on recommence. Je trouve que c'est mieux que de chercher à maximiser une fonction globale qui pourrait satisfaire beaucoup la majorité et laisser quelques très déçus.

Par ailleurs, cet algorithme propose l'ensemble des (meilleures) possibilités, il n'y a pas d'aléatoire. L'implémentation est (assez) simple, mais un peu longue à tourner.

## Updates

#### Nouvelle implémentation

Principe de la nouvelle implémentation :  
- on part d'une matrice M de booleans (Mij == True <=> le projet_j fait partie des projets préférés de la personne_i) ;
- on peut tester si la matrice représente une configuration acceptable (i.e. si chaque personne n'a qu'un seul projet affecté et que les groupes sont tous de 4 personnes sauf un groupe de 3 personnes)
- si la matrice n'est pas acceptable, alors on tente de la rendre acceptable en la transformant (on transforme un ou plusieurs True en False)
- on continue en testant toutes les possibilités jusqu'à obtenir soit une matrice acceptable, soit une matrice non acceptable (e.g. avec une personne affectée à 0 projets).

L'ensemble des possibilités étant très large, on tente de faire des transformation intelligentes et de détecter au plus tôt les configurations qui ne peuvent aboutir à une matrice acceptable, quelque soit les futures transformations appliquées.

#### Changement du code.

J'ai mis en 'dur' la taille des groupes (un groupe de 3 et sept groupes de 4). Ce qui accélère les opérations.

#### Suite au mail de Stephan...

J'ai ajouté la possibilité d'avoir trois groupes de 5 sur les projets BNP, SACEM, IPSEN et SFR.

#### Corrections

J'ai enlevé les appels récursifs de la fonction algo qui pouvaient causer des stack overflows.

## Algorithme

In [7]:
import pandas as pd
import numpy as np
from copy import deepcopy

class DataError(Exception):
    pass

Fonctions utiles.

In [8]:
def make_M(df, cut):
    '''Transform the initial dataframe into a matrix of bolean M such as:
    Mij = True iff project_j is one of the person_i's preferred project
    Parameters:
    -----------
    df : the initial dataframe
    cut : the value at which a project is considered to be a 'preferred' project
    Returns:
    --------
    A n x p boolean Numpy array.
    '''
    val = df.values.copy()
    val[val > cut] = cut
    max_lines = val.max(axis=1)
    M = (val.T == max_lines).T
    return M

In [9]:
def clean_and_check_M(M, projects_5):
    '''Remove (in-place) some impossible choices (the more and the faster we can find them the better).
    Then test for (in)validity
    Current cleaning ideas:
    (i) removing (setting the column to False) projects with strictly less than 3 people on it
    (ii) removing people from projects who are full (i.e. already 4 or 5 (for authorized projs) 
         people sure on it) ;
         may need several passes
    Current checking for invalidity ideas:
    (i) there exists i such that sum_i Mij == 0
    (ii) there is a project with more than 4 people (sure), on it
    (iii) there is more than one project with exactly 3 people on it (check on totally defined projects)
    Current checking for validity ideas:
    (i) sum_j Mij == 1 for all i
    (ii) [sum_i Mij for i in 1..n] contains only 0s, 4s, 5s and at most one 3.
    Parameters:
    -----------
    M : numpy boolean array to be cleaned
    projects_5 : a p Numpy boolean array with True where the project can hold a team of 5
    Returns:
    --------
    -1 if invalid
    +1 if valid
     0 if we do not know and must continue our search
    '''
    # Cleaning.. 
    sum_cols = M.sum(axis=0)
    sum_lines = M.sum(axis=1)
    # .. (i)
    M[:, sum_cols < 3] = False
    # .. (ii)
    flag = True
    while flag:
        pers_sure = sum_lines == 1
        min_pers_on_projects = M[pers_sure].sum(axis=0)
        full_projs = np.logical_and(min_pers_on_projects >= 4,
                                    np.logical_or(np.invert(projects_5),
                                                  min_pers_on_projects >= 5))
        sub_M = M[np.ix_(np.invert(pers_sure), full_projs)]
        flag = np.any(sub_M) # if there is something to change
        #print("flag is {}...".format(flag))
        if flag:
            M[np.ix_(np.invert(pers_sure), full_projs)] = False
            
    # Checking... 
    sum_lines = M.sum(axis=1)
    sum_cols = M.sum(axis=0)
    pers_sure = sum_lines == 1
    min_pers_on_projects = M[pers_sure].sum(axis=0)
    # projects already defined = projects which are the only choice of the people on it
    # i.e. projects that will not be reduced since it would make a person project-less
    already_defined_projects = min_pers_on_projects == sum_cols
    # .. if invalid
    # (i) everyone got at least one project
    # (ii) no project exceed maximum limit
    # (iii) no more than one project with exactly three people on it
    if np.any(sum_lines == 0) or \
    np.any(np.logical_and(min_pers_on_projects > 4,
           np.logical_or(np.invert(projects_5),
                         min_pers_on_projects > 5))) or \
    sum(sum_cols[already_defined_projects] == 3) > 1:
        return -1
    
    # .. if valid
    if np.all(pers_sure) and \
    (not [1 for s in sum_cols if s not in [0, 4, 5, 3]]) and \
    (np.sum(sum_cols == 3) <= 1):
        return 1
    
    # If we do not know...
    return 0

In [10]:
def next_M(M):
    '''returns a list of copies of the matrix M with each correspond to a step towards
    a solution
    Best ideas so far : 
    (i) we pick the person with the fewest number of preferred projects (k > 1) and
        create k different possible matrices
    '''
    sum_lines = M.sum(axis=1)
    sum_lines[sum_lines == 1] = 32768 # arbitrary large value, must be larger than the number of projects
    pers = np.argmin(sum_lines)
    for j, proj in enumerate(M[pers]):
        if proj:
            new_M = M.copy()
            new_M[pers] = 0
            new_M[pers, j] = True
            yield new_M
    #Can we assure that the cleaned version will yield different matrices ??? 
    # -> In that case, yes since the cleaned matrices will either be different or invalid
    

In [11]:
def clean_check_generate(M, p5_broad):
    '''Function to be mapped over the RDD'''
    check = clean_and_check_M(M, p5_broad.value)
    # if the matrix is valid
    if check == 1:
        return [(True, M)]
    # if the matrix may be valid but need further investigation
    elif check == 0:
        return [(False, m) for m in next_M(M)]
    # if the matrix is invalid
    else:
        return []

Fonction principale de l'algorithme, qui explore l'arbre des possibilités.

In [12]:
def make_groups(M, projects_5):
    '''Returns a list of acceptable projects
    
    Parameters:
    -----------
    M : an nxp Numpy boolean array representing preferred projects per persons
    projects_5 : a p Numpy boolean array with True where the project can hold a team of 5
    Returns:
    --------
    a list of valid numpy arrays 
    '''
    sols = []
    # We make an RDD
    Ms = sc.parallelize([M])
    p5_broad = sc.broadcast(projects_5)
    # function of 1 parameter to be passed in a flatmap later
    f = lambda m: clean_check_generate(m, p5_broad)
    # keeps track of length of the RDD (without having to call an expensive 'count')
    count = 1
    while count > 0:
        print(" New iteration, ToCheck={}, Found={}.".format(count, len(sols)))
        count = 0
        #print('We check \n {}'.format(m))
        NewMs = Ms.flatMap(f)
        for i, ms in NewMs.groupByKey().collect():
            if not i:
                Ms = sc.parallelize(ms)
                count += len(ms)
            else:
                sols = sols + ms.data
    return sols

Fonction utile qui cappe les valeurs à une certaine valeur :

In [13]:
def haircut(dat, n):
    ''' Caps all numbers at n'''
    dat_cap = dat.applymap(lambda x: min(n, x))
    return dat_cap

L'algorithme complet :

In [23]:
def algo(dat, project_5, cap=50):
    '''Main algorithm (recursive)'''
    # If the data is correct, this should not happen
    # put it there to prevent infinite loop
    sol = []
    while not sol and cap > 0:
        print("Run for cap={}...".format(cap))
        m = make_M(dat, cap)
        sol = make_groups(m, projects_5)
        cap -= 1
    if not sol:
        raise DataError('Bad data, no solution is possible')
    return sol, cap   

---

### Avec nos données :

Import et nettoyage des données.

In [24]:
import requests
try:
    from StringIO import StringIO
except:
    from io import StringIO
r = requests.get('https://docs.google.com/spreadsheets/d/1hUWvO8wyEJL-_SkhgpdrwdYiDL7XQdrbTkcp21py8Lw/export?format=csv&id=1hUWvO8wyEJL-_SkhgpdrwdYiDL7XQdrbTkcp21py8Lw&gid=0')

In [25]:
dat_bgd = pd.read_csv(StringIO(r.text), skiprows=1, index_col='Nom')
dat_bgd = dat_bgd.loc[:, 'Clustaar':'Plume Labs']
dat_bgd = dat_bgd.fillna(0)
dat_bgd = dat_bgd.loc[[i for i in dat_bgd.index if isinstance(i, str)], :]

Projets où un groupe de 5 personnes est authorisé :

In [26]:
projects_5 = pd.Series(data=np.array([False] * 15), index=dat_bgd.columns)
projects_5['BNP'] = True
projects_5['SACEM'] = True
projects_5['IPSEN'] = True
projects_5['SFR 1'] = True
projects_5['SFR 2'] = True

Test s'il y a bien 31 personnes:

In [27]:
n, p = dat_bgd.shape
print('{} personnes ont répondu.'.format(n))
if n < 31:
    print('Il faut ajouter {} personne(s).'.format(31-n))

30 personnes ont répondu.
Il faut ajouter 1 personne(s).


Il manque Cynthia (il faut qu'il y ait 31 personnes sinon l'algo ne trouvera rien !):

In [28]:
n, p = dat_bgd.shape
if n < 31:
    cv = pd.Series(name='Cynthia VARIEUX', index=dat_bgd.columns, data=[100./15] * 15)
    dat_bgd = dat_bgd.append(cv)

Afin que la somme des notes fasse 100 et qu'aucun projet ne se voit affecter plus de 50 points, on répartit :
* les points manquants entre tous les projets ;
* les points au dessus de 50 entre les autres projet.

Plusieurs passes peuvent être nécessaires.

In [29]:
def is_bad(df):
    bad_sums = np.abs(dat_bgd.sum(axis=1) - 100) > 0.001 
    bad_cells = dat_bgd > 50
    return np.any(bad_sums) or np.any(bad_cells)

while is_bad(dat_bgd):
    # cut à 50:
    bad_cells = dat_bgd > 50
    dat_bgd[bad_cells] = 50
    # repartition des points manquants (hors des cells déjà à 50)
    pts_a_repartir = 100 - dat_bgd.sum(axis=1)
    non_max_cells = dat_bgd < 50
    denominator = non_max_cells.sum(axis=1)
    to_add = (non_max_cells.T * np.array(pts_a_repartir / denominator)).T
    dat_bgd = dat_bgd + to_add

Exécution de l'algorithme (attention, ça peut être long).

In [30]:
import datetime

In [31]:
t0 = datetime.datetime.now()
sols, cut = algo(dat_bgd, projects_5)
t1 = datetime.datetime.now()
print('enlapsed time = {}'.format(t1-t0))

Run for cap=50...
 New iteration, ToCheck=1, Found=0.
Run for cap=49...
 New iteration, ToCheck=1, Found=0.
Run for cap=48...
 New iteration, ToCheck=1, Found=0.
Run for cap=47...
 New iteration, ToCheck=1, Found=0.
Run for cap=46...
 New iteration, ToCheck=1, Found=0.
Run for cap=45...
 New iteration, ToCheck=1, Found=0.
Run for cap=44...
 New iteration, ToCheck=1, Found=0.
Run for cap=43...
 New iteration, ToCheck=1, Found=0.
Run for cap=42...
 New iteration, ToCheck=1, Found=0.
Run for cap=41...
 New iteration, ToCheck=1, Found=0.
Run for cap=40...
 New iteration, ToCheck=1, Found=0.
Run for cap=39...
 New iteration, ToCheck=1, Found=0.
Run for cap=38...
 New iteration, ToCheck=1, Found=0.
Run for cap=37...
 New iteration, ToCheck=1, Found=0.
Run for cap=36...
 New iteration, ToCheck=1, Found=0.
Run for cap=35...
 New iteration, ToCheck=1, Found=0.
Run for cap=34...
 New iteration, ToCheck=1, Found=0.
Run for cap=33...
 New iteration, ToCheck=1, Found=0.
Run for cap=32...
 New itera

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 351.0 failed 1 times, most recent failure: Lost task 0.0 in stage 351.0 (TID 1404, localhost): java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3236)
	at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
	at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
	at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
	at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
	at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1835)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1848)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:905)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:904)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor45.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:207)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3236)
	at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
	at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
	at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
	at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
	at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [20]:
def convert_M_df(M, df):
    sol_df = pd.DataFrame(data=M, columns=df.columns, index=df.index)
    sol_serie = sol_df.apply(lambda s: s.argmax(), axis=1)
    return sol_serie

def convert_sols_df(sols, df):
    sol = pd.DataFrame(data=[convert_M_df(M, df) for M in sols])
    return sol

In [21]:
final_tab = convert_sols_df(sols, dat_bgd)
#final_tab

In [22]:
final_tab.to_csv('sols.csv')

In [23]:
def from_row(row):
    d = {proj: [] for proj in dat_bgd.columns}
    for name, proj in zip(row.index, row):
        d[proj].append(name)
    return d

def extract_teams(final_tab):
    for row in final_tab.index:
        print("\nSolution {}:".format(row))
        d = from_row(final_tab.loc[row])
        for proj in dat_bgd.columns:
            print("Projet {:15s}: {}".format(proj, ', '.join(d[proj])))

In [24]:
#extract_teams(final_tab)

Write all the above information into a csv file (to be imported in a spreadsheet)

In [25]:
with open('output.csv', 'w') as f:
    f.write('Output de l\'algorithme (base de discussion).\n'
            'Date de génération {}\n.'.format(t0))
    f.write('Le cut a été effectué à {} points.\n'.format(cut))
    f.write('Source : https://github.com/rachidalili/MS-BGD2015/blob/master/guillaume-mohr/algoPFR/AlgoPFR-New-SPARK.ipynb\n'
            'Note 1 : les personnes n\'ayant pas rempli le tableau sont ajoutées '
            'et sont supposées affecter un poid égal à chaque projet.\n'
            'Note 2 : les personnes n\'ayant pas affecté 100 points sont supposées '
            'affecter leur restant de points de manière équitable entre tous les '
            'projets (en respectant la limite de 50 points max par projet).\n'
            '"Note 3 : un seul groupe de 3 personnes permis, groupes de 5 personnes permis sur les'
            ' projets IPSEN / SACEM / SFR 1 / SFR 2 / BNP"\n\n'
            'TABLEAU RECAPITULATIF DES SOLUTIONS\n')
    f.write(final_tab.to_csv(None))
    f.write('\n\nDETAILS DES SOLUTIONS')
    for row in final_tab.index:
        f.write('\nSOLUTION N°{}\n'.format(row))
        f.write('Projet,Equipe\n')
        d = from_row(final_tab.loc[row])
        for proj in dat_bgd.columns:
            f.write('{},{}\n'.format(proj, ','.join(d[proj])))          