# Punto de entrada

Vamos a crear un punto de entrada al API de dataframes y dataset.

In [1]:
from pyspark.sql import SparkSession


In [2]:
spark= SparkSession.builder.appName("Trabajando con Spark SQL").getOrCreate()

Lo primero que vamos a leer va a ser un fichero json que representa la tabla periodica y lo vamos a almacenar en un dataframe sobre el que vamos a ir realizando diferentes acciones como si se tratara de un RDD.


Nota: el formato json en spark SQL es un formato por línea , como si fuera un CSV, por lo tanto, hay que transformar el listado de objetos en un una fila por cada objeto.

In [4]:
import json
with open('PeriodicTableJSON.json') as data_file:    
    data = json.load(data_file)
    

    with open('PeriodicTableJSON.jsonl', 'w') as outfile:
        for entry in data:
            json.dump(entry, outfile)
            outfile.write('\n')

In [6]:
df = spark.read.json("PeriodicTableJSON.jsonl")

In [7]:
df.show()

+--------------------+-------------+-------+--------------------+-----+-------+--------------------+-------+----------+----------+--------------------+------+------+-----+--------------------+--------------------+--------------------+------+----+----+
|          appearance|  atomic_mass|   boil|            category|color|density|       discovered_by|   melt|molar_heat|      name|            named_by|number|period|phase|              source|        spectral_img|             summary|symbol|xpos|ypos|
+--------------------+-------------+-------+--------------------+-----+-------+--------------------+-------+----------+----------+--------------------+------+------+-----+--------------------+--------------------+--------------------+------+----+----+
|       colorless gas|        1.008| 20.271|   diatomic nonmetal| null|0.08988|     Henry Cavendish|  13.99|    28.836|  Hydrogen|   Antoine Lavoisier|     1|     1|  Gas|https://en.wikipe...|https://en.wikipe...|Hydrogen is a che...|     H|   

In [8]:
df.printSchema()

root
 |-- appearance: string (nullable = true)
 |-- atomic_mass: double (nullable = true)
 |-- boil: double (nullable = true)
 |-- category: string (nullable = true)
 |-- color: string (nullable = true)
 |-- density: double (nullable = true)
 |-- discovered_by: string (nullable = true)
 |-- melt: double (nullable = true)
 |-- molar_heat: double (nullable = true)
 |-- name: string (nullable = true)
 |-- named_by: string (nullable = true)
 |-- number: string (nullable = true)
 |-- period: long (nullable = true)
 |-- phase: string (nullable = true)
 |-- source: string (nullable = true)
 |-- spectral_img: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- xpos: long (nullable = true)
 |-- ypos: long (nullable = true)



In [9]:
df.select("name").show()

+----------+
|      name|
+----------+
|  Hydrogen|
|    Helium|
|   Lithium|
| Beryllium|
|     Boron|
|    Carbon|
|  Nitrogen|
|    Oxygen|
|  Fluorine|
|      Neon|
|    Sodium|
| Magnesium|
| Aluminium|
|   Silicon|
|Phosphorus|
|    Sulfur|
|  Chlorine|
|     Argon|
| Potassium|
|   Calcium|
+----------+
only showing top 20 rows



Seleccionamos los elementos químicos que tengan la masa atómica menor que 200 y mostramos los 10 primeros.

In [10]:
df.select(df['name'],df['atomic_mass']).filter(df['atomic_mass']<200).show(10)

+---------+-------------+
|     name|  atomic_mass|
+---------+-------------+
| Hydrogen|        1.008|
|   Helium|    4.0026022|
|  Lithium|         6.94|
|Beryllium|   9.01218315|
|    Boron|        10.81|
|   Carbon|       12.011|
| Nitrogen|       14.007|
|   Oxygen|       15.999|
| Fluorine|18.9984031636|
|     Neon|     20.17976|
+---------+-------------+
only showing top 10 rows



In [11]:
df.groupBy('phase').count().show()

+------+-----+
| phase|count|
+------+-----+
| Solid|  104|
|Liquid|    2|
|   Gas|   12|
+------+-----+



Ahora vamos a ver como a partir de un dataframe podemos generar una tabla temporal sobre la que ejecutaremos sentencias en SQL.

In [12]:
df.createGlobalTempView("chemistryTable")

In [13]:
spark.sql("select name from global_temp.chemistryTable").show(10)

+---------+
|     name|
+---------+
| Hydrogen|
|   Helium|
|  Lithium|
|Beryllium|
|    Boron|
|   Carbon|
| Nitrogen|
|   Oxygen|
| Fluorine|
|     Neon|
+---------+
only showing top 10 rows



Como ya hemos comentado en el post, python no permite construir estructuras de dataset. Para que te hagas una idea si vienes del mundo Java o Scala. La creación de dataset se basa en la definición de una clase y permite añadir objetos de esa clase. El resultado es una estructura en formato de tabla como el dataframe mostrado en nuestro caso.

## Infiriendo el esquema

En Spark SQL, existen dos formas de inferir el esquema un dataframe. Una es mediante reflexión y la otra es explicitamente con programación. A continuación vamos a ver ambos casos sobre un documento txt que contiene el elemento químico y su masa atómica.

In [15]:
from pyspark.sql import Row
sc = spark.sparkContext
lines=sc.textFile("Periodictable.txt")
parts= lines.map(lambda p: p.split(","))

elements= parts.map(lambda e: Row(name=e[0],atomic_mass=float(e[1])))
elements.count()

10

In [27]:
schemeElements=spark.createDataFrame(elements)
schemeElements.createOrReplaceTempView("elements")

In [28]:
lightElements=spark.sql("select name from elements where atomic_mass>0 and atomic_mass<21")

In [33]:
lightElemName=lightElements.rdd.map(lambda elem: "Name: "+elem.name).collect()
for name in lightElemName:
    print(name)

Name: Hydrogen
Name: Helium
Name: Lithium
Name: Beryllium
Name: Boron
Name: Carbon
Name: Nitrogen
Name: Oxygen
Name: Fluorine
Name: Neon


Ahora vamos a ver como se haría programáticamente.

In [38]:
from pyspark.sql.types import *

sc=spark.sparkContext

lines=sc.textFile("sql/Periodictable.txt")
parts=lines.map(lambda line: line.split(","))
elements= parts.map(lambda p: (p[0],p[1]))

schemeString="name atomicMass"

fields= [StructField(field_name,StringType(),True) for field_name in schemeString.split()]
scheme =StructType(fields)

schemeElements= spark.createDataFrame(elements,scheme)

schemeElements.createOrReplaceTempView("elements")

spark.sql("select name,atomicMass from elements").show()

schemeElements.printSchema()

+---------+-------------+
|     name|   atomicMass|
+---------+-------------+
| Hydrogen|        1.008|
|   Helium|    4.0026022|
|  Lithium|         6.94|
|Beryllium|   9.01218315|
|    Boron|        10.81|
|   Carbon|       12.011|
| Nitrogen|       14.007|
|   Oxygen|       15.999|
| Fluorine|18.9984031636|
|     Neon|     20.17976|
+---------+-------------+

root
 |-- name: string (nullable = true)
 |-- atomicMass: string (nullable = true)



## Data Source


Existen multitud de formatos disponible en Spark SQL (json,parquet,jdbc,orc,libsvm,csv,text,...) aunque el formato por defecto es parquet.
En este apartado vamos a ver el manejo de diferentes formatos de datos y la comunicación con Hive, Parquet y JDBC para guardar/recuperar información.