# DataFrame

http://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark-sql-module

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession.builder \
    .appName('DataFrame') \
    .master('local[*]') \
    .getOrCreate()

## Tworzenie DataFrame'u

### Plik

In [None]:
data_path = './SparkSQLdata/'

In [None]:
# json
people = spark.read.json(data_path+'people.json')

In [None]:
people.show()

In [None]:
# csv
people_txt = spark.read.option("inferSchema", "true").csv(data_path+'people.txt')

In [None]:
people_txt.show()

### Kolekcja Row'ów

In [None]:
newPerson1 = Row(name='Greg', age=32)

In [None]:
newPerson1

In [None]:
newPerson1.name

In [None]:
newPerson1.age

In [None]:
newPerson1['age']

In [None]:
'age' in newPerson1

In [None]:
newPerson = Row("age", "name")

In [None]:
newPerson2 = newPerson(24, 'Alice')

In [None]:
newPerson2

In [None]:
newPerson3 = newPerson(None, None)
newPerson4 = newPerson(33, None)
newPerson5 = newPerson(None, 'Peter')
newPerson6 = newPerson(32, 'Peter')
newPerson7 = newPerson(40, 'Greg')

In [None]:
newPeopleDF = spark.createDataFrame([newPerson1, newPerson2, newPerson3, newPerson4, 
                                     newPerson5, newPerson6, newPerson7])

In [None]:
newPeopleDF.show()

### Inne lokalne kolekcje

Typy danych: http://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#module-pyspark.sql.types

Kilka podstawowych: IntegerType, DoubleType, FloatType, StringType, BooleanType, NullType

In [None]:
from pyspark.sql.types import IntegerType, StringType, StructType, StructField

In [None]:
# definicja schematu
# StructType ~ Row
schema = StructType([StructField("V1", IntegerType()), StructField("V2", StringType())])

In [None]:
# lokalna kolekcja - lista list
df = spark.createDataFrame([[1,2],[3,4]], schema)

In [None]:
df.show()

### RDD

Przechodzenie RDD <-> DF

In [None]:
type(people)

In [None]:
people.show()

In [None]:
people.collect()

DF -> RDD

In [None]:
type(people.rdd)

In [None]:
people.rdd.collect()

In [None]:
people.rdd.map(tuple).collect()

RDD -> DF

In [None]:
people.rdd.toDF().show()

In [None]:
people.rdd.map(tuple).toDF().show()

In [None]:
sc = spark.sparkContext

In [None]:
sc.parallelize([(x, x+1) for x in range(5)]).toDF().show()

In [None]:
# do toDF można podać schemat
schema = StructType([StructField("A", IntegerType()), StructField("B", StringType())])
sc.parallelize([(x, x+1) for x in range(5)]).toDF(schema).show()

In [None]:
sc.parallelize([(x, x+1) for x in range(5)]).toDF(schema).collect()[0]

> **TODO**: Na dwa sposoby stwórz DataFrame z 3 wierszami i 3 kolumnami - dwie typu string, jedna numeryczna

> **TODO**: Z podanego RDD utwórz DataFrame z nazwanymi kolumnami `name` i `age` oraz odpowiednimi typami (string i int)

In [None]:
myRDD = sc.textFile(data_path+"people.txt")
myRDD.collect()

****

## Praca z DataFrame'ami

### Ogolne wiadomosci na temat danych
printSchema, show, columns, dtypes <br>
Znane z RDD np: count, take, head

In [None]:
people.printSchema()

In [None]:
people_txt.printSchema()

In [None]:
people_txt.show(1)

In [None]:
people.show(1)

Liczba wierszy

In [None]:
people.count()

Lista kolumn

In [None]:
people.columns

Lista kolumn wraz z typami danych

In [None]:
newPeopleDF.dtypes

### Odwolania do poszczegolnych kolumn

In [None]:
people.age

In [None]:
people['age']

In [None]:
people[0]

### Dodanie/usuniecie kolumny
withColumn, drop

In [None]:
people.withColumn(colName = 'ageNextYear', col = people.age +1).show()

