## Przygotowanie sesji Spark
Zainicjowanie sesji Spark oraz stworzenie schematu bazy danych z której będziemy korzystać.

In [None]:
%env PYSPARK_SUBMIT_ARGS=--master local[*]  --jars /tmp/gcs-connector-hadoop2-1.9.17-shaded.jar,/tmp/google-cloud-nio-0.120.0-alpha-shaded.jar --conf spark.driver.port=29010 --conf spark.blockManager.port=29011  --conf spark.driver.host=jupyter-service- --conf spark.executorEnv.PYSPARK_PYTHON=python3 --packages org.biodatageeks:sequila_2.11:0.5.20,io.projectglow:glow-spark2_2.11:0.6.0,org.biodatageeks:seqtender_2.11:0.3.7 pyspark-shell


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.master("local[2]") \
.config("spark.driver.host", "localhost") \
.config('spark.driver.memory','1g') \
.config('spark.executor.memory', '2g') \
.getOrCreate()

Pobranie sesji Spark jest proste dla programisty korzystającego z notatnika, wymaga podania tylko kilku parametrów, ale faktyczna konfiguracja jest bardziej rozbudowana

In [None]:
import os
args = os.environ['PYSPARK_SUBMIT_ARGS'].replace("  ", "\n")
print(args)

Mamy pobraną sesję sparkową. Powstały dodatkowe pody gotowe na realizację obliczeń. A jak zwolnić te zasoby?

In [None]:
spark.stop()

Ponownie pobieramy sesję Spark. Będziemy z niej korzystać. Po zakończonej pracy należy pamiętać o zastopowaniu sesji.

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.config('spark.driver.memory','1g') \
.config('spark.executor.memory', '2g') \
.getOrCreate()

## Odczyt danych
Korzystając z sesji Spark można odczytać dane zapisane w lokalizacji dostępnej dla executorów (procesów obliczeniowych) koordynowanych przez Spark.
Konieczne jest podanie ścieżki dostępowej do pliku i formatu danych (nie jest to jednoznazne z rozszerzeniem pliku).
Dystrybucja Spark udostępnia kilka tzw Data Sources, które odczytują i zapisują dane w określonych formatach (CSV, formaty kolumnowe: parquet/orc).
Data Sources zgodne z opracowanym interfejsem można samodzielnie tworzyć. Na dzisiejszych zajęciach będziemy korzystać z takich zdefiniowanych DS:
* FASTQDataSource
* BAMDataSource
* VCFDataSource

**UWAGA - powyższe sposoby odczytu plików nie są częścią głównej dystrybucji Spark. Wymagana jest dodatkowa konfiguracja.**

In [None]:
import os                               # moduł OS języka Python
user_name = os.environ.get('USER')      # pobieramy zmienną środowiskową USER
bucket = f"gs://edugen-common-data2" # konstruujemy sciezke dostepowa do pliku
print(bucket)

In [None]:
!gsutil ls gs://edugen-common-data2

In [None]:
! mkdir -p  data/fastq/

In [None]:
!gsutil cp gs://edugen-common-data2/fastq/* data/fastq/

---

`Co oznacza f przed cudzysłowem?`

`Czy jest różnica między stosowaniem apostrofu i cudzysłowu przy definicji zmiennych przechowywujących łańcuchy znaków?`

In [None]:
reads_path = f"data/fastq/*"  # * oznacza wszystkie pliki we wskazanej lokalizacji. Można podać konkretny plik
fastq_all = spark.read.load(reads_path, format="org.biodatageeks.sequila.datasources.FASTQ.FASTQDataSource")
fastq_all = fastq_all.select("sample_id","seq", "qual")

### Weryfikacja danych

In [None]:
type(fastq_all)      # jaki jest typ danych utworzonej zmiennej?

In [None]:
fastq_all.printSchema() # jaki jest schemat danych?

In [None]:
len(fastq_all.columns)           # wymiary (liczba kolumn)

In [None]:
fastq_all.count()               # wymiary (liczba wierszy)

In [None]:
fastq_all.explain(True)              #  plan wykonania

DataFrame jest abstrakcją nad innym typem danych (RDD), który jest podstawową rozproszoną strukturą danych. Poprzez DF możemy dostać się do rdd i zweryfikować na przykład liczbę partycji danych.

In [None]:
fastq_all.rdd.getNumPartitions() # liczba partycji (bloków danych)

### Podgląd danych

In [None]:
fastq_all.show()

In [None]:
fastq_all.show(5)  # pierwsze 5 wierszy

In [None]:
fastq_all.show(truncate=False) # bez skracania zawartości kolumn

