In [1]:
from pyspark.context import SparkContext
sc = SparkContext(master='local[*]', appName='Spark_App')

In [2]:
sc

In [3]:
# Spark UI http://localhost:4040

> Wzorzec map-reduce to sposób takiego organizowania przetwarzania, aby wykorzystać potencjał wielu maszyn w klastrze i jednocześnie zgrupować możliwie jak najwięcej przetwarzania i przetwarzanych danych w tych samych miejscach.

## Utworzenie nowego RDD

Główna abstrakcja Spark zapewnia odporny rozproszony zestaw danych RDD (Resilient Distributed Datasets), który jest zbiorem elementów podzielonych między węzły klastra, na których można operować równolegle.

Kolekcja rozproszona:
RDD wykorzystuje operacje MapReduce, które są powszechnie stosowane do przetwarzania i generowania dużych zestawów danych za pomocą równoległego, rozproszonego algorytmu w klastrze. Umożliwia użytkownikom pisanie obliczeń równoległych przy użyciu zestawu operatorów wysokiego poziomu, bez martwienia się o rozkład pracy i odporność na uszkodzenia.

Niezmienne: RDD złożone ze zbioru rekordów podzielonych na partycje. Partycja jest podstawową jednostką równoległości w RDD, a każda partycja jest jednym logicznym podziałem danych, który jest niezmienny i utworzony przez pewne transformacje istniejących partycji. Niezmienność pomaga osiągnąć spójność obliczeń.

Odporny na błędy: W przypadku utraty części partycji RDD, możemy odtworzyć transformację na tej partycji w linii, aby uzyskać to samo obliczenie, a nie przeprowadzanie replikacji danych w wielu węzłach. Ta cecha jest największą zaletą RDD, ponieważ oszczędza wiele wysiłku w zarządzaniu danymi i replikacji, a tym samym zapewnia szybsze obliczenia.

Leniwe oceny: Wszystkie transformacje w Spark są leniwe, ponieważ nie obliczają od razu swoich wyników. Zamiast tego pamiętają po prostu transformacje zastosowane do jakiegoś podstawowego zestawu danych. Transformacje są obliczane tylko wtedy, gdy akcja wymaga zwrócenia wyniku do programu sterownika.

Transformacje funkcjonalne: RDD obsługują dwa typy operacji: transformacje, które tworzą nowy zestaw danych z istniejącego, oraz akcje, które zwracają wartość do sterownika program po uruchomieniu obliczeń na zbiorze danych.

Formaty przetwarzania danych:
Może łatwo i wydajnie przetwarzać dane zarówno ustrukturyzowane, jak i nieustrukturyzowane.

Obsługiwane języki programowania:
Interfejs API RDD jest dostępny w Javie, Scali, Python i R.

### TRANSFORMACJE

In [4]:
nums = sc.parallelize([1,2,3,4,5])
squared = nums.map(lambda x: x * x)
results = squared.collect()
for result in results:
    print(result)

1
4
9
16
25


In [5]:
nums = sc.parallelize([1,2,2,3,4,4,5])
distinct = nums.distinct()
print(distinct.collect())

[2, 4, 1, 3, 5]


In [6]:
rdd1 = sc.parallelize([1,2,3,4,5])
rdd2 = sc.parallelize([4,5,6,7])

union = rdd1.union(rdd2)
print(union.collect())

intersection = rdd1.intersection(rdd2)
print(intersection.collect())

subtract = rdd1.subtract(rdd2)
print(subtract.collect())

cartesian = rdd1.cartesian(rdd2)
print(cartesian.collect())

[1, 2, 3, 4, 5, 4, 5, 6, 7]
[4, 5]
[1, 2, 3]
[(1, 4), (1, 5), (2, 4), (2, 5), (1, 6), (1, 7), (2, 6), (2, 7), (3, 4), (3, 5), (4, 4), (4, 5), (5, 4), (5, 5), (3, 6), (3, 7), (4, 6), (4, 7), (5, 6), (5, 7)]


### AKCJE

In [7]:
nums = sc.parallelize([1,2,3,4,5])
sum = nums.reduce(lambda x, y: x + y)
print(sum)

15


In [8]:
nums = sc.parallelize([1,2,3,4,5])
sum = nums.fold(0, lambda x, y: x + y)
print(sum)

15


In [9]:
nums = sc.parallelize([1,1,3,3,5])
def f(x): print(x)

nums.foreach(f)

In [10]:
import random

n = 10

def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

inside(0)

True

In [11]:
sc.parallelize(range(0,n)).collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [12]:
sc.parallelize(range(0,n)).filter(inside).collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9]

In [13]:
sc.parallelize(range(0,n)).filter(inside).count()

