In [None]:
# !pip install pygsheets

In [1]:
import pyspark.sql
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pygsheets
import pandas as pd

In [2]:
def google_sheets(file, data):
    """
    Fonction pour l'export vers googlesheets
    """
    #authentification
    gc = pygsheets.authorize(service_file= '/home/jovyan/auth/biofitec-445d3a2a47d2_sheets.json')
    
    # on ouvre de fichier excel
    sh = gc.open(file)
    
    # on selectionne la première feuille
    wks = sh[0]
    
    # on ecrit 
    wks.set_dataframe(data.toPandas(), (1,1))
    

### I - Connexion

In [3]:
# connexion du driver
spark = SparkSession.\
        builder.\
        appName("pyspark-notebook2").\
        master("spark://172.17.0.1:8077").\
        config("spark.executor.memory", "1g").\
        config("spark.mongodb.input.uri","mongodb://172.17.0.1:28117,172.17.0.1:28118,172.17.0.1:28119/BIO.departements?replicaSet=rs0").\
        config("spark.mongodb.output.uri","mongodb://172.17.0.1:28117,172.17.0.1:28118,172.17.0.1:28119/BIO.departements?replicaSet=rs0").\
        config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.0").\
        getOrCreate()

### II - Récupération des données

In [4]:
df = spark.read.format("mongo").load()

### III - Magasins Bio

In [5]:
df.createOrReplaceTempView("mags")

In [6]:
magsDF = spark.sql("select departement, size(communes) as communes, magasins.* from mags")

In [7]:
magsDF = magsDF.fillna(0)

In [8]:
magsDF.show()

+--------------------+--------+----------+-------+--------+----------+-------------+-------------------+---------+-------+
|         departement|communes|BIO C' BON|BIOCOOP|BIOMONDE|L'EAU VIVE|LA VIE CLAIRE|LE GRAND PANIER BIO|NATURALIA|NATUREO|
+--------------------+--------+----------+-------+--------+----------+-------------+-------------------+---------+-------+
|                 AIN|     410|         0|      3|       0|         0|            4|                  1|        0|      0|
|               AISNE|     805|         0|      2|       1|         0|            3|                  0|        0|      0|
|              ALLIER|     318|         0|      2|       1|         0|            5|                  0|        0|      0|
|ALPES-DE-HAUTE-PR...|     199|         0|      5|       0|         0|            3|                  0|        1|      0|
|        HAUTES-ALPES|     168|         0|      8|       1|         1|            1|                  0|        0|      0|
|     ALPES-MARI

In [9]:
#google_sheets(file= "mags_bio_departs", data = magsDF)

### IV - Surfaces

In [10]:
df.createOrReplaceTempView("surface")

In [11]:
annee = ["surface_2008", "surface_2009", "surface_2010", "surface_2011", "surface_2012", "surface_2013", 
         "surface_2014", "surface_2015", "surface_2016", "surface_2017", "surface_2018", "surface_2019"]

type_surface = ["Surface_BIO", "Surface_C1", "Surface_C2", "Surface_C3"]

def queries(annees, surfaces):
    list_df = []
    for year in annees:
        list_df.append(spark.sql("select departement, {}.Annee, {}.{}.* from surface".format(year, year, surfaces)))
        
    return list_df    

def append_dfs(df1,df2): 
    list1 = df1.columns 
    list2 = df2.columns 
    for col in list2: 
        if(col not in list1): 
            df1 = df1.withColumn(col, lit(None))
            
    for col in list1: 
        if(col not in list2): 
            df2 = df2.withColumn(col, lit(None)) 
            
    return df1.unionByName(df2)

def union_DF(annees, surfaces):
    list_DF = queries(annees, surfaces)
    df = list_DF[0]
    for row in list_DF[1:]:
        df = append_dfs(df, row)
    return df

#### 1 - Surfaces BIO

In [12]:
surfaceBIO = union_DF(annee, "Surface_BIO")

surfaceBIO = surfaceBIO.where(col("Annee").isNotNull())
surfaceBIO = surfaceBIO.fillna(0)

In [14]:
#google_sheets(file= "surface_bio_departs", data = surfaceBIO)

#### 2 - Surfaces C1

In [15]:
surfaceC1 = union_DF(annee, "Surface_C1")

surfaceC1 = surfaceC1.where(col("Annee").isNotNull())
surfaceC1 = surfaceC1.fillna(0)

In [16]:
#google_sheets(file= "surface_C1_departs", data = surfaceC1)

#### 3 - Surfaces C2

In [17]:
surfaceC2 = union_DF(annee, "Surface_C2")

surfaceC2 = surfaceC2.where(col("Annee").isNotNull())
surfaceC2 = surfaceC2.fillna(0)

In [18]:
#google_sheets(file= "surface_C2_departs", data = surfaceC2)

#### 4 - Surfaces C3

In [19]:
surfaceC3 = union_DF(annee, "Surface_C3")

surfaceC3 = surfaceC3.where(col("Annee").isNotNull())
surfaceC3 = surfaceC3.fillna(0)

In [20]:
#google_sheets(file= "surface_C3_departs", data = surfaceC3)