# MBIT School

## Master Big Data Cloud & Analytics 2019-2020

---

## Módulo 02: Ecosistema Spark
## Submódulo DC-02: Ecosistema Spark

---

### Práctica Apache Spark

#### Carlos Alfonsel Jaén [(carlos.alfonsel@mbitschool.com)](carlos.alfonsel@mbitschool.com)

---

#### 1. DESCRIPCIÓN DEL TRABAJO

Disponemos de un stream de AWS Kinesis que recibe datos sobre la meteorología de cada provincia española cada 15 minutos. El objetivo el trabajo será el de crear una libreta de Databricks que se conecte al stream y descargue, pre-procese y almacene los datos conforme a una serie de tareas.

#### 2. TAREAS

* Creación de la libreta de Databricks.

* Conexión al stream ***mbit-weather***.

* *Preprocesamiento del stream* para extraer los datos en formato json: definir esquema de datos.

* *Limpieza y aumentado*: pasar de Kelvin a Celsius los datos de la variable ***temperatura***. Pista: T(ºC) = T(K) - 273,15.

* *Agregación en tiempo real* del dataframe, mostrando en ventanas de una hora por municipio datos de las siguientes variables:
    * ***Temperatura media***.
    * ***Temperatura máxima***.
    * ***Temperatura mínima***.
    * ***Humedad media***.
    * ***Número de observaciones***.

* *Serialización*: utilizando las funciones ***month*** y ***dayofmonth***, añadir dos nuevas columnas al dataframe, almacenarlo en DBFS en formato ***parquet***, y particionarlo por estas dos nuevas columnas.

* *Analítica*: obtener dos vistas sobre el dataframe serializado.
    * Agrupación por días, obteniendo un dataframe agrupado por municipio y día del mes, y otro agrupado por municipio y mes, mostrando las siguientes variables:
        * ***Temperatura media***.
        * ***Temperatura máxima***.
        * ***Temperatura mínima***.
        * ***Humedad media***.
        * ***Número de elementos agregados***.
    * Mostrar los municipios con las temperaturas más bajas.
    * Mostrar los municipios con las temperaturas más altas.

* *Analítica FreeStyle*: ejecutar el stream durante varios días para recopilar nuevos datos y poblar el fichero serializado.

##### 1. Creación, Configuración y Limpieza

Como hemos visto durante las clases, se ejecutan estos comandos para optimizar y limpiar los ficheros intermedios y no saturar la instancia gratuita de Databricks.

In [5]:
spark.conf.set("spark.sql.shuffle.partitions", 5)

In [6]:
%fs rm -r /local_disk0/tmp

##### 2. Conexión al Stream

Nos vamos a conectar al stream ***mbit-weather*** implementado en AWS Kinesis, que recibe cada 15 minutos datos meteorológicos de cada provincia española. Lo primero es autenticarnos con las claves proporcionadas para AWS.

In [8]:
ACCESS_KEY = 'AKIAQAJ3N3G5QURMQJ6K'
SECRET_KEY = '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'

In [9]:
kinesis_stream = spark \
  .readStream \
  .format("kinesis") \
  .option("streamName", "mbit-weather") \
  .option("initialPosition", "earliest") \
  .option("region", "eu-west-1") \
  .option("awsAccessKey", ACCESS_KEY) \
  .option("awsSecretKey", SECRET_KEY) \
  .load()


Una vez conectados al stream, vemos qué tipo de datos nos está devolviendo: al estar codificados, es necesario extraer el valor de lo que estamos introduciendo en el stream, proporcionando el esquema interno del tipo de datos, que podemos obtener del enunciado de la práctica.

In [11]:
display(kinesis_stream)

