# Spark SQL
Utowrzenie SparkSession

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions as func

spark = SparkSession.builder.appName("SparkSQL").master("local[*]").getOrCreate()

### Metoda przetwarzająca linijki zbioru danych do ustrukturyzowanych danych SQL

In [None]:
def to_structured_table(line):
    fields = line.split(',')
    return Row(entity=str(fields[0]), code=str(fields[1]), year=int(fields[2]), life_expectancy=float(fields[3]))

### Wnioskowanie schematu i utworzenie tabeli Dataframe

In [None]:
lines = spark.sparkContext.textFile("life-expectancy.csv")
life_expectancies = lines.map(to_structured_table)

df = spark.createDataFrame(life_expectancies).cache()
df.createOrReplaceTempView("life_expectancies")

### Użycie zapytania SQL
Zapytania SQL mogą być uruchamiane na tabelach DataFrames.

In [None]:
life_exp_90 = spark.sql("SELECT * FROM life_expectancies WHERE year >= 1950 AND year <= 1960 AND entity = 'Poland'")

Zapytania SQL są wyższą abstrakcją zapytań RDD. SQL wspierają również normalne operacje RDD

In [None]:
for life_exp in life_exp_90.collect():
  print(life_exp)

# DataFrames
### Analiza danych
Wyświetlenie tabeli

In [None]:
df.select("entity", "code", "year", "life_expectancy").show()

Przefiltrowanie najlepszych wskaźników długości życia

In [None]:
df.filter(df.life_expectancy > 84).show()

In [None]:
df.groupBy("entity").max("life_expectancy").sort("entity").show()

In [None]:
max_life_df = df.groupBy("Entity").agg(func.round(func.max("life_expectancy"), 2)
                         .alias("max_life_expectancy")).sort("Entity")
max_life_df.filter(max_life_df.max_life_expectancy > 83).show()