# TBD -loading data, sparkSQL, DF API


## Wczytanie danych

Bedziemy wykorzystywać dane z ankiety StackOverflow z 2020.

https://insights.stackoverflow.com/survey/

Dane sa dostepne na google drive. Skorzystamy z modułu GoogleDriveDownloader, ktory pozwala pobrac dokument o podanym id.


In [None]:
pip install googledrivedownloader

In [None]:
from google_drive_downloader import GoogleDriveDownloader as gdd

In [None]:
from pathlib import Path
path_dir = str(Path.home()) + "/data/2020/"  # ustawmy sciezke na HOME/data/2020
archive_dir = path_dir + "survey.zip"        # plik zapiszemy pod nazwa survey.zip

In [None]:
path_dir

In [None]:
archive_dir

In [None]:
# sciagniecie pliku we wskazane miejsce
gdd.download_file_from_google_drive(file_id='1dfGerWeWkcyQ9GX9x20rdSGj7WtEpzBB',
                                    dest_path=archive_dir,
                                    unzip=True)

##  Podłączenie do sesji Spark


#### WAZNE
jesli w poprzednim notatniku masz aktywną sesję Spark zakończ ją (w poprzednim notatniku) poleceniem spark.stop()

In [None]:
from pyspark.sql import SparkSession
#spark.stop()
spark = SparkSession \
.builder \
.config("spark.executor.instances", "1")\
.config('spark.driver.memory','1g')\
.config('spark.executor.memory', '1g') \
.getOrCreate()

## Dostęp do danych

In [None]:
import os
user_name = 'jovyan'
print(user_name)

In [None]:
# ścieżka dostępu do pliku
csv_path = 'file:////home/jovyan/data/2020/survey_results_public.csv'

In [None]:
csv_path

## Spark SQL

Platforma Apache Spark posiada komponent Spark SQL, który pozwala traktować dane jak tabele w bazie danych. Można zakładać swoje schematy baz danych oraz korzystać z języka SQL.

In [None]:
table_name = "survey_2020"                               # nazwa tabeli ktora bedziemy chcieli stworzyc

In [None]:
spark.sql(f'DROP TABLE IF EXISTS {table_name}')       # usun te tabele jesli istniala wczesniej 

# stworz tabele korzystajac z danych we wskazanej lokalizacji
spark.sql(f'CREATE TABLE IF NOT EXISTS {table_name} \
          USING csv \
          OPTIONS (HEADER true, INFERSCHEMA true, NULLVALUE "NA") \
          LOCATION "{csv_path}"')

## Weryfikacja danych 
Sprawdzmy zaczytane dane.

In [None]:
spark.sql(f"describe {table_name}").show(100, truncate=False) # niepoprawne typy danych... "NA" 

In [None]:
# sprawdzenie liczności tabeli
spark.sql(f"select count(*) from {table_name}").show()

## Podgląd danych

## Dataframe spark vs pandas

Moduł Pandas jest biblioteką Pythonową do manipulacji danymi. W szczegolnosci w pandas mozemy stworzyc ramki danych i wykonywac na niej analize, agregacje oraz wizualizacje danych. 
Przy nieduzych zbiorach danych i prostych operacjach to doskonała biblioteka. Jednak kiedy zbior danych sie rozrasta lub kiedy wymagane sa zlozone transformacje to operacje moga byc wolne.

Operacje na rozproszonych danych sa szybsze. Ale tu takze napotykamy ograniczenia np trudność w wizualizacji danych.

In [None]:
import pandas as pd

In [None]:
spark.sql(f"select * from {table_name} limit 10").toPandas()

**Ważne** 

Metoda toPandas() na ramce pyspark, konwertuje ramkę pyspark do ramki pandas. Wykonuje akcje pobrania (collect) wszystkich danych z executorów (z JVM) i transfer do  programu sterujacego (driver) i konwersje do typu Pythonowego w notatniku. Ze względu na ograniczenia pamięciowe w programie sterującym należy to wykonywać na podzbiorach danych.

