In [0]:
# =====================================================
# KOMPLETNY PROJEKT: ZADANIE 1, 2 i 3
# =====================================================

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number, floor, collect_list, regexp_extract, length
from pyspark.sql.window import Window

# ==============================
# 1. SparkSession
# ==============================
spark = SparkSession.builder.appName("FASTQ_Complex_Analysis").getOrCreate()

# ==============================
# 2. Ścieżki do FASTQ
# ==============================
fastq_dir = "/Volumes/databrics_2/default/fastq"
files = ["SRR16356246_1.fastq", "SRR16356246_2.fastq"]
paths = [f"{fastq_dir}/{file}" for file in files]

# ==============================
# 3. Wczytanie plików jako tekst
# ==============================
raw_df = spark.read.text(paths)

# ==============================
# 4. Numerowanie wierszy
# ==============================
window = Window.orderBy("value")
df_indexed = raw_df.withColumn("row_num", row_number().over(window))

# ==============================
# 5. Grupowanie co 4 linie (FASTQ)
# ==============================
df_indexed = df_indexed.withColumn("group_id", floor((col("row_num") - 1) / 4))

fastq_grouped = df_indexed.groupBy("group_id") \
    .agg(collect_list("value").alias("lines"))

# ==============================
# 6. Mapowanie na kolumny FASTQ
# ==============================
fastq_df = fastq_grouped.select(
    col("lines").getItem(0).alias("header"),
    col("lines").getItem(1).alias("sequence"),
    col("lines").getItem(2).alias("plus"),
    col("lines").getItem(3).alias("quality")
)

fastq_df.show(5, truncate=False)

# ==============================
# ZADANIE 1: WALIDACJA DŁUGOŚCI
# ==============================
df_with_lengths = fastq_df.withColumn(
    "declared_length",
    regexp_extract(col("header"), r"length=(\d+)", 1).cast("int")
).withColumn(
    "actual_length",
    length(col("sequence"))
)

invalid_records = df_with_lengths.filter(
    col("declared_length") != col("actual_length")
)

invalid_count = invalid_records.count()
print("Zadanie 1 - liczba niespójnych rekordów:", invalid_count)

# ==============================
# ZADANIE 2: WSTĘPNA ANALIZA JAKOŚCI
# ==============================
df_quality = fastq_df.withColumn(
    "low_quality",
    col("quality").contains("#")
)

low_quality_reads = df_quality.filter(col("low_quality") == True)

low_quality_count = low_quality_reads.count()
print("Zadanie 2 - liczba odczytów o bardzo niskiej jakości (znak '#'):", low_quality_count)

# ==============================
# ZADANIE 3: KOMPLEKSOWA ANALIZA + CACHE
# ==============================
complex_df = fastq_df.withColumn(
    "has_low_quality",
    col("quality").contains("#")
).withColumn(
    "seq_length",
    length(col("sequence"))
)

complex_grouped = complex_df.groupBy(
    "has_low_quality",
    "seq_length"
).count().orderBy("has_low_quality", "seq_length")

# Cache wyniku - removed .cache() as it's not supported on serverless
complex_grouped_cached = complex_grouped

# Akcja 1: count
total_groups = complex_grouped_cached.count()
print("Zadanie 3 - liczba grup połączenia długości i niskiej jakości:", total_groups)

# Akcja 2: count unikalnych długości
distinct_lengths = complex_grouped_cached.select("seq_length").distinct().count()
print("Zadanie 3 - liczba unikalnych długości sekwencji:", distinct_lengths)

# Akcja 3: pokazanie pierwszych 10 wierszy
complex_grouped_cached.show(10, truncate=False)

# ==============================
# KOMENTARZE ANALITYCZNE
# ==============================

# Zadanie 1:
# - Operacje regexp_extract + length → droższe
# - Liczba Jobów = 1 (akacja .count())
# - Stage'y: narrow transformations
# - Taski = liczba partycji

# Zadanie 2:
# - Operacja .contains("#") → szybka
# - Stage'y mogą być oznaczone jako "skipped", jeśli fastq_df było cache'owane
# - Locality Level tasków = PROCESS_LOCAL
# - Liczba Records/task ≈ liczba rekordów w partycji

# Zadanie 3:
# - groupBy + count → shuffle → Stage 1: shuffle write, Stage 2: shuffle read
# - Cache → kolejne akcje (distinct, show) nie liczą od zera, Stage'y mogą być "skipped"
# - Efektywność cache: pierwszy Job pełny czas, kolejne Joby bardzo szybkie
# - Locality Level: PROCESS_LOCAL, jeśli niektóre taski NODE_LOCAL → dane w pamięci innego executor'a



+------+--------+----+-------+
|header|sequence|plus|quality|
+------+--------+----+-------+
+------+--------+----+-------+

Zadanie 1 - liczba niespójnych rekordów: 0
Zadanie 2 - liczba odczytów o bardzo niskiej jakości (znak '#'): 0
Zadanie 3 - liczba grup połączenia długości i niskiej jakości: 0
Zadanie 3 - liczba unikalnych długości sekwencji: 0
+---------------+----------+-----+
|has_low_quality|seq_length|count|
+---------------+----------+-----+
+---------------+----------+-----+