10

In [14]:
n = 1000000

count = sc.parallelize(range(0,n)).filter(inside).count()
pi = 4*count/n

In [15]:
print("Pi is roughly %f" % (pi))

Pi is roughly 3.143196


## Utworzenie nowego DataFrame

DataFrame to rozproszony zbiór danych zorganizowany w nazwane kolumny. Jest to koncepcyjnie ekwiwalent tabeli w relacyjnej bazie danych lub ramce danych R/Python. Wraz z Dataframe firma Spark wprowadziła również optymalizator katalizatora, który wykorzystuje zaawansowane funkcje programowania do budowy rozszerzalnego optymalizatora zapytań.

Rozproszony zbiór obiektu wiersza: DataFrame to rozproszony zbiór danych zorganizowany w nazwane kolumny. Jest to koncepcyjnie odpowiednik tabeli w relacyjnej bazie danych, ale z bogatszymi optymalizacjami pod maską.

Przetwarzanie danych: Przetwarzanie ustrukturyzowanych i nieustrukturyzowanych formatów danych (Avro, CSV, wyszukiwanie elastyczne i Cassandra) oraz systemów pamięci masowej (HDFS, tabele gałęzi, MySQL itp. ). Może czytać i pisać ze wszystkich tych różnych źródeł danych.

Strukturyzując dane, DataFrames pozwala silnikowi Apache Spark (Catalyst Optimizer) udoskonalić wydajność zapytań. 

Zgodność gałęzi: Za pomocą Spark SQL można uruchamiać niezmodyfikowane zapytania Hive w istniejących magazynach Hive. Ponownie wykorzystuje interfejs Hive i interfejs MetaStore i zapewnia pełną zgodność z istniejącymi danymi Hive, zapytaniami i UDF.

Wolfram: Wolfram zapewnia fizyczny backend wykonawczy, który jawnie zarządza pamięcią i dynamicznie generuje kod bajtowy do oceny wyrażenia.

Obsługiwane języki programowania:
Interfejs API Dataframe jest dostępny w Javie, Scali, Python i R.

Wykonanie zapytań w Pythonie może być znacząco wolniejsze, z powodu potrzeby komunikacji między Java JVM i Py4J.

In [16]:
import pyspark
from pyspark.sql.session import SparkSession
spark = SparkSession(sc)

In [17]:
# spark = SparkSession.builder\
#        .master("local[*]")\
#        .appName("Spark_App")\
#        .getOrCreate()

In [18]:
spark.sparkContext

In [19]:
rdd = sc.parallelize(
    [('Amber', 22), ('Alfred', 23), ('Skye',4), ('Albert', 12), 
     ('Amber', 9)])

df = spark.createDataFrame(rdd).toDF("name", "age")
df.show()

+------+---+
|  name|age|
+------+---+
| Amber| 22|
|Alfred| 23|
|  Skye|  4|
|Albert| 12|
| Amber|  9|
+------+---+



In [20]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



In [21]:
rdd = sc.parallelize([(1,2,3),(4,5,6),(7,8,9)])
df = rdd.toDF(["a","b","c"])
df.show()

+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  2|  3|
|  4|  5|  6|
|  7|  8|  9|
+---+---+---+



Jak już użyliśmy `.collect()` do utworzenia RDD, możemy dostać się do danych obiektu, tak jak to robimy w Pythonie:
`data_heterogenous[1]['Porsche']`, dla którego Spark zwraca `100000`.
Metoda `.collect()` zwraca wszystkie elementy RDD do programu „driver”, gdzie jest szeregowany jako lista.

In [22]:
data_heterogenous = sc.parallelize([('Ferrari', 'fast'), {'Porsche': 100000},  ['Spain','visited', 4504]]).collect()

In [23]:
print(data_heterogenous[1]['Porsche'])

100000


In [24]:
data = spark.sql('''select 'spark' as hello ''')
data.show(truncate = False)

+-----+
|hello|
+-----+
|spark|
+-----+



Utworzymy DataFrame przez wygenerowanie danych. W naszym przykładzie utworzymy `stringJSONRDD` RDD, a następnie przekonwertujemy to do formatu DataFrame. 

In [25]:
stringJSONRDD = sc.parallelize((""" 
  { "id": "123",
    "name": "Katie",
    "age": 19,
    "eyeColor": "brown"
  }""",
   """{
    "id": "234",
    "name": "Michael",
    "age": 22,
    "eyeColor": "green"
  }""", 
  """{
    "id": "345",
    "name": "Simone",
    "age": 23,
    "eyeColor": "blue"
  }""")
)

In [26]:
swimmersJSON = spark.read.json(stringJSONRDD)

