# STEP 1 : CONVERTING DATA INTO PARQUET

In [1]:
import os
import re
import json
import pandas as pd
from datetime import datetime
from tqdm import tqdm  # For nicer progress bars (optional)
# pip install tqdm if you don't have it already

def flatten_article(article, file_date_str):
    """
    Flatten a single article's nested fields (abstract_sections, authors, mesh_terms, etc.)
    into a dictionary of simple columns.
    """
    # 1) Abstract
    abstract_sections = article.get("abstract_sections", [])
    # Join each abstract section into a single multi-paragraph string
    # including the label, if desired
    if abstract_sections:
        abstract = "\n\n".join([
            f"{section.get('label', '')}: {section.get('text', '').strip()}"
            for section in abstract_sections
            if section.get("text")
        ]).strip()
    else:
        abstract = None

    # 2) Authors
    authors_list = article.get("authors", [])
    authors = "; ".join(
        [a.get("name", "") for a in authors_list if a.get("name")]
    )
    # Flatten affiliations as well
    affiliations = "; ".join(
        [a.get("affiliation", "") for a in authors_list if a.get("affiliation")]
    )

    # 3) Mesh terms
    mesh_list = article.get("mesh_terms", [])
    mesh_terms = "; ".join(
        [m.get("descriptor", "") for m in mesh_list if m.get("descriptor")]
    )

    # 4) Keywords
    keywords_list = article.get("keywords", [])
    # Convert any None to an empty string
    keywords_list = [k if k is not None else "" for k in keywords_list]
    keywords_str = "; ".join(keywords_list)


    # 5) Build the flattened record
    record = {
        "uid": article.get("uid"),
        "title": article.get("title"),
        "journal": article.get("journal"),
        "pubdate": article.get("pubdate"),    # e.g. "1996-Aug-01"
        "abstract": abstract,
        "authors": authors,
        "affiliations": affiliations,
        "mesh_terms": mesh_terms,
        "keywords": keywords_str,
        "coi_statement": article.get("coi_statement"),
        # Add the date derived from filename/folder
        "date": file_date_str
    }
    return record

def get_year_month_from_filename(filename):
    """
    Extract year and month from filename like 'results_1996_10.json'
    Returns (year, month) as strings.
    """
    # You could also parse with a regex or with split
    match = re.search(r"results_(\d{4})_(\d{2})\.json", filename)
    if match:
        year = match.group(1)
        month = match.group(2)
        return year, month
    return None, None

def convert_jsons_to_parquet(
    input_folder: str,
    output_folder: str,
    start_year: int = 1994,
    end_year: int = 2024,
    one_big_file: bool = True,
):
    """
    Read all results_YYYY_MM.json files from input_folder, flatten them,
    and save to Parquet. If one_big_file=True, everything goes into one file,
    otherwise it saves one Parquet per month.

    Parameters:
    -----------
    - input_folder  : str  => Path to your data/results folder
    - output_folder : str  => Where to write the Parquet(s)
    - start_year, end_year : int => Range of years to look for
    - one_big_file : bool  => True = single Parquet file, 
                              False = one Parquet per year-month
    """
    # Make sure output folder exists
    os.makedirs(output_folder, exist_ok=True)

    # Gather all possible file paths
    file_paths = []
    for year in range(start_year, end_year + 1):
        for month in range(1, 13):
            filename = f"results_{year}_{month:02d}.json"
            full_path = os.path.join(input_folder, filename)
            if os.path.exists(full_path):
                file_paths.append(full_path)

    if not file_paths:
        print("No files found in the specified range. Aborting.")
        return

    print(f"Found {len(file_paths)} files to process from {start_year} to {end_year}.")

    # We will accumulate data in a DataFrame if one_big_file is True,
    # otherwise, we process each month separately.
    all_records = []  # Only used if one_big_file is True

    # tqdm is optional for progress bars. If you prefer basic prints, remove it.
    for file_path in tqdm(file_paths, desc="Converting JSON to Parquet"):
        filename = os.path.basename(file_path)
        year_str, month_str = get_year_month_from_filename(filename)

        # e.g. "1996-10-01"
        date_str = f"{year_str}-{month_str}-01" if year_str and month_str else None

        # Load the JSON
        with open(file_path, "r", encoding="utf-8") as f:
            data = json.load(f)  # data is a list of articles

        # Flatten each article
        month_records = []
        for article in data:
            record = flatten_article(article, date_str)
            month_records.append(record)

        # If writing monthly Parquet
        if not one_big_file:
            # Convert month_records -> DataFrame -> save to Parquet
            df = pd.DataFrame(month_records)

            # e.g. data/parquet/results_1996_10.parquet
            out_file = os.path.join(
                output_folder, f"results_{year_str}_{month_str}.parquet"
            )
            df.to_parquet(out_file, index=False)
        else:
            # Accumulate in memory
            all_records.extend(month_records)

    if one_big_file:
        print(f"\nConverting all records into one DataFrame and saving to Parquet...")
        df_all = pd.DataFrame(all_records)
        out_file = os.path.join(output_folder, "all_results.parquet")
        df_all.to_parquet(out_file, index=False)
        print(f"Saved to {out_file}.")

    print("Done converting JSON to Parquet.")


