# Dokumentacja PySpark

Strona z opisem funkcjonalności: https://spark.apache.org/docs/latest/api/python/index.html

Link do dokumentacji: https://spark.apache.org/docs/latest/api/python/getting_started/index.html


### Apache Spark

Apache Spark to szybki i uniwersalny silnik przetwarzania danych, który umożliwia łatwe przetwarzanie dużych zbiorów danych równolegle. PySpark jest interfejsem Pythona do Sparka, co pozwala na korzystanie z jego możliwości w środowisku Pythona.

Rozproszenie środowiska w PySpark odnosi się do zdolności do równoczesnego przetwarzania dużych zbiorów danych na wielu węzłach klastra. Spark automatycznie dzieli dane na części i przetwarza je równolegle na różnych węzłach, co przyspiesza przetwarzanie w porównaniu do tradycyjnych podejść jednowęzłowych.

### Jak jest to realizowane?

* RDD (Resilient Distributed Datasets):

Podstawowa abstrakcja danych w Sparku, reprezentuje podzbiór danych równocześnie na wielu węzłach klastra. Każdy RDD jest podzielony na wiele partycji, które są przetwarzane niezależnie.

```
from pyspark import SparkContext

sc = SparkContext("local", "ExampleApp")
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data, 2)  # Drugi argument to liczba partycji
```

* DataFrame:


Abstrakcja danych, bardziej wysokopoziomowa niż RDD, umożliwiająca pracę z danymi w formie tabelarycznej. Działa na zasadzie partycji podobnie jak RDD, ale dostarcza bardziej rozbudowanego interfejsu.

```
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ExampleApp").getOrCreate()
df = spark.createDataFrame([(1, "John"), (2, "Jane"), (3, "Doe")], ["id", "name"])
```

* Zadania Transformacji i Akcji:

Transformacje definiują nowe RDD/DataFrame na podstawie istniejących danych.
Akcje wykonują operacje na danych i zwracają wynik do lokalnego środowiska.

### O czym należy pamiętać?

* Liczba Partycji: Optymalna liczba partycji zależy od wielkości danych, dostępnych zasobów klastra i rodzaju operacji. Zbyt mała liczba partycji może prowadzić do niewydajnego przetwarzania, podczas gdy zbyt duża liczba partycji może prowadzić do nadmiernego narzutu komunikacyjnego.

* Równomierny Rozkład Danych: Należy pamietać, by zapewnić, aby dane były równomiernie rozdzielone między partycje. Nierównomierność może prowadzić do wydłużonego czasu przetwarzania z powodu pracy "węża gardłowego" na jednej z partycji.

* Optymalizacja Operacji: Należy unikać zbierania dużych ilości danych do lokalnego środowiska (`collect()`), jeśli to możliwe, ponieważ może to prowadzić do przeciążenia pamięci na lokalnym środowisku.

* Użycie Funkcji Spark: Warto korzystać z wbudowanych funkcji Spark, takich jak operacje map, filter, reduce, które są optymalizowane pod kątem równoległego przetwarzania.

```
# Przykład operacji MapReduce
lines = sc.textFile("path/to/textfile.txt")
word_counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
```

1. Zasoby Klastra:

Monitoruj dostępność zasobów klastra, takich jak liczba rdzeni CPU, dostępna pamięć, aby zoptymalizować równoczesność przetwarzania.

2. Optymalizacja Planu Wykonawczego:

Spark automatycznie optymalizuje plany wykonawcze, ale można ręcznie dostosować niektóre ustawienia, takie jak partycje i cache, aby zoptymalizować wydajność.

```
# Przykład ręcznej optymalizacji liczby partycji
rdd = rdd.repartition(4)
```

Zrozumienie i odpowiednie uwzględnienie tych aspektów jest kluczowe dla efektywnego korzystania z rozproszonego środowiska w PySpark.

## Funkcje PySpark:
1. RDD (Resilient Distributed Datasets):

Podstawowa abstrakcja danych w Sparku.
Elastyczne, odpornie na awarie struktury danych, które można łatwo dystrybuować na wiele węzłów.

In [None]:
from pyspark import SparkContext

sc = SparkContext("local", "MyApp")
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)

2. DataFrame:

Struktura danych podobna do tabeli w bazie danych.
Umożliwia łatwe przetwarzanie danych przy użyciu SQL queries.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MyApp").getOrCreate()
df = spark.createDataFrame([(1, "John"), (2, "Jane")], ["id", "name"])


Metoda `collect()` w PySpark jest używana do zebrania wszystkich elementów DataFrame z rozproszonego środowiska do lokalnego środowiska (driver program). Jednak, warto zauważyć, że ta operacja może być kosztowna, ponieważ wymaga przesłania wszystkich danych z klastra do jednego punktu, co może być wydajnościowo kosztowne w przypadku dużych zbiorów danych.

In [None]:
# Składnia
data = df.collect()

# Utwórz sesję Spark
spark = SparkSession.builder.appName("CollectExample").getOrCreate()

# Przykładowe dane
data = [(1, "John"), (2, "Jane"), (3, "Doe")]
columns = ["id", "name"]

# Utwórz DataFrame
df = spark.createDataFrame(data, columns)

# Wykonaj operację collect
collected_data = df.collect()

# Wyświetl zebrane dane
for row in collected_data:
    print(row)


W powyższym przykładzie, `collect()` zebrał dane DataFrame `df` do listy obiektów `Row`, a następnie te dane zostały wypisane. Dla dużych zbiorów danych `collect()` może prowadzić do przeciążenia pamięci na lokalnym środowisku, więc zaleca się ostrożność przy jej używaniu. W praktyce, często lepszym podejściem jest wykonywanie operacji na rozproszonych danych w samym środowisku Spark bez konieczności ich zbierania do lokalnego środowiska.

3. Operacje Transformacji i Akcji:


Transformacje definiują nowe RDD/DataFrame na podstawie istniejących danych.
Akcje wykonują operacje na danych i zwracają wynik do lokalnego środowiska.

In [None]:
# Transformacja
mapped_rdd = rdd.map(lambda x: x * 2)

# Akcja
result = mapped_rdd.reduce(lambda x, y: x + y)


### Algorytmy Przetwarzania Danych:

1. Machine Learning:


PySpark oferuje bogaty zestaw algorytmów ML dla klasyfikacji, regresji, klastrowania, etc.

In [None]:
from pyspark.ml.classification import LogisticRegression

# Przykład klasyfikacji
lr = LogisticRegression(maxIter=10, regParam=0.01)
model = lr.fit(train_data)


2. GraphX:


Biblioteka do przetwarzania grafów, co jest użyteczne w analizie sieci społecznościowej, trasowania itp.

In [None]:
from pyspark.sql import SparkSession
from graphframes import GraphFrame

# Tworzenie grafu
vertices = spark.createDataFrame([(1, "A"), (2, "B"), (3, "C")], ["id", "name"])
edges = spark.createDataFrame([(1, 2, "friend"), (2, 3, "follow")], ["src", "dst", "relationship"])

g = GraphFrame(vertices, edges)


### Analiza Danych:

1. SQL w PySpark:

Możliwość przetwarzania danych za pomocą zapytań SQL.

In [None]:
# Tworzenie tymczasowej tabeli
df.createOrReplaceTempView("my_table")

# Przykład zapytania SQL
result_df = spark.sql("SELECT id, name FROM my_table WHERE id > 1")


2. Wizualizacja Danych:


Możliwość korzystania z narzędzi do wizualizacji danych, takich jak Matplotlib, Seaborn.

In [None]:
import matplotlib.pyplot as plt

# Przykład wykresu
plt.scatter(df.select("feature1").collect(), df.select("feature2").collect())
plt.xlabel("Feature 1")
plt.ylabel("Feature 2")
plt.show()


PySpark to potężne narzędzie do przetwarzania danych, oferujące wiele funkcji i algorytmów. Dzięki elastycznym strukturom danych i obszernej funkcjonalności analizy danych, PySpark staje się niezastąpionym narzędziem w ekosystemie big data.

### Konfiguracja sesji PySpark

Personalizacja ustawień sesji PySpark odbywa się poprzez konfigurację SparkSession. Możesz dostosować różne parametry, takie jak liczba rdzeni, pamięć, ustawienia SQL, itp. Oto kilka przykładów różnych parametrów i jak można je konfigurować:

