In [1]:
import org.apache.spark.sql.SparkSession
val spark = SparkSession
 .builder
 .appName("ScalaChapter3")
 .getOrCreate()

Intitializing Scala interpreter ...

Spark Web UI available at http://L2108017.bosonit.local:4045
SparkContext available as 'sc' (version = 3.0.3, master = local[*], app id = local-1634122443701)
SparkSession available as 'spark'


import org.apache.spark.sql.SparkSession
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@533acfd8


In [2]:
import org.apache.spark.sql.types._

// Crear el esquema para leer el csv
val fireSchema = StructType(Array(StructField("CallNumber", IntegerType, true),
 StructField("UnitID", StringType, true),
 StructField("IncidentNumber", IntegerType, true),
 StructField("CallType", StringType, true), 
 StructField("CallDate", StringType, true), 
 StructField("WatchDate", StringType, true),
 StructField("CallFinalDisposition", StringType, true),
 StructField("AvailableDtTm", StringType, true),
 StructField("Address", StringType, true), 
 StructField("City", StringType, true), 
 StructField("Zipcode", IntegerType, true), 
 StructField("Battalion", StringType, true), 
 StructField("StationArea", StringType, true), 
 StructField("Box", StringType, true), 
 StructField("OriginalPriority", StringType, true), 
 StructField("Priority", StringType, true), 
 StructField("FinalPriority", IntegerType, true), 
 StructField("ALSUnit", BooleanType, true), 
 StructField("CallTypeGroup", StringType, true),
 StructField("NumAlarms", IntegerType, true),
 StructField("UnitType", StringType, true),
 StructField("UnitSequenceInCallDispatch", IntegerType, true),
 StructField("FirePreventionDistrict", StringType, true),
 StructField("SupervisorDistrict", StringType, true),
 StructField("Neighborhood", StringType, true),
 StructField("Location", StringType, true),
 StructField("RowID", StringType, true),
 StructField("Delay", FloatType, true)))

// Ruta del fichero
val sFireFile="C:/Users/alvaro.romero/Big_Data/LearningSparkV2-master/chapter3/data/sf-fire-calls.csv"

// Leer el fichero en un DataFrame usando el esquema que hemos definido y dicendo que tiene cabecera el fichero
val fireDF= spark.read.schema(fireSchema)
.option("header","true")
.csv(sFireFile)

