# Apache Spark deliberable tasks

#### Group: Alba Mas and Maria Cobo

Use microarray expression experiment dataset file “expression.csv”, with the following elements:

0. genBank Id
1. targetId
2. Species
3. experiment number
4. expression level

Please answer the following questions in your report/notebook providing:

- A short explanation of the Spark operations used.
- The source code of the query.
- All the results obtained.

In [1]:
#!pip install pyspark

In [2]:
# Setting up the environment
from pyspark.sql import SparkSession
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import min
from pyspark.sql.functions import col
import os

working_dir = os.getcwd()
#print(working_dir)

expression_path = working_dir + "/Delivery_datasets/expression-2023.csv"
gene_desc_path = working_dir + "/Delivery_datasets/geneDescriptions.csv"

In [3]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder.master("local[*]").appName("MicroarrayAnalysis").getOrCreate()

# Define file paths
working_dir = os.getcwd()
expression_path = working_dir + "/Delivery_datasets/expression-2023.csv"

# Property used to format output tables better
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
# spark

24/12/02 17:25:34 WARN Utils: Your hostname, maria resolves to a loopback address: 127.0.1.1; using 192.168.68.117 instead (on interface wlp1s0)
24/12/02 17:25:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/02 17:25:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/12/02 17:25:35 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
# Initialize SparkSession
spark = SparkSession.builder.master("local[*]").appName("MicroarrayAnalysis").getOrCreate()

# Define file paths
working_dir = os.getcwd()
expression_path = working_dir + "/Delivery_datasets/expression-2023.csv"

# Load the dataset
expression_df = spark.read.csv(expression_path, header=True, inferSchema=True)

#### EXERCICES:

You should use Spark DataFrames Python API to propose solutions to the questions:

**1. Get a list of all targets where species is Mus Musculus ('Mm') (1 point)**

In [5]:
# Filter targets where species is 'Mm'
mus_musculus_targets = expression_df.filter(col("SPECIES") == "Mm").select("targetId").distinct()

# Show the results
mus_musculus_targets.show()

+------------+
|    targetId|
+------------+
|     1235_at|
|U95_32123_at|
|     1234_at|
|     1231_at|
+------------+



**Results**: 

**2. Get all information on specific genBank Id "A00142" (1 p)**

In [6]:
# Filter rows where genBank Id is "A00142"
genbank_info = expression_df.filter(col("GENBANKID") == "A00142")

# Show all information for the specific genBank Id
genbank_info.show()

+---------+--------+-------+----------+----------+
|GENBANKID|TARGETID|SPECIES|EXPERIMENT|EXPRESSION|
+---------+--------+-------+----------+----------+
|   A00142|31325_at|     Hs|         1|       191|
|   A00142|31325_at|     Hs|         2|       101|
|   A00142|31325_at|     Hs|         4|        51|
|   A00142|31325_at|     Hs|         5|        71|
|   A00142|31325_at|     Hs|         6|        31|
+---------+--------+-------+----------+----------+



**Results**: 

**3. Get the list of unique genBank Ids from the experiment data (1 p)**

In [7]:
# Get the list of unique genBank Ids
unique_genbank_ids = expression_df.select("GENBANKID").distinct()

# Show the results
unique_genbank_ids.show()

+---------+
|GENBANKID|
+---------+
|   X70393|
|   L02870|
|   A12346|
|   A00142|
|   S75295|
|   A12348|
|   A12027|
|   A12349|
|   A22127|
|   A06977|
|   A12345|
|   M18228|
|   A00146|
| AI846313|
|   A12347|
|   A03911|
|   A22125|
|   A22124|
|   A22126|
|   A22123|
+---------+



**Results**: 

**4. Get all information on those samples with expression value between, but not equal to, 80 and 100. If you include 80 and 100 in your comparison, do you see any change in the results obtained? (1 p)**

In [8]:
# Filter for expression value between 80 and 100 (exclusive)
exclusive_filter = expression_df.filter((col("EXPRESSION") > 80) & (col("EXPRESSION") < 100))

# Filter for expression value between 80 and 100 (inclusive)
inclusive_filter = expression_df.filter((col("EXPRESSION") >= 80) & (col("EXPRESSION") <= 100))

# Show results for exclusive filter
print("Results for expression level between 80 and 100 (exclusive):")
exclusive_filter.show()

# Show results for inclusive filter
print("Results for expression level between 80 and 100 (inclusive):")
inclusive_filter.show()