# Example usage:
if __name__ == "__main__":
    results_folder = "data/results"        # Folder containing results_YYYY_MM.json
    output_folder = "data/parquet_output"  # Where to save Parquet files
    convert_jsons_to_parquet(
        input_folder=results_folder,
        output_folder=output_folder,
        start_year=1994,
        end_year=2024,
        one_big_file=True  # or True for a single file
    )


Found 370 files to process from 1994 to 2024.


Converting JSON to Parquet: 100%|██████████| 370/370 [10:07<00:00,  1.64s/it]



Converting all records into one DataFrame and saving to Parquet...
Saved to data/parquet_output\all_results.parquet.
Done converting JSON to Parquet.


In [19]:
# data check (3 min)
import pandas as pd

df = pd.read_parquet("data/parquet_output/all_results.parquet")
df.head()

Unnamed: 0,uid,title,journal,pubdate,abstract,authors,affiliations,mesh_terms,keywords,coi_statement,date
0,10150804,Evaluation of survival in medically treated pa...,"Journal of insurance medicine (New York, N.Y.)",None-01-01,General: Articles published in medical journal...,Iacovino J R,"New York Life Insurance Company, New York, USA.",Adolescent; Adult; Age Distribution; Chelation...,,,1994-01-01
1,9061841,Cohort versus cross-sectional design in large ...,Statistics in medicine,1994-Jan-15,General: In planning large longitudinal field ...,Feldman H A; McKinlay S M,"New England Research Institute, Inc., Watertow...",Analysis of Variance; Cluster Analysis; Cohort...,,,1994-01-01
2,9061840,Network analytic methods for epidemiological r...,Statistics in medicine,1994-Jan-15,General: The authors measure the efficacy of t...,Altmann M; Wee B C; Willard K; Peterson D; Gat...,"Division of Health Computer Sciences, Universi...",Analysis of Variance; Communicable Disease Con...,,,1994-01-01
3,9061838,"Estimating age, period and cohort effects usin...",Statistics in medicine,1994-Jan-15,General: To understand cancer aetiology better...,Holford T R; Zhang Z; McKay L A,"Department of Epidemiology and Public Health, ...","Age Factors; Cell Transformation, Neoplastic; ...",,,1994-01-01
4,16353609,A perspective on the hormonal abnormalities of...,Obesity research,1994-Jan-01,General: Studies in our laboratory and elsewhe...,Zumoff B; Strain G W,"Division of Endocrinology and Metabolism, Depa...",Adolescent; Adult; Estradiol; Estrone; Female;...,,,1994-01-01


# STEP 2: REPAIRING DATE + REMOVING DUPLICATES

In [14]:
import os
import re
import json
import pandas as pd
from datetime import datetime
from tqdm import tqdm

