# Installation des libairies

In [1]:
#!pip install findspark
#!pip install pyspark
#!pip install streamlit
#!pip install nympy
#!pip install plotly

# Import des librairies

In [2]:
import findspark
findspark.init()
findspark.find()

'c:\\users\\antoine\\appdata\\local\\programs\\python\\python38-32\\lib\\site-packages\\pyspark'

In [38]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pyspark.sql.functions as spf
import pandas as pd
import numpy as np
#import streamlit as st
import plotly as plt

# Traitement ETL
## Création d'un SparkSession

In [4]:
spark = SparkSession \
    .builder \
    .appName("how to read csv file") \
    .getOrCreate()

## Fonction d'ouverture de fichier

In [5]:
def openDataFrame(data):
    df = spark.read.option("delimiter", ",") \
        .csv(f"data/raw-file/{data}.csv", header=True)
    df.printSchema()
    return df

## Liste des fichiers à traiter

In [6]:
fileName = ["dre_auvergne_rhone_alpes",
            "dre_bourgogne_franche_comte",
            "dre_bretagne",
            "dre_centre_val_de_loire",
            "dre_corse",
            "dre_france_metropolitaine",
            "dre_grand_est",
            "dre_hauts_de_france",
            "dre_ile_de_france",
            "dre_normandie",
            "dre_nouvelle_aquitaine",
            "dre_occitanie",
            "dre_pays_de_la_loire",
            "dre_provence_alpes_cote_d_azur",
           ]

### Enlève la limitation de l'affichage du dataframe

In [7]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

## Ouverture d'un fichier .csv et affichage de dataframe

In [8]:
df = openDataFrame(fileName[0])
df

root
 |-- PRODUCTION: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- Unité: string (nullable = true)
 |-- 2014: string (nullable = true)
 |-- 2015: string (nullable = true)
 |-- 2016: string (nullable = true)
 |-- 2017: string (nullable = true)
 |-- 2018: string (nullable = true)
 |-- 2019: string (nullable = true)



PRODUCTION,_c1,Unité,2014,2015,2016,2017,2018,2019
Production d'éner...,,,,,,,,
P1,Production de cha...,kt,0,0,0,0,0,0
P2,Production de pét...,kt,0,0,0,0,0,0
P3,Production de gaz...,GWh PCS,0,0,0,0,0,0
P4,Chaleur nucléaire...,GWh,266302,275368,227294,242398,243029,260090
P5,Énergies renouvel...,GWh,30532,28268,30385,25659,32400,29961
P7,Production totale...,ktep,0,0,0,0,0,0
P8,- production de...,ktep,0,0,0,0,0,0
P9,- production de...,ktep,0,0,0,0,0,0
P10,- production de...,ktep,0,0,0,0,0,0


## Transposition de l'affichage

In [9]:
df.show(3, vertical=True)

-RECORD 0--------------------------
 PRODUCTION | Production d'éner... 
 _c1        | null                 
 Unité      | null                 
 2014       | null                 
 2015       | null                 
 2016       | null                 
 2017       | null                 
 2018       | null                 
 2019       | null                 
-RECORD 1--------------------------
 PRODUCTION | P1                   
 _c1        | Production de cha... 
 Unité      | kt                   
 2014       | 0                    
 2015       | 0                    
 2016       | 0                    
 2017       | 0                    
 2018       | 0                    
 2019       | 0                    
-RECORD 2--------------------------
 PRODUCTION | P2                   
 _c1        | Production de pét... 
 Unité      | kt                   
 2014       | 0                    
 2015       | 0                    
 2016       | 0                    
 2017       | 0             

Affichage des colonnes

In [10]:
df.columns

['PRODUCTION', '_c1', 'Unité', '2014', '2015', '2016', '2017', '2018', '2019']

Affichage des données

In [11]:
df.printSchema()

root
 |-- PRODUCTION: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- Unité: string (nullable = true)
 |-- 2014: string (nullable = true)
 |-- 2015: string (nullable = true)
 |-- 2016: string (nullable = true)
 |-- 2017: string (nullable = true)
 |-- 2018: string (nullable = true)
 |-- 2019: string (nullable = true)



