Skip to content

deduplicating analyses

James Kent edited this page Jun 4, 2024 · 1 revision
from sqlalchemy import create_engine, Column, Integer, String, Float, ForeignKey, func, exists
from sqlalchemy.orm import relationship, sessionmaker, aliased
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class BaseStudy(Base):
    __tablename__ = 'base_study'
    id = Column(Integer, primary_key=True)
    studies = relationship('Study', back_populates='base_study')

class Study(Base):
    __tablename__ = 'study'
    id = Column(Integer, primary_key=True)
    base_study_id = Column(Integer, ForeignKey('base_study.id'))
    source = Column(String)  # Assuming the source column exists
    base_study = relationship('BaseStudy', back_populates='studies')
    analyses = relationship('Analysis', back_populates='study')

class Analysis(Base):
    __tablename__ = 'analysis'
    id = Column(Integer, primary_key=True)
    study_id = Column(Integer, ForeignKey('study.id'))
    name = Column(String)  # Assuming the name column exists
    description = Column(String)  # Assuming the description column exists
    study = relationship('Study', back_populates='analyses')
    points = relationship('Point', back_populates='analysis')
    annotations = relationship('Annotation', secondary='annotation_analysis', back_populates='analyses')

class Point(Base):
    __tablename__ = 'point'
    id = Column(Integer, primary_key=True)
    analysis_id = Column(Integer, ForeignKey('analysis.id'))
    x = Column(Float)
    y = Column(Float)
    z = Column(Float)
    analysis = relationship('Analysis', back_populates='points')

class Annotation(Base):
    __tablename__ = 'annotation'
    id = Column(Integer, primary_key=True)
    text = Column(String)  # Assuming the annotation text column exists
    analyses = relationship('Analysis', secondary='annotation_analysis', back_populates='annotations')

class AnnotationAnalysis(Base):
    __tablename__ = 'annotation_analysis'
    annotation_id = Column(Integer, ForeignKey('annotation.id'), primary_key=True)
    analysis_id = Column(Integer, ForeignKey('analysis.id'), primary_key=True)

# Create an engine and session
engine = create_engine('postgresql://username:password@localhost:5432/mydatabase')
Session = sessionmaker(bind=engine)
session = Session()

# Regular expression patterns
numeric_pattern = r'^\d+$'
string_pattern = r'[A-Za-z]'

# Function to get common points count
def get_common_points_count(session, analysis1_id, analysis2_id):
    common_points_count = (
        session.query(func.count(Point.id))
        .join(PointAlias2, (Point.x == PointAlias2.x) & 
                          (Point.y == PointAlias2.y) & 
                          (Point.z == PointAlias2.z) &
                          (Point.analysis_id == analysis1_id) &
                          (PointAlias2.analysis_id == analysis2_id))
        .scalar()
    )
    return common_points_count

# Dictionary to hold the results
study_analysis_pairs = {}

# Retrieve all studies
studies = session.query(Study).filter(Study.source == 'neurosynth').all()

# Aliases for subquery
PointAlias2 = aliased(Point)

for study in studies:
    # Separate analyses into numeric named and string named groups
    numeric_analyses = session.query(Analysis).filter(Analysis.study_id == study.id, Analysis.name.op('~')(numeric_pattern)).all()
    string_analyses = session.query(Analysis).filter(Analysis.study_id == study.id, Analysis.name.op('~')(string_pattern)).all()

    # List to hold the pairs of numeric and string analyses
    pairs = []

    # Find and store the best matches based on common points
    for numeric_analysis in numeric_analyses:
        best_match = None
        max_common_points = 0
        for string_analysis in string_analyses:
            common_points_count = get_common_points_count(session, numeric_analysis.id, string_analysis.id)
            if common_points_count > max_common_points:
                max_common_points = common_points_count
                best_match = string_analysis

        if best_match:
            # Move annotations from the string analysis to the numeric analysis
            annotations = session.query(Annotation).join(AnnotationAnalysis).filter(AnnotationAnalysis.analysis_id == best_match.id).all()
            for annotation in annotations:
                # Check if the numeric analysis already has this annotation
                exists_query = session.query(
                    exists().where(
                        AnnotationAnalysis.annotation_id == annotation.id,
                        AnnotationAnalysis.analysis_id == numeric_analysis.id
                    )
                ).scalar()

                if not exists_query:
                    # Update AnnotationAnalysis table
                    annotation_analysis_entry = AnnotationAnalysis(annotation_id=annotation.id, analysis_id=numeric_analysis.id)
                    session.add(annotation_analysis_entry)
            
            # Copy name and description
            numeric_analysis.name = best_match.name
            numeric_analysis.description = best_match.description
            
            # Delete the string analysis
            session.delete(best_match)
            
            # Commit the changes
            session.commit()

            # Add to pairs
            pairs.append((numeric_analysis.name, best_match.name))

    # Store the pairs in the dictionary
    if pairs:
        study_analysis_pairs[study.id] = pairs

# Print the results
for study_id, pairs in study_analysis_pairs.items():
    print(f"Study ID: {study_id}")
    for numeric_name, string_name in pairs:
        print(f"  Numeric Analysis: {numeric_name}, String Analysis: {string_name}")

session.close()