partitionKey,data,stream,shardId,sequenceNumber,approximateArrivalTimestamp
0,eyJjcmVhdGVkX2F0IjogMTU4MDczOTMxMCwgIm5hbWUiOiAiQWxtZXJpYSIsICJsb24iOiAtMi40MywgImxhdCI6IDM2Ljc3LCAid2VhdGhlciI6IHsiY2xvdWRzIjogMzUsICJyYWluIjogMCwgIndpbmRfc3BlZWQiOiAyLjE= (truncated),mbit-weather,shardId-000000000000,49603611454388849041190681341064329237980673152856883202,2020-02-03T14:15:10.754+0000
0,eyJjcmVhdGVkX2F0IjogMTU4MDczOTA1OCwgIm5hbWUiOiAiU2FsYW1hbmNhIiwgImxvbiI6IC01LjY4LCAibGF0IjogNDAuOTYsICJ3ZWF0aGVyIjogeyJjbG91ZHMiOiA1MiwgInJhaW4iOiAwLCAid2luZF9zcGVlZCI6IDM= (truncated),mbit-weather,shardId-000000000000,49603611454388849041190681343950035169400793061600002050,2020-02-03T14:15:11.843+0000
0,eyJjcmVhdGVkX2F0IjogMTU4MDczOTIwOCwgIm5hbWUiOiAiU2FsYW1hbmNhIiwgImxvbiI6IC01LjY1LCAibGF0IjogNDAuOTcsICJ3ZWF0aGVyIjogeyJjbG91ZHMiOiA1MiwgInJhaW4iOiAwLCAid2luZF9zcGVlZCI6IDM= (truncated),mbit-weather,shardId-000000000000,49603611454388849041190681346849039284836673891264888834,2020-02-03T14:15:12.907+0000
0,eyJjcmVhdGVkX2F0IjogMTU4MDczOTMxMywgIm5hbWUiOiAiTHVnbyIsICJsb24iOiAtNy41LCAibGF0IjogNDMuMCwgIndlYXRoZXIiOiB7ImNsb3VkcyI6IDAsICJyYWluIjogMCwgIndpbmRfc3BlZWQiOiA3LjYsICJodW0= (truncated),mbit-weather,shardId-000000000000,49603611454388849041190681349821787875269047169306329090,2020-02-03T14:15:13.987+0000
0,eyJjcmVhdGVkX2F0IjogMTU4MDczOTMxNSwgIm5hbWUiOiAiVmFsZW5jaWEiLCAibG9uIjogLTAuMzUsICJsYXQiOiAzOS40NiwgIndlYXRoZXIiOiB7ImNsb3VkcyI6IDgzLCAicmFpbiI6IDAsICJ3aW5kX3NwZWVkIjogMi4= (truncated),mbit-weather,shardId-000000000000,49603611454388849041190681352978293190282844013183631362,2020-02-03T14:15:15.150+0000
0,eyJjcmVhdGVkX2F0IjogMTU4MDczOTMxNiwgIm5hbWUiOiAiQ3VlbmNhIiwgImxvbiI6IC0yLjE0LCAibGF0IjogNDAuMDcsICJ3ZWF0aGVyIjogeyJjbG91ZHMiOiA5LCAicmFpbiI6IDAsICJ3aW5kX3NwZWVkIjogMi4yNiw= (truncated),mbit-weather,shardId-000000000000,49603611454388849041190681355727390504086510825184952322,2020-02-03T14:15:16.221+0000
0,eyJjcmVhdGVkX2F0IjogMTU4MDczOTEwNSwgIm5hbWUiOiAiUGFsbWEiLCAibG9uIjogMi42NSwgImxhdCI6IDM5LjU3LCAid2VhdGhlciI6IHsiY2xvdWRzIjogNDEsICJyYWluIjogMCwgIndpbmRfc3BlZWQiOiAyLjYsICI= (truncated),mbit-weather,shardId-000000000000,49603611454388849041190681358343505977732568427968593922,2020-02-03T14:15:17.283+0000
0,eyJjcmVhdGVkX2F0IjogMTU4MDczOTMxOCwgIm5hbWUiOiAiQ2l1ZGFkIFJlYWwiLCAibG9uIjogLTMuOTgsICJsYXQiOiAzOC45NiwgIndlYXRoZXIiOiB7ImNsb3VkcyI6IDkwLCAicmFpbiI6IDAsICJ3aW5kX3NwZWVkIjo= (truncated),mbit-weather,shardId-000000000000,49603611454388849041190681361420222188651799746315288578,2020-02-03T14:15:18.373+0000
0,eyJjcmVhdGVkX2F0IjogMTU4MDczOTMxOSwgIm5hbWUiOiAiQXZpbGEiLCAibG9uIjogLTQuNjYsICJsYXQiOiA0MC42NywgIndlYXRoZXIiOiB7ImNsb3VkcyI6IDI1LCAicmFpbiI6IDAsICJ3aW5kX3NwZWVkIjogMi4yNCw= (truncated),mbit-weather,shardId-000000000000,49603611454388849041190681364036337662297857349098930178,2020-02-03T14:15:19.437+0000
0,eyJjcmVhdGVkX2F0IjogMTU4MDczOTMyMCwgIm5hbWUiOiAiVG9sZWRvIiwgImxvbiI6IC00LjAxLCAibGF0IjogMzkuODcsICJ3ZWF0aGVyIjogeyJjbG91ZHMiOiA2MCwgInJhaW4iOiAwLCAid2luZF9zcGVlZCI6IDAuNDk= (truncated),mbit-weather,shardId-000000000000,49603611454388849041190681366770927866266148611003777026,2020-02-03T14:15:20.571+0000


##### 3. Definir el Esquema de los Datos

Para poder extraer el valor de los datos, primero proporcionamos el esquema del tipo de datos interno, de acuerdo al enunciado de la práctica (página 4). Para solucionar el "problema" de la anidación de ciertas variables, se ha decidido crear un esquema ***weather_schema*** con formato ***Struct***, que se le pasa como argumento ***StructType*** a la variable ***weather*** en la definición del esquema ***df_schema*** a proporcionar al stream de tipo ***json*** ingestado desde Kinesis.