1. Liczba Rdzeni (Cores): `config("spark.cores.max", "4")`
Parametr "spark.cores.max" określa maksymalną liczbę rdzeni, które mogą być używane przez Spark. W powyższym przykładzie ustawiono maksymalną liczbę rdzeni na 4.

2. Ustawienia Pamięci:  `config("spark.executor.memory", "2g").config("spark.driver.memory", "1g")`
Parametry "spark.executor.memory" i "spark.driver.memory" pozwalają na ustawienie ilości dostępnej pamięci dla executorów i drivera, odpowiednio. W powyższym przykładzie ustawiono 2 GB pamięci dla executorów i 1 GB dla drivera.

3. Ustawienia SQL: `config("spark.sql.shuffle.partitions", "10").config("spark.sql.autoBroadcastJoinThreshold", "52428800")`
Parametr "spark.sql.shuffle.partitions" określa liczbę partycji do użycia w operacjach shuffle, a "spark.sql.autoBroadcastJoinThreshold" ustala maksymalny rozmiar tabeli do automatycznego rozgłaszania (broadcast join). W powyższym przykładzie ustawiono 10 partycji dla shuffle i próg automatycznego rozgłaszania na 50 MB.

4. Inne Ustawienia: `config("spark.streaming.stopGracefullyOnShutdown", "true").config("spark.executor.instances", "2")`
Parametr "spark.streaming.stopGracefullyOnShutdown" pozwala na kontrolowanie czy stopować strumieniowanie z gracji podczas zamykania aplikacji. Parametr "spark.executor.instances" ustawia liczbę instancji executorów w trybie rozproszonym.

5. Ustawienia Loggera: `config("spark.driver.extraJavaOptions", "-Dlog4j.configuration=file:log4j.properties")`
Parametr "spark.driver.extraJavaOptions" pozwala na dostosowanie konfiguracji log4j dla drivera Sparka.

### Warto pamiętać:
* Parametry można łączyć w jednym łańcuchu konfiguracji, używając metody config wielokrotnie.
* Bardzo ważne jest dostosowanie tych parametrów do wymagań konkretnego zadania lub aplikacji w celu uzyskania optymalnej wydajności.

# Instalacja frameworka PySpark w środowisku

In [7]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

Ustawianie ścieżki do folderu Spark

In [8]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

Import bibliotek

In [10]:
import findspark
findspark.init()
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *

### Tworzenie sesji Spark

W PySpark, sesja jest głównym punktem wejścia dla programów opartych na Sparku, umożliwiając interakcję z rozproszonymi danymi. Sesję PySpark można utworzyć przy użyciu obiektu `SparkSession`, który łączy różne konteksty pracy w jedno środowisko. Tworzenie sesji jest ważne, ponieważ dostarcza środowisko wykonawcze dla aplikacji PySpark oraz konfiguruje różne ustawienia, takie jak ustawienia klastra, ustawienia aplikacji itp.

In [11]:
# Utwórz obiekt SparkSession
session = SparkSession.builder.appName('myApp').getOrCreate() # creating a session

session

Wartością zwracaną przez `getOrCreate()` jest istniejąca sesja Spark, jeśli już istnieje, lub nowo utworzona sesja, jeśli nie. Dzięki temu możemy bezpiecznie korzystać z jednej sesji w naszej aplikacji.

### Dlaczego Tworzymy Sesję PySpark?

* Środowisko Wykonawcze: Sesja dostarcza środowisko wykonawcze dla aplikacji PySpark, zarządzając dostępem do zasobów klastra.

* Konfiguracje Aplikacji: Pozwala na dostosowanie konfiguracji aplikacji, takich jak liczba rdzeni, ilość pamięci itp.

* Ustawienia Aplikacji: Możemy dostarczyć unikalne nazwy aplikacji, co ułatwia identyfikację zadań w klastrze.

* Integracja z Biblioteką Spark SQL: Umożliwia korzystanie z funkcji Spark SQL, takich jak przetwarzanie danych za pomocą SQL, tworzenie DataFrame itp.

* Optymalizacja Wydajności: Sesja pozwala na dostosowanie ustawień dla optymalizacji wydajności w zależności od specyfiki aplikacji.


Przykład wykorzystania sesji PySpark w kontekście wczytywania danych:

In [None]:
# Wczytaj dane do DataFrame
df = spark.read.csv("path/to/data.csv", header=True, inferSchema=True)

# Wykonaj operacje na danych
result = df.groupBy("column").agg({"other_column": "mean"})

# Wyświetl wyniki
result.show()


W powyższym przykładzie, spark to obiekt sesji, który umożliwia wczytywanie danych, przetwarzanie ich za pomocą operacji na DataFrame, a następnie wyświetlanie wyników. Sesja jest kluczowym elementem do efektywnego korzystania z PySpark w analizie danych na dużą skalę.

### Pobranie plików i załadowanie ich do środowiska

Dane z linku: https://github.com/datasciencedojo/datasets/blob/master/titanic.csv

Plik pobrać i umieścić w swojej chmurze google by uzyskać dostęp do pliku w Jupter Notebook ColabResearch. Po lewej stronie znajduje się ikona folderu, na którą należy nacisnąć, a następnie wybrać pierwszą ikonę na górze po lewej stronie, za pomocą której można umieścić w obszarze roboczym pobrany plik.


Innym sposobem na upload plików jest wykorzystanie poniższej instrukcji, gdzie po uruchomieniu komórki należy nacisnąć wybierz pliki i wskazać na pobrany plik. Załaduje się on do tymczasowego obszaru roboczego, do którego będziemy mieli dostęp

In [None]:
from google.colab import files
files.upload()

Saving titanic.csv to titanic (1).csv


{'titanic (1).csv': b'PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked\n1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S\n2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C\n3,1,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S\n4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35,1,0,113803,53.1,C123,S\n5,0,3,"Allen, Mr. William Henry",male,35,0,0,373450,8.05,,S\n6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q\n7,0,1,"McCarthy, Mr. Timothy J",male,54,0,0,17463,51.8625,E46,S\n8,0,3,"Palsson, Master. Gosta Leonard",male,2,3,1,349909,21.075,,S\n9,1,3,"Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)",female,27,0,2,347742,11.1333,,S\n10,1,2,"Nasser, Mrs. Nicholas (Adele Achem)",female,14,1,0,237736,30.0708,,C\n11,1,3,"Sandstrom, Miss. Marguerite Rut",female,4,1,1,PP 9549,16.7,G6,S\n12,1,1,"Bonnell, Miss. Elizabeth",female,58,0,0,113783,26.55,C103,S\n13,0,3,"Saundercock

Wczytanie danych

In [None]:
data = session.read.csv('titanic.csv') # reading the dataset through the given path
data

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string, _c10: string, _c11: string]

In [None]:
data.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|        _c0|     _c1|   _c2|                 _c3|   _c4| _c5|  _c6|  _c7|             _c8|    _c9| _c10|    _c11|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
|          1|       0|     3|Braund, Mr. Owen ...|  male|  22|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|  38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|  26|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|  35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|  35|    0|    0|      

In [None]:
data = session.read.option('header', 'true').csv('titanic.csv')

In [None]:
data.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|  22|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|  38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|  26|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|  35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|  35|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [None]:
data.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [None]:
data = session.read.csv('titanic.csv',inferSchema=True, header =True)

data.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



### Przykład tworzenia Dataframe Pyspark z Dataframe Pandas

In [None]:
from datetime import datetime, date

pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = session.createDataFrame(pandas_df)
df.show()

  for column, series in pdf.iteritems():


+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



Dane mogą być również wyświetlane w sposób wertykalny

In [None]:
df.show(1, vertical=True)

-RECORD 0------------------
 a   | 1                   
 b   | 2.0                 
 c   | string1             
 d   | 2000-01-01          
 e   | 2000-01-01 12:00:00 
only showing top 1 row



Zapytania do bazy o wartości statystyczne

1.•	Obliczyć ile osób przeżyło

In [None]:
data.filter(data['Survived'] == 1).count()

342

•	Obliczyć procent osób które przeżyły

In [None]:
data.filter(data['Survived'] == 1).count() / data.count() * 100

38.38383838383838

•	Obliczyć średni wiek pasażerów