In [None]:
people.drop('age').show()

### Zmiany nazwy kolumny

In [None]:
people_txt = people_txt.withColumnRenamed('_c0', 'name').withColumnRenamed('_c1', 'age')

In [None]:
people_txt.show()

### Podstawowe statystyki kolumn w DataFrame'ie.
describe

In [None]:
people.describe().show()

In [None]:
people.describe('age').show()

### Braki danych
isNull, isNotNull<br>
fillna, dropna, replace

In [None]:
newPeopleDF.show()

In [None]:
newPeopleDF.filter(newPeopleDF.age.isNull()).show()

In [None]:
newPeopleDF.filter(newPeopleDF.age.isNotNull()).show()

In [None]:
newPeopleDF.fillna(-1).show()

In [None]:
newPeopleDF.fillna({'age':-1, 'name':'unknown'}).show()

In [None]:
newPeopleDF.fillna({'name':'unknown'}).replace('unknown', 'NN').show()

In [None]:
newPeopleDF.dropna().show()

In [None]:
newPeopleDF.dropna(subset='age').show()

### Funkcje wprost ze skladni SQL 
select, where (alias filter), orderBy

In [None]:
people.show()

In [None]:
people.\
select('name', 'age').\
where(people.name.like('%n%')).\
orderBy(people.age.asc()).\
show()
# select: wyswietl kolumny 'name' i 'age'
# where: uwzglednij tylko imiona (name) zaweirajace litere 'n'
# orderBy: zbior posortuj rosnaco po kolumnie 'age'

In [None]:
# gdy chcemy zobaczyć wszystkie kolumny select jest zbędny
people.\
where(people.name.like('%n%')).\
orderBy(people.age.asc()).\
show()

> **TODO**: Wyswietl imiona ludzi ze zbioru `people` starszych niz 29 lat. Wyniki posortuj alfabetycznie po kolumnie name

### Operacje na zbiorach
union - dziala jak UNION ALL w SQL. <br>
intersect (INTERSECT z SQLa), subtract (EXCEPT z SQLa)

In [None]:
people.show()

In [None]:
people_txt.show()

In [None]:
people.union(people_txt).show()

In [None]:
people.union(people_txt.select(people.columns)).show()

###############################################################################################################

# Zadania 1

****
Przygotowanie danych

In [None]:
data_path = './SparkSQLdata/'

In [None]:
people = spark.read.json(data_path+'people.json')
employees = spark.read.json(data_path+'employees.json')
people_txt = spark.read.option("inferSchema", "true").csv(data_path+'people.txt')

In [None]:
newPerson1 = Row(name=u'Greg', age=32)
newPerson = Row("age", "name")
newPerson2 = newPerson(24, 'Alice')
newPerson3 = newPerson(None, None)
newPerson4 = newPerson(33, None)
newPerson5 = newPerson(None, 'Peter')
newPerson6 = newPerson(32, 'Peter')
newPerson7 = newPerson(40, 'Greg')

In [None]:
newPeopleDF = spark.createDataFrame([newPerson1, newPerson2, newPerson3, newPerson4, 
                                     newPerson5, newPerson6, newPerson7])

In [None]:
people_txt = people_txt.withColumnRenamed('_c0', 'name').withColumnRenamed('_c1', 'age')

In [None]:
allPeople = spark.read.parquet(data_path+'allPeople.parquet')

****

1. Wyswietl imiona ludzi ze zbioru `people`, o ktorych wielku nie mamy informacji.

2. Na koniec zbioru `people` doklej zbiór `newPeopleDF` oraz `people_txt`. Tak utworzony DataFrame nazwij `allPeople`

3. Wybierz ze zbioru `allPeople` te wiersze, ktore wystepuja rowniez w zbiorze `people`.

4. Wybierz ze zbioru `allPeople` te imiona, ktore NIE wystepuja w zbiorze `people_txt`.

5. Usun ze zbioru `allPeople` wiersze, ktore zawieraja same braki danych.

6. Do zbioru `allPeople` dodaj kolumne 'age' zawierajaca wiek powiekszony o 1. Zmien nazwe oryginalnej kolumny 'age' na 'starting_age'.