In [13]:
from pyspark.sql.types import StructType, TimestampType, StringType, DoubleType

weather_schema = (
  StructType()
  .add("temperature", DoubleType())
  .add("rain", DoubleType())
  .add("wind_speed", DoubleType())
  .add("clouds", DoubleType())
  .add("humidity", DoubleType())
  .add("status", StringType())
)

df_schema = (
  StructType()
  .add("created_at", TimestampType())
  .add("name", StringType())
  .add("lat", DoubleType())
  .add("lon", DoubleType())
  .add("weather", StructType(weather_schema))
)


Una vez creado el esquema podemos transformar la cadena codificada que estamos recibiendo en un Dataframe llamado ***json_stream*** con los campos que contienen los objetos.

In [15]:
from pyspark.sql.functions import from_json

json_stream = kinesis_stream \
  .selectExpr("cast (data as STRING) jsonData") \
  .select(from_json("jsonData", df_schema).alias("views")) \

json_stream.printSchema()


In [16]:
display(json_stream)

views
"List(2020-02-03T14:15:10.000+0000, Almeria, 36.77, -2.43, List(291.28, 0.0, 2.1, 35.0, 68.0, Clouds))"
"List(2020-02-03T14:10:58.000+0000, Salamanca, 40.96, -5.68, List(291.33, 0.0, 3.1, 52.0, 48.0, Clouds))"
"List(2020-02-03T14:13:28.000+0000, Salamanca, 40.97, -5.65, List(291.55, 0.0, 3.1, 52.0, 48.0, Clouds))"
"List(2020-02-03T14:15:13.000+0000, Lugo, 43.0, -7.5, List(294.45, 0.0, 7.6, 0.0, 53.0, Clear))"
"List(2020-02-03T14:15:15.000+0000, Valencia, 39.46, -0.35, List(299.46, 0.0, 2.1, 83.0, 32.0, Clouds))"
"List(2020-02-03T14:15:16.000+0000, Cuenca, 40.07, -2.14, List(292.57, 0.0, 2.26, 9.0, 35.0, Clear))"
"List(2020-02-03T14:11:45.000+0000, Palma, 39.57, 2.65, List(294.89, 0.0, 2.6, 41.0, 72.0, Clouds))"
"List(2020-02-03T14:15:18.000+0000, Ciudad Real, 38.96, -3.98, List(283.15, 0.0, 0.5, 90.0, 100.0, Mist))"
"List(2020-02-03T14:15:19.000+0000, Avila, 40.67, -4.66, List(291.71, 0.0, 2.24, 25.0, 37.0, Clouds))"
"List(2020-02-03T14:15:20.000+0000, Toledo, 39.87, -4.01, List(290.88, 0.0, 0.49, 60.0, 67.0, Clouds))"


Lo siguiente que hacemos es ***aplanar*** el esquema del Dataframe, y lo renombramos como ***df_weather***.

In [18]:
df_weather = json_stream \
  .select("views.*") \
  .select("created_at", "name", "lat", "lon", "weather.*")

df_weather.printSchema()


In [19]:
display(df_weather)

created_at,name,lat,lon,temperature,rain,wind_speed,clouds,humidity,status
2020-02-03T14:15:10.000+0000,Almeria,36.77,-2.43,291.28,0.0,2.1,35.0,68.0,Clouds
2020-02-03T14:10:58.000+0000,Salamanca,40.96,-5.68,291.33,0.0,3.1,52.0,48.0,Clouds
2020-02-03T14:13:28.000+0000,Salamanca,40.97,-5.65,291.55,0.0,3.1,52.0,48.0,Clouds
2020-02-03T14:15:13.000+0000,Lugo,43.0,-7.5,294.45,0.0,7.6,0.0,53.0,Clear
2020-02-03T14:15:15.000+0000,Valencia,39.46,-0.35,299.46,0.0,2.1,83.0,32.0,Clouds
2020-02-03T14:15:16.000+0000,Cuenca,40.07,-2.14,292.57,0.0,2.26,9.0,35.0,Clear
2020-02-03T14:11:45.000+0000,Palma,39.57,2.65,294.89,0.0,2.6,41.0,72.0,Clouds
2020-02-03T14:15:18.000+0000,Ciudad Real,38.96,-3.98,283.15,0.0,0.5,90.0,100.0,Mist
2020-02-03T14:15:19.000+0000,Avila,40.67,-4.66,291.71,0.0,2.24,25.0,37.0,Clouds
2020-02-03T14:15:20.000+0000,Toledo,39.87,-4.01,290.88,0.0,0.49,60.0,67.0,Clouds


##### 4. Limpieza y Aumentado

Para transformar la columna ***temperatura*** del Dataframe ***df_weather*** de manera que pase a mostrar el resultado en grados Celsius en vez de Kelvin, utilizamos la siguiente instrucción, importando las funciones ***col*** y ***round***, y lo guardamos en un nuevo Dataframe llamado ***df_celsius***.

