In [2]:
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# ðŸ’¡ (Opcjonalnie) wymuÅ› wiÄ™kszy rozmiar sterty JVM
os.environ["PYSPARK_SUBMIT_ARGS"] = "--driver-memory 12g --executor-memory 12g pyspark-shell"

# ðŸ”§ Inicjalizacja SparkSession z wiÄ™kszÄ… pamiÄ™ciÄ… i optymalizacjÄ…
spark = (
    SparkSession.builder
    .master("local[*]")  # uÅ¼ywa wszystkich rdzeni
    .appName("DNA_Methylation_All_Parquet")
    # WiÄ™cej RAM
    .config("spark.driver.memory", "12g")
    .config("spark.executor.memory", "12g")
    .config("spark.memory.fraction", "0.8")        # 80% pamiÄ™ci dla zadaÅ„
    .config("spark.memory.storageFraction", "0.3") # czÄ™Å›Ä‡ na cache
    # WiÄ™ksze strony pamiÄ™ci
    .config("spark.storage.memoryMapThreshold", "2m")
    # Unikaj OOM przez adaptacyjne przetwarzanie
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", "64MB")
    # Deduplikacja kluczy map (np. przy map_from_arrays)
    .config("spark.sql.mapKeyDedupPolicy", "LAST_WIN")
    # UÅ‚atwia debugging
    .config("spark.ui.showConsoleProgress", "true")
    .getOrCreate()
)

# ðŸ”¹ ÅšcieÅ¼ka do katalogu z plikami
dataset_path = "/home/mw/.cache/kagglehub/datasets/martininf1n1ty/dna-methylation-adnotated/versions/1"

# ðŸ”¹ Wczytanie wszystkich plikÃ³w Parquet rekurencyjnie
merged_df = (
    spark.read
    .option("recursiveFileLookup", "true")
    .parquet(dataset_path)
)

# ðŸ”¹ Dodanie kolumny z nazwÄ… pliku (np. chromosome = chr1, chr2, ...)
merged_df = merged_df.withColumn(
    "chromosome",
    F.regexp_extract(F.input_file_name(), r"([^/]+)\.parquet$", 1)
)

# ðŸ”¹ (Dla bezpieczeÅ„stwa) zrepartycjonuj dane, Å¼eby nie Å‚adowaÄ‡ wszystkiego do jednej partycji
merged_df = merged_df.repartition(16, "chromosome")

# ðŸ”¹ WyÅ›wietlenie przykÅ‚adowych danych
#print(f"Liczba wierszy: {merged_df.count()}")
#merged_df.show(5, truncate=False)
merged_df.printSchema()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/05 11:20:24 WARN Utils: Your hostname, mw-OptiPlex-7050, resolves to a loopback address: 127.0.1.1; using 10.2.6.194 instead (on interface enp0s31f6)
25/11/05 11:20:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/05 11:20:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/05 11:20:29 WARN SQLConf: The SQL config 'spark.sql.adaptive.shuffle.targetPostShuffleInputSize' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.adaptive.advisoryPartitionSizeInBytes' instead of it.
25/11/05 11:20:29 WARN SQLConf: The SQL config 'spark.sql.adaptive.shuffle.target

root
 |-- case_barcode: string (nullable = true)
 |-- probe_id_ids: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- beta_values: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- chromosome: string (nullable = false)



In [3]:
# from pyspark.sql import functions as F

# ðŸ”¹ Rozwijamy obie tablice rÃ³wnolegle
exploded_df = merged_df.withColumn(
    "zipped",
    F.arrays_zip("probe_id_ids", "beta_values")
).withColumn(
    "exploded",
    F.explode("zipped")
).select(
    "case_barcode",
    "chromosome",
    F.col("exploded.probe_id_ids").alias("probe_id_id"),
    F.col("exploded.beta_values").alias("beta_value")
)

