# Data extraction, cleaning, summarization, and scoring

*Matthew Fecco*



# PySpark setup and UDF functions

In [None]:
%pip install --upgrade pyspark
%pip install --upgrade torch
%pip install --upgrade transformers
%pip install --upgrade pandas
%pip install --upgrade nltk
%pip install --upgrade rouge-score
%pip install --upgrade bert_score
%pip install --upgrade wordcloud


In [None]:
import pyspark as sp
from pyspark.sql import SparkSession


# Initialized SparkSession to use pyspark.sql df api
sc = SparkSession.builder.getOrCreate()


In [None]:
# If using colab
# from google.colab import drive

# # Mount Google Drive
# drive.mount('/content/drive')



raw_file = '../NOTEEVENTS.csv'

# Load raw dataset csv
df = sc.read.csv(raw_file, header=True, multiLine=True, quote='"', escape='"', inferSchema=True)

In [None]:
# Function to perform segmented summarizations using huggingface pipeline


from pyspark.sql.functions import StringType
from transformers import pipeline

med_T5_model = "Falconsai/medical_summarization"
cnn_Bart_model = "facebook/bart-large-cnn"
google_pegasus_model = "google/pegasus-xsum"

summarizer = pipeline(task = "summarization", model= google_pegasus_model)

def summarize_notes(text):
  if text is None:
    return ""

  max_chunk_size = 1024
  summary_text = ""
  l = len(text)

  # Break text into chunks to avoid token limit issues
  for i in range(0, l, max_chunk_size):
      chunk = text[i:i + max_chunk_size]

      try:
          summary = summarizer(chunk, max_length=100, min_length=50, do_sample=False)
          summary_text += summary[0]['summary_text'] + " "
      except Exception as e:
          print(f"Error summarizing chunk {i // max_chunk_size}: {e}")
          continue

  return summary_text

# Tried to use this function in pyspark setup, but the pipeline calls overloaded pyspark helpers
summarize_notes_udf = sp.sql.functions.udf(summarize_notes, StringType())

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

def filter_discharge(report):
  if report is None:
    return False
  keyWords = ['laceration', 'lacerations', 'abrasion', 'abrasions','burns', 'burn', 'necrotic tissue',
              'granular tissue', 'granulation tissue', 'cut', 'cuts', 'bandage', 'degloving', 'wound','wounds', 'fractures']

  hits = sum(1 for word in keyWords if word in report.lower())

  return hits >= 3

filter_discharge_udf = sp.sql.functions.udf(filter_discharge, BooleanType())

In [None]:
import re

def clean_text(text):
  if text is None:
    return ""

  text_alpha = re.sub(r'\[(.*?)\]',' ', text)

  return text_alpha

clean_text_udf = sp.sql.functions.udf(clean_text, StringType())

In [None]:
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from pyspark.sql.types import FloatType

def bleu_scoring(human_report, model_summary):
  if human_report is None or model_summary is None:
    return 0.0

  smoothing = SmoothingFunction().method4

  return sentence_bleu(references=[human_report.split()], hypothesis=model_summary.split(), smoothing_function=smoothing)

bleu_scoring_udf = sp.sql.functions.udf(bleu_scoring, FloatType())

In [None]:
from rouge_score.rouge_scorer import RougeScorer
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

def rouge_scoring(human_report, model_summary):

  if human_report is None or model_summary is None:
    return 0.0
    
  scorer = RougeScorer(['rouge1'], use_stemmer=True)
  scores = scorer.score(human_report,model_summary)
  return scores['rouge1'].fmeasure

rouge_scoring_udf = sp.sql.functions.udf(rouge_scoring, FloatType())

# Feature Extraction, Cleaning, and Summarization

In [None]:
# Isolate all discharge summaries, filter, and clean them

discharge_reports = df.where(df.CATEGORY == 'Discharge summary')
filtered_reports = discharge_reports.filter(filter_discharge_udf(discharge_reports.TEXT))
cleaned_filtered_reports = filtered_reports.withColumn('CLEANED', clean_text_udf(filtered_reports.TEXT))


In [None]:
# collect cleaned and filtered reports for non parralized summarization
sampled_reports = cleaned_filtered_reports.select('CLEANED').limit(1024).collect()

In [None]:
import pandas as pd
# save for repeated tests since

# convert to list
sampled_reports_list = [report['CLEANED'] for report in sampled_reports] 