In [21]:
from pyspark.sql.functions import col, round

df_celsius = df_weather \
  .select("created_at", "name", "lat", "lon", round(col("temperature") - 273.15, 2).alias("temp_celsius"), "rain", "wind_speed", "clouds", "humidity", "status") \

df_celsius.printSchema()


In [22]:
display(df_celsius)

created_at,name,lat,lon,temp_celsius,rain,wind_speed,clouds,humidity,status
2020-02-03T14:15:10.000+0000,Almeria,36.77,-2.43,18.13,0.0,2.1,35.0,68.0,Clouds
2020-02-03T14:10:58.000+0000,Salamanca,40.96,-5.68,18.18,0.0,3.1,52.0,48.0,Clouds
2020-02-03T14:13:28.000+0000,Salamanca,40.97,-5.65,18.4,0.0,3.1,52.0,48.0,Clouds
2020-02-03T14:15:13.000+0000,Lugo,43.0,-7.5,21.3,0.0,7.6,0.0,53.0,Clear
2020-02-03T14:15:15.000+0000,Valencia,39.46,-0.35,26.31,0.0,2.1,83.0,32.0,Clouds
2020-02-03T14:15:16.000+0000,Cuenca,40.07,-2.14,19.42,0.0,2.26,9.0,35.0,Clear
2020-02-03T14:11:45.000+0000,Palma,39.57,2.65,21.74,0.0,2.6,41.0,72.0,Clouds
2020-02-03T14:15:18.000+0000,Ciudad Real,38.96,-3.98,10.0,0.0,0.5,90.0,100.0,Mist
2020-02-03T14:15:19.000+0000,Avila,40.67,-4.66,18.56,0.0,2.24,25.0,37.0,Clouds
2020-02-03T14:15:20.000+0000,Toledo,39.87,-4.01,17.73,0.0,0.49,60.0,67.0,Clouds


##### 5. Agregación en Tiempo Real

Se configura una agregación del Dataframe ***df_celsius*** para que muestre agrupado por municipio y en ventanas de una hora los datos de ***temp_media***, ***temp_max***, ***temp_min***, ***humedad_media***, y ***num_observaciones***. Para ello, importamos las funciones ***avg***, ***col***, ***count***, ***max***, ***min***, ***round***, y ***window***.

In [24]:
from pyspark.sql.functions import avg, col, count, max, min, round, window
import pyspark.sql.functions as f

display(
  df_celsius.groupBy("name", window(col("created_at"), "1 hour")) \
    .agg(
      f.round(avg("temp_celsius"), 2).alias("temp_media"), 
      f.max("temp_celsius").alias("temp_max"),
      f.min("temp_celsius").alias("temp_min"),
      f.round(avg("humidity"), 2).alias("humedad_media"),
      f.count("name").alias("num_observaciones")
  )
)


name,window,temp_media,temp_max,temp_min,humedad_media,num_observaciones
Murcia,"List(2020-02-04T09:00:00.000+0000, 2020-02-04T10:00:00.000+0000)",18.12,19.42,17.08,70.0,17
Merida,"List(2020-02-03T20:00:00.000+0000, 2020-02-03T21:00:00.000+0000)",14.48,14.59,14.28,87.0,3
Melilla,"List(2020-02-03T15:00:00.000+0000, 2020-02-03T16:00:00.000+0000)",19.91,20.22,19.59,80.64,11
Cuenca,"List(2020-02-03T23:00:00.000+0000, 2020-02-04T00:00:00.000+0000)",5.5,6.58,4.65,58.5,8
Melilla,"List(2020-02-03T16:00:00.000+0000, 2020-02-03T17:00:00.000+0000)",19.39,20.19,18.8,77.0,14
Palma,"List(2020-02-03T23:00:00.000+0000, 2020-02-04T00:00:00.000+0000)",11.94,12.1,11.75,94.75,4
Barcelona,"List(2020-02-04T12:00:00.000+0000, 2020-02-04T13:00:00.000+0000)",18.92,19.02,18.81,50.71,7
Seville,"List(2020-02-04T04:00:00.000+0000, 2020-02-04T05:00:00.000+0000)",12.19,12.28,12.1,93.5,4
Logrono,"List(2020-02-04T13:00:00.000+0000, 2020-02-04T14:00:00.000+0000)",9.13,9.4,8.73,89.4,5
Huelva,"List(2020-02-04T10:00:00.000+0000, 2020-02-04T11:00:00.000+0000)",16.03,17.22,14.44,65.0,7


Lo mismo que la ***agregación*** anterior, pero cambiando el orden del ***groupBy***, primero ordenando por ventanas de una hora, y luego agrupando por municipio.

In [26]:
from pyspark.sql.functions import avg, col, count, max, min, round, window
import pyspark.sql.functions as f

