## Vamos trabalhar com os dados abertos do ONS - Operador Nacional do Sistema Elétrico.

### Optei por trabalhar com os dados de Carga de Energia. Definição conforme site: [Dados de carga por subsistema numa data de referência em base diária.](https://dados.ons.org.br/dataset/carga-energia)

### As variáveis iniciais são:
- "id_subsistema
- nom_subsistema
- din_instante
- val_cargaeenergia (em MWmed)

### Nessa primeira etapa, iremos unir os arquivos individuais em um único Dataframe.

### Load das bibliotecas e definição do contexto spark.

In [1]:
import os
import pyspark

from pyspark import SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *

In [2]:
sc = SparkContext.getOrCreate()

spark = (SparkSession
      .builder
      .getOrCreate()
     )

print('ApplicationID:', sc.applicationId)

ApplicationID: local-1630363732963


### Caminhos de Leitura e Escrita das Bases de dados

In [3]:
read_path = '../data/raw/'
read_path_list = [read_path + x for x in os.listdir(read_path)]
read_path_list.sort()

write_path = '../data/refined/01_bind'

### Criando lista de Dataframes, verificação se todas possuem a mesma "header" e quantidade de registros

In [4]:
dfs = [spark.read.csv(x, header= True, multiLine= True, sep= ";") for x in read_path_list]

In [5]:
# O método columns do dataframe retorna uma lista com o nome das colunas daquele df.
# Utilizando set, eliminamos strings duplicadas.
colunas_distintas = set()
for x in dfs:
    colunas_distintas = colunas_distintas.union(set(x.columns))

print(colunas_distintas)

{'id_subsistema', 'din_instante', 'val_cargaenergiamwmed', 'nom_subsistema'}


In [6]:
for i in range(len(dfs)):
    print(f'Base \'{read_path_list[i]}\': {dfs[i].count()} registros')

Base '../data/raw/CARGA_ENERGIA_2000.csv': 1464 registros
Base '../data/raw/CARGA_ENERGIA_2001.csv': 1460 registros
Base '../data/raw/CARGA_ENERGIA_2002.csv': 1460 registros
Base '../data/raw/CARGA_ENERGIA_2003.csv': 1460 registros
Base '../data/raw/CARGA_ENERGIA_2004.csv': 1464 registros
Base '../data/raw/CARGA_ENERGIA_2005.csv': 1460 registros
Base '../data/raw/CARGA_ENERGIA_2006.csv': 1460 registros
Base '../data/raw/CARGA_ENERGIA_2007.csv': 1460 registros
Base '../data/raw/CARGA_ENERGIA_2008.csv': 1464 registros
Base '../data/raw/CARGA_ENERGIA_2009.csv': 1460 registros
Base '../data/raw/CARGA_ENERGIA_2010.csv': 1460 registros
Base '../data/raw/CARGA_ENERGIA_2011.csv': 1460 registros
Base '../data/raw/CARGA_ENERGIA_2012.csv': 1464 registros
Base '../data/raw/CARGA_ENERGIA_2013.csv': 1461 registros
Base '../data/raw/CARGA_ENERGIA_2014.csv': 1460 registros
Base '../data/raw/CARGA_ENERGIA_2015.csv': 1460 registros
Base '../data/raw/CARGA_ENERGIA_2016.csv': 1464 registros
Base '../data/

### Como a base de 2013 apresentou um registro a mais que o padrão, vale uma investigação rápida. Uma forma de verificar se houve deslocamento de registro, por exemplo, consiste em agrupar os valores de uma variável categórica e observar se há algum resultado estranho para aquela categoria. "S" possui uma ocorrência a mais que os outros, mas não apareceu alguma data ou número, por exemplo.

In [7]:
dfs[-9].groupBy('id_subsistema').count().show()

+-------------+-----+
|id_subsistema|count|
+-------------+-----+
|           NE|  365|
|            N|  365|
|            S|  366|
|           SE|  365|
+-------------+-----+



### Para obter todas as informações em um único Dataframe, utilizaremos "union".

In [8]:
df = dfs[0]
for i in range(1,len(dfs)):
    df = df.union(dfs[i])
    
df.groupBy('id_subsistema').count().show()

+-------------+-----+
|id_subsistema|count|
+-------------+-----+
|           NE| 7912|
|            N| 7912|
|            S| 7913|
|           SE| 7912|
+-------------+-----+



### Por fim, a escrita da base em parquet.

In [9]:
df.write.parquet(write_path)

In [10]:
sc.stop()