# ðŸ”¹ WyÅ›wietl przykÅ‚adowe dane i schemat
#exploded_df.show(5, truncate=False)
exploded_df.printSchema()

root
 |-- case_barcode: string (nullable = true)
 |-- chromosome: string (nullable = false)
 |-- probe_id_id: string (nullable = true)
 |-- beta_value: double (nullable = true)



In [4]:
exploded_df.show()



+------------+----------+-----------+------------------+
|case_barcode|chromosome|probe_id_id|        beta_value|
+------------+----------+-----------+------------------+
|TCGA-BR-4276|     chr11| cg00080012|0.0120308692210344|
|TCGA-BR-4276|     chr11| cg00080012|0.0127937002970355|
|TCGA-BR-4276|     chr11| cg00146096|0.0123743793715371|
|TCGA-BR-4276|     chr11| cg00146096|0.0186264167163925|
|TCGA-BR-4276|     chr11| cg00554702| 0.105780696728702|
|TCGA-BR-4276|     chr11| cg00554702|   0.1195274389606|
|TCGA-BR-4276|     chr11| cg00858899|0.0312394159831404|
|TCGA-BR-4276|     chr11| cg00858899|0.0325718500447315|
|TCGA-BR-4276|     chr11| cg00901652|0.0742616877956292|
|TCGA-BR-4276|     chr11| cg00901652|0.0474890419650844|
|TCGA-BR-4276|     chr11| cg00908551|0.0686619257058537|
|TCGA-BR-4276|     chr11| cg00908551|0.0452899832671019|
|TCGA-BR-4276|     chr11| cg00953256| 0.643713811274543|
|TCGA-BR-4276|     chr11| cg00953256| 0.310383809463714|
|TCGA-BR-4276|     chr11| cg011

                                                                                

In [None]:
from pyspark.sql import functions as F

# ðŸ”¹ Krok 2: policz, w ilu pacjentach wystÄ™puje dana sonda (per chromosom)
probe_counts = (
    exploded_df
    .select("chromosome", "probe_id_id", "case_barcode")
    .distinct()  # unikamy duplikatÃ³w
    .groupBy("chromosome", "probe_id_id")
    .agg(F.countDistinct("case_barcode").alias("n_patients_with_probe"))
)
first_quantile = 8894

# ðŸ”¹ Krok 3: wybierz tylko te sondy, ktÃ³re wystÄ™pujÄ… u wszystkich pacjentÃ³w
common_probes = probe_counts.filter(F.col("n_patients_with_probe") >= first_quantile)

#common_probes.show()



In [None]:
#probe_counts.coalesce(1).write.option("header","true").csv("probe_counts_out")

25/11/05 11:40:10 WARN TaskMemoryManager: Failed to allocate a page (536870912 bytes), try again.
25/11/05 11:43:00 WARN TaskMemoryManager: Failed to allocate a page (2147483648 bytes), try again.
25/11/05 11:43:00 WARN TaskMemoryManager: Failed to allocate a page (2147483648 bytes), try again.
25/11/05 11:43:00 WARN TaskMemoryManager: Failed to allocate a page (1532228038 bytes), try again.
                                                                                

In [23]:
#ðŸ”¹ Krok 4: doÅ‚Ä…cz je z powrotem do exploded_df
filtered_df = (
    exploded_df.join(
        common_probes.select("chromosome", "probe_id_id"),
        on=["chromosome", "probe_id_id"],
        how="left"
    )
)

# ðŸ”¹ Krok 5: podglÄ…d i schemat
filtered_df.show(5, truncate=False)
filtered_df.printSchema()