La deuxième colonne ne porte pas de nom ..
Nous allons y remèdier maintenant 😉

In [20]:
# def withColumnRenamed(existingName: String, newName: String): DataFrame
df = df.withColumnRenamed("_c1","Intitulé énergie")
df

PRODUCTION,Intitulé énergie,Unité,2014,2015,2016,2017,2018,2019
Production d'éner...,,,,,,,,
P1,Production de cha...,kt,0,0,0,0,0,0
P2,Production de pét...,kt,0,0,0,0,0,0
P3,Production de gaz...,GWh PCS,0,0,0,0,0,0
P4,Chaleur nucléaire...,GWh,266302,275368,227294,242398,243029,260090
P5,Énergies renouvel...,GWh,30532,28268,30385,25659,32400,29961
P7,Production totale...,ktep,0,0,0,0,0,0
P8,- production de...,ktep,0,0,0,0,0,0
P9,- production de...,ktep,0,0,0,0,0,0
P10,- production de...,ktep,0,0,0,0,0,0


Sommaire des données

In [21]:
df.select('2014', '2015').describe().show()

+-------+-----------------+-----------------+
|summary|             2014|             2015|
+-------+-----------------+-----------------+
|  count|              123|              123|
|   mean|7200.949367088608|7521.220779220779|
| stddev|32936.38346909151|34321.52734326226|
|    min|                0|                0|
|    max|  Secret primaire|  Secret primaire|
+-------+-----------------+-----------------+



In [22]:
# Permière version des liste

list_production = [
    'P1','P2','P3','P4','P5','P7','P8','P9','P10','P11','P12','P13','P14',
    'E1','E2','E4','E6','E7','E8','E9','E10','E11','E12','E13','E14','E15','E16','E17','E18','E19','E20','E21','E22','E23','E24','E25','E26','E27','E28'
]

list_consomation = [
    'C1','CI1','CI3','CI3b','CI4','CI9','CI11','CI5','CI6','CI7','CI8',
    'CTR1','CTR2','CTR3','CTR4','CTR5','CTR6','CTR7','CTR8','CTR9','CTR10','CTR11',
    'CR1','CR7','CR2','CR3','CR4','CR5','CR6','CR8',
    'CTE1','CTE7','CTE2','CTE3','CTE4','CTE5','CTE6','CTE8',
    'CA1','CA7','CA2','CA21','CA3','CA4','CA5','CA6','CA8',
    'C2','C3','C4','C13','C14'
]

# Penser à passer par une tranposition puis pour un passae d'indexe

In [51]:
df_production = df.filter(df.PRODUCTION.isin(list_production))
df_production.show()

## Liste de test non concluant
#subset_df = df.filter(df.PRODUCTION.isin(list_production))
#df_production = spark.createDataFrame(subset_df, df.columns)
#df_production = df.withColumn(df.columns[0], spf.when(df.PRODUCTION.isin(list_production)))
#df_production.show

+----------+--------------------+-------+------+------+------+------+------+------+
|PRODUCTION|    Intitulé énergie|  Unité|  2014|  2015|  2016|  2017|  2018|  2019|
+----------+--------------------+-------+------+------+------+------+------+------+
|        P1|Production de cha...|     kt|     0|     0|     0|     0|     0|     0|
|        P2|Production de pét...|     kt|     0|     0|     0|     0|     0|     0|
|        P3|Production de gaz...|GWh PCS|     0|     0|     0|     0|     0|     0|
|        P4|Chaleur nucléaire...|    GWh|266302|275368|227294|242398|243029|260090|
|        P5|Énergies renouvel...|    GWh| 30532| 28268| 30385| 25659| 32400| 29961|
|        P7|Production totale...|   ktep|     0|     0|     0|     0|     0|     0|
|        P8|  - production de...|   ktep|     0|     0|     0|     0|     0|     0|
|        P9|  - production de...|   ktep|     0|     0|     0|     0|     0|     0|
|       P10|  - production de...|   ktep|     0|     0|     0|     0|     0|

