# DataFrame 

http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark-sql-module

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as f

In [None]:
spark = SparkSession.builder \
    .appName('DataFrame_2') \
    .master('local[*]') \
    .getOrCreate()

In [None]:
#https://github.com/apache/spark
data_path = .../'
people = spark.read.json(data_path+'people.json')
employees = spark.read.json(data_path+'employees.json')
people_txt = spark.read.option("inferSchema", "true").csv(data_path+'people.txt')
people_txt = people_txt.withColumnRenamed('_c0', 'name').withColumnRenamed('_c1', 'age')

In [None]:
people.show()

In [None]:
employees.show()

In [None]:
people_txt.show()

#### join
inner (domyslny)

In [None]:
people.join(other=employees, on='name', how='inner').show()

Uwaga ogolna
Join to stosunkowo popularna, ale kosztowna operacja.
W sytuacji, kiedy jeden z laczonych DataFramow jest znacznie mniejszy (w szczegolnosci na tyle maly, ze w calosci miesci sie w pamieci), zaleca sie zastosowanie broadcast hash join.
(Mala tabela zostanie zebrana do pamieci i wyslana do kazdego noda).
W niektorych przypadkach optymalizator sam za nas zdecyduje o zastosowaniu broadcast hash join. Jednak SparkSQL wyjatkowo tutaj daje nam mozliwosc wymuszenia tej operacji wprost w kodzie. 
from pyspark.sql.functions import broadcast

> **TODO**: Znajdź ile zarabia najmlodsza osoba spośród (people, people_txt)?

> **TODO**: Dla kazdego pracownika (employees), dla ktorego mamy informacje o wieku (people, people_txt) dodaj do pensji 0.1% za kazdy rok zycia. Zsumuj koszt takiego 'bonusu urodzinowego' dla pracodawcy. 

#### groupBy

In [None]:
people.groupBy()

 Przez GroupedData mamy dostep do takich funkcji jak:<br>
 avg, max, min, sum, count, agg <br>
 (dla wygody, do funkcji 'agg' mamy tez dostep bezposrednio na DataFrame)

In [None]:
people.groupBy().max('age').show()

In [None]:
people.groupBy('name').count().show()

In [None]:
#from pyspark.sql import functions as f
people.groupBy('name').agg(f.min('age').alias('min_age'), f.max('age').alias('max_age'), f.count('name').alias('n_people')).show()

In [None]:
people.agg(f.min('age'), f.max('age'), f.count('name')).show()

> **TODO**: Ile jest unikatowych (wystepujacych tylko 1 raz) imion w polaczonych zbiorach people oraz people_txt? (allPeople)

> **TODO**: Ile lat maja osoby, ktorych imiona wystepuja tylko raz w polaczonych zbiorach people oraz people_txt?

****

In [None]:
# Wygenerujmy nowy DataFrame

In [None]:
import random

In [None]:
years = 10
names = ['Alice', 'Betty', 'Chris', 'Dan', 'Greg']
unique_names_count = len(names)
names = sorted(names*years)
year = [y for y in range(2000, 2000+years)]*len(names)
starting_salary = [round(random.gauss(4000, 1000),2) for i in range(unique_names_count)]
salary = [0 for i in range(years*unique_names_count)]
salary[::years] = starting_salary
for n in range(unique_names_count):
    for y in range(years-1):
        index = (years*n+1)+y
        #print(index, salary[index-1])
        salary[index] = round(salary[index-1]*(1+random.gauss(0.1,0.09)),2)

In [None]:
salaryHistory = spark.createDataFrame([Row(name=n, year=y, salary=s) for n,y,s in zip(names, year, salary)])

In [None]:
salaryHistory = salaryHistory.filter((salaryHistory['name'] != 'Greg') | (salaryHistory['year'] != 2006))

In [None]:
salaryHistory = salaryHistory.union(spark.createDataFrame([Row('Alice', 3000, 2000)]))

In [None]:
salaryHistory.collect()

