# DataFrame 

http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark-sql-module

In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as f

In [2]:
spark = SparkSession.builder \
    .appName('DataFrame_2') \
    .master('local[*]') \
    .getOrCreate()

In [8]:
#https://github.com/apache/spark
data_path = ''
people = spark.read.json(data_path+'people.json')
employees = spark.read.json(data_path+'employees.json')
people_txt = spark.read.option("inferSchema", "true").csv(data_path+'people.txt')
people_txt = people_txt.withColumnRenamed('_c0', 'name').withColumnRenamed('_c1', 'age')

In [9]:
people.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [10]:
employees.show()

+-------+------+
|   name|salary|
+-------+------+
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
+-------+------+



In [11]:
people_txt.show()

+-------+----+
|   name| age|
+-------+----+
|Michael|29.0|
|   Andy|30.0|
| Justin|19.0|
+-------+----+



#### join
inner (domyslny)

In [12]:
people.join(other=employees, on='name', how='inner').show()

+-------+----+------+
|   name| age|salary|
+-------+----+------+
|Michael|null|  3000|
|   Andy|  30|  4500|
| Justin|  19|  3500|
+-------+----+------+



Uwaga ogolna
Join to stosunkowo popularna, ale kosztowna operacja.
W sytuacji, kiedy jeden z laczonych DataFramow jest znacznie mniejszy (w szczegolnosci na tyle maly, ze w calosci miesci sie w pamieci), zaleca sie zastosowanie broadcast hash join.
(Mala tabela zostanie zebrana do pamieci i wyslana do kazdego noda).
W niektorych przypadkach optymalizator sam za nas zdecyduje o zastosowaniu broadcast hash join. Jednak SparkSQL wyjatkowo tutaj daje nam mozliwosc wymuszenia tej operacji wprost w kodzie. 
from pyspark.sql.functions import broadcast

> **TODO**: Znajdź ile zarabia najmlodsza osoba spośród (people, people_txt)?

In [19]:
people_txt.join(other=employees, on='name', how='inner').select('salary').orderBy(people_txt.age.asc()).\
show()

+------+
|salary|
+------+
|  3500|
|  3000|
|  4500|
+------+



In [21]:
columns_order=people.columns
joined_df=people.union(people_txt.select(columns_order)).\
join(employees,'name').dropna().orderBy('age')
joined_df.select('salary').first()[0]

3500

> **TODO**: Dla kazdego pracownika (employees), dla ktorego mamy informacje o wieku (people, people_txt) dodaj do pensji 0.1% za kazdy rok zycia. Zsumuj koszt takiego 'bonusu urodzinowego' dla pracodawcy. 

In [32]:
columns_order=people.columns
joined_df=people.union(people_txt.select(columns_order)).join(employees,'name').dropna()
joined_df=joined_df.drop_duplicates()
joined_df=joined_df.withColumn(colName = 'bonusSalary', col = joined_df.salary*(1+0.001*joined_df.age))
joined_df=joined_df.withColumn(colName = 'bonus', col = joined_df.salary*(1+0.001*joined_df.age)-joined_df.salary)
joined_df.show()

+-------+----+------+------------------+-----------------+
|   name| age|salary|       bonusSalary|            bonus|
+-------+----+------+------------------+-----------------+
|Michael|29.0|  3000|3086.9999999999995|86.99999999999955|
|   Andy|30.0|  4500|            4635.0|            135.0|
| Justin|19.0|  3500|3566.4999999999995|66.49999999999955|
+-------+----+------+------------------+-----------------+



In [38]:
joined_df.select(f.sum('bonus')).show()

+-----------------+
|       sum(bonus)|
+-----------------+
|288.4999999999991|
+-----------------+



#### groupBy

In [39]:
people.groupBy()

<pyspark.sql.group.GroupedData at 0x2364f3b3b20>

 Przez GroupedData mamy dostep do takich funkcji jak:<br>
 avg, max, min, sum, count, agg <br>
 (dla wygody, do funkcji 'agg' mamy tez dostep bezposrednio na DataFrame)

In [40]:
people.groupBy().max('age').show()

+--------+
|max(age)|
+--------+
|      30|
+--------+



In [41]:
people.groupBy('name').count().show()

+-------+-----+
|   name|count|
+-------+-----+
|Michael|    1|
|   Andy|    1|
| Justin|    1|
+-------+-----+



In [42]:
#from pyspark.sql import functions as f
people.groupBy('name').agg(f.min('age').alias('min_age'), f.max('age').alias('max_age'), f.count('name').alias('n_people')).show()

+-------+-------+-------+--------+
|   name|min_age|max_age|n_people|
+-------+-------+-------+--------+
|Michael|   null|   null|       1|
|   Andy|     30|     30|       1|
| Justin|     19|     19|       1|
+-------+-------+-------+--------+