import org.apache.spark.sql.types._
fireSchema: org.apache.spark.sql.types.StructType = StructType(StructField(CallNumber,IntegerType,true), StructField(UnitID,StringType,true), StructField(IncidentNumber,IntegerType,true), StructField(CallType,StringType,true), StructField(CallDate,StringType,true), StructField(WatchDate,StringType,true), StructField(CallFinalDisposition,StringType,true), StructField(AvailableDtTm,StringType,true), StructField(Address,StringType,true), StructField(City,StringType,true), StructField(Zipcode,IntegerType,true), StructField(Battalion,StringType,true), StructField(StationArea,StringType,true), StructField(Box,StringType,true), StructField(OriginalPriority,StringType,true), StructField(Priority,StringType,true), StructField(FinalPriority,IntegerType,true), ...


In [3]:
// Crear un DataFrame infiriendo el esquema
val sampleDF = spark
.read
.option("samplingRatio", 0.001)
.option("header", true)
.csv("C:/Users/alvaro.romero/Big_Data/LearningSparkV2-master/chapter3/data/sf-fire-calls.csv")


sampleDF: org.apache.spark.sql.DataFrame = [CallNumber: string, UnitID: string ... 26 more fields]


In [4]:
// Crear un nuevo DataFrame cambiando el nombre de la columna Delay por ResponseDelayedMins
val new_fire_df =fireDF.withColumnRenamed("Delay", "ResponseDelayedMins")

// Mostrar 5 columnas ResponseDelayedMins > 5
new_fire_df.select("ResponseDelayedMins")
.where(col("ResponseDelayedMins")>5)
.show(5, false)

+-------------------+
|ResponseDelayedMins|
+-------------------+
|5.35               |
|6.25               |
|5.2                |
|5.6                |
|7.25               |
+-------------------+
only showing top 5 rows



new_fire_df: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


In [4]:
// Crear nuevo DataFrame añadiendo las columnas IncidentDate, OnWatchDate y AvailableDTS y borrando CallDate, WatchDate y AvailableDtTm
val fire_ts_df = new_fire_df
              .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
              .drop("CallDate")
              .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy"))
              .drop("WatchDate")
              .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a"))
              .drop("AvailableDtTm")

fire_ts_df: org.apache.spark.sql.DataFrame = [CallNumber: int, UnitID: string ... 26 more fields]


In [5]:
fire_ts_df
.select("IncidentDate", "OnWatchDate", "AvailableDtTS")
.show(5,false)

+-------------------+-------------------+-------------------+
|IncidentDate       |OnWatchDate        |AvailableDtTS      |
+-------------------+-------------------+-------------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:44|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 03:01:18|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 02:39:50|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 04:16:46|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 06:01:58|
+-------------------+-------------------+-------------------+
only showing top 5 rows



In [6]:
fire_ts_df
.select(year($"IncidentDate"))
.distinct()
.orderBy(year($"IncidentDate"))
.show()

+------------------+
|year(IncidentDate)|
+------------------+
|              2000|
|              2001|
|              2002|
|              2003|
|              2004|
|              2005|
|              2006|
|              2007|
|              2008|
|              2009|
|              2010|
|              2011|
|              2012|
|              2013|
|              2014|
|              2015|
|              2016|
|              2017|
|              2018|
+------------------+



In [7]:
fire_ts_df
.select("CallType")
.where(col("CallType").isNotNull)
.groupBy("CallType")
.count()
.orderBy(desc("count"))
.show(10,false)

+-------------------------------+------+
|CallType                       |count |
+-------------------------------+------+
|Medical Incident               |113794|
|Structure Fire                 |23319 |
|Alarms                         |19406 |
|Traffic Collision              |7013  |
|Citizen Assist / Service Call  |2524  |
|Other                          |2166  |
|Outside Fire                   |2094  |
|Vehicle Fire                   |854   |
|Gas Leak (Natural and LP Gases)|764   |
|Water Rescue                   |755   |
+-------------------------------+------+
only showing top 10 rows



In [8]:
fireDF
.select("CallType")
.where(col("CallType").isNotNull)
.distinct()
.show(30,false)

+--------------------------------------------+
|CallType                                    |
+--------------------------------------------+
|Elevator / Escalator Rescue                 |
|Marine Fire                                 |
|Aircraft Emergency                          |
|Confined Space / Structure Collapse         |
|Administrative                              |
|Alarms                                      |
|Odor (Strange / Unknown)                    |
|Citizen Assist / Service Call               |
|HazMat                                      |
|Watercraft in Distress                      |
|Explosion                                   |
|Oil Spill                                   |
|Vehicle Fire                                |
|Suspicious Package                          |
|Extrication / Entrapped (Machinery, Vehicle)|
|Other                                       |
|Outside Fire                                |
|Traffic Collision                           |
|Assist Polic

In [9]:
fire_ts_df
.filter(year($"IncidentDate")===2018)
.groupBy(month($"IncidentDate"))
.count()
.orderBy(desc("count"))
.show(12)

+-------------------+-----+
|month(IncidentDate)|count|
+-------------------+-----+
|                 10| 1068|
|                  5| 1047|
|                  3| 1029|
|                  8| 1021|
|                  1| 1007|
|                  7|  974|
|                  6|  974|
|                  9|  951|
|                  4|  947|
|                  2|  919|
|                 11|  199|
+-------------------+-----+



In [10]:
fire_ts_df
.select("Neighborhood")
.where(year($"IncidentDate") === 2018)
.groupBy("Neighborhood")
.count()
.orderBy(desc("count"))
.show(50, false)

+------------------------------+-----+
|Neighborhood                  |count|
+------------------------------+-----+
|Tenderloin                    |1393 |
|South of Market               |1053 |
|Mission                       |913  |
|Financial District/South Beach|772  |
|Bayview Hunters Point         |522  |
|Western Addition              |352  |
|Sunset/Parkside               |346  |
|Nob Hill                      |295  |
|Hayes Valley                  |291  |
|Outer Richmond                |262  |
|Castro/Upper Market           |251  |
|North Beach                   |231  |
|Excelsior                     |212  |
|Potrero Hill                  |210  |
|West of Twin Peaks            |210  |
|Pacific Heights               |191  |
|Chinatown                     |191  |
|Marina                        |191  |
|Mission Bay                   |178  |
|Bernal Heights                |170  |
|Lakeshore                     |159  |
|Inner Sunset                  |154  |
|Russian Hill            

In [11]:
fire_ts_df
.select("Neighborhood", "ResponseDelayedMins")
.filter(year($"IncidentDate") === 2018)
.show(10, false)

+------------------------------+-------------------+
|Neighborhood                  |ResponseDelayedMins|
+------------------------------+-------------------+
|Presidio Heights              |2.8833334          |
|Mission Bay                   |6.3333335          |
|Chinatown                     |2.65               |
|Financial District/South Beach|3.5333333          |
|Tenderloin                    |1.1                |
|Bayview Hunters Point         |4.05               |
|Inner Richmond                |2.5666666          |
|Inner Sunset                  |1.4                |
|Sunset/Parkside               |2.6666667          |
|South of Market               |1.7666667          |
+------------------------------+-------------------+
only showing top 10 rows



In [12]:
fire_ts_df
.filter(year($"IncidentDate")===2018)
.groupBy(weekofyear($"IncidentDate"))
.count()
.orderBy(desc("count"))
.show(12)

+------------------------+-----+
|weekofyear(IncidentDate)|count|
+------------------------+-----+
|                      22|  259|
|                      40|  255|
|                      43|  250|
|                      25|  249|
|                       1|  246|
|                      44|  244|
|                      32|  243|
|                      13|  243|
|                      11|  240|
|                      18|  236|
|                       5|  236|
|                      23|  235|
+------------------------+-----+
only showing top 12 rows



In [13]:
fire_ts_df
.select("CallType", "ZipCode")
.where($"CallType".isNotNull)
.groupBy("CallType", "ZipCode")
.count()
.orderBy(desc("count"))
.show()

+----------------+-------+-----+
|        CallType|ZipCode|count|
+----------------+-------+-----+
|Medical Incident|  94102|16130|
|Medical Incident|  94103|14775|
|Medical Incident|  94110| 9995|
|Medical Incident|  94109| 9479|
|Medical Incident|  94124| 5885|
|Medical Incident|  94112| 5630|
|Medical Incident|  94115| 4785|
|Medical Incident|  94122| 4323|
|Medical Incident|  94107| 4284|
|Medical Incident|  94133| 3977|
|Medical Incident|  94117| 3522|
|Medical Incident|  94134| 3437|
|Medical Incident|  94114| 3225|
|Medical Incident|  94118| 3104|
|Medical Incident|  94121| 2953|
|Medical Incident|  94116| 2738|
|Medical Incident|  94132| 2594|
|  Structure Fire|  94110| 2267|
|Medical Incident|  94105| 2258|
|  Structure Fire|  94102| 2229|
+----------------+-------+-----+
only showing top 20 rows



In [14]:
fire_ts_df
.select("Neighborhood", "ZipCode")
.where((col("ZipCode")===94102) or (col("ZipCode")===94103))
.distinct()
.show(10,false)

+------------------------------+-------+
|Neighborhood                  |ZipCode|
+------------------------------+-------+
|Potrero Hill                  |94103  |
|Western Addition              |94102  |
|Tenderloin                    |94102  |
|Nob Hill                      |94102  |
|Castro/Upper Market           |94103  |
|South of Market               |94102  |
|South of Market               |94103  |
|Hayes Valley                  |94103  |
|Financial District/South Beach|94102  |
|Mission Bay                   |94103  |
+------------------------------+-------+
only showing top 10 rows



In [15]:
case class DeviceIoTData (battery_level: Long, c02_level: Long, 
    cca2: String, cca3: String, cn: String, device_id: Long, 
    device_name: String, humidity: Long, ip: String, latitude: Double,
    lcd: String, longitude: Double, scale:String, temp: Long, timestamp: Long)

defined class DeviceIoTData


In [16]:
case class DeviceTempByCountry(temp: Long, device_name: String, device_id: Long, cca3: String)

defined class DeviceTempByCountry


In [17]:
val ds = spark.read.json("C:/Users/alvaro.romero/Big_Data/LearningSparkV2-master/databricks-datasets/learning-spark-v2/iot-devices/iot_devices.json").as[DeviceIoTData]
ds.printSchema()

root
 |-- battery_level: long (nullable = true)
 |-- c02_level: long (nullable = true)
 |-- cca2: string (nullable = true)
 |-- cca3: string (nullable = true)
 |-- cn: string (nullable = true)
 |-- device_id: long (nullable = true)
 |-- device_name: string (nullable = true)
 |-- humidity: long (nullable = true)
 |-- ip: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- lcd: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- scale: string (nullable = true)
 |-- temp: long (nullable = true)
 |-- timestamp: long (nullable = true)



ds: org.apache.spark.sql.Dataset[DeviceIoTData] = [battery_level: bigint, c02_level: bigint ... 13 more fields]


In [18]:
ds.show(5,false)

+-------------+---------+----+----+-------------+---------+---------------------+--------+-------------+--------+------+---------+-------+----+-------------+
|battery_level|c02_level|cca2|cca3|cn           |device_id|device_name          |humidity|ip           |latitude|lcd   |longitude|scale  |temp|timestamp    |
+-------------+---------+----+----+-------------+---------+---------------------+--------+-------------+--------+------+---------+-------+----+-------------+
|8            |868      |US  |USA |United States|1        |meter-gauge-1xbYRYcj |51      |68.161.225.1 |38.0    |green |-97.0    |Celsius|34  |1458444054093|
|7            |1473     |NO  |NOR |Norway       |2        |sensor-pad-2n2Pea    |70      |213.161.254.1|62.47   |red   |6.15     |Celsius|11  |1458444054119|
|2            |1556     |IT  |ITA |Italy        |3        |device-mac-36TWSKiT  |44      |88.36.5.1    |42.83   |red   |12.83    |Celsius|19  |1458444054120|
|6            |1080     |US  |USA |United States|4  

In [19]:
ds
.select($"battery_level", $"c02_level", $"device_name")
.where($"battery_level" < 8)
.sort($"c02_level")
.show(5, false)


+-------------+---------+---------------------------+
|battery_level|c02_level|device_name                |
+-------------+---------+---------------------------+
|1            |800      |sensor-pad-186516qVnGVN7jK |
|1            |800      |sensor-pad-190514UCQcNBzHH |
|7            |800      |sensor-pad-1864985T2DlE87SJ|
|4            |800      |meter-gauge-1855818YaYf    |
|3            |800      |sensor-pad-1886381NkVIS    |
+-------------+---------+---------------------------+
only showing top 5 rows



In [20]:
val newDS=ds.filter($"c02_level">1300).groupBy($"cn").avg().sort($"avg(c02_level)".desc)

newDS: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cn: string, avg(battery_level): double ... 7 more fields]