# save for repeated tests since pyspark collect is non-deterministic
sampled_reports_df = pd.DataFrame({"REPORT": sampled_reports_list})
sampled_reports_df.to_csv('raw_reports.csv', index=False)


In [None]:
from pathlib import Path

def log_summaries(raw_reports, logfile: str):

  summaries = []

  for report in raw_reports:

    try:
      summary = summarize_notes(report)
    except Exception as e:
      print(f"Summarization failed: {e}")
      summary = ""

    summaries.append(summary)
  
  logfile_path = Path(logfile)

  # create df for return and logging
  df = pd.DataFrame({'REPORT': raw_reports, 'SUMMARY': summaries})
  df.to_csv(logfile, index=False, mode='a', header=not logfile_path.exists())

  return df

Checkpoint for file loads and summarization outside of pyspark environment

In [None]:
# load raw_reports if needed
import pandas as pd

csv_file_path = 'raw_reports.csv'


sampled_reports_df = pd.read_csv(csv_file_path)

sampled_reports_list = sampled_reports_df['REPORT'].tolist()

In [None]:
# implementation for MapReduced method of summarization. Requires active SparkSession

# specify output file 
outputFile = ''

spark_df = sc.createDataFrame(sampled_reports_df)

summaries = spark_df.withColumn('SUMMARY', summarize_notes_udf(spark_df.REPORT))

summary_df = summaries.toPandas()

summary_df.to_csv(outputFile, index=False)

In [None]:

# set output file path
output_path = 'Bart_sum.csv'

# batching for colab execution


l = len(sampled_reports_list)

batch_size = 10

for i in range(0, l, batch_size):
  batch = sampled_reports_list[i:i+batch_size]
  log_summaries(batch, "Bart_sum.csv")
  print(f"batch {i}-{i+batch_size} complete ")

# Setup for Gemini flash 2.0 summary

In [None]:
def set_prompt(report):

  prompt = f"""### Instruction ###
  Please summarize the following discharge report.

  ### Discharge Report ###
  {report}

  """
  return prompt

In [None]:
import time
from google.colab import userdata
import google.generativeai as genai
import pandas as pd

# Get Gemini API key

api_key = userdata.get('gemini')
client = genai.configure(api_key=api_key)
model = genai.GenerativeModel("gemini-2.0-flash")

summaries =[]
batch = 0
for report in sampled_reports_list:
  batch += 1
  prompt = set_prompt(report)
  response = model.generate_content(prompt)
  summaries.append(response.text)
  if batch % 15 == 0:
    time.sleep(60)


data = {'REPORT': sampled_reports_list,
        'SUMMARY': summaries}
df = pd.DataFrame(data)

df.to_csv('gemini_sum.csv', index=False)


# Bleu Scoring and Rouge Scoring

In [None]:
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from rouge_score import rouge_scorer
import pandas as pd

# Calculate Bleu and Rouge score for summarization quality using MapReduce using active SparkSession
raw_df = pd.read_csv('Summaries/Pegasus_sum.csv')

df = raw_df.rename(columns={'REPORT': 'CLEANED'})
spark_df = sc.createDataFrame(df)

bleu_scores = spark_df.withColumn('Bleu Score', bleu_scoring_udf(spark_df.CLEANED, spark_df.SUMMARY))
scores = bleu_scores.withColumn('Rouge Score', rouge_scoring_udf(bleu_scores.CLEANED, bleu_scores.SUMMARY))


scores_df = scores.toPandas()


scores_df.to_csv('Summaries/Pegasus_sum_scores', index=False)


# BERT Score

In [None]:
%pip install bert_score
from bert_score import score
import pandas as pd

Checkpoint for loading csv files to conduct BERTscore metric

In [None]:
logfile = 'Scores/Pegasus_sum_scores.csv'

df = pd.read_csv(logfile)

human_reports = df['CLEANED'].tolist()
model_summaries = df['SUMMARY'].tolist()

P, R, F1 = score(human_reports, model_summaries, lang='en', verbose=True)

In [None]:
d = {'F1': F1, 'R': R, 'P':P}


df = pd.DataFrame(data = d)

df.to_csv("Scores/Pegasus_temp_BERT_score.csv", index=False)

# Prepare Docs and Embed

In [None]:
%pip install -qU langchain-huggingface
%pip install -qU langchain-community
%pip install faiss-cpu
%pip install huggingface_hub
%pip install -qU langchain_community pypdf