display(
  df_celsius.groupBy(window(col("created_at"), "1 hour"), "name") \
    .agg(
      f.round(avg("temp_celsius"), 2).alias("temp_media"), 
      f.max("temp_celsius").alias("temp_max"),
      f.min("temp_celsius").alias("temp_min"),
      f.round(avg("humidity"), 2).alias("humedad_media"),
      f.count("name").alias("num_observaciones")
  )
)


window,name,temp_media,temp_max,temp_min,humedad_media,num_observaciones
"List(2020-02-03T16:00:00.000+0000, 2020-02-03T17:00:00.000+0000)",Pontevedra,19.36,19.55,19.27,56.57,7
"List(2020-02-04T10:00:00.000+0000, 2020-02-04T11:00:00.000+0000)",Seville,15.55,16.43,14.73,78.67,3
"List(2020-02-03T20:00:00.000+0000, 2020-02-03T21:00:00.000+0000)",Seville,15.84,16.62,15.43,78.67,3
"List(2020-02-03T22:00:00.000+0000, 2020-02-03T23:00:00.000+0000)",Cadiz,13.34,13.83,13.2,90.6,5
"List(2020-02-04T11:00:00.000+0000, 2020-02-04T12:00:00.000+0000)",Huesca,16.79,16.84,16.73,62.43,7
"List(2020-02-04T01:00:00.000+0000, 2020-02-04T02:00:00.000+0000)",Oviedo,12.19,13.08,10.81,91.0,9
"List(2020-02-04T09:00:00.000+0000, 2020-02-04T10:00:00.000+0000)",Pontevedra,11.61,11.78,10.96,100.0,7
"List(2020-02-03T19:00:00.000+0000, 2020-02-03T20:00:00.000+0000)",Zaragoza,11.79,11.88,11.7,93.0,9
"List(2020-02-04T04:00:00.000+0000, 2020-02-04T05:00:00.000+0000)",Burgos,3.0,3.0,3.0,100.0,7
"List(2020-02-03T16:00:00.000+0000, 2020-02-03T17:00:00.000+0000)",Huelva,19.53,22.22,15.4,55.63,8


##### 6. Serialización

Utilizando las funciones ***month*** y ***dayofmonth***, aumentamos el Dataframe ***df_celsius*** añadiendo dos columnas: ***month*** y ***day***. Estas columnas se extraen de la hora de inicio de la ventana.

Una vez configurado el nuevo Dataframe ***df_extended***, lo almacenamos en formato ***parquet*** particionado por estas dos nuevas columnas en el directorio ***/tmp/weather*** de DBFS.

In [28]:
from pyspark.sql.functions import dayofmonth, month

df_extended = df_celsius \
    .select(
      "created_at",
      "name", 
      "lat",
      "lon",
      "temp_celsius",
      "rain",
      "wind_speed",
      "clouds",
      "humidity",
      "status"
    ) \
    .withColumn("month", month(col("created_at"))) \
    .withColumn("day", dayofmonth(col("created_at")))

display(df_extended)


created_at,name,lat,lon,temp_celsius,rain,wind_speed,clouds,humidity,status,month,day
2020-02-03T14:15:10.000+0000,Almeria,36.77,-2.43,18.13,0.0,2.1,35.0,68.0,Clouds,2,3
2020-02-03T14:10:58.000+0000,Salamanca,40.96,-5.68,18.18,0.0,3.1,52.0,48.0,Clouds,2,3
2020-02-03T14:13:28.000+0000,Salamanca,40.97,-5.65,18.4,0.0,3.1,52.0,48.0,Clouds,2,3
2020-02-03T14:15:13.000+0000,Lugo,43.0,-7.5,21.3,0.0,7.6,0.0,53.0,Clear,2,3
2020-02-03T14:15:15.000+0000,Valencia,39.46,-0.35,26.31,0.0,2.1,83.0,32.0,Clouds,2,3
2020-02-03T14:15:16.000+0000,Cuenca,40.07,-2.14,19.42,0.0,2.26,9.0,35.0,Clear,2,3
2020-02-03T14:11:45.000+0000,Palma,39.57,2.65,21.74,0.0,2.6,41.0,72.0,Clouds,2,3
2020-02-03T14:15:18.000+0000,Ciudad Real,38.96,-3.98,10.0,0.0,0.5,90.0,100.0,Mist,2,3
2020-02-03T14:15:19.000+0000,Avila,40.67,-4.66,18.56,0.0,2.24,25.0,37.0,Clouds,2,3
2020-02-03T14:15:20.000+0000,Toledo,39.87,-4.01,17.73,0.0,0.49,60.0,67.0,Clouds,2,3


In [29]:
%fs rm -r /tmp/weather

In [30]:
%fs rm -r /tmp/checkpoints

In [31]:
df_extended.writeStream \
  .outputMode("append") \
  .format("parquet") \
  .option("path", "/tmp/weather/") \
  .partitionBy("month", "day") \
  .option("checkpointLocation", "/tmp/checkpoints") \
  .start()


