# [o3]- Proyecto Ozono - ETL_Contaminación (2014-202004) - v3

    0. Inicializacion
    1. Datos
        1.0 Carga ficheros AIRE ( 2014-2020) - URL
        1.1 FORMATO:  [ESTACION,MAGNITUD,ANO,MES,DATO_DIA1,VALIDO,DATO_DIA2,VALIDO...]
        1.2 Creacion nuevo esquema
        1.3 Creacion nuevo DF vacio
        1.4 Nuevo DF -> [ESTACION,MAGNITUD,ANO,MES,DIA,FECHA,VALOR,VALIDO]
        1.5 Columna FECHA -> 20140101
        1.6 Colocar columnas -> [ESTACION,MAGNITUD, ANO,MES,DIA, FECHA, VALOR,VALIDO]
        1.7 VALOR ? VALIDO -> VALOR VALIDADO
        1.8 PIVOT Magnitudes
        1.9 Limpiar: Meses 31 días + Meses 30 días + Febreros
    2. Formato
    3. Ordenar
    4. Exportar
        
    

# [0] - Inicialización

In [1]:
from __future__ import print_function
import findspark
findspark.init('/home/rulicering/BigData/spark-2.4.5-bin-hadoop2.7')
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField,StringType,IntegerType,FloatType,StructType
from pyspark.ml.feature import StringIndexer
from pyspark.sql import functions as F
import pandas as pd
import datetime
import requests

In [2]:
spark = SparkSession.builder.appName('contaminacion').getOrCreate()

# [1] - Datos

## [1.0] - Carga ficheros AIRE ( 2014-2020) - URL

In [3]:
anos= [2014,2015,2016,2017,2018,2019,2020]
urls =["https://datos.madrid.es/egob/catalogo/201410-10306578-calidad-aire-diario.csv",
       "https://datos.madrid.es/egob/catalogo/201410-10306576-calidad-aire-diario.csv",
       "https://datos.madrid.es/egob/catalogo/201410-10306574-calidad-aire-diario.csv",
       "https://datos.madrid.es/egob/catalogo/201410-7775098-calidad-aire-diario.csv",
       "https://datos.madrid.es/egob/catalogo/201410-7775096-calidad-aire-diario.csv",
       "https://datos.madrid.es/egob/catalogo/201410-10306606-calidad-aire-diario.csv",
       "https://datos.madrid.es/egob/catalogo/201410-10306609-calidad-aire-diario.csv"  
]

In [4]:
def pd_read_to_df(url):
    pdf = pd.read_csv(url,sep=';')
    df = spark.createDataFrame(pdf)
    return df

In [5]:
lista = [pd_read_to_df(urls[i])for i in range(len(anos))]

In [6]:
df = lista[0]
for i in range(1,len(lista)):
    df=df.union(lista[i])

## [1.1] - FORMATO:  [ESTACION,MAGNITUD,ANO,MES,DATO_DIA1,VALIDO,DATO_DIA2,VALIDO...]

In [7]:
#Eliminamos columnas no esenciales
df = df.select('ESTACION','MAGNITUD','ANO','MES','D01','V01','D02','V02','D03','V03','D04','V04','D05','V05','D06','V06','D07','V07','D08','V08','D09','V09','D10','V10','D11','V11','D12','V12','D13','V13','D14','V14','D15','V15','D16','V16','D17','V17','D18','V18','D19','V19','D20','V20','D21','V21','D22','V22','D23','V23','D24','V24','D25','V25','D26','V26','D27','V27','D28','V28','D29','V29','D30','V30','D31','V31')

## [1.2] - Creacion nuevo esquema

In [8]:
data_schema = [StructField('ESTACION',IntegerType(), False), 
              StructField('MAGNITUD',IntegerType(), False),
              StructField('ANO',IntegerType(), False),
              StructField('MES',IntegerType(), False),
              StructField('VALOR',FloatType(), True),
              StructField('VALIDO',IntegerType(), False),
              StructField('DIA',IntegerType(), False)]
struct = StructType(fields = data_schema)

## [1.3] - Creacion nuevo DF vacio

In [9]:
df_v1 = spark.createDataFrame(spark.sparkContext.emptyRDD(),struct)

## [1.4] - Nuevo DF -> [ESTACION,MAGNITUD,ANO,MES,DIA,FECHA,VALOR,VALIDO]

In [10]:
for i in range(1,32): #Días  
    valor = 'D%02d' % i
    valido = 'V%02d' % i
    df_v1 = df_v1.union(df.select("ESTACION","MAGNITUD","ANO","MES",valor,valido).withColumn('DIA', F.lit(i)))

df = df_v1


## [1.5] - Columna FECHA -> 20140101

In [11]:
df = df.withColumn("FECHA",df["ANO"]*10000 + df["MES"]*100 + df["DIA"])

## [1.6] -Colocar columnas ->
### [ESTACION, MAGNITUD, ANO,MES,DIA, FECHA, VALOR,VALIDO]

In [12]:
#Colocar las columnas
cols = df.columns
cols = cols[:4] + cols[-2:] + cols[-4:-2]
df= df[cols]

## [1.7] - VALOR ? VALIDO -> VALOR VALIDADO

In [13]:
df = df.withColumn("VALOR VALIDADO",F.when(F.col("VALIDO")== 'N',None).otherwise(F.col("VALOR")) )
df = df.select("ESTACION","MAGNITUD","ANO","MES","DIA","FECHA",df["VALOR VALIDADO"].alias("VALOR"))

## [1.8] - PIVOT Magnitudes

In [14]:
df = df.groupBy('ESTACION','ANO', 'MES', 'DIA',"FECHA").pivot("MAGNITUD").sum("VALOR").orderBy("FECHA")

## [1.9] - Limpiar: Meses 31 días + Meses 30 días + Febreros

In [15]:
#Esto se puede hacer en el paso 6
df_31 = df.filter(df["MES"].isin([1,3,5,7,8,10,12]))
df_30 = df.filter((df["MES"].isin([4,6,9,11])) & (df["DIA"] <31))
df_feb = df.filter((df["MES"] == 2) & (df["DIA"] <30)) #Para no excluir los bisiestos
df = df_31.union(df_30).union(df_feb)

# [2] - Formato

In [16]:
df = df.withColumn("ESTACION",df["ESTACION"].cast(StringType()))

In [17]:
pd = df.toPandas()
pd = pd.rename(columns={"ESTACION":"CODIGO_CORTO"})
pd_final = pd

# [3] - Ordenar 

In [18]:
pd_final = pd_final.sort_values(by=["ANO","MES","DIA","FECHA","CODIGO_CORTO"])

# [4] - Exportar

In [19]:
#pd_final.head(10)

In [21]:
#Versiones
hoy = datetime.date.today().strftime("%Y-%m-%d")
pd_final.to_csv("/home/rulicering/Datos_Proyecto_Ozono/Procesado/Contaminacion/BackUp/Contaminacion_diaria_2014-202004-" + hoy +".csv")

In [22]:
pd_final.to_csv("/home/rulicering/Datos_Proyecto_Ozono/Procesado/Contaminacion/Contaminacion_diaria_2014-202004.csv")