In [None]:
# LAMBDA PYTHON IMPORT
import os
import sys
sys.path.insert(0, os.path.abspath('./code/'))

# PYSPARK IMPORT
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.getOrCreate()

# CUSTOM IMPORT
import entryPoint

In [None]:
# IMPORT DATASET
path = "./data/subventions-accordees-et-refusees.csv"
df = spark.read.format('csv').options(header=True, inferSchema=True, sep =';').load(path)

newColumns = ['Num', 'Annee', 'Collectivite', 'Benificiaire', 'Siret', 'Objet', 'Montant', 'Direction', 'Nature', 'Secteur']
df = df.toDF(*newColumns)

In [None]:
# Récupération des secteurs

secteurs = entryPoint.getAllSecteur(df)

for secteur in secteurs:
    print(secteur)

In [None]:
# Récupérer uniquement un `Secteur` spécifique

entryPoint.selectSecteur(df, 'Vie et animation locale').take(10)

In [None]:
# Permet d'obtenir le montant total des 5 directions qui subventionnent le plus les associations.

entryPoint.getAmountSubventionByDirection(df)

In [None]:
# Permet d'obtenir le nombre total de subvention accordées par les directions.

entryPoint.getCountAcceptedSubventionByDirection(df)

In [None]:
def myFlatMap(words, amount):
    '''
        Transforme en lowercase, supprime les doublons, retourne (mot, montant)
    '''
    words = map(lambda word: word.lower(), words)
    words = set(words)
    words = map(lambda word: (word, amount), words)
    
    return words

def myReducer(accumulator, currentValue):
    '''
        Convertie et additionne les deux valeurs
    '''
    try:
        accumulator = int(accumulator)
    except:
        accumulator = 0
    try:
        currentValue = int(currentValue)
    except:
        currentValue = 0
    
    return accumulator + currentValue
        

def getMontantByWord(df):
    '''
        Récupérer le `Montant` en fonction des mots clée présent dans `Objet`
    '''
    bannedWords = ('de', 'et', 'la', '-', 'du', 'des', 'pour', 'en', 'dans', 'le', 'à', 'les', 'au', 'aux', 'sur', \
                'd\'un', 'd\'une', ':', 'par', 'avec', '')

    dfObjectWordMontant = df.select("Objet", "Montant")
    return dfObjectWordMontant.rdd.map(lambda row: [str(row[0]).split(' '), row[1]]) \
            .flatMap(lambda row: myFlatMap(row[0], row[1])) \
            .reduceByKey(lambda accumulator, currentValue : myReducer(accumulator, currentValue)) \
            .toDF() \
            .where(f"_1 NOT IN {bannedWords}") \
            .sort(desc('_2'))

In [None]:
# Récupérer le `Montant` en fonction des mots clée présent dans `Objet`

getMontantByWord(df).show(20)

In [None]:
# Montant des subventions accordées par bénéficiaires

entryPoint.subvention_by_actor(df)

In [None]:
# Demandes d'argent les plus importantes, tous bénéficiaires confondus, toutes années confondues

entryPoint.subvention_by_actor_ordered(df)

In [None]:
# Nombre de demandes par année, par bénéficiaire

entryPoint.requests_by_actors_by_year(df)

In [None]:
# Nombre total de bénéficiaires

entryPoint.total_of_actors(df)

In [None]:
# Bénéficiaires ayant fait le plus de demandes, par année

entryPoint.max_requests_by_actors_by_year(df)

In [None]:
# Bénéficiaire ayant touché le plus d'argent, par année

entryPoint.max_amount_by_actors_by_year(df)