Lectura y escritura de ficheros
===============================

### Sistemas de ficheros soportados
-   Igual que Hadoop, Spark soporta diferentes filesystems: local, HDFS, Amazon S3

    -   En general, soporta cualquier fuente de datos que se pueda leer con Hadoop

-   También, acceso a bases de datos relacionales o noSQL

    -   MySQL, Postgres, etc. mediante JDBC
    -   Apache Hive, HBase, Cassandra o Elasticsearch

### Formatos de fichero soportados

-   Spark puede acceder a diferentes tipos de ficheros:

    -   Texto plano, CSV, ficheros sequence, JSON, *protocol buffers* y *object files*
        -   Soporta ficheros comprimidos
    -   Veremos el acceso a algunos tipos en esta clase, y dejaremos otros para más adelante 


#### Ejemplos con ficheros de texto

En el directorio `../datos/libros` hay un conjunto de ficheros de texto comprimidos.

### Funciones de lectura y escritura con ficheros de texto


- `sc.textFile(nombrefichero/directorio)` Crea un RDD a partir las líneas de uno o varios ficheros de texto
    - Si se especifica un directorio, se leen todos los ficheros del mismo, creando una partición por fichero
    - Los ficheros pueden estar comprimidos, en diferentes formatos (gz, bz2,...)
    - Pueden especificarse comodines en los nombres de los ficheros
- `sc.wholeTextFiles(nombrefichero/directorio)` Lee ficheros y devuelve un RDD clave/valor
    - clave: path completo al fichero
    - valor: el texto completo del fichero
- `rdd.saveAsTextFile(directorio_salida)` Almacena el RDD en formato texto en el directorio indicado
    - Crea un fichero por partición del rdd


In [5]:
# Lee todos los ficheros del directorio
# y crea un RDD con las líneas
lineas = sc.textFile("../datos/libros")

# Se crea una partición por fichero de entrada
print("Número de particiones del RDD lineas = {0}".format(
       lineas.getNumPartitions()))

In [6]:
# Obtén las palabras usando el método split (split usa un espacio como delimitador por defecto)
palabras = lineas.flatMap(lambda x: x.split())
print("Número de particiones del RDD palabras = {0}".format(
       palabras.getNumPartitions()))

In [7]:
# Reparticiono el RDD en 4 particiones       
palabras2 = palabras.coalesce(4)
print("Número de particiones del RDD palabras2 = {0}".format(
       palabras2.getNumPartitions()))

In [8]:
# Toma una muestra aleatoria de palabras
print(palabras2.takeSample(False, 10))

In [9]:
# Salva el RDD palabras en varios ficheros de salida
# (un fichero por partición)
palabras2.saveAsTextFile("file:///tmp/salidatxt")

In [11]:
# Lee los ficheros y devuelve un RDD clave/valor
# clave->nombre fichero, valor->fichero completo
prdd = sc.wholeTextFiles("../datos/libros/p*.gz")
print("Número de particiones del RDD prdd = {0}\n".format(
       prdd.getNumPartitions()))

In [12]:
# Obtiene un lista clave/valor
# clave->nombre fichero, valor->numero de palabras
lista = prdd.mapValues(lambda x: len(x.split())).collect()

for libro in lista:
    print("El fichero {0:14s} tiene {1:6d} palabras".format(
           libro[0].split("/")[-1], libro[1]))

### Ficheros Sequence
Ficheros clave/valor usados en Hadoop

-   Sus elementos implementan la interfaz [`Writable`](https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/io/Writable.html)

In [14]:
rdd = sc.parallelize([("a",2), ("b",5), ("a",8)], 2)

# Salvamos el RDD clave valor como fichero de secuencias
rdd.saveAsSequenceFile("file:///tmp/sequenceoutdir2")

In [16]:
# Lo leemos en otro RDD
rdd2 = sc.sequenceFile("file:///tmp/sequenceoutdir2", 
                       "org.apache.hadoop.io.Text", 
                       "org.apache.hadoop.io.IntWritable")
                       
print("Contenido del RDD {0}".format(rdd2.collect()))

### Formatos de entrada/salida de Hadoop
Spark puede interactuar con cualquier formato de fichero soportado por Hadoop 
- Soporta las APIs “vieja” y “nueva”
- Permite acceder a otros tipos de almacenamiento (no fichero), p.e. HBase o MongoDB, a través de `saveAsHadoopDataSet` y/o `saveAsNewAPIHadoopDataSet`


In [18]:
# Salvamos el RDD clave/valor como fichero de texto Hadoop (TextOutputFormat)
rdd.saveAsNewAPIHadoopFile("file:///tmp/hadoopfileoutdir", 
                            "org.apache.hadoop.mapreduce.lib.output.TextOutputFormat",
                            "org.apache.hadoop.io.Text",
                            "org.apache.hadoop.io.IntWritable")
                            

In [20]:
# Lo leemos como fichero clave-valor Hadoop (KeyValueTextInputFormat)
rdd3 = sc.newAPIHadoopFile("file:///tmp/hadoopfileoutdir", 
                          "org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat",
                          "org.apache.hadoop.io.Text",
                          "org.apache.hadoop.io.IntWritable")
                          
print("Contenido del RDD {0}".format(rdd3.collect()))

### Tarea

A partir del fichero apat63_99.txt, crear un conjunto de ficheros secuencia, que se almacenarán en el directorio apat63_99-seq. En estos ficheros, la clave tiene que ser el país (campo "COUNTRY") y el valor un string formado por el número de patente (campo "PATENT") y el año de concesión (campo "GYEAR") separados por una coma. Una línea de esto ficheros será, por ejemplo:

> BE &nbsp;&nbsp;&nbsp;&nbsp; 3070801,1963

### Tarea

Escribir un programa Scala Spark que, a partir de los ficheros cite75_99.txt y apat63_99-seq, obtenga, para cada patente, el país, el año y el número de citas.

Utilizar un *full outer join* para unir, por el campo común (el número de patente) los RDDs asociados a ambos ficheros.
