In [1]:
import os
import sys
import json
import base64
import gzip
import logging
from collections import defaultdict
from io import BufferedReader
from typing import Any, Dict, List, Mapping, Optional, Sequence, Union, cast
from pathlib import Path

# Third-party imports
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import plotly.graph_objects as go
from PIL import Image
from importlib_metadata import EntryPoint

# MultiQC imports
import multiqc
from multiqc import BaseMultiqcModule, config
from multiqc.modules.custom_content import custom_content
from multiqc.plots import table, box
from multiqc.plots.plotly import heatmap
from multiqc.plots.plotly.heatmap import ElemT, HeatmapConfig
from multiqc.types import Anchor, LoadedFileDict, ModuleId, SectionId

In [13]:
from multiqc import config

In [None]:
config.load_config_file('config.yaml')

In [None]:
input_path = 'revio_run_dir'
multiqc.parse_logs(input_path)
module = multiqc.BaseMultiqcModule(
    name = 'revio demux report',
    anchor = 'revio_demux_report',
    info = 'revio demux report analysis results'
)

In [3]:
REPORTS_PATH = os.path.join(input_path, 'reports')
def read_json(file_path):
    with open(file_path, 'r') as f:
        return json.load(f)

def read_gzipped_json(file_path):
    with gzip.open(file_path, 'rt') as f:
        return json.load(f)

def parse_barcodes_report(file_path):
    with open(file_path) as f:
        data = json.load(f)
    parsed_data = {}
    for attr in data['attributes']:
        key = attr['id'].replace('ccs_demux_stats.', '')
        parsed_data[key] = attr['value']
    return parsed_data


In [4]:
summarized_data = defaultdict(dict)
barcodes_path = os.path.join(REPORTS_PATH, 'barcodes.report.json')

for s in multiqc.list_samples():
    data = parse_barcodes_report(barcodes_path)
    if data:
        summarized_data[s].update({
            "number_of_ccs_reads": data.get("number_of_ccs_reads"),
            "mean_ccs_readlength": data.get("mean_ccs_readlength"),
            "median_ccs_readlength": data.get("median_ccs_readlength"),
            "ccs_readlength_n50": data.get("ccs_readlength_n50"),
            "median_qv": data.get("median_qv")
        })

summarized_data = {
    s: {k: v for k, v in d.items() if v is not None}
    for s, d in summarized_data.items()
}
summarized_data = {s: d for s, d in summarized_data.items() if d}


In [5]:
plot = table.plot(
    data=summarized_data,
    pconfig={
        "id": "ccs_general_stats",
        "title": "CCS General Statistics",
    },
)
module.add_section(
    name="CCS General Statistics",
    anchor="ccs_general_stats",
    description="summary of ccs general statistics",
    plot=plot,
)


In [6]:
loading_data = read_json(os.path.join(REPORTS_PATH, 'loading.report.json'))

# 테이블 데이터 준비
data = {}
collection_context = loading_data['tables'][0]['columns'][0]['values'][0]
data[collection_context] = {
    'Total ZMWs': loading_data['attributes'][0]['value'],
    'P0 Count': loading_data['attributes'][1]['value'],
    'P0 (%)': loading_data['tables'][0]['columns'][3]['values'][0],
    'P1 Count': loading_data['attributes'][2]['value'],
    'P1 (%)': loading_data['tables'][0]['columns'][5]['values'][0],
    'P2 Count': loading_data['attributes'][3]['value'],
    'P2 (%)': loading_data['tables'][0]['columns'][7]['values'][0],
    'Loading Type': loading_data['tables'][0]['columns'][8]['values'][0]
}

# 테이블 설정
config = {
    'namespace': 'Loading Stats',
    'id': 'loading_stats',
    'table_title': 'Loading Statistics',
    'col1_header': 'Collection Context',
    'sortRows': False,
    'no_beeswarm': True,
    'format': {
        'Total ZMWs': '{:,.0f}',
        'P0 Count': '{:,.0f}',
        'P0 (%)': '{:.3f}%',
        'P1 Count': '{:,.0f}',
        'P1 (%)': '{:.3f}%',
        'P2 Count': '{:,.0f}',
        'P2 (%)': '{:.3f}%'
    }
}

