# Scholar to parquet notebook


Notebook to deal with parquet problems in longevity genie




In [1]:
val runtime = Runtime.getRuntime
val maxMemory = runtime.maxMemory / 1024 / 1024  // Convert bytes to MB
println(s"Max memory: $maxMemory MB")

Max memory: 36864 MB


In [2]:
import io.circe.generic.auto._
import io.circe.parser._
import io.circe.parser.decode
import io.circe.syntax._
import io.circe._


In [3]:
import org.apache.spark.{io => spark_io}
import org.apache.spark.sql.types._
import scala.reflect.runtime.universe._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
import group.research.aging.spark.extensions._
import group.research.aging.spark.extensions.functions._
import kernel.display.html

In [4]:
import better.files._
import java.io.{File => JFile}

Load papers again

It is assumed the papers are in /data/papers/s2orc




In [6]:
val data = File("/data")
val s2orc = data / "papers" / "s2orc" 
val input = s2orc / "original"
val pubmed = s2orc / "pubmed"

In [7]:
val prefix = "20230714"
val paper_files = input .children.filter{f => !f.isDirectory() && f.name.startsWith(prefix)}.toList
paper_files.foreach(println)

/data/papers/s2orc/20230714_111942_00012_e64uq_a82fd724-5f8b-4dcf-9f59-d59531c59000
/data/papers/s2orc/20230714_111942_00012_e64uq_2847f4cb-dccf-4a0a-aca5-e405eb86c060
/data/papers/s2orc/20230714_111942_00012_e64uq_41a86bf3-8d2c-44b5-a5ec-d845c5e39d88
/data/papers/s2orc/20230714_111942_00012_e64uq_c50619fc-2409-4671-ab21-78f22cc66e94
/data/papers/s2orc/20230714_111942_00012_e64uq_1945e5da-e874-4e2d-8169-a447fc56bc6d
/data/papers/s2orc/20230714_111942_00012_e64uq_061ba37d-7776-4179-ae0f-a97563a170e4
/data/papers/s2orc/20230714_111942_00012_e64uq_17aa256c-f2c7-490e-a539-7ec5c767dd2b
/data/papers/s2orc/20230714_111942_00012_e64uq_87b926e2-99f6-490e-9966-c0a5d7b70fcd
/data/papers/s2orc/20230714_111942_00012_e64uq_ad89e368-674d-4fd5-aa29-4a8a0c32b545
/data/papers/s2orc/20230714_111942_00012_e64uq_b76c9370-94d3-43fa-8690-8d5fa592500d
/data/papers/s2orc/20230714_111942_00012_e64uq_11491525-0295-4d6d-9119-4afe180d474d
/data/papers/s2orc/20230714_111942_00012_e64uq_9710ad0e-0abb-4a1b-a8d8-2d0a9

In [22]:
val t = paper_files.head
val tt = spark.read.json(t.pathAsString)
tt