Widok "szerokich" tabel jest nieczytelny, w kolejnych częściach warsztatów zaradzimy temu korzystając z dodatkowej biblioteki.

## Dostęp do wybranych danych

#### Operacja projekcji (SELECT)

In [None]:
fastq_all.select("sample_id").show()

`Czy operacja select (sample_id) wpłynęła na oryginalny data frame fastq_all?`

In [None]:
fastq_all.printSchema()
fastq_all.show()

Jeśli chcemy zachować wynik działania transformacji (w celu późniejszego wykorzystania) trzeba wynik zachować w zmiennej.

In [None]:
fastq_sample_only = fastq_all.select("sample_id")

In [None]:
fastq_sample_only.printSchema()
fastq_sample_only.show()

In [None]:
fastq_all.select("sample_id","seq").show()

Powiedzmy, że interesują nas wszystkie kolumny poza qual. Jak to zrobic? Można wylistować wszystkie kolumny poza qual, ale to uciazliwe. Mozna skorzystac z operacji drop.

In [None]:
fastq_no_qual=fastq_all.drop('qual')

`Czy operacja usunięcia kolumny qual wpłynęła na oryginalny data frame fastq_all?`

In [None]:
fastq_no_qual.printSchema()
fastq_no_qual.show()

## Wartości unikalne
Jeśli chcemy uzyskać unikalne wartości w określonych kolumnach korzystamy z metody distinct().
Operacje na DF można łańcuchowo łączyć, zatem na wyniku działania select() można wywołać kolejne metody.

In [None]:
fastq_all.select('sample_id').distinct().show()

In [None]:
fastq_all.count()

# Sortowanie

Do sortowania służy metoda orderBy. Domyślne sortowanie jest rosnące.

In [None]:
fastq_all.select('sample_id').distinct().orderBy('sample_id').show()

In [None]:
fastq_all.select('sample_id').distinct().orderBy('sample_id', ascending=False).show()

In [None]:
fastq_all.orderBy('sample_id', 'seq', ascending=False).show()  # kierunek sortowania jest wspólny dla listy kolumn

In [None]:
fastq_all.orderBy('sample_id', ascending=False).orderBy('seq', ascending=True).show() # sortowanie malejace i rosnące na dwóch roznych kolumnach

### Filtrowanie wynikow
Nasz zbiór danych posiada odczyty z 3 próbek. Ograniczmy się do wybranych próbek.

In [None]:
fastq_mother = fastq_all.filter("sample_id = 'mother'")

`Czy już odbył się odczyt danych z fastq?`

In [None]:
fastq_mother.show()

In [None]:
fastq_mother.select('sample_id').distinct().show()

In [None]:
fastq_mother.count()

Warunki można łączyć spójnikami logicznymi.
Można używać
* operatorów arytmetycznych (=, !=, >, >=, <, <=)
* przynależności do zbioru (IN/NOT IN)
* porównania znaków (LIKE/NOT LIKE)
* przyrównania do wartości NULL (IS NULL/ IS NOT NULL)

Przy korzystaniu z LIKE można użyć % jako oznaczenie dowolnego ciągu znaków.

Konstrukcja warunku w metodzie filter() jak taka jak w klauzuli WHERE W SQL.

Pokaż odczyty spełniające warunek ze nazwa instrumentu jest pusta, run_id jest >=0 a odczyt zaczyna sie od liter GCA. Pokaz tylko kolumny z filtra oraz nazwe probki

In [None]:
fastq_mother.filter('seq LIKE "GCA%"').select('sample_id',  'seq').show()

In [None]:
fastq_mother.select('sample_id', 'seq').filter('seq LIKE "GCA%"').show() # kolejnosc select i filter bez znaczenia

<div class="alert alert-block alert-warning">

<b>Zadanie 2_2:</b>Napisz polecenie które policzy ile jest rekordów dla próbki syna które spełniają warunki, że sekwencja odczytu konczy się na TGG a qual zaczyna się od ==. </div>



## Używanie funkcji, kolumny wyliczane

Dostępne są funkcje skalarne (przykład: ROUND, UPPER, CURRENT_DATE) oraz agregujące (MIN, MAX, AVG, SUM, COUNT).
Niektóre funkcje są dostępne "od razu" bez dodatkowych poleceń import.
Lista funkcji znajduje się : https://spark.apache.org/docs/latest/api/sql/index.html

In [None]:
fastq_all.selectExpr("*").show()  # pokaż wszystkie kolumny tego DF

Dodanie dwóch dodatkowych kolumn wyliczanych

In [None]:
fastq_all.selectExpr("*", "length(seq) as len_seq", "length(qual) as len_qual" ).show() ## dodanie dwóch kolumn wyliczanych przy uzyciu funkcji LENGTH