> **TODO**: Przyjrzyj sie nowemu zbiorowi danych salaryHistory.<br>
a. Zobacz schemat. <br>
b. Ile rekordow jest w calym zbiorze? <br>
c. Jaka jest najmniejsza i najwieksza pensja?<br>
d. Ile razy powtarza sie kazde z imion?<br>
e. Stworz tabele sredniej, minimalnej i maksymalnej pensji w zależności od roku. Posortuj lata malejaco. Pensje podaj z dokladnoscia do pelnych wartosci. <br>

#### Window functions
over

Do obliczania agregowanych wartosci w grupach definiowanych oknem (window).<br>
Zwracaja wiele rekordow (tyle ile na wejsciu w grupie).

partitionBy - definiuje podzial danych na okna<br>
orderBy - definiuje sortowanie wewnatrz kazdego z okien<br>
Frame (rangeBetween/rowsBetween) - definiuje offset<br>

In [None]:
from pyspark.sql.window import Window
# from pyspark.sql import functions as f

partitionBy

In [None]:
# definicja 'okna'
myWindowSpec = Window.partitionBy(allPeople['name'])

Funkcje agregujące okien (aka funkcje okien lub agregacje okienkowe ) <br>
Funkcje okna działają w odniesieniu do grupy wierszy, nazywanej oknem, i obliczania wartości zwracanej dla każdego wiersza w oparciu o grupę wierszy. Funkcje okna są przydatne do przetwarzania zadań, takich jak Obliczanie średniej przenoszonej, obliczanie zbiorczej statystyki lub uzyskiwanie dostępu do wartości wierszy, w których podano względne położenie bieżącego wiersza.

In [None]:
# wywolanie funkcji na kazdym 'oknie'
allPeople.withColumn('nameCount', f.count(allPeople['name']).over(myWindowSpec)).show()

> **TODO**: Do zbioru salaryHistory dodaj kolumne 'avgSalaryDiff', ktora bedzie zawierala roznice pomiedzy pensja z danego roku, a srednia pensja osoby na przestrzeni wszytskich lat. 

partitionBy + orderBy

In [None]:
# np. rank
# - musimy zdefiniowac dodatkowo sortowanie wewnatrz kazdej z grup
# - zwraca lp dla kolejnych rekordow posortowanych wedlug zadanych kolumn
windowSpec = Window.partitionBy(salaryHistory['name']).orderBy(salaryHistory['year'])
ranked = (f.rank()).over(windowSpec)
salaryHistory.withColumn('ranked', ranked).show()

In [None]:
salaryHistory_tmp = salaryHistory.filter(salaryHistory.name.isin('Alice', 'Greg'))
salaryHistory_tmp.show()

> **TODO**: Dla zbioru salaryHistory, porownaj pensje ludzi pomiedzy najwczesniejszym a najpozniejszym rokiem ich pracy.  

partitionBy + orderBy + rangeBetween/rowsBetween

In [None]:
# np. srednia ruchoma
windowSpec = Window.partitionBy(salaryHistory['name']).orderBy(salaryHistory['year']).rangeBetween(-1,0)
movingAvg = (f.avg(salaryHistory['salary'])).over(windowSpec)
salaryHistory.withColumn('movingAvg', movingAvg).show()

In [None]:
# np. srednia ze wszystkich rekordow do aktualnego wlacznie
windowSpec = Window.partitionBy(salaryHistory['name']).orderBy(salaryHistory['year']).rowsBetween(Window.unboundedPreceding,Window.currentRow)
movingAvg = (f.avg(salaryHistory['salary'])).over(windowSpec)
salaryHistory.withColumn('movingAvg', movingAvg).show()

In [None]:
# podobny efekt uzyskamy ponizszym zapytaniam. Drobna roznica: rekordy w jednej grupie (imie, rok) nie zostana rozdzielone 
import sys
windowSpec = Window.partitionBy(salaryHistory['name']).orderBy(salaryHistory['year']).rangeBetween(-sys.maxsize,0)
movingAvg = (f.avg(salaryHistory['salary'])).over(windowSpec)
salaryHistory.withColumn('movingAvg', movingAvg).show()