In [32]:
%fs ls /tmp/weather

path,name,size
dbfs:/tmp/weather/_spark_metadata/,_spark_metadata/,0
dbfs:/tmp/weather/month=1/,month=1/,0
dbfs:/tmp/weather/month=2/,month=2/,0


In [33]:
%fs ls /tmp/weather/month=2

path,name,size
dbfs:/tmp/weather/month=2/day=2/,day=2/,0
dbfs:/tmp/weather/month=2/day=3/,day=3/,0
dbfs:/tmp/weather/month=2/day=4/,day=4/,0


In [34]:
%fs ls /tmp/weather/month=2/day=4

path,name,size
dbfs:/tmp/weather/month=2/day=4/part-00000-002990e4-4718-4efe-b256-9c22d07157a6.c000.snappy.parquet,part-00000-002990e4-4718-4efe-b256-9c22d07157a6.c000.snappy.parquet,2775
dbfs:/tmp/weather/month=2/day=4/part-00000-00b5b84c-13db-49ea-9b5b-d4c260e7311d.c000.snappy.parquet,part-00000-00b5b84c-13db-49ea-9b5b-d4c260e7311d.c000.snappy.parquet,2905
dbfs:/tmp/weather/month=2/day=4/part-00000-01362866-f53e-4379-bf50-a3e50a64ae9c.c000.snappy.parquet,part-00000-01362866-f53e-4379-bf50-a3e50a64ae9c.c000.snappy.parquet,2793
dbfs:/tmp/weather/month=2/day=4/part-00000-0145f53f-ccc5-4834-a03a-cb6b2b023f57.c000.snappy.parquet,part-00000-0145f53f-ccc5-4834-a03a-cb6b2b023f57.c000.snappy.parquet,2901
dbfs:/tmp/weather/month=2/day=4/part-00000-016e3e1d-84ab-40d0-98dc-6eeed372d309.c000.snappy.parquet,part-00000-016e3e1d-84ab-40d0-98dc-6eeed372d309.c000.snappy.parquet,2767
dbfs:/tmp/weather/month=2/day=4/part-00000-01c6cf40-cb72-49d0-9803-6f8edc4e73f6.c000.snappy.parquet,part-00000-01c6cf40-cb72-49d0-9803-6f8edc4e73f6.c000.snappy.parquet,2803
dbfs:/tmp/weather/month=2/day=4/part-00000-01f6a4cd-ecea-4fd5-9a77-acf40eb8867a.c000.snappy.parquet,part-00000-01f6a4cd-ecea-4fd5-9a77-acf40eb8867a.c000.snappy.parquet,2993
dbfs:/tmp/weather/month=2/day=4/part-00000-021b55e0-59d5-478b-bfdd-160429fa966a.c000.snappy.parquet,part-00000-021b55e0-59d5-478b-bfdd-160429fa966a.c000.snappy.parquet,5999
dbfs:/tmp/weather/month=2/day=4/part-00000-031a4432-b0a6-4745-8508-cca0efa03692.c000.snappy.parquet,part-00000-031a4432-b0a6-4745-8508-cca0efa03692.c000.snappy.parquet,2791
dbfs:/tmp/weather/month=2/day=4/part-00000-034ecf86-b779-4c40-b0ee-26eb8ecc5aa6.c000.snappy.parquet,part-00000-034ecf86-b779-4c40-b0ee-26eb8ecc5aa6.c000.snappy.parquet,2793


##### 7. Analítica

Para poder obtener las vistas solicitadas, lo primero es recuperar la muestra guardada en forma de Dataframe estático.

In [36]:
df_extended_static = spark.read.parquet("/tmp/weather/")
df_extended_static.show(100)


A partir del Dataframe ***df_extended_static*** vamos a obtener una vista del Dataframe agrupado por municipio (***name***) y día del mes (***day***), obteniendo las siguientes variables agregadas: ***temp_media***, ***temp_max***, ***temp_min***, ***humedad_media***, y ***num_elementos_agg***.

In [38]:
from pyspark.sql.functions import avg, col, count, max, min, round, window
import pyspark.sql.functions as f

display(
  df_extended_static \
  .groupBy("name", "day") \
  .agg(
    f.round(avg("temp_celsius"), 2).alias("temp_media"), 
    f.max("temp_celsius").alias("temp_max"),
    f.min("temp_celsius").alias("temp_min"),
    f.round(avg("humidity"), 2).alias("humedad_media"),
    f.count("name").alias("num_elementos_agg")
  )
)


name,day,temp_media,temp_max,temp_min,humedad_media,num_elementos_agg
Valencia,30,15.87,20.47,12.97,58.59,191
Burgos,30,8.1,9.0,7.0,92.85,192
Huelva,30,12.78,16.11,10.0,86.92,192
Guadalajara,30,9.5,11.53,8.29,87.1,192
Almeria,30,12.38,16.05,9.0,89.38,96
Palma,30,13.56,18.67,10.16,89.02,96
Segovia,30,9.16,11.15,7.49,87.34,192
Cadiz,30,12.54,18.55,8.25,91.13,96
Logrono,30,11.74,13.91,10.12,79.07,95
Avila,29,10.01,13.23,7.73,76.96,98