In [None]:
a = data.agg({"Age": 'avg'})
a.show()

+-----------------+
|         avg(Age)|
+-----------------+
|29.69911764705882|
+-----------------+



•	Obliczyć procentowo i liczbowo ilość pasażerów korzystających z różnych klas podróży (Pclass)

In [None]:
data_all = data.count()
grouped_data = data.groupBy("Pclass").count()
grouped_data.show()

grouped_data.withColumn('pclass %', grouped_data['count'] / data_all * 100).show()

+------+-----+
|Pclass|count|
+------+-----+
|     1|  216|
|     3|  491|
|     2|  184|
+------+-----+

+------+-----+------------------+
|Pclass|count|          pclass %|
+------+-----+------------------+
|     1|  216|24.242424242424242|
|     3|  491|55.106621773288445|
|     2|  184| 20.65095398428732|
+------+-----+------------------+



Czy więcej było kobiet czy mężczyzn na statku

In [None]:
data_sex = data.groupBy("Sex").count()
data_sex.show()

+------+-----+
|   Sex|count|
+------+-----+
|female|  314|
|  male|  577|
+------+-----+



Mediana i odchylenie standardowe

In [None]:
import numpy as np
my_list = data.select(collect_list('Fare')).first()[0]

In [None]:
np.median(my_list)

14.4542

In [None]:
np.std(my_list)

49.6655344447741

In [None]:
a = data.agg({"Fare": 'std'})
a.show()

+-----------------+
|     stddev(Fare)|
+-----------------+
|49.69342859718089|
+-----------------+



Ile osób najwięcej znajdowało się w jednej kabinie

In [None]:
grouped_cabin = data.groupBy("Cabin").count()
grouped_cabin.sort('count', ascending=False).show()

+-----------+-----+
|      Cabin|count|
+-----------+-----+
|       null|  687|
|C23 C25 C27|    4|
|         G6|    4|
|    B96 B98|    4|
|          D|    3|
|       E101|    3|
|    C22 C26|    3|
|         F2|    3|
|        F33|    3|
|        E33|    2|
|    B58 B60|    2|
|       C126|    2|
|        C65|    2|
|         E8|    2|
|        B22|    2|
|        D35|    2|
|        E44|    2|
|        D20|    2|
|        C78|    2|
|       C123|    2|
+-----------+-----+
only showing top 20 rows



Wybór kolumn

In [None]:
data.select('PassengerId').show(10)

+-----------+
|PassengerId|
+-----------+
|          1|
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
|         10|
+-----------+
only showing top 10 rows



In [None]:
data.select('Name', 'Age').show(10)

+--------------------+----+
|                Name| Age|
+--------------------+----+
|Braund, Mr. Owen ...|22.0|
|Cumings, Mrs. Joh...|38.0|
|Heikkinen, Miss. ...|26.0|
|Futrelle, Mrs. Ja...|35.0|
|Allen, Mr. Willia...|35.0|
|    Moran, Mr. James|null|
|McCarthy, Mr. Tim...|54.0|
|Palsson, Master. ...| 2.0|
|Johnson, Mrs. Osc...|27.0|
|Nasser, Mrs. Nich...|14.0|
+--------------------+----+
only showing top 10 rows



Dodawanie nowej kolumny

In [None]:
# adding columns in dataframe
data = data.withColumn('Age_after_3_y', data['Age']+3)
data.show(10)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|Age_after_3_y|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|         25.0|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|         41.0|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|         29.0|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|         38.0|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.0

In [None]:
data.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Age_after_3_y: double (nullable = true)



Usuwanie kolumn

In [None]:
# dropping the columns
data = data.drop('Age_after_3_y')
data.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

Zmiana nazwy kolumn

In [None]:
# renaming the columns
data = data.withColumnRenamed('Fare', 'Price')
data.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|  Price|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

Usuwanie pustych wierszy

In [None]:
data = data.na.drop(how = 'any', thresh = 2)
data.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|  Price|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

Filtrowanie operacji

In [None]:
data = data.filter(data['Survived'] == 1)
data.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+--------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Price|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+--------+-----+--------+
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599| 71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|   7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|    53.1| C123|       S|
|          9|       1|     3|Johnson, Mrs. Osc...|female|27.0|    0|    2|          347742| 11.1333| null|       S|
|         10|       1|     2|Nasser, Mrs. Nich...|female|14.0|    1|    0|          237736| 30.0708| null|       C|
|         11|       1|     3|Sandstrom, Miss. ...|female| 4.0|    1|    

In [None]:
data = data.filter((data['Survived'] == 1) & (data['Pclass'] == 3))
data.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|  Price|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          9|       1|     3|Johnson, Mrs. Osc...|female|27.0|    0|    2|          347742|11.1333| null|       S|
|         11|       1|     3|Sandstrom, Miss. ...|female| 4.0|    1|    1|         PP 9549|   16.7|   G6|       S|
|         20|       1|     3|Masselmani, Mrs. ...|female|null|    0|    0|            2649|  7.225| null|       C|
|         23|       1|     3|"McGowan, Miss. A...|female|15.0|    0|    0|          330923| 8.0292| null|       Q|
|         26|       1|     3|Asplund, Mrs. Car...|female|38.0|    1|    5|      

Tworzenie pandas-on-Spark DataFrame z Spark DataFrame.

`psdf = sdf.pandas_api()`

W przypadku DataFrame, metoda plot() jest wygodnym sposobem na wykreślenie wszystkich kolumn z etykietami:


In [None]:
# import pyspark.pandas as ps

# pser = pd.Series(np.random.randn(1000),
#                  index=pd.date_range('1/1/2000', periods=1000))
# psser = ps.Series(pser)
# psser = psser.cummax()
# psser.plot()

# pdf = pd.DataFrame(np.random.randn(1000, 4), index=pser.index,
#                    columns=['A', 'B', 'C', 'D'])
# psdf = ps.from_pandas(pdf)
# psdf = psdf.cummax()
# psdf.plot()

# SQL w PySpark

In [20]:
from datetime import date, timedelta, datetime
import time

Tworzenie nowej sesji PySpark z odpowiednimi parametrami konfiguracyjnymi

In [25]:
sc = SparkSession.builder.appName("PySparkSQL")\
    .config("spark.sql.shuffle.partitions", "50") \
    .config("spark.driver.maxResultSize","5g") \
    .config("spark.sql.execution.arrow.enabled", "true")\
    .getOrCreate()

Pobranie plików i załadowanie ich do środowiska

Dane z linku: https://www.kaggle.com/datasets/cmenca/new-york-times-hardcover-fiction-best-sellers

In [21]:
from google.colab import files
files.upload()

Output hidden; open in https://colab.research.google.com to view.

### Sposoby tworzenia ramki danych

In [26]:
#JSON
dataframe = sc.read.json('nyt2.json')

#TXT FILES
# dataframe_txt = sc.read.text('text_data.txt')

#CSV FILES
# dataframe_csv = sc.read.csv('csv_data.csv')

#PARQUET FILES
# dataframe_parquet = sc.read.load('parquet_data.parquet')

Wyświetlenie pierwszych dziesięciu rekordów

In [27]:
dataframe.show(10)