table_plot = table.plot(
    data=data,
    pconfig=config
)

module.add_section(
    name="Loading Statistics",
    anchor="loading_stats_table",
    description="Summary of ZMW loading statistics including productivity metrics (P0, P1, P2) and loading type.",
    plot=table_plot
)

• deprecated field 'no_beeswarm'. Use 'no_violin' instead
• deprecated field 'sortRows'. Use 'sort_rows' instead
• deprecated field 'table_title'. Use 'title' instead
• unrecognized field 'format'. Available fields: id, title, anchor, height, width, square, logswitch, logswitch_active, logswitch_label, cpswitch, cpswitch_c_active, cpswitch_counts_label, cpswitch_percent_label, xlog, ylog, data_labels, xlab, ylab, xsuffix, ysuffix, tt_suffix, xlab_format, ylab_format, tt_label, x_decimals, y_decimals, tt_decimals, xmin, xmax, ymin, ymax, x_clipmin, x_clipmax, y_clipmin, y_clipmax, save_data_file, showlegend, x_minrange, y_minrange, x_bands, y_bands, x_lines, y_lines, namespace, save_file, raw_data_fn, defaultsort, sort_rows, only_defined_headers, col1_header, no_violin, scale, min, parse_numeric[0m


In [8]:
heatmap_data = read_gzipped_json(os.path.join(REPORTS_PATH, 'binned_readlength.json.gz'))
z_data = np.array(heatmap_data['data'][0]['z']).T.tolist()  # Convert to list after transpose
x_data = heatmap_data['data'][0]['y']  # Swap x and y labels
y_data = heatmap_data['data'][0]['x']

pconfig = {
    'id': 'readlength_barcode_heatmap',
    'title': 'Read Length Distribution By Barcode',
    'xlab': 'Read Length (bp)',  # Swapped x and y labels
    'ylab': 'Barcode Rank Order By Read Count',
    'min': 0,
    'square': False,
    'colstops': [
        [0, 'rgb(180,180,180)'],
        [0.001, 'rgb(82,82,255)'],
        [0.25, 'rgb(80,236,186)'],
        [0.5, 'rgb(255,255,100)'],
        [0.75, 'rgb(242,163,76)'],
        [1, 'rgb(255,80,80)']
    ],
}

heatmap_plot = heatmap.plot(
    z_data,  # List of lists after transpose
    HeatmapConfig.from_pconfig_dict(pconfig),
    x_data,  # Swapped x and y categories
    y_data
)

module.add_section(
    name="Read Length Distribution By Barcode",
    anchor="readlength_barcode_heatmap",
    description="Read Length Distribution By Barcode",
    plot=heatmap_plot
)


In [9]:
heatmap_data = read_gzipped_json(os.path.join(REPORTS_PATH, 'binned_bcqual.json.gz'))
z_data = np.array(heatmap_data['data'][0]['z']).T.tolist()  # Convert to list after transpose
x_data = heatmap_data['data'][0]['y']  # Swap x and y labels
y_data = heatmap_data['data'][0]['x']

pconfig = {
    'id': 'bcqual_barcode_heatmap',
    'title': 'Barcode Quality Distribution By Barcode',
    'xlab': 'Read Length (bp)',  # Swapped x and y labels
    'ylab': 'Barcode Rank Order By Read Count',
    'min': 0,
    'square': False,
    'colstops': [
        [0, 'rgb(180,180,180)'],
        [0.001, 'rgb(82,82,255)'],
        [0.25, 'rgb(80,236,186)'],
        [0.5, 'rgb(255,255,100)'],
        [0.75, 'rgb(242,163,76)'],
        [1, 'rgb(255,80,80)']
    ],
}

heatmap_plot = heatmap.plot(
    z_data,  # List of lists after transpose
    HeatmapConfig.from_pconfig_dict(pconfig),
    x_data,  # Swapped x and y categories
    y_data
)
module.add_section(
    name="Barcode Quality Distribution By Barcode",
    anchor="bcqual_barcode_heatmap",
    description="Barcode Quality Distribution By Barcode",
    plot=heatmap_plot
)

In [11]:
def get_plot_description(plot_name: str) -> str:
    """
    Get detailed description for each plot type
    """
    descriptions = {
        "ccs_npasses_hist": "Distribution of CCS passes showing the number of complete passes of the insert",
        "ccs_combined_readlength_hist_plot": "Combined read length distribution for all CCS reads",
        "readlength_qv_hist2d.hexbin": "2D histogram showing relationship between read length and quality scores",
        "m5c_detections": "Methylation (m5C) detection results across reads",
        "m5c_detections_hist": "Histogram of methylation (m5C) detection frequencies",
        "raw_read_length_plot": "Distribution of raw read lengths before processing",
        "readLenDist0": "Read length distribution for the dataset",
        "hexbin_length_plot": "Hexbin plot showing read length distributions",
        "readlength_plot": "Distribution of read lengths after processing",
        "concordance_plot": "Plot showing concordance between reads",
        "base_yield_plot": "Cumulative base yield across read lengths",
        "nreads": "Number of reads per sample/barcode",
        "nreads_histogram": "Histogram showing distribution of read counts",
        "bq_histogram": "Distribution of base quality scores",
        "readlength_histogram": "Detailed histogram of read lengths",
        "ccs_hifi_readlength_hist_plot": "Read length distribution for HiFi reads",
        "ccs_hifi_read_length_yield_plot": "Cumulative yield plot for HiFi reads",
        "ccs_all_readlength_hist_plot": "Read length distribution for all CCS reads",
        "ccs_accuracy_hist": "Distribution of CCS read accuracies",
        "readlength_distribution": "Overall distribution of read lengths across the dataset"
    }
    return descriptions.get(plot_name, "Analysis plot showing sequence data characteristics")

def load_image_file(file_path: str) -> Dict:
    """
    Load and encode image file for MultiQC report
    """
    file_path = Path(file_path)
    
    with open(file_path, 'rb') as f:
        image_string = base64.b64encode(cast(BufferedReader, f).read()).decode("utf-8")
        img_html = '<div class="mqc-custom-content-image"><img src="data:image/png;base64,{}" /></div>'.format(
            image_string
        )
    
    # Create parsed dictionary with detailed description
    plot_name = file_path.stem
    parsed_dict = {
        "id": plot_name,
        "plot_type": "image",
        "section_name": plot_name.replace("_", " ").replace("-", " ").replace(".", " ").title(),
        "description": get_plot_description(plot_name),
        "data": img_html,
    }
    
    return parsed_dict

def get_non_thumb_pngs(directory: str) -> List[Path]:
    """
    Get list of all non-thumbnail PNG files in directory
    """
    path = Path(directory)
    all_pngs = list(path.glob("*.png"))
    return [p for p in all_pngs if "thumb" not in p.name]

png_files = get_non_thumb_pngs(REPORTS_PATH)
for png_file in sorted(png_files):
    try:
        parsed_dict = load_image_file(str(png_file))
        module.add_section(
            name=parsed_dict["section_name"],
            anchor=parsed_dict["id"],
            description=parsed_dict["description"],
            content=parsed_dict["data"]
        )
    except Exception as e:
        print(f"Failed to process {png_file}: {str(e)}")


In [12]:
# Add module to report
multiqc.report.modules = [module]
multiqc.write_report(
    force=True,
    title="Revio Demux Report",
    filename="revio_demux_report",
)

[34m     update_config[0m | Report title: Revio Demux Report
[34m     write_results[0m | Data        : revio_demux_report_data   (overwritten)
[34m     write_results[0m | Report      : revio_demux_report.html   (overwritten)