**DataFrame.collect() collects the distributed data to the driver side as the local data in Python. Note that this can throw an out-of-memory error when the dataset is too large to fit in the driver side because it collects all the data from executors to the driver side.**

**Note that DataFrame.toPandas() results in the collection of all records in the DataFrame to the driver program and should be done on a small subset of the data.**

In [None]:
dist_df = spark.sql(f"select * from {table_name} LIMIT 10")
local_df = spark.sql(f"select * from {table_name} LIMIT 10").toPandas()

In [None]:
type(dist_df)  # dataframe Sparkowy ("przepis na dane, rozproszony, leniwy")

In [None]:
type(local_df)  # dataframe Pandasowy (lokalny, sciągnięty do pamięci operacyjnej)

In [None]:
#dist_df.show()

In [None]:
local_df

In [None]:
pd.set_option('display.max_columns', None)    # pokazuj wszystkie kolumny
#pd.reset_option('max_columns')

## Przykład wykorzystania Spark'a do wizualizacji
### Narysuj histogram wieku respondentów

In [None]:
# przygotowanie danych
# przycinamy dane tylko do zakresu ktory jest potrzebny do realizacji polecenia
ages = spark.sql(f"SELECT CAST (Age AS INT) \
                    FROM {table_name} \
                    WHERE age IS NOT NULL \
                    AND age BETWEEN 10 AND 80").toPandas()

In [None]:
ages

In [None]:
ages.hist("Age", bins=10)
plt.show()

## Spark DF API

In [None]:
df = spark.read.csv(csv_path, inferSchema=True, header="true", nullValue='NA', nanValue='NA',emptyValue='NA')

In [None]:
type (df)                 # jaki jest typ danych

In [None]:
df.describe()

In [None]:
df.explain()              # fizyczny plan wykonania

In [None]:
df.explain(True)          # logiczny i fizyczny plan wykonania

In [None]:
df.rdd.getNumPartitions() # liczba partycji (bloków danych)

In [None]:
df.printSchema()          # schemat danych

In [None]:
df.count()                # wymiary (liczba wierszy)

In [None]:
len(df.columns)           # wymiary (liczba kolumn)

In [None]:
df.describe().show()      # pokaż podsumowanie danych w tabeli

In [None]:
df.describe("Age").show() # pokaż podsumowanie danych w kolumnie 

In [None]:
df.show()                    # pokaż 20 wierszy danych 

In [None]:
df.show(10, truncate=False) # pokaż 10 wierszy, nie skracaj danych

In [None]:
df.select("Age").show()  # pokaż tylko jedną kolumne

In [None]:
df.select("Country", "Age", "ConvertedComp").show() # pokaż wybrane kolumny 

In [None]:
df_country = df.select(df.Country).distinct() # stworz nowy DF zawierajacy tylko unikalne wartosci nazw krajow

In [None]:
df_country.count()

In [None]:
df.filter("Country = 'Spain'")

In [None]:
df.filter("Country = 'Spain' and Age > 50" ).toPandas()

### Grupowanie i funkcje agregujące

In [None]:
from pyspark.sql.functions import *
df.groupBy("Country").count().show()   # dokonaj grupowania po Country i zlicz liczbe respondentow

In [None]:
df.groupBy("Country").avg("Age").show() # dokonaj grupowania po Contry i policz sredni wiek

In [None]:
df.withColumn ("Bonus", col("ConvertedComp") + 1000).filter("ConvertedComp is not NULL").toPandas() # dodaj kolumne

In [None]:
df.drop("Age").show()  # usun kolumne Chromosome. 
# Czy DF została pozbawiona kolumny na trwale?

### Zapis wyników

In [None]:
df2 = df.withColumn ("Bonus", col("ConvertedComp") + 1000).filter("ConvertedComp is not NULL")   # dodaj kolumne  i zapisz do nowego DF

In [None]:
output_path = 'file:///home/jovyan/data/out.csv'
df2.write.format("csv").mode("overwrite").save(output_path)

In [None]:
spark.stop()