7. Ile unikatowych rekordow znajduje sie w zbiorze `allPeople`?

8. **⋆** Do zbioru `employees` dodaj kolumny: z pensjami po 0-10% podwyżce (dla każdego losowo z rozkładu jednostajnego) oraz z różnicą pomiędzy pensją przed i po podwyżce. Zbiór posortuj alfabetycznie.

In [None]:
from pyspark.sql import functions as f

###############################################################################################################

## Praca z DataFrame'ami

In [None]:
data_path = './SparkSQLdata/'
people = spark.read.json(data_path+'people.json')
employees = spark.read.json(data_path+'employees.json')
people_txt = spark.read.option("inferSchema", "true").csv(data_path+'people.txt')
newPeople = spark.read.parquet(data_path+'newPeople.parquet')
people_txt = people_txt.withColumnRenamed('_c0', 'name').withColumnRenamed('_c1', 'age')

### Funkcje SparkSQL

In [None]:
from pyspark.sql import functions as f

In [None]:
people.select(f.min("age")).show()

In [None]:
people.select(f.min("age").alias("min_age")).show()

In [None]:
people.withColumn("random", f.randn(42)).show()

In [None]:
people.withColumn("random", f.exp("age")).show()

### join
inner (domyslny)

In [None]:
people.join(other=employees, on='name', how='inner').show()

<b> Uwaga ogólna </b><br>
Join to stosunkowo popularna, ale kosztowna operacja.<br>
W sytuacji, kiedy jeden z łaczonych DataFramow jest znacznie mniejszy (w szczegolnosci na tyle mały, że w całości mieści się w pamięci), zaleca sie zastosowanie <i>broadcast hash join</i>.<br>
(Mała tabela zostanie zebrana do pamięci i wysłana do każdego noda).<br>
W niektórych przypadkach optymalizator sam za nas zdecyduje o zastosowaniu <i>broadcast hash join</i>.

In [None]:
from pyspark.sql.functions import broadcast
newPeople.join(broadcast(spark.createDataFrame([Row(age=20, name='Greg')])), on='name').show()

> **TODO**: Spośród osób (`people`, `newPeople`, `people_txt`), dla których mamy informacje o zarobkach (zbiór `employees`). Ile zarabia najmłodsza osoba?

> **TODO**: Dla każdego pracownika (`employees`), dla którego mamy informacje o wieku (`people`, `newPeople`, `people_txt`). Dodaj do pensji 0.1% za każdy rok życia. Oblicz koszt takiego 'bonusu urodzinowego' dla pracodawcy. 

### groupBy

In [None]:
newPeople.groupBy()

 Przez GroupedData mamy dostep do takich funkcji jak:<br>
 avg, max, min, sum, count, agg <br>
 (dla wygody, do funkcji 'agg' mamy tez dostep bezposrednio na DataFrame)

In [None]:
newPeople.groupBy().max().show()

In [None]:
newPeople.groupBy('name').count().show()

In [None]:
newPeople.groupBy('name').agg(f.min('age').alias('min_age'), f.max('age').alias('max_age'),\
                              f.count('name').alias('n_people')).show()

In [None]:
newPeople.agg(f.min('age'), f.max('age'), f.count('name')).show()

> **TODO**: Ile jest unikatowych (występujących tylko 1 raz) imion w połączonych zbiorach `people`, `newPeople` oraz `people_txt`?

> **TODO**: Ile lat mają osoby, których imiona występują tylko raz w połączonych zbiorach `people`, `newPeople` oraz `people_txt`?

****

#### Nowy DataFrame

In [None]:
import random

In [None]:
random.seed(123)

years = 10
names = ['Alice', 'Betty', 'Chris', 'Dan', 'Greg']
unique_names_count = len(names)
names = sorted(names*years)
year = [y for y in range(2000, 2000+years)]*len(names)
starting_salary = [round(random.gauss(4000, 1000),2) for i in range(unique_names_count)]
salary = [0 for i in range(years*unique_names_count)]
salary[::years] = starting_salary
for n in range(unique_names_count):
    for y in range(years-1):
        index = (years*n+1)+y
        salary[index] = round(salary[index-1]*(1+random.gauss(0.1,0.09)),2)