Results for expression level between 80 and 100 (exclusive):
+---------+------------+-------+----------+----------+
|GENBANKID|    TARGETID|SPECIES|EXPERIMENT|EXPRESSION|
+---------+------------+-------+----------+----------+
|   M18228|     1235_at|     Mm|         1|        90|
|   A00146|    31324_at|     Hs|         3|        91|
|   A03911|    31356_at|     Hs|         3|        99|
|   A12347|     3159_at|     Hs|         3|        86|
|   S75295|U95_40474_at|     Hs|         1|        99|
+---------+------------+-------+----------+----------+

Results for expression level between 80 and 100 (inclusive):
+---------+------------+-------+----------+----------+
|GENBANKID|    TARGETID|SPECIES|EXPERIMENT|EXPRESSION|
+---------+------------+-------+----------+----------+
|   M18228|     1235_at|     Mm|         1|        90|
|   A00146|    31324_at|     Hs|         3|        91|
|   A03911|    31356_at|     Hs|         3|        99|
|   A12027|     3156_at|     Hs|         2|       10

**Results**: 

**5. Count the number of experiments where each genBank id has a sample. Each line from the answer must contain a gene id and a specific number of experiments. If gene A is found twice in experiment number 1, it should be counted only once (1 p)**

In [9]:
# Group by genBank Id and count unique experiments
experiment_count = (
    expression_df.select("GENBANKID", "EXPERIMENT")
    .distinct()  # Ensure no duplicates for the same genBank Id and experiment
    .groupBy("GENBANKID")  # Group by genBank Id
    .count()  # Count unique experiments per genBank Id
    .alias("EXPERIMENT_COUNT")  # Rename the count column
)

# Show the results
experiment_count.show()

+---------+-----+
|GENBANKID|count|
+---------+-----+
|   X70393|    1|
|   L02870|    1|
|   A12346|    1|
|   A00142|    5|
|   S75295|    1|
|   A12348|    1|
|   A12349|    1|
|   A12027|    1|
|   A22127|    1|
|   A06977|    1|
|   A12345|    1|
|   M18228|    1|
|   A00146|    1|
| AI846313|    1|
|   A12347|    1|
|   A03911|    1|
|   A22125|    1|
|   A22124|    1|
|   A22126|    1|
|   A22123|    1|
+---------+-----+



**Results**: 

**6. Get all experiment Ids from the dataset together with its minimum expression level. Sort the list by expression level. Please provide all results obtained in your answer (1 p)**

In [10]:
# Group by experiment number and calculate the minimum expression level
experiment_min_expr = (
    expression_df.groupBy("EXPERIMENT")
    .agg(min("EXPRESSION").alias("MIN EXPRESSION LEVEL"))
    .orderBy("MIN EXPRESSION LEVEL")  # Sort by minimum expression level
)

# Show the results
experiment_min_expr.show()

+----------+--------------------+
|EXPERIMENT|MIN EXPRESSION LEVEL|
+----------+--------------------+
|         1|                   0|
|         3|                   0|
|         2|                   0|
|         6|                  31|
|         4|                  51|
|         5|                  71|
+----------+--------------------+



**Results**: 

**7. Get all expression levels from albumin genes. You must use additional geneDescriptions.csv dataset file with columns "gene_id" and "description" to design a join operation. Show genes, expression levels and descriptions in your final result. Please provide all results obtained in your answer (1 p)**

In [11]:
from pyspark.sql.functions import col

# Load the expression and gene descriptions datasets
expression_df = spark.read.csv(expression_path, header=True, inferSchema=True)
gene_desc_path = working_dir + "/Delivery_datasets/geneDescriptions.csv"
gene_desc_df = spark.read.csv(gene_desc_path, header=True, inferSchema=True)

# Join the datasets on TARGETID (expression dataset) and GENE_ID (gene descriptions dataset)
joined_df = expression_df.join(gene_desc_df, expression_df["GENBANKID"] == gene_desc_df["GENE_ID"])

# Filter for albumin genes (case-insensitive search in DESCRIPTION)
albumin_genes_df = joined_df.filter(joined_df["DESCRIPTION"].rlike("albumin"))

# Select the relevant columns: GENE_ID, EXPRESSION, and DESCRIPTION
result_df = albumin_genes_df.select("GENE_ID", "EXPRESSION", "DESCRIPTION")

# Show the results
result_df.show(truncate=False)

+-------+----------+-----------+
|GENE_ID|EXPRESSION|DESCRIPTION|
+-------+----------+-----------+
|A00142 |31        |albumin    |
|A00142 |71        |albumin    |
|A00142 |51        |albumin    |
|A00142 |101       |albumin    |
|A00142 |191       |albumin    |
+-------+----------+-----------+



**Results**: 