In [None]:
from langchain_community.document_loaders.csv_loader import CSVLoader
from langchain_core.documents import Document
import pandas as pd

# load raw document file
data_file = "raw_reports.csv"
df = pd.read_csv(data_file)



# loader = TextLoader(data_file, encoding='utf8')
loader = CSVLoader(data_file, source_column="REPORT")


docs = loader.load()

# isolate the SUMMARY column to create documents for embedding, add $ as seperator symbol for organizaiton

summary_docs = [Document('$'+doc.metadata['source']) for doc in docs]

In [None]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings

# split documents along seperator symbol
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200, separators='$')
texts = text_splitter.split_documents(summary_docs)



In [None]:
import pandas as pd
from pathlib import Path


def embed_and_log(vector_store, texts, embeddings=None, logfile=None):

  if embeddings is None:
    embedding_model = "sentence-transformers/all-mpnet-base-v2"
    embeddings = HuggingFaceEmbeddings(model_name=embedding_model)
  

  # List of text documents and remove seperator symbol
  raw_texts = [d.page_content[1:] for d in texts]

  # Generate embeddings
  embedding_vectors = embeddings.embed_documents(raw_texts)

  df = pd.DataFrame({'EMBEDDING': embedding_vectors, 'TEXT': raw_texts})

  # Add embeddings to vector_store
  vector_store.add_texts(texts = raw_texts, embeddings=embedding_vectors)

  if logfile:
    logfile_path = Path(logfile)
    try:
      df.to_csv(logfile, index=False, mode='a', header=not logfile_path.exists())
    except IOError as e:
      print(f"Error writing to logfile: {e}")

  return df


In [None]:
from langchain_community.docstore.in_memory import InMemoryDocstore
from langchain_community.vectorstores import FAISS
import faiss


# load embedding model
embeddings = HuggingFaceEmbeddings(model_name = "sentence-transformers/all-mpnet-base-v2")

# create FAISS vector store
embedding_dim = len(embeddings.embed_query("Hello World"))
index = faiss.IndexFlatL2(embedding_dim)

vector_store = FAISS(
                     embedding_function = embeddings,
                     index=index,
                     docstore=InMemoryDocstore(),
                     index_to_docstore_id={},
)

# Generate embeddings and add to vector store
# Return df of raw_text and embedded vector value

log_df = embed_and_log(vector_store, texts, embeddings=embeddings, logfile="Embeddings/raw_reports_embeddings.csv")


# Lexical comparison using Word Clouds

In [3]:
import pandas as pd


bart_df = pd.read_csv("Summaries/Bart_sum.csv")

gemini_df = pd.read_csv("Summaries/gemini_sum.csv")

pegasus_df = pd.read_csv("Summaries/Pegasus_sum.csv")

T5_df = pd.read_csv("Summaries/T5_sum.csv")

raw_df = pd.read_csv("raw_reports.csv")

# set the column for easy handling
raw_df.columns = ['SUMMARY']

models = ['Bart', 'Gemini', 'Pegasus', 'T5', 'Raw']
data = [bart_df, gemini_df, pegasus_df, T5_df, raw_df]

summaries_dict = {}

for model, df in zip(models, data):
    summaries_dict[model] = df['SUMMARY']



In [4]:
import re
from wordcloud import STOPWORDS


def clean_clouds(summary_list):
    # join all strings to one
    text = ' '.join(summary_list)
    # remove punctuation
    text = re.sub(r'[^A-Za-z\s]', '', text)
    # normalize to all lowercase
    text = text.lower()

    # clean list of summarys of common stopwords (the, is, was, ...) and short words or abbreviations
    stopwords = set(STOPWORDS)
    text = ' '.join(word for word in text.split() if word not in stopwords and len(word)>3)

    return text


In [24]:
from wordcloud import WordCloud
import matplotlib.pyplot as plt


def generate_cloud(text, model):



    wordcloud = WordCloud(
                        width=800,
                        height=400,
                        collocations=False,
                        background_color='black').generate(clean_clouds(text))

    plt.figure(figsize=(10, 5))
    plt.imshow(wordcloud, interpolation='bilinear')
    plt.axis('off')  
    plt.title(model+ ' Word Cloud')
    plt.show()

In [None]:
for model in summaries_dict.keys():
    generate_cloud(summaries_dict[model], model)