# DNA Base Sequence Content Analysis

This notebook demonstrates how to analyze and visualize the distribution of nucleotide bases in DNA sequence data using Polars and matplotlib.

## 0. Setup Environment

Before we begin our analysis, we should clean up any temporary files created by DataFusion/Apache Arrow from previous runs. These temporary catalogs can accumulate over time as they are created each time we run queries.

In [None]:
import os
import shutil

def cleanTMP():
    tmp_path = os.path.join(os.getcwd(), 'tmp')
    if os.path.exists(tmp_path):
        print(f"Usuwanie folderu tymczasowego: {tmp_path}")
        shutil.rmtree(tmp_path, ignore_errors=True)
        print("Folder tymczasowy usunięty.")
    else:
        print("Folder tymczasowy nie istnieje - brak potrzeby czyszczenia.")

cleanTMP()

## 1. Create Sample DNA Sequence Data

Let's import the necessary libraries for our analysis.

In [None]:
import polars as pl
import matplotlib.pyplot as plt
import polars_bio as pb
from polars_bio.quality_control_op import base_sequence_content
from polars_bio.quality_control_viz import plot_base_content

# Set matplotlib style for better visualizations
plt.style.use('ggplot')
%matplotlib inline

Now we'll create a simple example dataset with DNA sequences.

In [None]:
short_sequences = pl.DataFrame({
    "sequence": ["ATGC", "AAGC", "ATTC", "GTCC"]
})

## 2. Analyze Base Sequence Content

Now we'll use the `base_sequence_content` function to analyze the distribution of bases at each position.

In [None]:
result = base_sequence_content(short_sequences)
print(result)

## 3. Visualize Base Distribution

Let's visualize the distribution of bases at each position using our custom plotting function.

In [None]:
plot_base_content(result, figsize=(10, 6))

## 4. Creating More Realistic Data

Generate a more realistic dataset with longer sequences to better visualize base content distribution.

In [None]:
import random

def generate_dna(length, n_freq=0.05):
    bases = ['A', 'C', 'G', 'T']
    sequence = []
    for _ in range(length):
        if random.random() < n_freq:
            sequence.append('N')
        else:
            sequence.append(random.choice(bases))
    return ''.join(sequence)

random.seed(42)
num_sequences = 100
seq_length = 100

sequences = [generate_dna(seq_length) for _ in range(num_sequences)]
df_sequences = pl.DataFrame({"sequence": sequences})

df_sequences.head()

Analyze base content on our larger dataset

In [None]:
result_large = base_sequence_content(df_sequences)
result_large.head()

Plot the base content distribution for our larger dataset

In [None]:
plot_base_content(
  result_large,
  figsize=(12, 7),
  title='Base Distribution Across Sequence Positions'
)

# 5. Processing Real FASTQ Data

5.1 Tesing with FASTQ file directly using file path

In [None]:
fastq_path = "../tests/data/quality_control/example.fastq"

# pb.ctx.set_option("datafusion.execution.target_partitions", "8")

fastq_results = pb.base_sequence_content(fastq_path)

print(f"Processed FASTQ file directly from path: {fastq_path}")
print("Base content analysis results (first 10 positions):")
print(fastq_results.head(10))

max_a = fastq_results["a_count"].max() if "a_count" in fastq_results.columns else 0
max_t = fastq_results["t_count"].max() if "t_count" in fastq_results.columns else 0
max_g = fastq_results["g_count"].max() if "g_count" in fastq_results.columns else 0
max_c = fastq_results["c_count"].max() if "c_count" in fastq_results.columns else 0
max_n = fastq_results["n_count"].max() if "n_count" in fastq_results.columns else 0

print(f"\nMaximum counts - A: {max_a}, T: {max_t}, G: {max_g}, C: {max_c}, N: {max_n}")

plot_base_content(
    fastq_results, 
    figsize=(14, 8), 
    title='Base Distribution in FASTQ Sequences'
)

5.2 Testing with LazyFrame Input

In [None]:
lazy_sequences = df_sequences.lazy()
print(f"Type of input: {type(lazy_sequences)}")

result_lazy = base_sequence_content(lazy_sequences)
print("Result from LazyFrame input:")
print(result_lazy.head())

plot_base_content(
    result_lazy,
    figsize=(12, 7),
    title='Base Distribution (From LazyFrame Input)'
)

5.3 Testing with Pandas DataFrame Input

In [None]:
import pandas as pd
pandas_sequences = df_sequences.to_pandas()
print(f"Type of input: {type(pandas_sequences)}")

result_pandas = base_sequence_content(pandas_sequences)
print("Result from pandas DataFrame input:")
print(result_pandas.head())

plot_base_content(
    result_pandas,
    figsize=(12, 7),
    title='Base Distribution (From Pandas DataFrame Input)'
)

5.4 Testing with Parquet file Input

In [None]:
parquet_file_path = "tmp.parquet"
df_sequences.write_parquet(parquet_file_path)

result_parquet = base_sequence_content(parquet_file_path)
print("Result from Parquet file input:")
print(result_parquet.head())

plot_base_content(
    result_parquet,
    figsize=(12, 7),
    title="Base Distribution (From Parquet file input)"
)

# 6. Comparison fastq-rc library with our implementation

⚠️ **CAUTION**: The performance comparison in the next cell is designed to work only on Linux-based systems. The benchmark uses `subprocess.run()` with shell commands to execute the `fqc` tool, which may not be available or may behave differently on other operating systems like Windows or macOS.

In [None]:
import time
import subprocess

test_fastq_path = "../tests/data/quality_control/example.fastq"

def measure_time(description, func, *args, **kwargs):
    print(f"Uruchamiam: {description}")
    start_time = time.time()
    result = func(*args, **kwargs)
    end_time = time.time()
    elapsed = end_time - start_time
    print(f"Czas wykonania: {elapsed:.4f}s")
    print("-----------------------------")
    return result

def run_fqc():
    cmd = f"fqc -q {test_fastq_path} > /dev/null"
    try:
        subprocess.run(cmd, shell=True, check=True)
        return True
    except Exception as e:
        print(f"Error running fqc: {e}")
        return False

def run_polars_bio_analysis():
    try:
        pb.base_sequence_content(test_fastq_path)
        return True
    except Exception as e:
        print(f"Error running base_sequence_content: {e}")
        return False

measure_time(f"fqc implementation", run_fqc)
measure_time("polars-bio implementation", run_polars_bio_analysis)
print("Comparison complete. Review execution times above.")

# 7. Clean up tmp files

Clean up temporary files created during the analysis

In [None]:
cleanTMP()