+--------------------+--------------------+--------------------+-----------------+--------------------+-------------+-----------------+-------------+----+--------------+--------------------+-------------+
|                 _id|  amazon_product_url|              author| bestsellers_date|         description|        price|   published_date|    publisher|rank|rank_last_week|               title|weeks_on_list|
+--------------------+--------------------+--------------------+-----------------+--------------------+-------------+-----------------+-------------+----+--------------+--------------------+-------------+
|{5b4aa4ead3089013...|http://www.amazon...|       Dean R Koontz|{{1211587200000}}|Odd Thomas, who c...|   {null, 27}|{{1212883200000}}|       Bantam| {1}|           {0}|           ODD HOURS|          {1}|
|{5b4aa4ead3089013...|http://www.amazon...|     Stephenie Meyer|{{1211587200000}}|Aliens have taken...|{25.99, null}|{{1212883200000}}|Little, Brown| {2}|           {1}|           

Schemat pliku

In [28]:
dataframe.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- amazon_product_url: string (nullable = true)
 |-- author: string (nullable = true)
 |-- bestsellers_date: struct (nullable = true)
 |    |-- $date: struct (nullable = true)
 |    |    |-- $numberLong: string (nullable = true)
 |-- description: string (nullable = true)
 |-- price: struct (nullable = true)
 |    |-- $numberDouble: string (nullable = true)
 |    |-- $numberInt: string (nullable = true)
 |-- published_date: struct (nullable = true)
 |    |-- $date: struct (nullable = true)
 |    |    |-- $numberLong: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- rank: struct (nullable = true)
 |    |-- $numberInt: string (nullable = true)
 |-- rank_last_week: struct (nullable = true)
 |    |-- $numberInt: string (nullable = true)
 |-- title: string (nullable = true)
 |-- weeks_on_list: struct (nullable = true)
 |    |-- $numberInt: string (nullable = true)



Porównanie z klasycznymi ramkami danych w `pandas`

In [23]:
df = pd.read_json('nyt2.json', lines=True)
df.head(5)

Unnamed: 0,_id,bestsellers_date,published_date,amazon_product_url,author,description,price,publisher,title,rank,rank_last_week,weeks_on_list
0,{'$oid': '5b4aa4ead3089013507db18b'},{'$date': {'$numberLong': '1211587200000'}},{'$date': {'$numberLong': '1212883200000'}},http://www.amazon.com/Odd-Hours-Dean-Koontz/dp...,Dean R Koontz,"Odd Thomas, who can communicate with the dead,...",{'$numberInt': '27'},Bantam,ODD HOURS,{'$numberInt': '1'},{'$numberInt': '0'},{'$numberInt': '1'}
1,{'$oid': '5b4aa4ead3089013507db18c'},{'$date': {'$numberLong': '1211587200000'}},{'$date': {'$numberLong': '1212883200000'}},http://www.amazon.com/The-Host-Novel-Stephenie...,Stephenie Meyer,Aliens have taken control of the minds and bod...,{'$numberDouble': '25.99'},"Little, Brown",THE HOST,{'$numberInt': '2'},{'$numberInt': '1'},{'$numberInt': '3'}
2,{'$oid': '5b4aa4ead3089013507db18d'},{'$date': {'$numberLong': '1211587200000'}},{'$date': {'$numberLong': '1212883200000'}},http://www.amazon.com/Love-Youre-With-Emily-Gi...,Emily Giffin,A woman's happy marriage is shaken when she en...,{'$numberDouble': '24.95'},St. Martin's,LOVE THE ONE YOU'RE WITH,{'$numberInt': '3'},{'$numberInt': '2'},{'$numberInt': '2'}
3,{'$oid': '5b4aa4ead3089013507db18e'},{'$date': {'$numberLong': '1211587200000'}},{'$date': {'$numberLong': '1212883200000'}},http://www.amazon.com/The-Front-Garano-Patrici...,Patricia Cornwell,A Massachusetts state investigator and his tea...,{'$numberDouble': '22.95'},Putnam,THE FRONT,{'$numberInt': '4'},{'$numberInt': '0'},{'$numberInt': '1'}
4,{'$oid': '5b4aa4ead3089013507db18f'},{'$date': {'$numberLong': '1211587200000'}},{'$date': {'$numberLong': '1212883200000'}},http://www.amazon.com/Snuff-Chuck-Palahniuk/dp...,Chuck Palahniuk,An aging porn queens aims to cap her career by...,{'$numberDouble': '24.95'},Doubleday,SNUFF,{'$numberInt': '5'},{'$numberInt': '0'},{'$numberInt': '1'}


Usunięcie wartości zduplikowanych

In [30]:
dataframe_dropdup = dataframe.dropDuplicates()
dataframe_dropdup.show(10)

+--------------------+--------------------+--------------------+-----------------+--------------------+-------------+-----------------+----------------+----+--------------+--------------------+-------------+
|                 _id|  amazon_product_url|              author| bestsellers_date|         description|        price|   published_date|       publisher|rank|rank_last_week|               title|weeks_on_list|
+--------------------+--------------------+--------------------+-----------------+--------------------+-------------+-----------------+----------------+----+--------------+--------------------+-------------+
|{5b4aa4ead3089013...|http://www.amazon...|Clive Cussler wit...|{{1213401600000}}|Juan Cabrillo and...|{26.95, null}|{{1214697600000}}|          Putnam| {4}|           {3}|         PLAGUE SHIP|          {2}|
|{5b4aa4ead3089013...|http://www.amazon...|      Jeffery Deaver|{{1215820800000}}|Detectives Lincol...|    {null, 0}|{{1217116800000}}|Simon & Schuster|{20}|           

In [31]:
print(dataframe.count())
print(dataframe_dropdup.count())

10195
10195


Wyświetlenie kolumn z pliku

In [None]:
dataframe.columns

### Zapytania SQL

In [32]:
dataframe.select("author").show(10) #Show all entries in title column

+--------------------+
|              author|
+--------------------+
|       Dean R Koontz|
|     Stephenie Meyer|
|        Emily Giffin|
|   Patricia Cornwell|
|     Chuck Palahniuk|
|James Patterson a...|
|       John Sandford|
|       Jimmy Buffett|
|    Elizabeth George|
|      David Baldacci|
+--------------------+
only showing top 10 rows



In [33]:
dataframe.select("author", "title", "rank", "price").show(10)  #Show all entries in title, author, rank, price

+--------------------+--------------------+----+-------------+
|              author|               title|rank|        price|
+--------------------+--------------------+----+-------------+
|       Dean R Koontz|           ODD HOURS| {1}|   {null, 27}|
|     Stephenie Meyer|            THE HOST| {2}|{25.99, null}|
|        Emily Giffin|LOVE THE ONE YOU'...| {3}|{24.95, null}|
|   Patricia Cornwell|           THE FRONT| {4}|{22.95, null}|
|     Chuck Palahniuk|               SNUFF| {5}|{24.95, null}|
|James Patterson a...|SUNDAYS AT TIFFANY’S| {6}|{24.99, null}|
|       John Sandford|        PHANTOM PREY| {7}|{26.95, null}|
|       Jimmy Buffett|          SWINE NOT?| {8}|{21.99, null}|
|    Elizabeth George|     CARELESS IN RED| {9}|{27.95, null}|
|      David Baldacci|     THE WHOLE TRUTH|{10}|{26.99, null}|
+--------------------+--------------------+----+-------------+
only showing top 10 rows



Wykorzystanie klauzuli WHEN

In [34]:
# Show title and assign 0 or 1 depending on title

dataframe.select("title", when(dataframe.title != 'ODD HOURS', 1).otherwise(0)).show(10)

+--------------------+-----------------------------------------------------+
|               title|CASE WHEN (NOT (title = ODD HOURS)) THEN 1 ELSE 0 END|
+--------------------+-----------------------------------------------------+
|           ODD HOURS|                                                    0|
|            THE HOST|                                                    1|
|LOVE THE ONE YOU'...|                                                    1|
|           THE FRONT|                                                    1|
|               SNUFF|                                                    1|
|SUNDAYS AT TIFFANY’S|                                                    1|
|        PHANTOM PREY|                                                    1|
|          SWINE NOT?|                                                    1|
|     CARELESS IN RED|                                                    1|
|     THE WHOLE TRUTH|                                                    1|

Wykorzystanie klauzuli ISIN

In [35]:
# Show rows with specified authors if in the given options

dataframe[dataframe.author.isin("John Sandford","Emily Giffin")].show(5)
print(dataframe[dataframe.author.isin("John Sandford","Emily Giffin")].count())

+--------------------+--------------------+-------------+-----------------+--------------------+-------------+-----------------+------------+----+--------------+--------------------+-------------+
|                 _id|  amazon_product_url|       author| bestsellers_date|         description|        price|   published_date|   publisher|rank|rank_last_week|               title|weeks_on_list|
+--------------------+--------------------+-------------+-----------------+--------------------+-------------+-----------------+------------+----+--------------+--------------------+-------------+
|{5b4aa4ead3089013...|http://www.amazon...| Emily Giffin|{{1211587200000}}|A woman's happy m...|{24.95, null}|{{1212883200000}}|St. Martin's| {3}|           {2}|LOVE THE ONE YOU'...|          {2}|
|{5b4aa4ead3089013...|http://www.amazon...|John Sandford|{{1211587200000}}|The Minneapolis d...|{26.95, null}|{{1212883200000}}|      Putnam| {7}|           {4}|        PHANTOM PREY|          {3}|
|{5b4aa4ead3089

Klauzula LIKE

In [36]:
# Show author and title is TRUE if title has " THE " word in titles

dataframe.select("author", "title", dataframe.title.like("% THE %")).show(15)

+--------------------+--------------------+------------------+
|              author|               title|title LIKE % THE %|
+--------------------+--------------------+------------------+
|       Dean R Koontz|           ODD HOURS|             false|
|     Stephenie Meyer|            THE HOST|             false|
|        Emily Giffin|LOVE THE ONE YOU'...|              true|
|   Patricia Cornwell|           THE FRONT|             false|
|     Chuck Palahniuk|               SNUFF|             false|
|James Patterson a...|SUNDAYS AT TIFFANY’S|             false|
|       John Sandford|        PHANTOM PREY|             false|
|       Jimmy Buffett|          SWINE NOT?|             false|
|    Elizabeth George|     CARELESS IN RED|             false|
|      David Baldacci|     THE WHOLE TRUTH|             false|
|        Troy Denning|          INVINCIBLE|             false|
|          James Frey|BRIGHT SHINY MORNING|             false|
|         Garth Stein|THE ART OF RACING...|            

In [37]:
dataframe.select("author", "title", dataframe.title.like("THE %")).show(15)

+--------------------+--------------------+----------------+
|              author|               title|title LIKE THE %|
+--------------------+--------------------+----------------+
|       Dean R Koontz|           ODD HOURS|           false|
|     Stephenie Meyer|            THE HOST|            true|
|        Emily Giffin|LOVE THE ONE YOU'...|           false|
|   Patricia Cornwell|           THE FRONT|            true|
|     Chuck Palahniuk|               SNUFF|           false|
|James Patterson a...|SUNDAYS AT TIFFANY’S|           false|
|       John Sandford|        PHANTOM PREY|           false|
|       Jimmy Buffett|          SWINE NOT?|           false|
|    Elizabeth George|     CARELESS IN RED|           false|
|      David Baldacci|     THE WHOLE TRUTH|            true|
|        Troy Denning|          INVINCIBLE|           false|
|          James Frey|BRIGHT SHINY MORNING|           false|
|         Garth Stein|THE ART OF RACING...|            true|
|     Debbie Macomber|  

Klauzule STARTS-WITH i ENDS-WITH

In [38]:
dataframe.select("author", "title", dataframe.title.startswith("THE")).show(5)

+-----------------+--------------------+----------------------+
|           author|               title|startswith(title, THE)|
+-----------------+--------------------+----------------------+
|    Dean R Koontz|           ODD HOURS|                 false|
|  Stephenie Meyer|            THE HOST|                  true|
|     Emily Giffin|LOVE THE ONE YOU'...|                 false|
|Patricia Cornwell|           THE FRONT|                  true|
|  Chuck Palahniuk|               SNUFF|                 false|
+-----------------+--------------------+----------------------+
only showing top 5 rows



In [39]:
dataframe.select("author", "title", dataframe.title.endswith("NT")).show(5)

+-----------------+--------------------+-------------------+
|           author|               title|endswith(title, NT)|
+-----------------+--------------------+-------------------+
|    Dean R Koontz|           ODD HOURS|              false|
|  Stephenie Meyer|            THE HOST|              false|
|     Emily Giffin|LOVE THE ONE YOU'...|              false|
|Patricia Cornwell|           THE FRONT|               true|
|  Chuck Palahniuk|               SNUFF|              false|
+-----------------+--------------------+-------------------+
only showing top 5 rows



Wyciąganie części tekstu z kolumny

In [40]:
dataframe.select(dataframe.author.substr(1, 6).alias("auth")).show()

+------+
|  auth|
+------+
|Dean R|
|Stephe|
|Emily |
|Patric|
|Chuck |
|James |
|John S|
|Jimmy |
|Elizab|
|David |
|Troy D|
|James |
|Garth |
|Debbie|
|Jeff S|
|Philli|
|Jhumpa|
|Joseph|
|John G|
|James |
+------+
only showing top 20 rows



Zmiana nazwy kolumny w zapytaniu

In [41]:
dataframe.select(dataframe.title.substr(1,100).alias("title")).show(5)

+--------------------+
|               title|
+--------------------+
|           ODD HOURS|
|            THE HOST|
|LOVE THE ONE YOU'...|
|           THE FRONT|
|               SNUFF|
+--------------------+
only showing top 5 rows



Wyświetlenie typów danych

In [42]:
# Returns dataframe column names and data types
dataframe.dtypes

[('_id', 'struct<$oid:string>'),
 ('amazon_product_url', 'string'),
 ('author', 'string'),
 ('bestsellers_date', 'struct<$date:struct<$numberLong:string>>'),
 ('description', 'string'),
 ('price', 'struct<$numberDouble:string,$numberInt:string>'),
 ('published_date', 'struct<$date:struct<$numberLong:string>>'),
 ('publisher', 'string'),
 ('rank', 'struct<$numberInt:string>'),
 ('rank_last_week', 'struct<$numberInt:string>'),
 ('title', 'string'),
 ('weeks_on_list', 'struct<$numberInt:string>')]

In [43]:
# Displays the content of dataframe
dataframe.show()

+--------------------+--------------------+--------------------+-----------------+--------------------+-------------+-----------------+--------------------+----+--------------+--------------------+-------------+
|                 _id|  amazon_product_url|              author| bestsellers_date|         description|        price|   published_date|           publisher|rank|rank_last_week|               title|weeks_on_list|
+--------------------+--------------------+--------------------+-----------------+--------------------+-------------+-----------------+--------------------+----+--------------+--------------------+-------------+
|{5b4aa4ead3089013...|http://www.amazon...|       Dean R Koontz|{{1211587200000}}|Odd Thomas, who c...|   {null, 27}|{{1212883200000}}|              Bantam| {1}|           {0}|           ODD HOURS|          {1}|
|{5b4aa4ead3089013...|http://www.amazon...|     Stephenie Meyer|{{1211587200000}}|Aliens have taken...|{25.99, null}|{{1212883200000}}|       Little, Br

Wyświetlenie pierwszych 5 wierszy

In [44]:
# Return first n rows
dataframe.head(5)

[Row(_id=Row($oid='5b4aa4ead3089013507db18b'), amazon_product_url='http://www.amazon.com/Odd-Hours-Dean-Koontz/dp/0553807056?tag=NYTBS-20', author='Dean R Koontz', bestsellers_date=Row($date=Row($numberLong='1211587200000')), description='Odd Thomas, who can communicate with the dead, confronts evil forces in a California coastal town.', price=Row($numberDouble=None, $numberInt='27'), published_date=Row($date=Row($numberLong='1212883200000')), publisher='Bantam', rank=Row($numberInt='1'), rank_last_week=Row($numberInt='0'), title='ODD HOURS', weeks_on_list=Row($numberInt='1')),
 Row(_id=Row($oid='5b4aa4ead3089013507db18c'), amazon_product_url='http://www.amazon.com/The-Host-Novel-Stephenie-Meyer/dp/0316218502?tag=NYTBS-20', author='Stephenie Meyer', bestsellers_date=Row($date=Row($numberLong='1211587200000')), description='Aliens have taken control of the minds and bodies of most humans, but one woman won’t surrender.', price=Row($numberDouble='25.99', $numberInt=None), published_date=

Wyświetlenie pierwszego wiersza

In [45]:
# Returns first row
dataframe.first()

Row(_id=Row($oid='5b4aa4ead3089013507db18b'), amazon_product_url='http://www.amazon.com/Odd-Hours-Dean-Koontz/dp/0553807056?tag=NYTBS-20', author='Dean R Koontz', bestsellers_date=Row($date=Row($numberLong='1211587200000')), description='Odd Thomas, who can communicate with the dead, confronts evil forces in a California coastal town.', price=Row($numberDouble=None, $numberInt='27'), published_date=Row($date=Row($numberLong='1212883200000')), publisher='Bantam', rank=Row($numberInt='1'), rank_last_week=Row($numberInt='0'), title='ODD HOURS', weeks_on_list=Row($numberInt='1'))

In [46]:
# Return first n rows
dataframe.take(5)

[Row(_id=Row($oid='5b4aa4ead3089013507db18b'), amazon_product_url='http://www.amazon.com/Odd-Hours-Dean-Koontz/dp/0553807056?tag=NYTBS-20', author='Dean R Koontz', bestsellers_date=Row($date=Row($numberLong='1211587200000')), description='Odd Thomas, who can communicate with the dead, confronts evil forces in a California coastal town.', price=Row($numberDouble=None, $numberInt='27'), published_date=Row($date=Row($numberLong='1212883200000')), publisher='Bantam', rank=Row($numberInt='1'), rank_last_week=Row($numberInt='0'), title='ODD HOURS', weeks_on_list=Row($numberInt='1')),
 Row(_id=Row($oid='5b4aa4ead3089013507db18c'), amazon_product_url='http://www.amazon.com/The-Host-Novel-Stephenie-Meyer/dp/0316218502?tag=NYTBS-20', author='Stephenie Meyer', bestsellers_date=Row($date=Row($numberLong='1211587200000')), description='Aliens have taken control of the minds and bodies of most humans, but one woman won’t surrender.', price=Row($numberDouble='25.99', $numberInt=None), published_date=

Opis wartości w tabeli

In [47]:
# Computes summary statistics
dataframe.describe().show()

+-------+--------------------+---------------+--------------------+---------+------------------+
|summary|  amazon_product_url|         author|         description|publisher|             title|
+-------+--------------------+---------------+--------------------+---------+------------------+
|  count|               10195|          10195|               10195|    10195|             10195|
|   mean|                null|           null|                null|     null|1877.7142857142858|
| stddev|                null|           null|                null|     null| 370.9760613506458|
|    min|http://www.amazon...|        AJ Finn|                    |      ACE|  10TH ANNIVERSARY|
|    max|https://www.amazo...|various authors|’Tis for the Rebe...|allantine|               ZOO|
+-------+--------------------+---------------+--------------------+---------+------------------+



Ilość elementów w tabeli, liczone po rekordach

In [48]:
# Counts the number of rows in dataframe
dataframe.count()

10195

In [49]:
# Counts the number of distinct rows in dataframe
dataframe.distinct().count()

10195

In [50]:
# Prints plans including physical and logical
dataframe.explain()

== Physical Plan ==
FileScan json [_id#31,amazon_product_url#32,author#33,bestsellers_date#34,description#35,price#36,published_date#37,publisher#38,rank#39,rank_last_week#40,title#41,weeks_on_list#42] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex[file:/content/nyt2.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_id:struct<$oid:string>,amazon_product_url:string,author:string,bestsellers_date:struct<$d...





Funkcja explain w PySpark służy do wyświetlenia planu wykonawczego dla zapytania DataFrame. Plan wykonawczy to szczegółowy opis tego, jak Spark planuje przetworzyć dane dla danej operacji. Pozwala to na zrozumienie kroków, jakie Spark podejmuje w celu wykonania zapytania i analizy potencjalnych problemów związanych z wydajnością.

### Rodzaje Planu Wykonawczego:
* Physical Plan (Fizyczny Plan): Fizyczny plan wykonawczy opisuje, jak Spark planuje faktyczne operacje przetwarzania danych w klastrze. Zawiera informacje o konkretnej implementacji i kolejności operacji.

* Optimized Plan (Zoptymalizowany Plan): Zoptymalizowany plan wykonawczy jest krokiem po planie fizycznym i obejmuje optymalizacje przeprowadzone przez katalizator Sparka. Może zawierać informacje o usunięciu zbędnych operacji lub przekształceniach.

### Co Pokazuje Plan Wykonawczy:
Plan wykonawczy zawiera informacje takie jak:

* Fazy Wykonawcze: (np. Scan, Filter, Aggregate).
* Operacje: (np. Project, Join, GroupBy).
* Typy Operacji: (np. SortMergeJoin, HashAggregate).
* Struktury Danych Używane w Operacjach: (np. DataFrame, RDD).
* Kroki Przetwarzania na Poszczególnych Węzłach Klastra.
* Schemat Kolumn.
* Wykorzystywane Indeksy.
* Informacje o Partycjonowaniu i Shufflingu.

### Interpretacja Planu Wykonawczego:
* Exchange hashpartitioning: Oznacza, że planuje operację wymiany danych między węzłami klastra.
* HashAggregate: Wskazuje na operację agregacji z wykorzystaniem funkcji skrótu (hash).
* Scan ExistingRDD: Oznacza, że dane są wczytywane z istniejącego RDD.


Dzięki explain można zrozumieć, w jaki sposób Spark przetwarza zapytanie, co jest szczególnie przydatne podczas optymalizacji wydajności w bardziej zaawansowanych scenariuszach.

# Dodawanie, aktualizowanie i usuwanie kolumn

Dodawanie kolumny

In [51]:
# Lit() is required while we are creating columns with exact values.

dataframe = dataframe.withColumn('new_column', lit('This is a new column'))
dataframe.show(5)

+--------------------+--------------------+-----------------+-----------------+--------------------+-------------+-----------------+-------------+----+--------------+--------------------+-------------+--------------------+
|                 _id|  amazon_product_url|           author| bestsellers_date|         description|        price|   published_date|    publisher|rank|rank_last_week|               title|weeks_on_list|          new_column|
+--------------------+--------------------+-----------------+-----------------+--------------------+-------------+-----------------+-------------+----+--------------+--------------------+-------------+--------------------+
|{5b4aa4ead3089013...|http://www.amazon...|    Dean R Koontz|{{1211587200000}}|Odd Thomas, who c...|   {null, 27}|{{1212883200000}}|       Bantam| {1}|           {0}|           ODD HOURS|          {1}|This is a new column|
|{5b4aa4ead3089013...|http://www.amazon...|  Stephenie Meyer|{{1211587200000}}|Aliens have taken...|{25.99, 

Aktualizacja nazwy kolumny

In [52]:
# Update column 'amazon_product_url' with 'URL'

dataframe = dataframe.withColumnRenamed('amazon_product_url', 'URL')
dataframe.show(5)

+--------------------+--------------------+-----------------+-----------------+--------------------+-------------+-----------------+-------------+----+--------------+--------------------+-------------+--------------------+
|                 _id|                 URL|           author| bestsellers_date|         description|        price|   published_date|    publisher|rank|rank_last_week|               title|weeks_on_list|          new_column|
+--------------------+--------------------+-----------------+-----------------+--------------------+-------------+-----------------+-------------+----+--------------+--------------------+-------------+--------------------+
|{5b4aa4ead3089013...|http://www.amazon...|    Dean R Koontz|{{1211587200000}}|Odd Thomas, who c...|   {null, 27}|{{1212883200000}}|       Bantam| {1}|           {0}|           ODD HOURS|          {1}|This is a new column|
|{5b4aa4ead3089013...|http://www.amazon...|  Stephenie Meyer|{{1211587200000}}|Aliens have taken...|{25.99, 

Usuwanie kolumn - sposób pierwszy

In [53]:
dataframe_remove = dataframe.drop("publisher", "published_date").show(5)

+--------------------+--------------------+-----------------+-----------------+--------------------+-------------+----+--------------+--------------------+-------------+--------------------+
|                 _id|                 URL|           author| bestsellers_date|         description|        price|rank|rank_last_week|               title|weeks_on_list|          new_column|
+--------------------+--------------------+-----------------+-----------------+--------------------+-------------+----+--------------+--------------------+-------------+--------------------+
|{5b4aa4ead3089013...|http://www.amazon...|    Dean R Koontz|{{1211587200000}}|Odd Thomas, who c...|   {null, 27}| {1}|           {0}|           ODD HOURS|          {1}|This is a new column|
|{5b4aa4ead3089013...|http://www.amazon...|  Stephenie Meyer|{{1211587200000}}|Aliens have taken...|{25.99, null}| {2}|           {1}|            THE HOST|          {3}|This is a new column|
|{5b4aa4ead3089013...|http://www.amazon...|  

Usuwanie kolumn - sposób drugi

In [54]:
dataframe_remove2 = dataframe.drop(dataframe.publisher).drop(dataframe.published_date).show(5)

+--------------------+--------------------+-----------------+-----------------+--------------------+-------------+----+--------------+--------------------+-------------+--------------------+
|                 _id|                 URL|           author| bestsellers_date|         description|        price|rank|rank_last_week|               title|weeks_on_list|          new_column|
+--------------------+--------------------+-----------------+-----------------+--------------------+-------------+----+--------------+--------------------+-------------+--------------------+
|{5b4aa4ead3089013...|http://www.amazon...|    Dean R Koontz|{{1211587200000}}|Odd Thomas, who c...|   {null, 27}| {1}|           {0}|           ODD HOURS|          {1}|This is a new column|
|{5b4aa4ead3089013...|http://www.amazon...|  Stephenie Meyer|{{1211587200000}}|Aliens have taken...|{25.99, null}| {2}|           {1}|            THE HOST|          {3}|This is a new column|
|{5b4aa4ead3089013...|http://www.amazon...|  

Klauzula GROUPBY

In [55]:
dataframe.groupBy("rank_last_week")

<pyspark.sql.group.GroupedData at 0x7ce282509ae0>

In [56]:
# Group by author, count the books of the authors in the groups
dataframe.groupBy("rank_last_week").count().show()

+--------------+-----+
|rank_last_week|count|
+--------------+-----+
|           {7}|  470|
|          {10}|  363|
|          {15}|  123|
|          {11}|  327|
|           {4}|  524|
|           {3}|  520|
|          {12}|  289|
|           {6}|  497|
|          {13}|  246|
|           {8}|  460|
|          {14}|  171|
|          {16}|   53|
|           {0}| 4174|
|           {2}|  526|
|           {9}|  421|
|           {1}|  529|
|           {5}|  502|
+--------------+-----+



Uzupełnianie brakujących wartości

In [57]:
# Replace null values
dataframe.na.fill(50).show(20)

+--------------------+--------------------+--------------------+-----------------+--------------------+-------------+-----------------+--------------------+----+--------------+--------------------+-------------+--------------------+
|                 _id|                 URL|              author| bestsellers_date|         description|        price|   published_date|           publisher|rank|rank_last_week|               title|weeks_on_list|          new_column|
+--------------------+--------------------+--------------------+-----------------+--------------------+-------------+-----------------+--------------------+----+--------------+--------------------+-------------+--------------------+
|{5b4aa4ead3089013...|http://www.amazon...|       Dean R Koontz|{{1211587200000}}|Odd Thomas, who c...|   {null, 27}|{{1212883200000}}|              Bantam| {1}|           {0}|           ODD HOURS|          {1}|This is a new column|
|{5b4aa4ead3089013...|http://www.amazon...|     Stephenie Meyer|{{12

In [58]:
# Return new dataframe restricting rows with null values
dataframe.na.drop().show(5)

+--------------------+--------------------+-----------------+-----------------+--------------------+-------------+-----------------+-------------+----+--------------+--------------------+-------------+--------------------+
|                 _id|                 URL|           author| bestsellers_date|         description|        price|   published_date|    publisher|rank|rank_last_week|               title|weeks_on_list|          new_column|
+--------------------+--------------------+-----------------+-----------------+--------------------+-------------+-----------------+-------------+----+--------------+--------------------+-------------+--------------------+
|{5b4aa4ead3089013...|http://www.amazon...|    Dean R Koontz|{{1211587200000}}|Odd Thomas, who c...|   {null, 27}|{{1212883200000}}|       Bantam| {1}|           {0}|           ODD HOURS|          {1}|This is a new column|
|{5b4aa4ead3089013...|http://www.amazon...|  Stephenie Meyer|{{1211587200000}}|Aliens have taken...|{25.99, 

In [59]:
# Return new dataframe replacing one value with another
dataframe.na.replace('Dean R Koontz', 'Dean Koontz').show(5)

+--------------------+--------------------+-----------------+-----------------+--------------------+-------------+-----------------+-------------+----+--------------+--------------------+-------------+--------------------+
|                 _id|                 URL|           author| bestsellers_date|         description|        price|   published_date|    publisher|rank|rank_last_week|               title|weeks_on_list|          new_column|
+--------------------+--------------------+-----------------+-----------------+--------------------+-------------+-----------------+-------------+----+--------------+--------------------+-------------+--------------------+
|{5b4aa4ead3089013...|http://www.amazon...|      Dean Koontz|{{1211587200000}}|Odd Thomas, who c...|   {null, 27}|{{1212883200000}}|       Bantam| {1}|           {0}|           ODD HOURS|          {1}|This is a new column|
|{5b4aa4ead3089013...|http://www.amazon...|  Stephenie Meyer|{{1211587200000}}|Aliens have taken...|{25.99, 

Partycjonowanie

In [60]:
# Dataframe with 10 partitions

dataframe.repartition(10).rdd.getNumPartitions()

10

In [61]:
# Dataframe with 1 partition

dataframe.coalesce(1).rdd.getNumPartitions()

1

Liczba partycji w PySpark ma istotny wpływ na jakość i wydajność obliczeń. Partycje to podziały na fragmenty danych, które są przetwarzane równolegle na różnych węzłach klastra. Odpowiednie zarządzanie liczbą partycji jest kluczowe, ponieważ może wpływać na równomierność obciążenia klastra, szybkość przetwarzania, oraz zużycie zasobów.

Oto kilka aspektów, które należy wziąć pod uwagę:

1. Równomierność Obciążenia:
* Zalety: Większa liczba partycji może prowadzić do równomierniejszego rozkładu obciążenia między węzłami klastra. To oznacza, że każdy węzeł będzie miał mniej pracy do wykonania, co przyspiesza przetwarzanie.

* Wady: Zbyt duża liczba partycji może prowadzić do nadmiernego narzutu komunikacyjnego między węzłami, co może zrównoważyć korzyści z równomiernego obciążenia.

2. Wydajność Przetwarzania:
* Zalety: Większa liczba partycji może zwiększyć wydajność przetwarzania równoległego, zwłaszcza w przypadku operacji, które można efektywnie parallelizować.

* Wady: Zbyt duża liczba partycji może prowadzić do nadmiernego narzutu związanego z zarządzaniem partycjami oraz wymianą danych między węzłami.

3. Zużycie Pamięci:
* Zalety: Mniejsza liczba partycji może zmniejszyć zużycie pamięci, ponieważ dla każdej partycji wymagane są pewne zasoby.

* Wady: Zbyt mała liczba partycji może prowadzić do nierównomiernego rozkładu obciążenia oraz ograniczonej równoległości przetwarzania.

4. Zasoby Klastra:
* Zalety: Dostosowanie liczby partycji do dostępnych zasobów klastra może przynieść optymalne wyniki.

* Wady: Nieprawidłowe dostosowanie liczby partycji może prowadzić do nieefektywnego wykorzystania dostępnych zasobów.

Jak Wybrać Odpowiednią Liczbę Partycji:
* Zrozumienie Danych: Przemyśl, jak są rozproszone dane, aby dostosować liczbę partycji do specyfiki zbioru danych.
* Rozmiar Partycji: W przypadku dużych zbiorów danych warto dostosować rozmiar partycji do dostępnej pamięci na węzłach klastra.
* Narzut Komunikacyjny: Optymalizuj liczbę partycji tak, aby unikać nadmiernego narzutu komunikacyjnego.
* Monitorowanie Wydajności: Regularnie monitoruj wydajność aplikacji i dostosuj liczbę partycji w zależności od wyników.

Ostateczny wybór liczby partycji zależy od charakterystyki danych, dostępnych zasobów klastra oraz konkretnego rodzaju operacji, które są wykonywane. Warto eksperymentować i monitorować wydajność, aby dostosować liczbę partycji do konkretnych potrzeb.

### Wykonanie kodu SQL na ramce danych

Pierwszy krok to nadanie nazwy dla tabeli, by móc się do niej odnosić w zapytaniach SQL

In [62]:
# Registering a table
dataframe.registerTempTable("df")

Zapytanie podstawowe

In [63]:
sc.sql("select * from df").show(3)


+--------------------+--------------------+---------------+-----------------+--------------------+-------------+-----------------+-------------+----+--------------+--------------------+-------------+--------------------+
|                 _id|                 URL|         author| bestsellers_date|         description|        price|   published_date|    publisher|rank|rank_last_week|               title|weeks_on_list|          new_column|
+--------------------+--------------------+---------------+-----------------+--------------------+-------------+-----------------+-------------+----+--------------+--------------------+-------------+--------------------+
|{5b4aa4ead3089013...|http://www.amazon...|  Dean R Koontz|{{1211587200000}}|Odd Thomas, who c...|   {null, 27}|{{1212883200000}}|       Bantam| {1}|           {0}|           ODD HOURS|          {1}|This is a new column|
|{5b4aa4ead3089013...|http://www.amazon...|Stephenie Meyer|{{1211587200000}}|Aliens have taken...|{25.99, null}|{{12

Zapytanie bardziej złożone

In [64]:
sc.sql("select \
               CASE WHEN description LIKE '%love%' THEN 'Love_Theme' \
               WHEN description LIKE '%hate%' THEN 'Hate_Theme' \
               WHEN description LIKE '%happy%' THEN 'Happiness_Theme' \
               WHEN description LIKE '%anger%' THEN 'Anger_Theme' \
               WHEN description LIKE '%horror%' THEN 'Horror_Theme' \
               WHEN description LIKE '%death%' THEN 'Criminal_Theme' \
               WHEN description LIKE '%detective%' THEN 'Mystery_Theme' \
               ELSE 'Other_Themes' \
               END Themes \
       from df").groupBy('Themes').count().show()

sc.sql("select \
        CASE WHEN description LIKE '%love%' THEN 'Love_Theme' \
        ELSE 'Other_Themes' \
        END Themes \
  from df").groupBy('Themes').count().show()

+---------------+-----+
|         Themes|count|
+---------------+-----+
|    Anger_Theme|  203|
|   Other_Themes| 8778|
|  Mystery_Theme|  454|
|     Hate_Theme|   23|
| Criminal_Theme|  305|
|   Horror_Theme|    6|
|Happiness_Theme|   34|
|     Love_Theme|  392|
+---------------+-----+

+------------+-----+
|      Themes|count|
+------------+-----+
|Other_Themes| 9803|
|  Love_Theme|  392|
+------------+-----+



### Konwersja struktur danych

In [65]:
# Converting dataframe into a RDD of string
dataframe.toJSON().first()

'{"_id":{"$oid":"5b4aa4ead3089013507db18b"},"URL":"http://www.amazon.com/Odd-Hours-Dean-Koontz/dp/0553807056?tag=NYTBS-20","author":"Dean R Koontz","bestsellers_date":{"$date":{"$numberLong":"1211587200000"}},"description":"Odd Thomas, who can communicate with the dead, confronts evil forces in a California coastal town.","price":{"$numberInt":"27"},"published_date":{"$date":{"$numberLong":"1212883200000"}},"publisher":"Bantam","rank":{"$numberInt":"1"},"rank_last_week":{"$numberInt":"0"},"title":"ODD HOURS","weeks_on_list":{"$numberInt":"1"},"new_column":"This is a new column"}'

In [66]:
# Obtaining contents of df as Pandas dataFrame
dataframe.toPandas()

  Nested StructType not supported in conversion to Arrow
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.


Unnamed: 0,_id,URL,author,bestsellers_date,description,price,published_date,publisher,rank,rank_last_week,title,weeks_on_list,new_column
0,"(5b4aa4ead3089013507db18b,)",http://www.amazon.com/Odd-Hours-Dean-Koontz/dp...,Dean R Koontz,"((1211587200000,),)","Odd Thomas, who can communicate with the dead,...","(None, 27)","((1212883200000,),)",Bantam,"(1,)","(0,)",ODD HOURS,"(1,)",This is a new column
1,"(5b4aa4ead3089013507db18c,)",http://www.amazon.com/The-Host-Novel-Stephenie...,Stephenie Meyer,"((1211587200000,),)",Aliens have taken control of the minds and bod...,"(25.99, None)","((1212883200000,),)","Little, Brown","(2,)","(1,)",THE HOST,"(3,)",This is a new column
2,"(5b4aa4ead3089013507db18d,)",http://www.amazon.com/Love-Youre-With-Emily-Gi...,Emily Giffin,"((1211587200000,),)",A woman's happy marriage is shaken when she en...,"(24.95, None)","((1212883200000,),)",St. Martin's,"(3,)","(2,)",LOVE THE ONE YOU'RE WITH,"(2,)",This is a new column
3,"(5b4aa4ead3089013507db18e,)",http://www.amazon.com/The-Front-Garano-Patrici...,Patricia Cornwell,"((1211587200000,),)",A Massachusetts state investigator and his tea...,"(22.95, None)","((1212883200000,),)",Putnam,"(4,)","(0,)",THE FRONT,"(1,)",This is a new column
4,"(5b4aa4ead3089013507db18f,)",http://www.amazon.com/Snuff-Chuck-Palahniuk/dp...,Chuck Palahniuk,"((1211587200000,),)",An aging porn queens aims to cap her career by...,"(24.95, None)","((1212883200000,),)",Doubleday,"(5,)","(0,)",SNUFF,"(1,)",This is a new column
...,...,...,...,...,...,...,...,...,...,...,...,...,...
10190,"(5b4aa4ead3089013507dd959,)",https://www.amazon.com/Clancy-Line-Sight-Jack-...,Mike Maden,"((1530921600000,),)",Jack Ryan Jr. risks his life to protect a woma...,"(None, 0)","((1532217600000,),)",Putnam,"(11,)","(6,)",TOM CLANCY LINE OF SIGHT,"(4,)",This is a new column
10191,"(5b4aa4ead3089013507dd95a,)",https://www.amazon.com/Something-Water-Novel-C...,Catherine Steadman,"((1530921600000,),)",A documentary filmmaker and an investment bank...,"(None, 0)","((1532217600000,),)",Ballantine,"(12,)","(11,)",SOMETHING IN THE WATER,"(5,)",This is a new column
10192,"(5b4aa4ead3089013507dd95b,)",https://www.amazon.com/Little-Fires-Everywhere...,Celeste Ng,"((1530921600000,),)",An artist upends a quiet town outside Cleveland.,"(None, 0)","((1532217600000,),)",Penguin Press,"(13,)","(12,)",LITTLE FIRES EVERYWHERE,"(41,)",This is a new column
10193,"(5b4aa4ead3089013507dd95c,)",https://www.amazon.com/Shelter-Place-Nora-Robe...,Nora Roberts,"((1530921600000,),)",Survivors of a mass shooting outside a mall in...,"(None, 0)","((1532217600000,),)",St. Martin's,"(14,)","(5,)",SHELTER IN PLACE,"(6,)",This is a new column


### Zapis do plików

In [None]:
# Write & Save File in .parquet format
dataframe.select("author", "title", "rank", "description")\
.write \
.save("Rankings_Descriptions.parquet")

In [None]:
# Write & Save File in .json format
dataframe.select("author", "title") \
.write \
.save("Authors_Titles.json",format="json")

### Kończenie sesji Spark

In [None]:
# End Spark Session
sc.stop()

### Lista zadań do wykonania na zbiorze danych titanic

1. Obliczyć ile osób przeżyło
2. Obliczyć procent osób które przeżyły
3. Obliczyć średni wiek pasażerów
4. Obliczyć procentowo i liczbowo ilość pasażerów korzystających z różnych klas podróży (Pclass)
5. Określić, czy więcej było kobiet czy mężczyzn na statku
6. Obliczyć medianę i odchylenie standardowe ceny biletu
7. Wskazać ile najwięcej osób znajdowało się w jednej kabinie na statku
8. Obliczyć wariancję ceny biletów oraz percentyle 10 i 90 (funkcje z pakietu numpy)
9. Wyszukać mężczyzn w przedziale wiekowym 20-40 i zliczyć ich ilość
10. Wyszukać kobiety mające mniej niż 20 i więcej niż 40 lat i zliczyć ich ilość
11. Wypisać nazwiska osób, których długość nazwiska jest parzysta
  * Pobrać imiona i nazwiska do listy
  * W pętli dla każdej osoby
      * Podzielić tekst za pomocą znaku przecinka – funkcja split
      * Zapisać 1 element z powstałej listy
      *	Sprawdzić czy jego długość jest parzysta
      * Jeśli tak, wypisać to nazwisko
12. Na podstawie zadania 11 policzyć ilość osób z imieniem William
13. 3 swoje zapytania do bazy o innej treści niż powyżej