A partir del Dataframe ***df_extended_static*** vamos a obtener una vista del Dataframe agrupado por municipio (***name***) y mes (***month***), obteniendo las siguientes variables agregadas: ***temp_media***, ***temp_max***, ***temp_min***, ***humedad_media***, y ***num_elementos_agg***.

In [40]:
from pyspark.sql.functions import avg, col, count, max, min, round, window
import pyspark.sql.functions as f

display(
  df_extended_static \
  .groupBy("name", "month") \
  .agg(
    f.round(avg("temp_celsius"), 2).alias("temp_media"), 
    f.max("temp_celsius").alias("temp_max"),
    f.min("temp_celsius").alias("temp_min"),
    f.round(avg("humidity"), 2).alias("humedad_media"),
    f.count("name").alias("num_elementos_agg")
  )
)


name,month,temp_media,temp_max,temp_min,humedad_media,num_elementos_agg
Huesca,1,8.84,15.81,3.36,86.05,689
Murcia,1,14.58,21.06,8.07,79.6,1378
Pamplona,1,11.87,17.86,4.86,81.43,687
Seville,1,12.58,17.18,9.23,90.98,344
Ciudad Real,1,9.86,14.02,4.98,90.0,691
Alicante,1,14.84,20.68,9.49,63.31,688
Badajoz,1,12.6,16.21,8.33,88.94,685
Pontevedra,1,13.02,14.87,9.9,97.98,684
Almeria,2,14.44,21.2,8.0,75.72,186
Cuenca,2,8.99,20.05,2.9,61.33,369


Mostrar __los 10 municipios con las temperaturas más bajas__.

In [42]:
from pyspark.sql.functions import asc, min
import pyspark.sql.functions as f

df_coldest = df_extended_static \
    .cube("name") \
    .agg(f.min("temp_celsius").alias("temp_min")) \
    .select("name", "temp_min") \
    .orderBy(asc("temp_min")) \
    .limit(10)

display(df_coldest)


name,temp_min
Salamanca,1.0
Zamora,1.0
,1.0
Leon,1.65
Logrono,2.43
Granada,2.45
Cuenca,2.9
Burgos,3.0
Albacete,3.0
Soria,3.33


Mostrar __los 10 municipios con las temperaturas más altas__.

In [44]:
from pyspark.sql.functions import desc, max
import pyspark.sql.functions as f

df_hottest = df_extended_static \
    .cube("name") \
    .agg(f.max("temp_celsius").alias("temp_max")) \
    .select("name", "temp_max") \
    .orderBy(desc("temp_max")) \
    .limit(10)

display(df_hottest)


name,temp_max
,26.31
Valencia,26.31
Santander,26.04
Las Palmas de Gran Canaria,25.0
Santa Cruz de Tenerife,24.77
Seville,24.18
Murcia,23.93
Huelva,23.33
Alicante,23.23
Bilbao,23.21


##### 8. Analítica FreeStyle

Se ejecuta el stream durante varios días, y se lista la carpeta donde se almacenan los resultados. Se realizan asimismo otras consultas sencillas con los datos obtenidos: temperaturas máximas y mínimas, poblaciones con mayor cantidad de lluvia y humedad, datos ingestados más recientes, etc.

In [46]:
df_static = spark.read.parquet("/tmp/weather/")
df_static.show(100)


In [47]:
df_static = spark.read.parquet("/tmp/weather/month=2/day=4")
df_static.show(100)


In [48]:
## MOSTRAR LOS DATOS INGESTADOS MÁS RECIENTES

from pyspark.sql.functions import asc, avg, col, count, desc, max, min, round, window
import pyspark.sql.functions as f

display(
  df_static \
  .select("created_at", "name", "temp_celsius", "status") \
  .where("name is not null") \
  .orderBy(desc("created_at")) \
)


created_at,name,temp_celsius,status
2020-02-04T14:17:43.000+0000,Santa Cruz de Tenerife,22.36,Clouds
2020-02-04T14:17:40.000+0000,Zamora,14.79,Clear
2020-02-04T14:17:39.000+0000,Pontevedra,16.11,Clouds
2020-02-04T14:17:38.000+0000,Pontevedra,16.11,Clouds
2020-02-04T14:17:34.000+0000,Melilla,19.62,Clear
2020-02-04T14:17:33.000+0000,Cuenca,15.52,Clear
2020-02-04T14:17:31.000+0000,Guadalajara,17.14,Clear
2020-02-04T14:17:30.000+0000,Caceres,17.0,Clear
2020-02-04T14:17:29.000+0000,Huelva,26.11,Clear
2020-02-04T14:17:27.000+0000,Huelva,26.11,Clear