Utworzymy również tabelę tymczasową przez użycie metody `createOrReplaceTempView`

In [27]:
swimmersJSON.createOrReplaceTempView('swimmersJSON')

Jak już wiemy, wiele operacji RDD to transformacje, które nie są wykonywane dopóki nie jest wykonana operacja „akcja” (action). 
Przykładowo, w `sc.parallelize` jest transformacją, która jest wykonana, gdy dokonujemy konwersji z RDD do DataFrame używając `spark.read.json` (akcja).

Używając DataFrame API, możemy wywołać metodę `show(<n>)`, która zwraca pierwsze n wierszy na konsolę (domyślnie 10 wierszy):

In [28]:
swimmersJSON.show(truncate = False)

+---+--------+---+-------+
|age|eyeColor|id |name   |
+---+--------+---+-------+
|19 |brown   |123|Katie  |
|22 |green   |234|Michael|
|23 |blue    |345|Simone |
+---+--------+---+-------+



Jak już utworzyliśmy `swimmersJSON` DataFrame, to możemy uruchomić DataFrame API, jak również zapytania SQL na tym DataFrame.

In [29]:
spark.sql("select * from swimmersJSON").collect()

[Row(age=19, eyeColor='brown', id='123', name='Katie'),
 Row(age=22, eyeColor='green', id='234', name='Michael'),
 Row(age=23, eyeColor='blue', id='345', name='Simone')]

In [30]:
swimmersJSON.printSchema()

root
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



W tym przypadku,  określamy schemat przez dostarczenie do Spark SQL typów danych (pyspark.sql.types) i wygenerowanie kilku przykładowych danych.

In [31]:
from pyspark.sql.types import *

stringCSVRDD = sc.parallelize([
(123, 'Katie', 19, 'brown'),
(234, 'Michael', 22, 'green'),
(345, 'Simone', 23, 'blue')
])

In [32]:
schema = StructType([
StructField("id", LongType(), True),
StructField("name", StringType(), True),
StructField("age", LongType(), True),
StructField("eyeColor", StringType(), True)
])

Zastosowanie schematu do RDD i utworzenie DataFrame

In [33]:
swimmers = spark.createDataFrame(stringCSVRDD, schema)

Utworzenie tymczasowego widoku używają DataFrame

In [34]:
swimmers.createOrReplaceTempView("swimmers")

In [35]:
swimmers.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)



Aby uzyskać liczbę wierszy w DataFrame możemy użyć metody

In [36]:
swimmersJSON.count()

3

Aby uruchomić instrukcję filtrującą możemy użyć klauzulę `filter`. W poniższych poleceniach używamy również klauzuli `select`, aby określić zwracane kolumny:

In [37]:
swimmers.select("id", "age").filter("age = 22").show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [38]:
swimmers.select("name", "eyeColor").filter("eyeColor like 'b%'").show()

+------+--------+
|  name|eyeColor|
+------+--------+
| Katie|   brown|
|Simone|    blue|
+------+--------+



Wywołajmy teraz te same zapytania, ale używając zapytań SQL w stosunku do tej samej DataFrame. Warto nadmienić, że DataFrame zawierająca dane pływaków jest dostępna z uwagi na wykonane wcześniej polecenie z metodą `.createOrReplaceTempView`.

In [39]:
spark.sql("select count(1) from swimmers").show()

+--------+
|count(1)|
+--------+
|       3|
+--------+



In [40]:
spark.sql("select id, age from swimmers where age = 22").show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [41]:
spark.sql("select name, eyeColor from swimmers where eyeColor like 'b%'").show()

+------+--------+
|  name|eyeColor|
+------+--------+
| Katie|   brown|
|Simone|    blue|
+------+--------+



Ważną uwagą, gdy pracujemy z Spark SQL i DataFrames, jest łatwość pracy z popularnymi formatami przechowywania danych *CSV*, *JSON*, zaś powszechnie stosowanym formatem przechowywania danych dla zapytań Spark SQL jest format pliku *Parquet*. 
Jest to format kolumnowy, który jest wspierany przez wiele innych systemów przetwarzania danych, a Spark SQL wspiera zarówno odczyt i zapis z/do plików *Parquet*, które automatycznie zachowują oryginalny schemat danych.

In [42]:
df = spark.sql("select name, eyeColor from swimmers where eyeColor like 'b%'")
df.write.parquet("mydata.parquet")

In [43]:
df_parquet = spark.read.parquet("mydata.parquet")
df_parquet.show()

+------+--------+
|  name|eyeColor|
+------+--------+
| Katie|   brown|
|Simone|    blue|
+------+--------+



In [44]:
sc.stop()