In [49]:
df_consomation = df.filter(df.PRODUCTION.isin(list_consomation))
df_consomation.show()

+----------+--------------------+-------+------+------+------+------+------+------+
|PRODUCTION|    Intitulé énergie|  Unité|  2014|  2015|  2016|  2017|  2018|  2019|
+----------+--------------------+-------+------+------+------+------+------+------+
|        C1|Consommation fina...|   ktep| 17948| 18567| 18559| 18746| 18327| 18369|
|       CI1|           Industrie|   ktep|  3802|  3860|  3971|  3930|  3826|  3909|
|       CI3|             Charbon|   ktep|    60|    61|    63|   112|   114|   112|
|      CI3b|dont hauts fourneaux|   ktep|     0|     0|     0|     0|     0|     0|
|       CI4| Produits pétroliers|   ktep|   387|   356|   366|   362|   347|   350|
|       CI9|Energies renouvel...|   ktep|   156|   154|   164|   148|   160|   169|
|      CI11|Chaleur commercia...|   ktep|   215|   236|   231|   222|   181|   258|
|       CI5|Gaz naturel (unit...|GWh PCS|16 969|17 484|18 511|17 338|16 490|16 819|
|       CI6|         Gaz naturel|   ktep| 1 313| 1 353| 1 432| 1 342| 1 276|

## Sauvegardes 

###  /!\ En Chantier

In [52]:
def saveDataFrame(dataframe, file_name):
    dataframe.write.csv(f"data/refine-file/{file_name}.csv", header=True)
    return "file_name as been succesfully create"

In [53]:
saveDataFrame(df_production, f"production_{fileName}")

Py4JJavaError: An error occurred while calling o993.csv.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:979)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 25.0 failed 1 times, most recent failure: Lost task 0.0 in stage 25.0 (TID 25) (DESKTOP-8KB7N7C.mshome.net executor driver): java.io.IOException: Mkdirs failed to create file:/C:/Users/Antoine/Documents/spark_nrg-master/data/refine-file/production_['dre_auvergne_rhone_alpes', 'dre_bourgogne_franche_comte', 'dre_bretagne', 'dre_centre_val_de_loire', 'dre_corse', 'dre_france_metropolitaine', 'dre_grand_est', 'dre_hauts_de_france', 'dre_ile_de_france', 'dre_normandie', 'dre_nouvelle_aquitaine', 'dre_occitanie', 'dre_pays_de_la_loire', 'dre_provence_alpes_cote_d_azur'].csv/_temporary/0/_temporary/attempt_202202040111283260659664240352794_0025_m_000000_25 (exists=false, cwd=file:/C:/Users/Antoine/Documents/spark_nrg-master)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:458)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:443)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1118)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:987)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
	at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CsvOutputWriter.scala:38)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:84)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:126)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:111)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:200)
	... 33 more
Caused by: java.io.IOException: Mkdirs failed to create file:/C:/Users/Antoine/Documents/spark_nrg-master/data/refine-file/production_['dre_auvergne_rhone_alpes', 'dre_bourgogne_franche_comte', 'dre_bretagne', 'dre_centre_val_de_loire', 'dre_corse', 'dre_france_metropolitaine', 'dre_grand_est', 'dre_hauts_de_france', 'dre_ile_de_france', 'dre_normandie', 'dre_nouvelle_aquitaine', 'dre_occitanie', 'dre_pays_de_la_loire', 'dre_provence_alpes_cote_d_azur'].csv/_temporary/0/_temporary/attempt_202202040111283260659664240352794_0025_m_000000_25 (exists=false, cwd=file:/C:/Users/Antoine/Documents/spark_nrg-master)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:458)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:443)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1118)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:987)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
	at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CsvOutputWriter.scala:38)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:84)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:126)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:111)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more


In [None]:
saveDataFrame(df_consomation, f"consomation_{fileName}")