In [49]:
## DATOS AGREGADOS DE TEMPERATURA MEDIA, MÁXIMA Y MÍNIMA, HUMEDAD Y NÚMERO DE ELEMENTOS AGREGADOS, ELIMINANDO LOS NULOS

from pyspark.sql.functions import asc, avg, col, count, desc, max, min, round, window
import pyspark.sql.functions as f

display(
  df_static \
  .groupBy("name") \
  .agg(
    f.round(avg("temp_celsius"), 2).alias("temp_media"), 
    f.max("temp_celsius").alias("temp_max"),
    f.min("temp_celsius").alias("temp_min"),
    f.round(avg("humidity"), 2).alias("humedad_media"),
    f.count("name").alias("num_elementos_agg")
  )
  .where("name is not null") \
  .orderBy(asc("name")) \
)


name,temp_media,temp_max,temp_min,humedad_media,num_elementos_agg
Albacete,8.46,18.58,3.48,88.03,116
Alicante,17.49,25.69,12.86,49.72,115
Almeria,10.98,13.8,8.0,91.18,60
Avila,9.67,16.26,5.5,67.34,58
Badajoz,10.96,19.08,8.4,97.25,115
Barcelona,14.42,19.25,11.19,68.14,114
Bilbao,10.45,17.14,5.5,88.69,173
Burgos,6.76,12.1,3.0,94.87,116
Caceres,8.29,17.0,5.56,97.31,58
Cadiz,13.57,23.24,9.57,85.03,58


In [50]:
## MOSTRAR LOS 25 MUNICIPIOS CON TEMPERATURAS MÁS BAJAS -ELIMINANDO NULOS-

from pyspark.sql.functions import asc, desc, max, min
import pyspark.sql.functions as f

df_tempMin = df_static \
    .cube("name") \
    .agg(f.min("temp_celsius").alias("temp_min")) \
    .select("name", "temp_min") \
    .where("name is not null") \
    .orderBy(asc("temp_min")) \
    .limit(25)

display(df_tempMin)


name,temp_min
Leon,1.65
Zamora,1.73
Salamanca,1.78
Burgos,3.0
Albacete,3.48
Cuenca,3.55
Ciudad Real,3.9
Valladolid,4.15
Palencia,4.38
Soria,4.46


In [51]:
## MOSTRAR LOS 25 MUNICIPIOS CON TEMPERATURAS MÁS ALTAS -ELIMINANDO NULOS-

from pyspark.sql.functions import asc, desc, max, min
import pyspark.sql.functions as f

df_tempMax = df_static \
    .cube("name") \
    .agg(f.max("temp_celsius").alias("temp_max")) \
    .select("name", "temp_max") \
    .where("name is not null") \
    .orderBy(desc("temp_max")) \
    .limit(25)

display(df_tempMax)


name,temp_max
Valencia,26.5
Murcia,26.24
Huelva,26.11
Alicante,25.69
Santa Cruz de Tenerife,24.77
Las Palmas de Gran Canaria,23.94
Cadiz,23.24
Seville,22.77
Palma,21.99
Jaen,20.79


In [52]:
## MOSTRAR LOS 10 MUNICIPIOS CON % DE HUMEDAD MÁS BAJO -ELIMINANDO NULOS-

from pyspark.sql.functions import asc, desc, max, min
import pyspark.sql.functions as f

df_humidMin = df_static \
    .cube("name") \
    .agg(f.min("humidity").alias("humid_min")) \
    .select("name", "humid_min") \
    .where("name is not null") \
    .orderBy(asc("humid_min")) \
    .limit(10)

display(df_humidMin)


name,humid_min
Toledo,10.0
Cuenca,10.0
Santa Cruz de Tenerife,15.0
Valencia,20.0
Murcia,27.0
Alicante,28.0
Las Palmas de Gran Canaria,31.0
Huelva,31.0
Avila,33.0
Granada,39.0


In [53]:
## MOSTRAR LOS 10 MUNICIPIOS CON % DE LLUVIA MÁS ALTO -ELIMINANDO NULOS-

## RESULTADO ¿SORPRENDENTE?: EN ESPAÑA NO LLUEVE MUCHO.

from pyspark.sql.functions import asc, desc, max, min
import pyspark.sql.functions as f

df_rainMax = df_static \
    .cube("name") \
    .agg(f.max("rain").alias("rain_max")) \
    .select("name", "rain_max") \
    .where("name is not null") \
    .orderBy(desc("rain_max")) \
    .limit(10)

display(df_rainMax)


name,rain_max
Ciudad Real,0.0
Toledo,0.0
Tarragona,0.0
Logrono,0.0
Malaga,0.0
Bilbao,0.0
Granada,0.0
Alicante,0.0
Santander,0.0
Jaen,0.0


#### END OF SPARK STREAMING mbit-weather TEST