def parse_pubdate(pubdate_str: str):
    """
    Convert a pubdate like '1994-Jan-15' or '1994-Jan' or '1994' into 'YYYY-MM-DD'.
    If anything is missing or invalid, return None.
    """
    if not pubdate_str:
        return None
    
    # Common patterns you might see:
    # 1) YYYY-Mmm-DD  (e.g. 1994-Jan-15)
    # 2) YYYY-Mmm      (e.g. 1994-Jan)
    # 3) YYYY          (e.g. 1994)
    # 4) Something like 'None-01-01' which is invalid
    # 5) Possibly 'N/A', 'NA', etc.

    # Quick check for 'None', 'NA', or obviously invalid
    if 'None' in pubdate_str or 'NA' in pubdate_str:
        return None

    # We'll try multiple parsing patterns. The easiest is to replace
    # short month name with numeric month via strptime if it follows 
    # the pattern 'YYYY-%b-%d' or 'YYYY-%b' etc.
    # We'll guess day=01 if missing.

    # 1) Attempt full pattern: YYYY-%b-%d (e.g. 1994-Jan-15)
    # 2) If that fails, attempt YYYY-%b (e.g. 1994-Jan, assume day=01)
    # 3) If that fails, attempt just YYYY (assume month=01, day=01)

    # We'll do this in a small try/except chain:
    for fmt in ["%Y-%b-%d", "%Y-%b", "%Y"]:
        try:
            dt = datetime.strptime(pubdate_str, fmt)
            # If the format was just Year or Year+Month, 
            # dt will default day=1 or month=January if missing
            return dt.strftime("%Y-%m-%d")
        except ValueError:
            pass
    
    # If everything fails, return None
    return None

def create_parsed_date_column(df: pd.DataFrame) -> pd.DataFrame:
    """
    Create a new column 'parsed_date' based on the logic:
      - If pubdate is valid, parse it
      - Otherwise, use the file-based date
    """
    # We'll create a new column by applying parse_pubdate to each pubdate row
    df["temp_parsed_pubdate"] = df["pubdate"].apply(parse_pubdate)

    # Now we combine
    # if temp_parsed_pubdate is None, use the 'date' column
    # else use the parsed pubdate
    def pick_date(row):
        pubd = row["temp_parsed_pubdate"]
        file_d = row["date"]  # e.g. '1994-01-01'
        if pubd is not None:
            return pubd
        else:
            return file_d
    
    df["parsed_date"] = df.apply(pick_date, axis=1)

    # You can drop the temp column
    df.drop(columns=["temp_parsed_pubdate"], inplace=True)

    return df

def check_duplicates(df: pd.DataFrame):
    """
    Print duplicate stats based on 'uid'.
    Doesn't remove them, just prints summary.
    """
    total_rows = len(df)
    unique_uids = df["uid"].nunique()
    dup_count = total_rows - unique_uids

    print(f"Total rows: {total_rows}")
    print(f"Unique UIDs: {unique_uids}")
    print(f"Duplicate rows (by uid): {dup_count}")

    if dup_count > 0:
        print("\nExample duplicates:")
        # Find which UIDs are duplicated
        duplicated_uids = df["uid"][df["uid"].duplicated()].unique()
        print(duplicated_uids[:10])  # print first 10 duplicates for inspection

def clean_parquet_file(input_parquet: str, output_parquet: str):
    """
    Load a Parquet file, create 'parsed_date', check duplicates, then save result.
    """
    print(f"\nReading {input_parquet}...")
    df = pd.read_parquet(input_parquet)
    print(f"Loaded {len(df)} rows from {input_parquet}.")

    # 1. Create the parsed date column
    df = create_parsed_date_column(df)

    # 2. Check duplicates
    check_duplicates(df)

    # 3. Remove duplicates by uid (keeping the last occurrence, for example)
    df_dedup = df.drop_duplicates(subset=["uid"], keep="last") # after checking how deduplication works uid the same as 

    # 4. Save it back
    df_dedup.to_parquet(output_parquet, index=False)
    print(f"Saved cleaned data to {output_parquet}")

if __name__ == "__main__":
    # Suppose you have a single big Parquet from 1994-1998,
    # or monthly Parquets. Example of whole approach:
    import glob
    input_folder = "data/parquet_output"
    output_folder = "data/cleaned_parquet/final"
    os.makedirs(output_folder, exist_ok=True)

    # Let’s process all .parquet in input_folder
    parquet_files = glob.glob(os.path.join(input_folder, "*.parquet"))
    
    for infile in parquet_files:
        fname = "final" #os.path.basename(infile)
        outfile = os.path.join(output_folder, f"PubMedAbstracts_{fname}.parquet")
        clean_parquet_file(infile, outfile)



