## Базовые операции с RDD

In [None]:
dataset = ['Moscow', 'Madrid', 'Paris', 'Berlin', 'Barselona', 'Cairo', 'Perm']
rdd = spark.sparkContext.parallelize(dataset)
rdd

In [None]:
rdd.first()

In [None]:
rdd.take(2)

In [None]:
rdd\
    .map(lambda x: x.lower())\
    .first()

In [None]:
rdd\
    .map(lambda x: x.lower())\
    .collect()

In [None]:
rdd\
    .map(lambda x: x.lower())\
    .filter(lambda x: x.startswith('m'))\
    .collect()

In [None]:
rdd\
    .map(lambda x: x.lower())\
    .filter(lambda x: x.startswith('m'))\
    .map(lambda x: 'I love ' + x)\
    .collect()

In [None]:
rdd\
    .map(lambda x: x.lower())\
    .filter(lambda x: x.startswith('m'))\
    .map(lambda x: 'I love ' + x)\
    .map(lambda x: len(x))\
    .reduce(lambda x,y: x + y)

In [None]:
def first_letter(input):
    return input[0]

rdd.map(lambda x: first_letter(x)).collect()

## PairdRDD функции

In [None]:
import json
from pprint import pprint

In [None]:
rdd = sc.textFile('../datasets/data1.json')
rdd.first()

In [None]:
parsed = rdd.map(lambda x: json.loads(x))
parsed.first()

In [None]:
# fails
cities = parsed.map(lambda x: (x['name'], x['population']))
cities.first()
cities.count()

In [None]:
# fails
cities = parsed.map(lambda x: (x['name'], x.get('population', 0)))
cities.count()

In [None]:
def parse_json(data):
    try:
        parsed = json.loads(data)
    except json.decoder.JSONDecodeError:
        return (None, None)
    else:
        return (parsed['name'], parsed.get('population', 0))

cities = rdd.map(lambda x: parse_json(x))
cities.count()
cities.first()

In [None]:
# fails
population_agg = cities.reduceByKey(lambda x,y: x+y)
population_agg.collect()

In [None]:
population_agg = cities.filter(lambda x: x[0] and x[1]).reduceByKey(lambda x,y: x+y)
population_agg.count()
population_agg.first()

In [None]:
population_agg.max(key=lambda x: x[1])

In [None]:
population_agg.min(key=lambda x: x[1])

## Управление параллелизмом

In [None]:
rdd = sc.textFile('../datasets/data1.json')
print(rdd.getNumPartitions())
rdd.glom().collect()

In [None]:
repartitioned = rdd.repartition(10)
repartitioned.glom().collect()

In [None]:
rdd.coalesce(1).glom().collect()

In [None]:
def new_partitioner(data):
    return len(data) % 3

custom_rep = rdd\
    .map(lambda x: parse_json(x))\
    .filter(lambda x: x[0] and x[1])\
    .repartitionAndSortWithinPartitions(3, new_partitioner, True)

custom_rep.glom().collect()

## Кеширование и персистентность

In [None]:
from time import time

In [None]:
rdd = sc.parallelize(range(10000000))
start = time()
rdd.filter(lambda x: x % 2 == 0).count()
rdd.filter(lambda x: x % 2 == 0).filter(lambda x: x % 3 == 0).count()
rdd.filter(lambda x: x % 2 == 0).filter(lambda x: x % 3 == 0).filter(lambda x: x % 7 == 0).count()

end = time()
print('Took {}'.format(end - start))

In [None]:
rdd = sc.parallelize(range(10000000))
start = time()
tmp = rdd.filter(lambda x: x % 2 == 0)
tmp.cache()
tmp.count()
tmp.filter(lambda x: x % 3 == 0).count()
tmp.filter(lambda x: x % 3 == 0).filter(lambda x: x % 7 == 0).count()
end = time()
print('Took {}'.format(end - start))

## Бродкасты

In [None]:
simple_list = ["Apple", "Banana", "Orange", "Avocado"]
some_dict = {"A": 'foo', "B": 'bar', "C": 'meow'}
bc_dict = sc.broadcast(some_dict)
rdd = sc.parallelize(simple_list)
pair_rdd = rdd.map(lambda x: (x[0], len(x)))
mapped_rdd = pair_rdd.map(lambda x: bc_dict.value.get(x[0],'no idea'))
mapped_rdd.take(2)

## Остановка SparkContext

In [None]:
# Do not run until you have finished working with this notebook
sc.stop()