In [21]:
newDS.show(10,false)

+------------------------------+------------------+--------------+--------------+-------------+-------------+------------------+---------+------------------+
|cn                            |avg(battery_level)|avg(c02_level)|avg(device_id)|avg(humidity)|avg(latitude)|avg(longitude)    |avg(temp)|avg(timestamp)    |
+------------------------------+------------------+--------------+--------------+-------------+-------------+------------------+---------+------------------+
|Solomon Islands               |3.0               |1588.0        |187433.0      |40.0         |-9.43        |159.95            |21.0     |1.458444060894E12 |
|Federated States of Micronesia|3.0               |1573.0        |78806.0       |55.0         |6.92         |158.25            |13.0     |1.45844405755E12  |
|Rwanda                        |2.5               |1560.5        |102085.0      |44.0         |-2.0         |30.0              |21.5     |1.458444058393E12 |
|British Indian Ocean Territory|7.0               |1

In [22]:
ds.filter($"temp" > 25 && $"humidity" > 75)
  .select("temp", "humidity", "cn")
  .groupBy($"cn")
  .avg()
  .sort($"avg(temp)".desc, $"avg(humidity)".desc).as("avg_humidity").show(10, false)

+----------------------+---------+-------------+
|cn                    |avg(temp)|avg(humidity)|
+----------------------+---------+-------------+
|Monaco                |34.0     |91.0         |
|Anguilla              |34.0     |83.0         |
|British Virgin Islands|34.0     |81.0         |
|Turkmenistan          |34.0     |80.0         |
|Suriname              |34.0     |79.0         |
|Gibraltar             |34.0     |78.0         |
|Liechtenstein         |34.0     |76.0         |
|Vanuatu               |33.5     |84.0         |
|Cameroon              |33.0     |91.0         |
|Fiji                  |33.0     |78.0         |
+----------------------+---------+-------------+
only showing top 10 rows