Reading data/parquet_output\all_results.parquet...
Loaded 1460893 rows from data/parquet_output\all_results.parquet.
Total rows: 1460893
Unique UIDs: 1059761
Duplicate rows (by uid): 401132

Example duplicates:
['11250692' '11250688' '11094416' '11094415' '11094414' '10854520'
 '10770738' '10739772' '10739759' '10739758']
Saved cleaned data to data/cleaned_parquet/final\PubMedAbstracts_final.parquet


In [9]:
import pandas as pd

df = pd.read_parquet("data/cleaned_parquet/final/PubMedAbstracts_final.parquet")
df.head()

Unnamed: 0,uid,title,journal,pubdate,abstract,authors,affiliations,mesh_terms,keywords,coi_statement,date,parsed_date
0,10150804,Evaluation of survival in medically treated pa...,"Journal of insurance medicine (New York, N.Y.)",None-01-01,General: Articles published in medical journal...,Iacovino J R,"New York Life Insurance Company, New York, USA.",Adolescent; Adult; Age Distribution; Chelation...,,,1994-01-01,1994-01-01
1,9061841,Cohort versus cross-sectional design in large ...,Statistics in medicine,1994-Jan-15,General: In planning large longitudinal field ...,Feldman H A; McKinlay S M,"New England Research Institute, Inc., Watertow...",Analysis of Variance; Cluster Analysis; Cohort...,,,1994-01-01,1994-01-15
2,9061840,Network analytic methods for epidemiological r...,Statistics in medicine,1994-Jan-15,General: The authors measure the efficacy of t...,Altmann M; Wee B C; Willard K; Peterson D; Gat...,"Division of Health Computer Sciences, Universi...",Analysis of Variance; Communicable Disease Con...,,,1994-01-01,1994-01-15
3,9061838,"Estimating age, period and cohort effects usin...",Statistics in medicine,1994-Jan-15,General: To understand cancer aetiology better...,Holford T R; Zhang Z; McKay L A,"Department of Epidemiology and Public Health, ...","Age Factors; Cell Transformation, Neoplastic; ...",,,1994-01-01,1994-01-15
4,16353609,A perspective on the hormonal abnormalities of...,Obesity research,1994-Jan-01,General: Studies in our laboratory and elsewhe...,Zumoff B; Strain G W,"Division of Endocrinology and Metabolism, Depa...",Adolescent; Adult; Estradiol; Estrone; Female;...,,,1994-01-01,1994-01-01


In [10]:
len(df)

1059761

# STEP 3: DEDUPLICATION CHECK

In [13]:
import os
import re
import json
import pandas as pd
from datetime import datetime
from tqdm import tqdm

def parse_pubdate(pubdate_str: str):
    """
    Convert a pubdate like '1994-Jan-15' or '1994-Jan' or '1994' into 'YYYY-MM-DD'.
    If anything is missing or invalid, return None.
    """
    if not pubdate_str:
        return None
    
    # Common patterns you might see:
    # 1) YYYY-Mmm-DD  (e.g. 1994-Jan-15)
    # 2) YYYY-Mmm      (e.g. 1994-Jan)
    # 3) YYYY          (e.g. 1994)
    # 4) Something like 'None-01-01' which is invalid
    # 5) Possibly 'N/A', 'NA', etc.

    # Quick check for 'None', 'NA', or obviously invalid
    if 'None' in pubdate_str or 'NA' in pubdate_str:
        return None

    # We'll try multiple parsing patterns. The easiest is to replace
    # short month name with numeric month via strptime if it follows 
    # the pattern 'YYYY-%b-%d' or 'YYYY-%b' etc.
    # We'll guess day=01 if missing.

    # 1) Attempt full pattern: YYYY-%b-%d (e.g. 1994-Jan-15)
    # 2) If that fails, attempt YYYY-%b (e.g. 1994-Jan, assume day=01)
    # 3) If that fails, attempt just YYYY (assume month=01, day=01)

    # We'll do this in a small try/except chain:
    for fmt in ["%Y-%b-%d", "%Y-%b", "%Y"]:
        try:
            dt = datetime.strptime(pubdate_str, fmt)
            # If the format was just Year or Year+Month, 
            # dt will default day=1 or month=January if missing
            return dt.strftime("%Y-%m-%d")
        except ValueError:
            pass
    
    # If everything fails, return None
    return None