[content: struct<annotations: struct<abstract: string, author: string ... 21 more fields>, source: struct<oainfo: struct<license: string, openaccessurl: string ... 1 more field>, pdfsha: string ... 1 more field> ... 1 more field>, corpusid: bigint ... 2 …

Conversion papers to proper format




In [10]:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._

def flattenDataFrame(df: DataFrame): DataFrame = {
  df.select(
    $"corpusid",
    $"updated",
    $"content.source.oainfo.license".alias("content_source_oainfo_license"),
    $"content.source.oainfo.openaccessurl".alias("content_source_oainfo_openaccessurl"),
    $"content.source.oainfo.status".alias("content_source_oainfo_status"),
    $"content.source.pdfsha".alias("content_source_pdfsha"),
    $"content.source.pdfurls".alias("content_source_pdfurls"),
    $"externalids.acl".alias("externalids_acl"),
    $"externalids.arxiv".alias("externalids_arxiv"),
    $"externalids.dblp".alias("externalids_dblp"),
    $"externalids.doi".alias("externalids_doi"),
    $"externalids.mag".alias("externalids_mag"),
    $"externalids.pubmed".alias("externalids_pubmed"),
    $"externalids.pubmedcentral".alias("externalids_pubmedcentral"),
    $"content.text".alias("content_text"),
    $"content.annotations.abstract".alias("annotations_abstract"),
    $"content.annotations.author".alias("annotations_author"),
    $"content.annotations.authoraffiliation".alias("annotations_authoraffiliation"),
    $"content.annotations.authorfirstname".alias("annotations_authorfirstname"),
    $"content.annotations.authorlastname".alias("annotations_authorlastname"),
    $"content.annotations.bibauthor".alias("annotations_bibauthor"),
    $"content.annotations.bibauthorfirstname".alias("annotations_bibauthorfirstname"),
    $"content.annotations.bibauthorlastname".alias("annotations_bibauthorlastname"),
    $"content.annotations.bibentry".alias("annotations_bibentry"),
    $"content.annotations.bibref".alias("annotations_bibref"),
    $"content.annotations.bibtitle".alias("annotations_bibtitle"),
    $"content.annotations.bibvenue".alias("annotations_bibvenue"),
    $"content.annotations.figure".alias("annotations_figure"),
    $"content.annotations.figurecaption".alias("annotations_figurecaption"),
    $"content.annotations.figureref".alias("annotations_figureref"),
    $"content.annotations.formula".alias("annotations_formula"),
    $"content.annotations.paragraph".alias("annotations_paragraph"),
    $"content.annotations.publisher".alias("annotations_publisher"),
    $"content.annotations.sectionheader".alias("annotations_sectionheader"),
    $"content.annotations.table".alias("annotations_table"),
    $"content.annotations.tableref".alias("annotations_tableref"),
    $"content.annotations.title".alias("annotations_title"),
    $"content.annotations.venue".alias("annotations_venue")
  )
}


Extracting annotations

=================




Let's make 

In [14]:
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, DataFrame}

case class Span(start: Int, end: Int)

val getSubstrings: UserDefinedFunction = udf {
  (jsonStr: String, text: String) =>
    val spans = decode[List[Span]](jsonStr) match {
      case Right(spans) => spans
      case Left(_) => List()
    }

    spans.map(span =>
      if (span.start >= 0 && span.end <= text.length) text.substring(span.start, span.end) else ""
    )
}

val udfGetSubstrings = spark.udf.register("getSubstrings", getSubstrings)

In [15]:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col

def with_extracted_annotations(df: DataFrame): DataFrame = {
  val annotationsFields = df.schema.fields
    .filter(_.name.startsWith("annotations_"))
    .map(_.name)

  annotationsFields.foldLeft(df) { (data, field) =>
    data.withColumn(field, udfGetSubstrings(col(field), col("content_text")))
  }
}

Storing everything in proper format




In [18]:
for(p <- paper_files){
    val paper_nested = spark.read.json(p.pathAsString).where(col("externalids.pubmed").isNotNull)
    val paper_flatten = flattenDataFrame(paper_nested)
    val paper = with_extracted_annotations(paper_flatten)
    val where = (pubmed / (p.name + ".parquet")).pathAsString
    println(s"started to write $where")
    paper.writeParquet(where, true)
}

started to write /data/papers/s2orc/pubmed/20230714_111942_00012_e64uq_a82fd724-5f8b-4dcf-9f59-d59531c59000.parquet
parts of /data/papers/s2orc/pubmed/20230714_111942_00012_e64uq_a82fd724-5f8b-4dcf-9f59-d59531c59000.parquet merged!
started to write /data/papers/s2orc/pubmed/20230714_111942_00012_e64uq_2847f4cb-dccf-4a0a-aca5-e405eb86c060.parquet
parts of /data/papers/s2orc/pubmed/20230714_111942_00012_e64uq_2847f4cb-dccf-4a0a-aca5-e405eb86c060.parquet merged!
started to write /data/papers/s2orc/pubmed/20230714_111942_00012_e64uq_41a86bf3-8d2c-44b5-a5ec-d845c5e39d88.parquet
parts of /data/papers/s2orc/pubmed/20230714_111942_00012_e64uq_41a86bf3-8d2c-44b5-a5ec-d845c5e39d88.parquet merged!
started to write /data/papers/s2orc/pubmed/20230714_111942_00012_e64uq_c50619fc-2409-4671-ab21-78f22cc66e94.parquet
parts of /data/papers/s2orc/pubmed/20230714_111942_00012_e64uq_c50619fc-2409-4671-ab21-78f22cc66e94.parquet merged!
started to write /data/papers/s2orc/pubmed/20230714_111942_00012_e64uq_1

Loading resulting parquet files into memory