In [23]:
import org.apache.spark.sql.functions._ 

ds.select(min("temp"), max("temp"), min("humidity"), max("humidity"), min("c02_level"), max("c02_level"), min("battery_level"), max("battery_level")).show(10)

+---------+---------+-------------+-------------+--------------+--------------+------------------+------------------+
|min(temp)|max(temp)|min(humidity)|max(humidity)|min(c02_level)|max(c02_level)|min(battery_level)|max(battery_level)|
+---------+---------+-------------+-------------+--------------+--------------+------------------+------------------+
|       10|       34|           25|           99|           800|          1599|                 0|                 9|
+---------+---------+-------------+-------------+--------------+--------------+------------------+------------------+



import org.apache.spark.sql.functions._


In [None]:
/* 
El último parámetro de StructField(‘Delay’, FloatType(), True) significa que el campo “Delay” puede ser nulo.
En caso de ser False, no podría ser nulo. 
*/

In [None]:
/*
Los DataSets son colecciones de objetos fuertemente tipados y los DataFrames son DataSets sin tipado. 
En Scala, un DataFrame es un alias para un DataSet[Row], siendo Row un objeto JVM genérico sin tipado 
que puede contener diferentes tipos de campos. 
*/

In [15]:
// Guardando en parquet
fire_ts_df.write.format("parquet").mode("overwrite").save("C:/Users/alvaro.romero/Big_Data/Ejercicios_Spark/Guardar_Tablas/Parquet")

