# ETL using Scala and Spark

#### Call data from CSV file

In [1]:
val bat_datos = spark.read.format("csv").option("header", "true").option("delimiter", ";").load("data/baterias-datos.csv")

Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.1.11:4042
SparkContext available as 'sc' (version = 3.0.0, master = local[*], app id = local-1595592847834)
SparkSession available as 'spark'


bat_datos: org.apache.spark.sql.DataFrame = [vcd: string, amp: string ... 4 more fields]


#### Show the data frame

In [2]:
bat_datos.show()

+-----+-----+-----+-----+---------+------+
|  vcd|  amp|  tmp|  imp|estado_id|estado|
+-----+-----+-----+-----+---------+------+
|14,30|68,52|13,83|39,19|        0|    ok|
|14,48|86,92|12,75|32,74|        0|    ok|
|14,18|80,65| 5,27|27,58|        0|    ok|
|13,68|56,30|13,96|17,91|        0|    ok|
|13,67|61,49| 7,71|26,40|        0|    ok|
|14,50|72,96|18,06|34,24|        0|    ok|
|14,33|85,43| 8,97|30,19|        0|    ok|
|14,18|55,65|14,16|34,74|        0|    ok|
|14,06|58,50|10,04|29,77|        0|    ok|
|14,28|59,43| 6,42|11,85|        0|    ok|
|14,47|79,23| 8,83|24,78|        0|    ok|
|13,83|89,96|10,35|14,94|        0|    ok|
|13,94|63,25|13,54|23,35|        0|    ok|
|13,91|64,80|17,63|24,68|        0|    ok|
|14,24|64,82|17,59|15,99|        0|    ok|
|14,43|70,65|24,36|22,19|        0|    ok|
|13,93|85,23|18,37|28,60|        0|    ok|
|14,01|79,98|10,49|29,21|        0|    ok|
|14,03|55,24|19,24|17,94|        0|    ok|
|14,14|61,81|15,11|30,77|        0|    ok|
+-----+----

#### Print data frame schema. All variables all string type

In [3]:
println(bat_datos.count())
bat_datos.printSchema()

595
root
 |-- vcd: string (nullable = true)
 |-- amp: string (nullable = true)
 |-- tmp: string (nullable = true)
 |-- imp: string (nullable = true)
 |-- estado_id: string (nullable = true)
 |-- estado: string (nullable = true)



#### Transform data types from string to float

In [4]:
import org.apache.spark.sql.types.DoubleType;
import org.apache.spark.sql.types.IntegerType;
import org.apache.spark.sql.types.FloatType;

var df_datos = bat_datos.withColumn("estado_id", $"estado_id".cast(IntegerType))

df_datos = df_datos.withColumn("vcd", regexp_replace($"vcd", ",", ".").cast("float"))
df_datos = df_datos.withColumn("amp", regexp_replace($"vcd", ",", ".").cast("float"))
df_datos = df_datos.withColumn("tmp", regexp_replace($"vcd", ",", ".").cast("float"))
df_datos = df_datos.withColumn("imp", regexp_replace($"vcd", ",", ".").cast("float"))

df_datos.printSchema()


root
 |-- vcd: float (nullable = true)
 |-- amp: float (nullable = true)
 |-- tmp: float (nullable = true)
 |-- imp: float (nullable = true)
 |-- estado_id: integer (nullable = true)
 |-- estado: string (nullable = true)



import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.FloatType
df_datos: org.apache.spark.sql.DataFrame = [vcd: float, amp: float ... 4 more fields]
df_datos: org.apache.spark.sql.DataFrame = [vcd: float, amp: float ... 4 more fields]
df_datos: org.apache.spark.sql.DataFrame = [vcd: float, amp: float ... 4 more fields]
df_datos: org.apache.spark.sql.DataFrame = [vcd: float, amp: float ... 4 more fields]
df_datos: org.apache.spark.sql.DataFrame = [vcd: float, amp: float ... 4 more fields]


#### Filter data by "estado_id", print result data frame and number of records

In [5]:
var new_df=df_datos.filter(
    $"estado_id" === 1
).select(
    $"amp",
    $"tmp",
    $"estado"
)

new_df.show()
print("Registros: "+new_df.count())

+-----+-----+------+
|  amp|  tmp|estado|
+-----+-----+------+
| 13.1| 13.1|alerta|
|13.38|13.38|alerta|
|13.28|13.28|alerta|
|13.32|13.32|alerta|
|13.27|13.27|alerta|
|13.33|13.33|alerta|
|13.39|13.39|alerta|
|13.13|13.13|alerta|
|13.26|13.26|alerta|
| 13.1| 13.1|alerta|
|13.38|13.38|alerta|
|13.07|13.07|alerta|
|13.45|13.45|alerta|
|13.37|13.37|alerta|
|13.01|13.01|alerta|
|13.26|13.26|alerta|
|13.24|13.24|alerta|
|13.04|13.04|alerta|
|13.35|13.35|alerta|
|13.33|13.33|alerta|
+-----+-----+------+
only showing top 20 rows

Registros: 200

new_df: org.apache.spark.sql.DataFrame = [amp: float, tmp: float ... 1 more field]