In [43]:
people.agg(f.min('age'), f.max('age'), f.count('name')).show()

+--------+--------+-----------+
|min(age)|max(age)|count(name)|
+--------+--------+-----------+
|      19|      30|          3|
+--------+--------+-----------+



> **TODO**: Ile jest unikatowych (wystepujacych tylko 1 raz) imion w polaczonych zbiorach people oraz people_txt? (allPeople)

In [52]:
columns_order=people.columns
countedNames = people.union(people_txt.select(columns_order)).dropna(subset='name').groupBy('name').count()
countedNames.show()

+-------+-----+
|   name|count|
+-------+-----+
|Michael|    2|
|   Andy|    2|
| Justin|    2|
+-------+-----+



In [53]:
countedNames.where(countedNames['count']==1).count()

0

> **TODO**: Ile lat maja osoby, ktorych imiona wystepuja tylko raz w polaczonych zbiorach people oraz people_txt?

****

In [None]:
# Wygenerujmy nowy DataFrame

In [54]:
import random

In [55]:
years = 10
names = ['Alice', 'Betty', 'Chris', 'Dan', 'Greg']
unique_names_count = len(names)
names = sorted(names*years)
year = [y for y in range(2000, 2000+years)]*len(names)
starting_salary = [round(random.gauss(4000, 1000),2) for i in range(unique_names_count)]
salary = [0 for i in range(years*unique_names_count)]
salary[::years] = starting_salary
for n in range(unique_names_count):
    for y in range(years-1):
        index = (years*n+1)+y
        #print(index, salary[index-1])
        salary[index] = round(salary[index-1]*(1+random.gauss(0.1,0.09)),2)

In [56]:
salaryHistory = spark.createDataFrame([Row(name=n, year=y, salary=s) for n,y,s in zip(names, year, salary)])

In [57]:
salaryHistory = salaryHistory.filter((salaryHistory['name'] != 'Greg') | (salaryHistory['year'] != 2006))

In [58]:
salaryHistory = salaryHistory.union(spark.createDataFrame([Row('Alice', 3000, 2000)]))

In [59]:
salaryHistory.collect()