In [16]:
// Guardando en json
fire_ts_df.write.format("json").mode("overwrite").save("C:/Users/alvaro.romero/Big_Data/Ejercicios_Spark/Guardar_Tablas/JSON/")

In [17]:
// Guardando en csv
fire_ts_df.write.format("csv").mode("overwrite").save("C:/Users/alvaro.romero/Big_Data/Ejercicios_Spark/Guardar_Tablas/CSV/")

In [18]:
// Guardando en avro
fire_ts_df.write.format("avro").mode("overwrite").save("C:/Users/alvaro.romero/Big_Data/Ejercicios_Spark/Guardar_Tablas/AVRO/")

In [25]:
// Hay más de un fichero debido a las particiones.
// El comando getNumPartitions en el RDD sirve para obtener el número de particiones de un DataFrame
fire_ts_df.rdd.getNumPartitions.show()
// El comando coalesce en el RDD sirve para reducir el número de particiones de un DataFrame
// El comando partition en el RDD sirve para aumentar el número de particiones de un DataFrame
val prueba_fire_ts_df = fire_ts_df.coalesce(1)
prueba_fire_ts_df.rdd.getNumPartitions

prueba_fire_ts_df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [CallNumber: int, UnitID: string ... 26 more fields]
res18: Int = 1


In [26]:
// Guardando en parquet
prueba_fire_ts_df.write.format("parquet").mode("overwrite").save("C:/Users/alvaro.romero/Big_Data/Ejercicios_Spark/Guardar_Tablas2/Parquet")

In [27]:
// Guardando en json
prueba_fire_ts_df.write.format("json").mode("overwrite").save("C:/Users/alvaro.romero/Big_Data/Ejercicios_Spark/Guardar_Tablas2/JSON/")

In [28]:
// Guardando en csv
prueba_fire_ts_df.write.format("csv").mode("overwrite").save("C:/Users/alvaro.romero/Big_Data/Ejercicios_Spark/Guardar_Tablas2/CSV/")

In [29]:
// Guardando en avro
prueba_fire_ts_df.write.format("avro").mode("overwrite").save("C:/Users/alvaro.romero/Big_Data/Ejercicios_Spark/Guardar_Tablas2/AVRO/")