def create_parsed_date_column(df: pd.DataFrame) -> pd.DataFrame:
    """
    Create a new column 'parsed_date' based on the logic:
      - If pubdate is valid, parse it
      - Otherwise, use the file-based date
    """
    # We'll create a new column by applying parse_pubdate to each pubdate row
    df["temp_parsed_pubdate"] = df["pubdate"].apply(parse_pubdate)

    # Now we combine
    # if temp_parsed_pubdate is None, use the 'date' column
    # else use the parsed pubdate
    def pick_date(row):
        pubd = row["temp_parsed_pubdate"]
        file_d = row["date"]  # e.g. '1994-01-01'
        if pubd is not None:
            return pubd
        else:
            return file_d
    
    df["parsed_date"] = df.apply(pick_date, axis=1)

    # You can drop the temp column
    df.drop(columns=["temp_parsed_pubdate"], inplace=True)

    return df

def check_duplicates(df: pd.DataFrame):
    """
    Print duplicate stats based on 'uid'.
    Doesn't remove them, just prints summary.
    """
    total_rows = len(df)
    unique_uids = df["uid"].nunique()
    dup_count = total_rows - unique_uids

    print(f"Total rows: {total_rows}")
    print(f"Unique UIDs: {unique_uids}")
    print(f"Duplicate rows (by uid): {dup_count}")

    if dup_count > 0:
        print("\nExample duplicates:")
        # Find which UIDs are duplicated
        duplicated_uids = df["uid"][df["uid"].duplicated()].unique()
        print(duplicated_uids[:10])  # print first 10 duplicates for inspection

def clean_parquet_file(input_parquet: str, output_parquet: str):
    """
    Load a Parquet file, create 'parsed_date', check duplicates, then save result.
    """
    print(f"\nReading {input_parquet}...")
    df = pd.read_parquet(input_parquet)
    print(f"Loaded {len(df)} rows from {input_parquet}.")

    # 1. Create the parsed date column
    df = create_parsed_date_column(df)

    # 2. Check duplicates
    check_duplicates(df)

    # 3. Remove duplicates by uid (keeping the last occurrence, for example)
    #df_dedup = df.drop_duplicates(subset=["uid"], keep="last") #after checking how deduplication works 

    # 4. Save it back
    df.to_parquet(output_parquet, index=False)
    print(f"Saved cleaned data to {output_parquet}")

if __name__ == "__main__":
    # Suppose you have a single big Parquet from 1994-1998,
    # or monthly Parquets. Example of whole approach:
    import glob
    input_folder = "data/parquet_output"
    output_folder = "data/cleaned_parquet"
    os.makedirs(output_folder, exist_ok=True)

    # Let’s process all .parquet in input_folder
    parquet_files = glob.glob(os.path.join(input_folder, "*.parquet"))
    
    for infile in parquet_files:
        fname = os.path.basename(infile)
        outfile = os.path.join(output_folder, f"cleaned_nodedup_{fname}")
        clean_parquet_file(infile, outfile)



Reading data/parquet_output\all_results.parquet...
Loaded 1460893 rows from data/parquet_output\all_results.parquet.
Total rows: 1460893
Unique UIDs: 1059761
Duplicate rows (by uid): 401132

Example duplicates:
['11250692' '11250688' '11094416' '11094415' '11094414' '10854520'
 '10770738' '10739772' '10739759' '10739758']
Saved cleaned data to data/cleaned_parquet\cleaned_nodedup_all_results.parquet


# STEP 4: DUPLICATES CHECK

In [6]:
import pandas as pd