Alias - nadanie kolumnie lub kolumnie wyliczanej nazwy

In [None]:
extended_fastq = fastq_all.selectExpr("*", "length(seq) as len_s", "length(qual) as len_q" ) # AS alias

In [None]:
extended_fastq.printSchema()

Dodanie nowej kolumny, dla każdego wiersza zostanie dodana wartość zwracana przez funkcję current_date()

In [None]:
from pyspark.sql.functions import current_date
extended_fastq.withColumn ("date", current_date()).show()

Dodanie kolumny o stałej wartości dla każdej wartości wymaga wykorzystania funkcji lit (), która przekształci stała wartość w kolumnę.

In [None]:
from pyspark.sql.functions import lit

extended_fastq.withColumn("imported_by", lit(user_name)).withColumn("format", lit('FASTQ')).show()

<div class="alert alert-block alert-warning">

<b>Zadanie 2_3:</b>

Napisz polecenie które stworzy ramkę danych zawierającą sklejenie wartości dwóch kolumn (sample_id) oraz daty eksperymentu (dodaj kolumne z wartościami 2019-01-15) . W wynikach chcemy mieć tylko dane matki i ojca. Kolumny wynikowe: nazwa próbki, seq, qual, data eksperymentu oraz scalona nazwa probki oraz data eksperymenty (np father-2019-01-15). Posortuj po nazwie próbki. Pokaż schemat ramki. Upewnij się, że data eksperymentu jest typu date.

* Zwróć uwagę na potrzebę konwersji ciągu znaków na datę
</div>


### Instrukcje warunkowe przy kolumnach wyliczanych

In [None]:
fastq_dates=fastq_all.selectExpr('*', 'if(sample_id = "son",to_date("2018-11-10"), to_date("2019-01-15")) as experiment_date')

<div class="alert alert-block alert-warning">
<b>Zadanie 2_4:</b> Napisz polecenia, które zweryfikuje czy daty eksperymentów zostały dodane poprawnie </div>


## Grupowanie

In [None]:
fastq_all.groupBy("sample_id").show()

In [None]:
type(fastq_all.groupBy("sample_id")) # to nie jest DF

In [None]:
sample_count=fastq_all.groupBy("sample_id").count()

In [None]:
type(sample_count)

In [None]:
sample_count.show()

In [None]:
sample_count.orderBy("count").show()

<div class="alert alert-block alert-warning">
<b>Zadanie 2_5:</b> Napisz funkcje, ktora znajdzie rozklady jakosci dla 1, 2, 3 i 4 pozycji odczytu.
*Nastepnie zaprezentuj wyniki w postaci serii histogramów </div>

### Mapowanie do genomu referencyjnego

UWAGA - na dzisiejszych zajęciach ten kod nie będzie uruchamiany

Wykonamy mapowanie do genomu referencyjnych korzystając z rozproszenia danych miedzy procesy obliczeniowe sparka.

Przygotowanie ścieżek do plików.

In [None]:
#import os
#user_name = os.environ.get('USER')
#bucket = f"gs://edugen-lab-{user_name}"

#reads_file_path = f"{bucket}/fastq/mother.fastq"
#ref_path = "/mnt/data/mapping/ref/ref.fasta"

Konstruujemy komendę, która będzie uruchamiana na procesach obliczeniowych. Potrzebne narzędzia muszą być dostępne na węzłach obliczeniowych.

In [None]:
#command = f'bwa mem -p {ref_path} - | samtools fixmate -m - - | samtools sort  | samtools markdup -r -S - -  | samtools addreplacerg  -r "ID:S1" -r "SM:S1"  -r "PL:ILLUMINA" - | samtools view -b -'

Żeby wykonać rozproszone obliczenia na danych genomicznych nalezy wykorzystac dodatkową bibliotekę.

In [None]:
#from pyseqtender import SeqTenderAlignment

#seq_aligner = SeqTenderAlignment(spark, reads_file_path, command)
#alignments_rdd = seq_aligner.pipe_reads()

Zapisujemy plik na kubełek.

In [None]:
#bam_file_path = f"{bucket}/bam/mother10.bam"
#seq_aligner.save_reads(bam_file_path, alignments_rdd)

In [None]:
#!gsutil ls gs://edugen-lab-$USER/bam

<div class="alert alert-block alert-warning">
<b>Zadanie 2_6:</b> Na podstawie notatników z zajęć z genomiki wyświetl fragment pliku BAM w widge'cie IGV.  </div>



Kończymy notatniki, należy zamknąć sesję.

In [None]:
spark.stop()