[Row(name='Alice', year=2000, salary=4711.77),
 Row(name='Alice', year=2001, salary=4133.36),
 Row(name='Alice', year=2002, salary=4815.05),
 Row(name='Alice', year=2003, salary=5488.72),
 Row(name='Alice', year=2004, salary=6044.3),
 Row(name='Alice', year=2005, salary=7361.99),
 Row(name='Alice', year=2006, salary=8419.89),
 Row(name='Alice', year=2007, salary=9677.49),
 Row(name='Alice', year=2008, salary=9427.13),
 Row(name='Alice', year=2009, salary=10582.08),
 Row(name='Betty', year=2000, salary=4093.11),
 Row(name='Betty', year=2001, salary=4578.24),
 Row(name='Betty', year=2002, salary=4761.09),
 Row(name='Betty', year=2003, salary=4969.06),
 Row(name='Betty', year=2004, salary=5631.39),
 Row(name='Betty', year=2005, salary=6335.63),
 Row(name='Betty', year=2006, salary=6568.63),
 Row(name='Betty', year=2007, salary=7427.03),
 Row(name='Betty', year=2008, salary=9309.8),
 Row(name='Betty', year=2009, salary=9157.87),
 Row(name='Chris', year=2000, salary=5667.09),
 Row(name='Chr

> **TODO**: Przyjrzyj sie nowemu zbiorowi danych salaryHistory.<br>
a. Zobacz schemat. <br>
b. Ile rekordow jest w calym zbiorze? <br>
c. Jaka jest najmniejsza i najwieksza pensja?<br>
d. Ile razy powtarza sie kazde z imion?<br>
e. Stworz tabele sredniej, minimalnej i maksymalnej pensji w zależności od roku. Posortuj lata malejaco. Pensje podaj z dokladnoscia do pelnych wartosci. <br>

In [60]:
salaryHistory.printSchema()

root
 |-- name: string (nullable = true)
 |-- year: long (nullable = true)
 |-- salary: double (nullable = true)



In [61]:
salaryHistory.describe().show()

+-------+-----+------------------+------------------+
|summary| name|              year|            salary|
+-------+-----+------------------+------------------+
|  count|   50|                50|                50|
|   mean| null|           2024.38| 6839.503800000001|
| stddev| null|140.81901686279477|2032.8277003368228|
|    min|Alice|              2000|            2000.0|
|    max| Greg|              3000|          10854.67|
+-------+-----+------------------+------------------+



In [62]:
salaryHistory.groupBy("name").count().show()

+-----+-----+
| name|count|
+-----+-----+
|Betty|   10|
|Alice|   11|
|Chris|   10|
|  Dan|   10|
| Greg|    9|
+-----+-----+



In [82]:
salaryHistory.groupBy('year').agg(f.round(f.avg('salary')).alias("AverageSalary"),         
f.round(f.min('salary')).alias("MinSalary"),                 
f.round(f.max('salary')).alias("MaxSalary")).\
orderBy(f.desc('year')).\
show()

+----+-------------+---------+---------+
|year|AverageSalary|MinSalary|MaxSalary|
+----+-------------+---------+---------+
|3000|       2000.0|   2000.0|   2000.0|
|2009|       9773.0|   8282.0|  10855.0|
|2008|       8972.0|   7618.0|   9862.0|
|2007|       8617.0|   6932.0|  10848.0|
|2006|       8083.0|   6569.0|   9649.0|
|2005|       6982.0|   6320.0|   8165.0|
|2004|       6269.0|   5631.0|   7394.0|
|2003|       5816.0|   4969.0|   6520.0|
|2002|       5433.0|   4761.0|   6190.0|
|2001|       4928.0|   4133.0|   5761.0|
|2000|       4738.0|   4093.0|   5667.0|
+----+-------------+---------+---------+



#### Window functions
over

Do obliczania agregowanych wartosci w grupach definiowanych oknem (window).<br>
Zwracaja wiele rekordow (tyle ile na wejsciu w grupie).

partitionBy - definiuje podzial danych na okna<br>
orderBy - definiuje sortowanie wewnatrz kazdego z okien<br>
Frame (rangeBetween/rowsBetween) - definiuje offset<br>

In [None]:
from pyspark.sql.window import Window
# from pyspark.sql import functions as f

partitionBy

In [None]:
# definicja 'okna'
myWindowSpec = Window.partitionBy(allPeople['name'])

Funkcje agregujące okien (aka funkcje okien lub agregacje okienkowe ) <br>
Funkcje okna działają w odniesieniu do grupy wierszy, nazywanej oknem, i obliczania wartości zwracanej dla każdego wiersza w oparciu o grupę wierszy. Funkcje okna są przydatne do przetwarzania zadań, takich jak Obliczanie średniej przenoszonej, obliczanie zbiorczej statystyki lub uzyskiwanie dostępu do wartości wierszy, w których podano względne położenie bieżącego wiersza.

In [None]:
# wywolanie funkcji na kazdym 'oknie'
allPeople.withColumn('nameCount', f.count(allPeople['name']).over(myWindowSpec)).show()

> **TODO**: Do zbioru salaryHistory dodaj kolumne 'avgSalaryDiff', ktora bedzie zawierala roznice pomiedzy pensja z danego roku, a srednia pensja osoby na przestrzeni wszytskich lat. 

partitionBy + orderBy

In [None]:
# np. rank
# - musimy zdefiniowac dodatkowo sortowanie wewnatrz kazdej z grup
# - zwraca lp dla kolejnych rekordow posortowanych wedlug zadanych kolumn
windowSpec = Window.partitionBy(salaryHistory['name']).orderBy(salaryHistory['year'])
ranked = (f.rank()).over(windowSpec)
salaryHistory.withColumn('ranked', ranked).show()

In [None]:
salaryHistory_tmp = salaryHistory.filter(salaryHistory.name.isin('Alice', 'Greg'))
salaryHistory_tmp.show()

> **TODO**: Dla zbioru salaryHistory, porownaj pensje ludzi pomiedzy najwczesniejszym a najpozniejszym rokiem ich pracy.  

partitionBy + orderBy + rangeBetween/rowsBetween

In [None]:
# np. srednia ruchoma
windowSpec = Window.partitionBy(salaryHistory['name']).orderBy(salaryHistory['year']).rangeBetween(-1,0)
movingAvg = (f.avg(salaryHistory['salary'])).over(windowSpec)
salaryHistory.withColumn('movingAvg', movingAvg).show()

In [None]:
# np. srednia ze wszystkich rekordow do aktualnego wlacznie
windowSpec = Window.partitionBy(salaryHistory['name']).orderBy(salaryHistory['year']).rowsBetween(Window.unboundedPreceding,Window.currentRow)
movingAvg = (f.avg(salaryHistory['salary'])).over(windowSpec)
salaryHistory.withColumn('movingAvg', movingAvg).show()

In [None]:
# podobny efekt uzyskamy ponizszym zapytaniam. Drobna roznica: rekordy w jednej grupie (imie, rok) nie zostana rozdzielone 
import sys
windowSpec = Window.partitionBy(salaryHistory['name']).orderBy(salaryHistory['year']).rangeBetween(-sys.maxsize,0)
movingAvg = (f.avg(salaryHistory['salary'])).over(windowSpec)
salaryHistory.withColumn('movingAvg', movingAvg).show()