# 1. Load the dataset
parquet_path = "data/parquet_output/all_results.parquet"
df = pd.read_parquet(parquet_path)
print(f"Loaded {len(df)} rows from {parquet_path}.")

################################################################################
# 2. Deduplicate by UID ONLY
################################################################################
df_dedup_uid = df.drop_duplicates(subset=["uid"], keep="last")
rows_before = len(df)
rows_after_uid = len(df_dedup_uid)
removed_uid = rows_before - rows_after_uid

print("\n=== Dedup by [uid] only ===")
print(f"Rows before: {rows_before}")
print(f"Rows after:  {rows_after_uid}")
print(f"Rows removed: {removed_uid}")

# Which rows were removed in the UID-only approach?
removed_mask_uid = ~df.index.isin(df_dedup_uid.index)
df_removed_uid = df[removed_mask_uid]
print(f"\nRows actually removed when deduplicating by [uid]: {len(df_removed_uid)}")


################################################################################
# 3. Deduplicate by MULTI-COLUMNS
################################################################################
dup_columns = ["uid", "abstract", "title", "pubdate", "mesh_terms", "journal"]
df_dedup_multi = df.drop_duplicates(subset=dup_columns, keep="last")
rows_after_multi = len(df_dedup_multi)
removed_multi = rows_before - rows_after_multi

print(f"\n=== Dedup by {dup_columns} ===")
print(f"Rows before: {rows_before}")
print(f"Rows after:  {rows_after_multi}")
print(f"Rows removed: {removed_multi}")

# Which rows were removed in the multi-column approach?
removed_mask_multi = ~df.index.isin(df_dedup_multi.index)
df_removed_multi = df[removed_mask_multi]
print(f"\nRows actually removed when deduplicating by {dup_columns}: {len(df_removed_multi)}")


################################################################################
# 4. Compare the Removed Sets
################################################################################
# - Rows removed by UID dedup only
# - Rows removed by MULTI-COL dedup only
# - Rows removed by BOTH approaches

uid_removed_indices = set(df_removed_uid.index)
multi_removed_indices = set(df_removed_multi.index)

# Removed by UID but not by multi-col
uid_only = uid_removed_indices - multi_removed_indices
df_uid_only = df.loc[list(uid_only)]

# Removed by multi-col but not by UID
multi_only = multi_removed_indices - uid_removed_indices
df_multi_only = df.loc[list(multi_only)]

# Removed by BOTH
both = uid_removed_indices & multi_removed_indices
df_both = df.loc[list(both)]

print("\n=== Comparison of removed rows ===")
print(f"Removed only by [uid] dedup:        {len(df_uid_only)}")
print(f"Removed only by {dup_columns} dedup: {len(df_multi_only)}")
print(f"Removed by BOTH methods:            {len(df_both)}")

# Optional: show a few examples
print("\nExample rows removed only by [uid] approach (first 5):")
print(df_uid_only.head(5))

print("\nExample rows removed only by multi-col approach (first 5):")
print(df_multi_only.head(5))

print("\nExample rows removed by BOTH (first 5):")
print(df_both.head(5))


Loaded 1460893 rows from data/parquet_output/all_results.parquet.

=== Dedup by [uid] only ===
Rows before: 1460893
Rows after:  1059761
Rows removed: 401132

Rows actually removed when deduplicating by [uid]: 401132

=== Dedup by ['uid', 'abstract', 'title', 'pubdate', 'mesh_terms', 'journal'] ===
Rows before: 1460893
Rows after:  1059761
Rows removed: 401132

Rows actually removed when deduplicating by ['uid', 'abstract', 'title', 'pubdate', 'mesh_terms', 'journal']: 401132

=== Comparison of removed rows ===
Removed only by [uid] dedup:        0
Removed only by ['uid', 'abstract', 'title', 'pubdate', 'mesh_terms', 'journal'] dedup: 0
Removed by BOTH methods:            401132

Example rows removed only by [uid] approach (first 5):
Empty DataFrame
Columns: [uid, title, journal, pubdate, abstract, authors, affiliations, mesh_terms, keywords, coi_statement, date]
Index: []

Example rows removed only by multi-col approach (first 5):
Empty DataFrame
Columns: [uid, title, journal, pubdate