+----------+-----------+------------+------------------+
|chromosome|probe_id_id|case_barcode|beta_value        |
+----------+-----------+------------+------------------+
|chr11     |cg00080012 |TCGA-BR-4276|0.0120308692210344|
|chr11     |cg00080012 |TCGA-BR-4276|0.0127937002970355|
|chr11     |cg00146096 |TCGA-BR-4276|0.0123743793715371|
|chr11     |cg00146096 |TCGA-BR-4276|0.0186264167163925|
|chr11     |cg00554702 |TCGA-BR-4276|0.105780696728702 |
+----------+-----------+------------+------------------+
only showing top 5 rows
root
 |-- chromosome: string (nullable = false)
 |-- probe_id_id: string (nullable = true)
 |-- case_barcode: string (nullable = true)
 |-- beta_value: double (nullable = true)



                                                                                

In [None]:
patients_per_chr = (
    exploded_df
    .select("case_barcode", "chromosome")
    .distinct()
)
probes_per_chr = (
    exploded_df
    .select("chromosome", "probe_id_id")
    .distinct()
)
full_grid = (
    patients_per_chr
    .join(probes_per_chr, on="chromosome", how="inner")
)

[Stage 35:===>             (5 + 8) / 23][Stage 36:>                (0 + 0) / 23]

In [32]:
exploded_df_with_nans = (
    full_grid
    .join(
        exploded_df.select("case_barcode", "chromosome", "probe_id_id", "beta_value"),
        on=["case_barcode", "chromosome", "probe_id_id"],
        how="left"
    )
)

In [None]:
output_path = "dna_methylation_common_probes.parquet"

(
    exploded_df_with_nans
    .coalesce(1)   # wymusza zapis do jednego pliku (jedna partycja)
    .write
    .mode("overwrite")
    .parquet(output_path)
)

print(f"âœ… Wynik zapisany do: {output_path}")

In [None]:
# import pandas as pd
from google.cloud import bigquery

client = bigquery.Client()

df_genes = pd.read_csv('https://raw.githubusercontent.com/marcin119a/data/refs/heads/main/joined.csv')
genes = "', '".join(list(df_genes['Hugo Symbol'].unique()))
dataset = 'isb-cgc-bq.annotations.methylation_annotation_hg38_gdc_current'

query = f"""
SELECT DISTINCT
  g.symbol AS gene_symbol,
  CpG_probe_id,
  chromosome,
  position
FROM `{dataset}`,
UNNEST(genes) AS g
WHERE g.symbol IN ('{genes}')
ORDER BY g.symbol
"""

query_job = client.query(query)
probles_for_onkodb = query_job.to_dataframe()


In [None]:
# probles_for_onkodb = spark.createDataFrame(probles_for_onkodb)


In [None]:
# from pyspark.sql import functions as F

# ðŸ”¹ Rozwijamy obie tablice rÃ³wnolegle
exploded_df = merged_df.withColumn(
    "zipped",
    F.arrays_zip("probe_id_ids", "beta_values")
).withColumn(
    "exploded",
    F.explode("zipped")
).select(
    "case_barcode",
    "chromosome",
    F.col("exploded.probe_id_ids").alias("probe_id_id"),
    F.col("exploded.beta_values").alias("beta_value")
)

# ðŸ”¹ WyÅ›wietl przykÅ‚adowe dane i schemat
#exploded_df.show(5, truncate=False)
exploded_df.printSchema()

In [2]:
exploded_df

NameError: name 'exploded_df' is not defined

# joined_df = (
    exploded_df
    .join(probles_for_onkodb, on=["chromosome"], how="left")
)


# grouped_df = (
    exploded_df
    .groupBy("case_barcode", "chromosome")
    .agg(
        F.sort_array(
            F.collect_list(
                F.struct("probe_id_id", "beta_value")
            ),
            asc=True
        ).alias("sorted_pairs")
    )
    .select(
        "case_barcode",
        "chromosome",
        F.expr("transform(sorted_pairs, x -> x.probe_id_id)").alias("probe_id_ids"),
        F.expr("transform(sorted_pairs, x -> x.beta_value)").alias("beta_values")
    )
)

print(grouped_df.show(3, truncate=False))
print(ggrouped_df.printSchema())

# grouped_df.count()

# 