In [None]:
salaryHistory = spark.createDataFrame([Row(name=n, year=y, salary=s) for n,y,s in zip(names, year, salary)])
salaryHistory = salaryHistory.filter((salaryHistory['name'] != 'Greg') | (salaryHistory['year'] != 2006))
salaryHistory = salaryHistory.union(spark.createDataFrame([Row('Alice', 3000, 2000)]))

In [None]:
salaryHistory.show()

****

> **TODO**: Ile razy powtarza się każde z imion w `salaryHistory`?

> **TODO**: Na podstawie `salaryHistory` stworz tabelę zależności średniej, minimalnej i maksymalnej pensji od roku. Posortuj lata malejąco. Pensje podaj z dokładnością do pełnych wartości.

### Window functions
**over**

Służy do obliczania agregowanych wartości w grupach definiowanych oknem (window).<br>
Zwraca wiele rekordow (tyle ile na wejsciu w grupie).

partitionBy - definiuje podział danych na okna<br>
orderBy - definiuje sortowanie wewnątrz każdego z okien<br>
Frame (rangeBetween/rowsBetween) - definiuje offset<br>

In [None]:
from pyspark.sql.window import Window

**partitionBy**

In [None]:
allPeople = spark.read.parquet(data_path+'allPeople.parquet')

In [None]:
# definicja 'okna'
myWindowSpec = Window.partitionBy('name')

In [None]:
# wywołanie funkcji na kazdym 'oknie'
allPeople.withColumn('nameCount', \
                     f.count(allPeople['name']).over(myWindowSpec)).show()

> **TODO**: Do zbioru `salaryHistory` dodaj kolumnę 'avgSalaryDiff', która będzie zawierała różnicę pomiedzy pensją z danego roku, a średnią pensją osoby na przestrzeni wszytskich lat.

**partitionBy + orderBy**

In [None]:
# przykład: rank
# - musimy zdefiniować dodatkowo sortowanie wewnątrz każdej z grup
# - zwraca lp dla kolejnych rekordów posortowanych według zadanych kolumn
windowSpec = Window.partitionBy(salaryHistory['name']).orderBy(salaryHistory['year'])
ranked = f.rank().over(windowSpec)
salaryHistory.withColumn('ranked', ranked).show()

In [None]:
salaryHistory_tmp = salaryHistory.filter(salaryHistory.name.isin('Alice', 'Greg'))

In [None]:
# rank, dense_rank, percent_rank, row_number
windowSpec = Window.partitionBy('name').orderBy('year')
ranked = (f.rank()).over(windowSpec)
dense_rank = (f.dense_rank()).over(windowSpec)
percent_rank = (f.percent_rank()).over(windowSpec)
row_number = (f.row_number()).over(windowSpec)
salaryHistory_tmp.withColumn('ranked', ranked).withColumn('dense_rank', dense_rank).\
withColumn('percent_rank', percent_rank).withColumn('row_number', row_number).\
show()

Inne warte uwagi funkcje: <br>
ntile, cume_dist, first, lag, lead

> **TODO**: Dla zbioru `salaryHistory`, porównaj pensje ludzi pomiedzy najwcześniejszym i najpóźniejszym rokiem ich pracy.

**partitionBy + orderBy + rangeBetween/rowsBetween**

In [None]:
# przykład: średnia ruchoma
windowSpec = Window.partitionBy(salaryHistory['name']).orderBy(salaryHistory['year'])\
.rowsBetween(-1,1)
movingAvg = (f.avg(salaryHistory['salary'])).over(windowSpec)
salaryHistory.withColumn('movingAvg', movingAvg).show()

In [None]:
# przykład: średnia ze wszystkich rekordów do aktualnego włącznie
windowSpec = Window.partitionBy(salaryHistory['name']).orderBy(salaryHistory['year']).\
rowsBetween(Window.unboundedPreceding,Window.currentRow)
movingAvg = (f.avg(salaryHistory['salary'])).over(windowSpec)
salaryHistory.withColumn('movingAvg', movingAvg).show()

