# Ввод и вывод в Spark

Структура шага

**здесь нужно редактирование**

* аудио io_intro
    * по нему слайд "источника данных"
* скринкаст 1 про работу с файлами (5-7 минут)
* в материалах то, по чему шел скринкаст
* скринкаст 2 про работу с базами данных (5-7 минут)
* в материалах то, по чему шел скринкаст
* дополнительные материалы (фрагменты ноутбуков с комментариями)
* задачи для самопроверки

### Источники данных

![](ideal.png)

* базы данных (JDBC)
* файлы (HDFS)

#### С источниками данных можно выполнить операции 

* читать данные
* записывать данные

Все это работает максимально эффективно - используется параллелизм кластера.

**скринкаст 1**

### Работа с файлами

Работать с файлами можно локально (т.е. spark рабоает в `local mode`) и в кластере (`yarn mode`), нужны только данные. Мы будем работать с файлами локально, с базами данных - в кластере.

Стандартная "шапка" - настройка окружения на работу со spark

**Важно** обратите внимание - SPARK_HOME указывает на путь, куда установлен Spark на локальном компютере (поправьте под себя) 

In [1]:
import os
from pyspark.sql import SparkSession

os.environ["SPARK_HOME"] = "/home/mk/mk_win/projects/SparkEdu/lib/python3.5/site-packages/pyspark"
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python3"
os.environ["PYSPARK_SUBMIT_ARGS"] = "pyspark-shell"

Создаем сессию, работаем в "локальном" режиме.

In [2]:
master = "local"
spark = SparkSession.builder.master(master).appName("spark_test").getOrCreate()

Загрузим наш файл со странами мира, на забываем - первая строка, заголовок (опция `header`)

In [3]:
# цепочка обработки при чтении

df = spark.read.format("csv") \
    .option("mode", "FAILFAST") \
    .option("inferSchema", "true") \
    .option("header","true") \
    .option("path", "data/countries of the world.csv") \
    .load()
# df.show(3)

Покажем первые 3 строки

In [4]:
df.show(3)

+------------+--------------------+----------+--------------+--------------------------+----------------------------+-------------+----------------------------------+------------------+------------+-----------------+----------+---------+---------+-------+---------+---------+-----------+--------+-------+
|     Country|              Region|Population|Area (sq. mi.)|Pop. Density (per sq. mi.)|Coastline (coast/area ratio)|Net migration|Infant mortality (per 1000 births)|GDP ($ per capita)|Literacy (%)|Phones (per 1000)|Arable (%)|Crops (%)|Other (%)|Climate|Birthrate|Deathrate|Agriculture|Industry|Service|
+------------+--------------------+----------+--------------+--------------------------+----------------------------+-------------+----------------------------------+------------------+------------+-----------------+----------+---------+---------+-------+---------+---------+-----------+--------+-------+
|Afghanistan |ASIA (EX. NEAR EA...|  31056997|        647500|                      48

Посмотреть схему файла можно с помощью метода `printSchema()`

In [5]:
df.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Population: integer (nullable = true)
 |-- Area (sq. mi.): integer (nullable = true)
 |-- Pop. Density (per sq. mi.): string (nullable = true)
 |-- Coastline (coast/area ratio): string (nullable = true)
 |-- Net migration: string (nullable = true)
 |-- Infant mortality (per 1000 births): string (nullable = true)
 |-- GDP ($ per capita): integer (nullable = true)
 |-- Literacy (%): string (nullable = true)
 |-- Phones (per 1000): string (nullable = true)
 |-- Arable (%): string (nullable = true)
 |-- Crops (%): string (nullable = true)
 |-- Other (%): string (nullable = true)
 |-- Climate: string (nullable = true)
 |-- Birthrate: string (nullable = true)
 |-- Deathrate: string (nullable = true)
 |-- Agriculture: string (nullable = true)
 |-- Industry: string (nullable = true)
 |-- Service: string (nullable = true)



Загрузка JSON файла (столицы) происходит полностью аналогично, меняются только формат файла и, возможно, набор опций (зависит от специфики файла).

In [7]:
# загрузим JSON
dfj = spark.read.format("json") \
    .option("mode", "FAILFAST") \
    .option("inferSchema", "true") \
    .option("path", "data/capital.json") \
    .load()
dfj.show(3)

+----------------+---------+-----+----------+----------+------+-------+------+---+------------+---------+------+--------+----------+---------+----+--------+----------+-----+--------+-----------+-----+------+---------+----------+--------+--------+-------------------+-----+---+--------+------+-------+---+--------+-----+--------+------+-----------+--------+------+-----------+-----+------------+------+--------+-------+-------+------+--------+------+-----+-----------+----------------+-------+------+------+--------+----------+------+-------------+-------+-----+-------+-----+--------+------+------+-----------+--------+----+-------+-------+--------+-----+----------+------+------------+-------+-------+-------------+-----+---------+----+------+-------+-----------+------+------+---------+--------------+-------+------+----------+---------+---+-----------+------+--------------+--------+-------+------+---------+--------------------+---------+------------+-------+------+---------+----+------------+--

Запись в файлы происходит так же просто (и универсально - нужно лишь указать формат результирующего файла).

In [8]:
# цепочка обработки при записи
df.write.format("csv") \
    .mode("overwrite") \
    .option("sep", "\t") \
    .save("data/new.csv")

In [9]:
# сохраним в виде JSON
df.write.format("json") \
    .mode("overwrite") \
    .save("data/new.json")

In [10]:
spark.stop()

### Комментарии

В-целом работать с файлами в Spark проще, чем без него (вспомним наш способ копирования файлов через внешние таблицы - там было гораздо больше действий). Общим действием является начало процесса - копирование файла в HDFS (см. ниже).

Важные моменты:

* при чтении файл должен быть в HDFS (для кластерного варианта), имя - директория
* при записи файл окажется в HDFS, имя - директория, заставить записать единичный файл - сложно...
* при записи есть проблема с заголовками (например, в CSV файлах) - они либо будут отсутствовать во всех файлах, либо присутствовать (что неудобно)
* mode - опция при чтении, метод - при записи...
* запятая в числах - фатально, преобразовывать до или после
* параллелизм не посмотреть - см. материалы
* опций много и они разные для разных видов файлов, см. материалы про основные из них
* при чтении можно указывать схему (`.schema(someSchema)`)

### Поддерживаются файлы

* CSV
* JSON
* Parquet
* ORC
* текстовые


**скринкаст 2**

### Работа с базами данных

Для работы с базой данных нам нужна

* база данных (поэтому выполняем в кластере)
* JDBC драйвера

особенности установки драйверов - см. материалы предыдущего шага.


С базами данных будем работать из кластера (в `cluster mode`), шапка стандартная, но чуть другая.

In [None]:
# настраиваем окружение для работы spark
import os

os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark"
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python3"
os.environ["HADOOP_CONF_DIR"] = "/etc/hadoop/conf"
#os.environ["PYSPARK_SUBMIT_ARGS"] = "--executor-memory 2G --num-executors 16 --executor-cores 2 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = """--driver-class-path /usr/share/java/postgresql.jar 
--jars /usr/share/java/postgresql.jar --executor-memory 600M --driver-memory 1G --num-executors 1 pyspark-shell"""

# добавляем модуль в путь
import sys
sys.path.append('/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python')

In [None]:
from pyspark.sql import SparkSession
master = "yarn"
spark = SparkSession.builder.master(master).appName("spark_test").enableHiveSupport().getOrCreate()

Чтение из базы данных с помощью Spark - это просто, указывается абсолютный минимум параметров.

In [None]:
# цепочка обработки при чтении
df = spark.read.format("jdbc") \
    .option("url","jdbc:postgresql://localhost:7432/scm") \
    .option("dbtable","hosts") \
    .option("user","cloudera-scm") \
    .option("password","2MhalIGcSp") \
    .load()

Запись в базу данных (Hive) происходит так же просто (как и запись в файл), фактически меняется один метод - вместо `save()` вызываем `saveAsTable()`

In [None]:
# цепочка обработки при записи
jdbcDF.write.format("parquet") \
    .mode('overwrite') \
    .option("compression","gzip") \
    .saveAsTable("sp_hosts")

### Комментарии

* см. предыдущий шаг про настройку JDBC
* вместо имени таблицы в опции `dbtable` может стоять `select` (см. документацию)
* схема данных переносится автоматически
* сохранять можно не только в Hive, в любую реляционную СУБД (для которой настроен JDBC доступ)
* обратите внимание на `enableHiveSupport()` при создании сессии - это нужно для того, чтобы Spark работал с теми же метаданными, что и Hive
* мы познакомимся с еще одним способом загрузки данных из реляционных СУБД, когда будем говорить про Spark SQL
* параллелизм посмотреть на таком кластере не получится, см. материалы ниже.

### Эффективная работа с базами данных и файлами (параллелизм)

Для эффективной (параллельной) работы с базами данных и файлами нужно

* наличие нескольких executor-ов
* разбиение dataframe на разделы (partitions)

**Executor-ы**

Их количество мы указываем в параметрах запуска Spark приложения (см. предыщущий шаг).

**Разделы**

Для файлов разделы "наследуются" из способа разбиения файла на split-ы (см. HDFS).

Для реляционных баз данных мы задаем количество партиций явно (см. параметр `numPartitions` ниже).

Кроме того, необходимо (как и в случае `sqoop`) "подсказать" Spark-у - как правильнее разбить исходную таблицу на разделы (правильнее = равномернее). Для этого служит набор опций, перечисленный ниже. Идеология ровно такая же, как и в случае `sqoop` - нужно указать примерный диапазон значений числовой колонки, по которой производится разбиение, само разделение обеспечит Spakr.

In [None]:

    .option("lowerBound", 712415) \
    .option("upperBound", 81792182) \
    .option("partitionColumn", "CONTRACT_ID") \
    .option("numPartitions", 100) \


При записи количество разделов будет совпадать с количеством файлов в директории с таблицей, с помощью вызова метода `repartition()` можно явно управлять этим количеством (вызов метода приведет к так называемому `shuffle` - передаче данных между узлами кластера.

**Проблема плохих полей в файлах**

Если при загрузке данных из файлов (например, CSV) в файле в полях присутствуют разделители (полей или строк), то загрузка закончится плохо (также, как и при работе с внешними файлами). Чтобы бороться с этим в Spark есть опции, позволяющие задать разделители полей и строк, а также позволяющие загружать "экранированные" кавычками строки.

**Вопросы для самопроверки**

* для каких файлов нужна явная схема
* как будут записаны файлы (директория)
* как задать присутствие заголовка в csv
* 