In [None]:
import os
import sys

SPARK_HOME = "/usr/lib/spark3"
PYSPARK_PYTHON = "/opt/conda/envs/dsenv/bin/python"
os.environ["PYSPARK_PYTHON"]= PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"]= PYSPARK_PYTHON
os.environ["SPARK_HOME"] = SPARK_HOME

PYSPARK_HOME = os.path.join(SPARK_HOME, "python/lib")
sys.path.insert(0, os.path.join(PYSPARK_HOME, "py4j-0.10.9.5-src.zip"))
sys.path.insert(0, os.path.join(PYSPARK_HOME, "pyspark.zip"))

# Apache Spark lost chapters. RDD, CLI, UI
# /opt/shared/
### 1. Смотрим в Spark UI
### 2. Исследуем Spark CLI
### 3. Пробегаемся по Spark RDD

# Dataframes, UI, Tuning

## Оптимизации Spark

### А какие у нас есть еще опции?
[Очень много](https://spark.apache.org/docs/3.3.0/configuration.html)

In [None]:
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession

In [None]:
conf = SparkConf()
conf.set('spark.ui.port', '14099')
conf.set('spark.driver.memory', '2G')
conf.set('spark.executor.memory', '2G')

In [None]:
for p in sorted(conf.getAll(), key=lambda p: p[0]):
    print(p)

In [None]:
for p in sorted(spark.sparkContext.getConf().getAll(), key=lambda p: p[0]):
    print(p)

In [None]:
spark = SparkSession.builder.config(conf=conf).appName("spark_next_part").getOrCreate()

### Кривые данные

In [None]:
data = """{"a": 1, "b":2, "c":3}|{"a": 1, "b":2, "c":3}|{"a": 1, "b, "c":10}""".split('|')

In [None]:
data

In [None]:
corruptDF = (spark.read
  .option("mode", "PERMISSIVE")
  .option("columnNameOfCorruptRecord", "_corrupt_record")
  .json(spark.sparkContext.parallelize(data))
)
corruptDF.show()

На клетке выше Спарк обработал испорченные данные и поместил их в специальную колонку, остальное успешно обработал.
Это поведение по умолчаниб, мы ничего не применяли опции `mode` и `columnNameOfCorruptRecord`.

Есть три варианта обработки испорченных записей через опцию [`ParseMode`](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala#L34):

| `ParseMode` | Поведение |
|-------------|----------|
| `PERMISSIVE` | Складывает все испорченные записии в специальную колонку "_corrupt_record" (по умолчанию) |
| `DROPMALFORMED` | Игнорирует все испорченные записи |
| `FAILFAST` | Выкидывает исключение на первой же неправильной записи |

Следующая клетка уже выкидывает испорченные данные:

In [None]:
corruptDF = (spark.read
  .option("mode", "DROPMALFORMED")
  .json(spark.sparkContext.parallelize(data))
)
corruptDF.show()

### Кэширование
Какие вообще у нас опции?


* `DataFrame.persist(..)`

In [None]:
pyspark.sql.DataFrame.persist?

In [None]:
pyspark.storagelevel.StorageLevel?

In [None]:
log.count()

In [None]:
log.cache()

In [None]:
%%time
log.count()

In [None]:
log.unpersist()

In [None]:
spark.catalog.clearCache()

### Spark UI
* Список запланированнных, исполняемых и исполненных этапов (stages) и задач (tasks)
* Информация о размере RDD и использовании памяти
* Информация об окружении
* Информация об экзекуторах
* Все задачи Spark SQL

### Давайте заглянем!

In [None]:
spark

## Apache Spark RDD

### Что такое RDD?
Resilient Distributed Dataset, основа основ Apache Spark, низкоуровненая базовая абстракция, в которую превратится ваш код на DataFrame API

Из чего состоит RDD
* Dependencies
* Partitions
* Functions
* (если определен) Partitioner
* (если определен) Locations

Transformations и Actions точно так же работают и в RDD!

### Зачем использовать RDD?
* Нужен функционал, которого не оказалось в датафреймах 
* У вас есть старая кодовая база с RDD
* Вам нужен контроль за данными и за распределенными переменными
* Вы хотите добиться высокой производительности (**не в Python!**)

In [None]:
spark.range(10).rdd

In [None]:
spark.range(10).toDF("id").rdd.map(lambda row: row[0])

Всегда можно конвертировать туда и обратно!

In [None]:
spark.range(10).rdd.toDF()

In [None]:
myCollection = "Big Data and Data Engineering for AI Masters"\
.split(" ")
words = spark.sparkContext.parallelize(myCollection, 2)

In [None]:
words

In [None]:
words.setName("bigdata")

In [None]:
words.name()

#### Transformations

In [None]:
# distinct
words.distinct().count()

In [None]:
# filter
def startsWithD(word):
    return word.startswith("D")

In [None]:
words.filter(lambda word: startsWithD(word)).collect()

In [None]:
# map
words2 = words.map(lambda word: (word, word[0], word.startswith("D")))

In [None]:
words2.filter(lambda record: record[2]).take(5)

In [None]:
# flatMap
words.flatMap(lambda word: list(word)).take(7)

In [None]:
# sort
words.sortBy(lambda word: len(word) * -1).take(2)

In [None]:
# Random Splits

In [None]:
fiftyFiftySplit = words.randomSplit([0.5, 0.5])

In [None]:
fiftyFiftySplit[0].collect()

In [None]:
fiftyFiftySplit[1].collect()

#### Actions

In [None]:
# reduce
spark.sparkContext.parallelize(range(1, 21)).reduce(lambda x, y: x + y)

In [None]:
def wordLengthReducer(leftWord, rightWord):
    if len(leftWord) > len(rightWord):
        return leftWord
    else:
        return rightWord

In [None]:
words.reduce(wordLengthReducer)

In [None]:
# count
words.count()

In [None]:
# countApprox
confidence = 0.95
timeoutMilliseconds = 100
words.countApprox(timeoutMilliseconds, confidence)

In [None]:
words.countApprox?

In [None]:
# countApproxDistinct
words.countApproxDistinct(0.05)

In [None]:
words.countApproxDistinct?

In [None]:
# countByValue
words.countByValue()

In [None]:
# first
words.first()

In [None]:
# max and min
spark.sparkContext.parallelize(range(1,20)).max()

In [None]:
spark.sparkContext.parallelize(range(1,20)).min()

In [None]:
# take
words.take(5)

In [None]:
words.takeOrdered(5)

In [None]:
words.top(5)

In [None]:
words.takeSample(True, 10, 42)

In [None]:
words.takeSample?

#### Caching

In [None]:
words.cache()

In [None]:
words.getStorageLevel()

In [None]:
# сheckpointing
spark.sparkContext.setCheckpointDir("/tmp/rdd_checkpoint")

In [None]:
words.checkpoint()

In [None]:
#piping
words.pipe("wc -l").collect()

In [None]:
words.getNumPartitions()

In [None]:
# mapPartitions
words.mapPartitions(lambda part: [1]).sum()

In [None]:
def indexedFunc(partitionIndex, withinPartIterator):
    return ["partition: {} => {}".format(partitionIndex, x) for x in withinPartIterator]
words.mapPartitionsWithIndex(indexedFunc).collect()

#### Key-Value RDDs
Многие методы подразумевают K-V структуру для выполнения.  
Эта группа выглядит как **"что-то"ByKey**

In [None]:
# keyBy
keyword = words.keyBy(lambda word: word.lower()[0])

In [None]:
keyword.take(5)

In [None]:
# Mapping over Values
keyword.mapValues(lambda word: word.upper()).collect()

In [None]:
#flatMap
keyword.flatMapValues(lambda word: word.upper()).collect()

In [None]:
# Извлекаем K-V
keyword.keys().collect()

In [None]:
keyword.values().collect()

In [None]:
# lookup
keyword.lookup("d")

In [None]:
# sampleByKey
import random
distinctChars = words.flatMap(lambda word: list(word.lower())).distinct()\
.collect()
sampleMap = dict(map(lambda c: (c, random.random()), distinctChars))
words.map(lambda word: (word.lower()[0], word))\
.sampleByKey(True, sampleMap, 6).collect()

#### Aggregations

In [None]:
chars = words.flatMap(lambda word: word.lower())
KVcharacters = chars.map(lambda letter: (letter, 1))
def maxFunc(left, right):
    return max(left, right)
def addFunc(left, right):
    return left + right
nums = spark.sparkContext.parallelize(range(1,31), 5)

In [None]:
# countByKey
KVcharacters.countByKey()

In [None]:
# groupByKey
from functools import reduce
KVcharacters.groupByKey().map(lambda row: (row[0], reduce(addFunc, row[1])))\
.collect()

In [None]:
# reduceByKey
KVcharacters.reduceByKey(addFunc).collect()

In [None]:
# aggregate
nums.aggregate(0, maxFunc, addFunc)

In [None]:
#treeAggregate
depth = 3
nums.treeAggregate(0, maxFunc, addFunc, depth)

In [None]:
nums.treeAggregate?

In [None]:
# aggregateByKey
KVcharacters.aggregateByKey(0, addFunc, maxFunc).collect()

In [None]:
# combineByKey
def valToCombiner(value):
    return [value]
def mergeValuesFunc(vals, valToAppend):
    vals.append(valToAppend)
    return vals
def mergeCombinerFunc(vals1, vals2):
    return vals1 + vals2
outputPartitions = 6
KVcharacters.combineByKey(valToCombiner, mergeValuesFunc, mergeCombinerFunc, outputPartitions)\
.collect()

In [None]:
# foldByKey
KVcharacters.foldByKey(0, addFunc).collect()

#### Joins

In [None]:
# joins
distinctChars = words.flatMap(lambda word: word.lower()).distinct()
keyedChars = distinctChars.map(lambda c: (c, random.random()))
outputPartitions = 10
KVcharacters.join(keyedChars).count()
KVcharacters.join(keyedChars, outputPartitions).count()

In [None]:
# zips
numRange = spark.sparkContext.parallelize(range(8), 2)
words.zip(numRange).collect()

#### Controlling Partitions

In [None]:
# coalesce
words.coalesce(1).getNumPartitions()

In [None]:
# repartition
words.repartition(10).getNumPartitions()

#### Broadcast Variables

In [None]:
supplementalData = {"Big": 10000, "Data": -100, "Engineering": 400, "AI": 100}

In [None]:
suppBroadcast = spark.sparkContext.broadcast(supplementalData)

In [None]:
suppBroadcast.value

In [None]:
words.map(lambda word: (word, suppBroadcast.value.get(word, 0)))\
.sortBy(lambda wordPair: wordPair[1])\
.collect()

#### Accumulators

In [None]:
accBest = spark.sparkContext.accumulator(0)

In [None]:
schema = StructType(fields=[
    StructField("userId", IntegerType()),
    StructField("movieId", IntegerType()),
    StructField("rating", FloatType()),
    StructField("timestamp", TimestampType()),
])

In [None]:
df = spark.read\
          .schema(schema)\
          .format("csv")\
          .option("sep", ",")\
          .option("header", "true")\
          .load("/datasets/movielens/ratings.csv")

In [None]:
def accBestFunc(movie_row):
    rating = movie_row["rating"]
    if rating >= 4:
        accBest.add(1)

In [None]:
df.rdd.getNumPartitions()

In [None]:
df.foreach(lambda movie_row: accBestFunc(movie_row))

In [None]:
accBest.value

In [None]:
df.count()

## Spark CLI

In [None]:
!spark3-submit --help

In [None]:
!cat ~/spark_example.py

In [None]:
!cat ~/spark_example.sh

In [None]:
! ~/spark_example.sh

In [None]:
spark.stop()