In [None]:
salaryHistory.withColumn('movingAvg', movingAvg).filter("name == 'Alice'").show()

In [None]:
# podobny efekt uzyskamy poniższym zapytaniemm. 
# Różnica: rekordy w jednej grupie (imię, rok) nie zostaną rozdzielone 
import sys
windowSpec = Window.partitionBy(salaryHistory['name']).orderBy(salaryHistory['year']).\
rangeBetween(-sys.maxsize,0)
movingAvg = (f.avg(salaryHistory['salary'])).over(windowSpec)
salaryHistory.withColumn('movingAvg', movingAvg).show()

In [None]:
salaryHistory.withColumn('movingAvg', movingAvg).filter("name == 'Alice'").show()

###############################################################################################################

# Zadania 2

1. Ile lat mają osoby, których imiona występują tylko raz w połączonych zbiorach `people`, `newPeople` oraz `people_txt`? Rozwiązując problem zastosuj window functions.

2. Czy komukolwiek obniżyła się pensja w stosunku do roku poprzedniego? <br>
a. Ile osób było kiedykolwiek w takiej sytuacji?<br>
b. Jaki jest rozkład częstości takich przypadków w zależności od roku?

3. Oblicz różnicę w pensjach w stosuku do <br>
a. najwyższej pensji danej osoby<br> 
b. drugiej najwyższej pensji danej osoby<br>

###############################################################################################################

### SQL

Spark wspiera ANSI SQL 2003 (SQL3)

Aby użyć zbioru danych w zapytaniu SQL musimy go najpierw zarejestrować.

In [None]:
salaryHistory.createOrReplaceTempView('salaryHistory')
# salaryHistory.registerTempTable('salaryHistorySQL') - wersja dla sparka 1.x
# view NIE jest 'persisted in memory', to nadal nie jest akcja

In [None]:
spark.sql('select * from salaryHistory limit 2').show()

In [None]:
type(spark.sql('select * from salaryHistory limit 2'))

In [None]:
spark.sql('select name, avg(salary) avg_sal from salaryHistory group by name').show()

In [None]:
spark.sql('select name, salary, avg(salary) over (partition by name) avg_sal from salaryHistory').show()

In [None]:
spark.catalog.dropTempView("salaryHistory")

### UDFs
Poza funkcjami dostępnymi w module pyspark.sql.functions, można też tworzyć własne (User Defined Functions) i używać ich w zapytaniach SparkSQL <br>
<i>Uwaga:</i> Po UDFy sięgamy w ostateczności - optymalizacja <br>
<i>Uwaga:</i> UDF napisany w Pythonie (lub R) bedzie mial gorszy performance niż UDF napisany w Scali lub Javie. Ale możemy nasz UDF napisać w Scali/Javie i zarejestrować do użycia w Pythonie. <br>
Blogpost o tym jak w pySparku używać UDF napisanej w scali: <br>
https://medium.com/@ingwbaa/using-scala-udfs-in-pyspark-b70033dd69b9 <br>
Blogpost o ulepszeniach w UDF (pandas UDF) dla pySparka od Sparku 2.3: <br>
https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

In [None]:
from pyspark.sql.types import IntegerType, StringType, FloatType

In [None]:
def udfPower3(value):
    return(value**3)

In [None]:
udfPower3(3.14)

Aby stosować zdefiniowaną funkcję, musimy ja zarejestrować. (Driver roześle funkcje do wszystkich egzekutorów)

In [None]:
power3 = f.udf(udfPower3, FloatType())

In [None]:
salaryHistory.select(power3(salaryHistory.salary)).show()

### Spark wspiera Hive

Spark SQL wspiera HiveQL. <br>
Spark SQL wspiera rownież wczytawanie/zapisywanie danych z/do Apache Hive.<br>

Wiecej informacji na temat integracji z Hive:<br>
https://spark.apache.org/docs/latest/sql-programming-guide.html#compatibility-with-apache-hive <br>
https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables