# Reader & Writer
##### Objetivos
1. Leer de CSV
1. Leer de JSON
1. Escribir DataFrames a ficheros
1. Escribir DataFrames a tables

##### Métodos
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#input-and-output" target="_blank">DataFrameReader</a>: **`csv`**, **`json`**, **`option`**, **`schema`**
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#input-and-output" target="_blank">DataFrameWriter</a>: **`mode`**, **`option`**, **`parquet`**, **`format`**, **`saveAsTable`**
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.types.StructType.html#pyspark.sql.types.StructType" target="_blank">StructType</a>: **`toDDL`**

##### Spark Types
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#data-types" target="_blank">Types</a>: **`ArrayType`**, **`DoubleType`**, **`IntegerType`**, **`LongType`**, **`StringType`**, **`StructType`**, **`StructField`**

In [None]:
%pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=230024f4f598e9258e19432d1c8d1c201f85dd6c46b567e48264ee1d95b1d6bf
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

spark = SparkSession.builder.master('local[*]').appName('reader-writer').getOrCreate()
sc = SparkContext.getOrCreate()

## DataFrameReader
Interfaz utilizada para cargar un DataFrame desde sistemas de almacenamiento externos.

**`spark.read.parquet("ruta/a/archivos")`**

DataFrameReader es accesible a través del atributo **`read`** de SparkSession. Esta clase incluye métodos para cargar DataFrames desde diferentes sistemas de almacenamiento externos.


### Leer desde archivos CSV
Leer desde un archivo CSV con el método **`csv`** de DataFrameReader y las siguientes opciones:

Separador de tabulaciones, utilizar la primera línea como encabezado, inferir esquema


In [None]:
df = (spark
  .read
  .option('sep', ',')
  .option('header', True)
  .option('inferSchema', True)
  .csv('/content/sample_data/california_housing_test.csv')
  )

In [None]:
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [None]:
df.schema

StructType([StructField('longitude', DoubleType(), True), StructField('latitude', DoubleType(), True), StructField('housing_median_age', DoubleType(), True), StructField('total_rooms', DoubleType(), True), StructField('total_bedrooms', DoubleType(), True), StructField('population', DoubleType(), True), StructField('households', DoubleType(), True), StructField('median_income', DoubleType(), True), StructField('median_house_value', DoubleType(), True)])

In [None]:
df2 = (spark
       .read
       .csv('/content/sample_data/california_housing_test.csv', sep=',', header=True, inferSchema=True)
)

df2.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [None]:
from pyspark.sql.types import DoubleType, StructType, StructField

user_defined_schema = StructType([
    StructField("longitude", DoubleType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("housing_median_age", DoubleType(), True),
    StructField("total_rooms", DoubleType(), True),
    StructField("total_bedrooms", DoubleType(), True),
    StructField("population", DoubleType(), True),
    StructField("households", DoubleType(), True),
    StructField("median_income", DoubleType(), True),
    StructField("median_house_value", DoubleType(), True)
])

In [None]:
df3 = (
    spark
    .read
    .option('sep', ',')
    .option('header', True)
    .schema(user_defined_schema)
    .csv('/content/sample_data/california_housing_test.csv')
)

df3.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [None]:
ddl_schema = "longitude double, latitude double, housing_median_age double, total_rooms double, total_bedrooms double, population double, households double, median_income double, median_house_value double"

df4 = (
    spark
    .read
    .option('sep', ',')
    .option('header', True)
    .schema(ddl_schema)
    .csv('/content/sample_data/california_housing_test.csv')
)

df4.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



### Leer de ficheros JSON

In [None]:
df5 = (
    spark
    .read
    .option('inferSchema', True)
    .json('/content/sample_data/anscombe.json')
)

df5.show()

+------+----+-----+---------------+
|Series|   X|    Y|_corrupt_record|
+------+----+-----+---------------+
|  NULL|NULL| NULL|              [|
|     I|10.0| 8.04|           NULL|
|     I| 8.0| 6.95|           NULL|
|     I|13.0| 7.58|           NULL|
|     I| 9.0| 8.81|           NULL|
|     I|11.0| 8.33|           NULL|
|     I|14.0| 9.96|           NULL|
|     I| 6.0| 7.24|           NULL|
|     I| 4.0| 4.26|           NULL|
|     I|12.0|10.84|           NULL|
|     I| 7.0| 4.81|           NULL|
|     I| 5.0| 5.68|           NULL|
|    II|10.0| 9.14|           NULL|
|    II| 8.0| 8.14|           NULL|
|    II|13.0| 8.74|           NULL|
|    II| 9.0| 8.77|           NULL|
|    II|11.0| 9.26|           NULL|
|    II|14.0|  8.1|           NULL|
|    II| 6.0| 6.13|           NULL|
|    II| 4.0|  3.1|           NULL|
+------+----+-----+---------------+
only showing top 20 rows



### DataFrameWriter

Interfaz usada para escribir un DataFrame en sistemas de almacenamiento externo.

In [None]:
spark.sql('create database db_dfw')

DataFrame[]

In [None]:
spark.sql('show databases').show()

+---------+
|namespace|
+---------+
|   db_dfw|
|  default|
+---------+



In [None]:
df5.write.mode('overwrite').saveAsTable('db_dfw.anscombe')

In [None]:
spark.sql('select * from db_dfw.anscombe').show()

+------+----+-----+---------------+
|Series|   X|    Y|_corrupt_record|
+------+----+-----+---------------+
|  NULL|NULL| NULL|              [|
|     I|10.0| 8.04|           NULL|
|     I| 8.0| 6.95|           NULL|
|     I|13.0| 7.58|           NULL|
|     I| 9.0| 8.81|           NULL|
|     I|11.0| 8.33|           NULL|
|     I|14.0| 9.96|           NULL|
|     I| 6.0| 7.24|           NULL|
|     I| 4.0| 4.26|           NULL|
|     I|12.0|10.84|           NULL|
|     I| 7.0| 4.81|           NULL|
|     I| 5.0| 5.68|           NULL|
|    II|10.0| 9.14|           NULL|
|    II| 8.0| 8.14|           NULL|
|    II|13.0| 8.74|           NULL|
|    II| 9.0| 8.77|           NULL|
|    II|11.0| 9.26|           NULL|
|    II|14.0|  8.1|           NULL|
|    II| 6.0| 6.13|           NULL|
|    II| 4.0|  3.1|           NULL|
+------+----+-----+---------------+
only showing top 20 rows

