In [37]:
import os
import time
import requests
from typing import Dict, Iterable, Any
from dotenv import load_dotenv

load_dotenv()
BASE = os.getenv("OPENALEX_BASE", "https://api.openalex.org")
CONTACT = os.getenv("CONTACT_EMAIL", "mhmoslemi2338@gmail.com")

def paginate(endpoint: str, params: Dict[str, Any], max_pages: int) -> Iterable[Dict]:
    params = dict(params)
    if CONTACT:
        params["mailto"] = CONTACT
    url = f"{BASE}/{endpoint}"
    cursor = "*"
    for _ in range(max_pages):
        qp = dict(params)
        qp["cursor"] = cursor
        r = requests.get(url, params=qp, timeout=60)
        r.raise_for_status()
        js = r.json()
        for item in js.get("results", []):
            yield item
        cursor = js.get("meta", {}).get("next_cursor")
        if not cursor:
            break
        time.sleep(0.5)  # rate limit politeness


In [1]:
import pandas as pd

In [23]:
import pandas as pd

# Load parquet files
works = pd.read_parquet("data/curated/works.parquet")
auths = pd.read_parquet("data/curated/authorships.parquet")

print("Works shape:", works.shape)
print("Authorships shape:", auths.shape)

# Merge them on work_id to get a combined view
# combined = auths.merge(works, on="work_id", how="left")

# print("Combined shape:", combined.shape)

works.columns




Works shape: (100000, 6)
Authorships shape: (538529, 9)


Index(['work_id', 'title', 'year', 'venue', 'cited_by_count', 'source_issn_l'], dtype='object')

In [5]:
# save as openalex_to_parquet.py
import json
import sys
from pathlib import Path
from typing import Any, Dict, Iterable, List, Union

import pandas as pd


def _brace_split(s: str) -> List[str]:
    """Split concatenated JSON objects by brace counting (handles embedded strings/escapes)."""
    objs, depth, start = [], 0, None
    in_str = False
    esc = False
    for i, ch in enumerate(s):
        if in_str:
            if esc:
                esc = False
            elif ch == "\\":
                esc = True
            elif ch == '"':
                in_str = False
        else:
            if ch == '"':
                in_str = True
            elif ch == "{":
                if depth == 0:
                    start = i
                depth += 1
            elif ch == "}":
                depth -= 1
                if depth == 0 and start is not None:
                    objs.append(s[start : i + 1])
                    start = None
    return objs


def _read_works(path: Union[str, Path]) -> List[Dict[str, Any]]:
    """Read OpenAlex works from:
       - JSON array file
       - single JSON object
       - concatenated JSON objects in a single file.
    """
    p = Path(path)
    raw = p.read_text(encoding="utf-8").strip()

    # Try JSON array or single JSON object
    try:
        data = json.loads(raw)
        if isinstance(data, dict):  # single work
            return [data]
        if isinstance(data, list):
            return data
    except Exception:
        pass

    # Fallback: concatenated objects
    objs = _brace_split(raw)
    if not objs:
        raise ValueError("Could not parse input as JSON array/object or concatenated JSON objects.")
    out = []
    for obj in objs:
        out.append(json.loads(obj))
    return out


def _jdumps(x: Any) -> str:
    return json.dumps(x, ensure_ascii=False)





def _flatten_work(w: Dict[str, Any]) -> Dict[str, Any]:
    pl = (w.get("primary_location") or {}) or {}
    src = (pl.get("source") or {}) or {}
    oa  = (w.get("open_access") or {}) or {}
    return {
        # core
        "work_id": w.get("id"),
        "title": w.get("title"),
        "publication_year": w.get("publication_year"),
        "cited_by_count": w.get("cited_by_count"),
        # primary location
        # "pl_is_oa": pl.get("is_oa"),
        # "pl_landing_page_url": pl.get("landing_page_url"),
        # "pl_pdf_url": pl.get("pdf_url"),
        # "pl_version": pl.get("version"),
        # "pl_is_accepted": pl.get("is_accepted"),
        # "pl_is_published": pl.get("is_published"),
        # source
        "source_id": src.get("id"),
        "source_display_name": src.get("display_name"),
        "source_type": src.get("type"),
        "source_issn_l": src.get("issn_l"),
        # "source_issn": _jdumps(src.get("issn", [])),
        # "source_is_oa": src.get("is_oa"),
        # "source_is_in_doaj": src.get("is_in_doaj"),
        # "source_is_indexed_in_scopus": src.get("is_indexed_in_scopus"),
        # "source_is_core": src.get("is_core"),
        # "source_host_org": src.get("host_organization"),
        # "source_host_org_name": src.get("host_organization_name"),
        # "source_host_org_lineage": _jdumps(src.get("host_organization_lineage", [])),
        # "source_host_org_lineage_names": _jdumps(src.get("host_organization_lineage_names", [])),
        # OA summary
        # "oa_is_oa": oa.get("is_oa"),
        # "oa_status": oa.get("oa_status"),
        # "oa_url": oa.get("oa_url"),
        # "oa_any_repository_has_fulltext": oa.get("any_repository_has_fulltext"),
        # raw blobs (so nothing is lost)
        # "raw_primary_location": _jdumps(pl),
        # "raw_open_access": _jdumps(oa),
    }


# def _flatten_authorships(w: Dict[str, Any]) -> Iterable[Dict[str, Any]]:
#     auths = (w.get("authorships") or []) or []
#     for a in auths:
#         author = (a.get("author") or {}) or {}
#         yield {
#             "work_id": w.get("id"),
#             "work_title": w.get("title"),
#             "author_position": a.get("author_position"),
#             # "is_corresponding": a.get("is_corresponding"),
#             "raw_author_name": a.get("raw_author_name"),
#             "raw_affiliation_strings": "; ".join((a.get("raw_affiliation_strings") or []) or []),
#             "author_id": author.get("id"),
#             "author_display_name": author.get("display_name"),
#             # "author_orcid": author.get("orcid"),
#             "countries": "; ".join((a.get("countries") or []) or []),
#             # preserve full affiliation data as JSON
#             "institutions_json": _jdumps(a.get("institutions", [])),
#             "affiliations_json": _jdumps(a.get("affiliations", [])),
#         }
def _flatten_authorships(w: Dict[str, Any]) -> Iterable[Dict[str, Any]]:
    auths = (w.get("authorships") or []) or []
    for a in auths:
        author = (a.get("author") or {}) or {}

        # Extract institution names
        institutions = a.get("institutions", []) or []
        institution_names = "; ".join([inst.get("display_name", "") for inst in institutions if inst.get("display_name")])

        # Extract affiliation strings
        affils = a.get("affiliations", []) or []
        affiliation_strings = "; ".join([aff.get("raw_affiliation_string", "") for aff in affils if aff.get("raw_affiliation_string")])

        yield {
            "work_id": w.get("id"),
            "work_title": w.get("title"),
            "author_id": author.get("id"),
            "author_display_name": author.get("display_name"),
            "author_position": a.get("author_position"),
            "is_corresponding": a.get("is_corresponding"),
            # "raw_author_name": a.get("raw_author_name"),
            # "author_orcid": author.get("orcid"),
            "countries": "; ".join((a.get("countries") or []) or []),
            # new flattened fields
            "institution_names": institution_names,
            "affiliation_strings": affiliation_strings,
            # keep raw JSON in case
            # "institutions_json": json.dumps(institutions, ensure_ascii=False),
            # "affiliations_json": json.dumps(affils, ensure_ascii=False),
        }


def write_parquet(df: pd.DataFrame, path: Union[str, Path]) -> None:
    # Try pyarrow, then fastparquet
    last_err = None
    for eng in ("pyarrow", "fastparquet"):
        try:
            df.to_parquet(path, index=False, engine=eng)
            return
        except Exception as e:
            last_err = e
    raise RuntimeError(
        "Failed to write Parquet. Install one of the engines:\n"
        "  pip install pyarrow\n"
        "  # or\n"
        "  pip install fastparquet\n"
        f"Last error: {last_err}"
    )


def convert(input_path: Union[str, Path],
            works_out: Union[str, Path] = "works.parquet",
            authorships_out: Union[str, Path] = "authorships.parquet") -> None:
    works_raw = _read_works(input_path)

    works_rows = [_flatten_work(w) for w in works_raw]
    auth_rows = [row for w in works_raw for row in _flatten_authorships(w)]

    works_df = pd.DataFrame(works_rows)
    auth_df = pd.DataFrame(auth_rows)

    write_parquet(works_df, works_out)
    write_parquet(auth_df, authorships_out)
    print(f"Saved: {works_out}  ({len(works_df)} rows)")
    print(f"Saved: {authorships_out}  ({len(auth_df)} rows)")


if __name__ == "__main__":
    inp = '/Users/mohammad/Downloads/citation-bias-explorer/data/raw/works.jsonl'
    wout = sys.argv[2] if len(sys.argv) >= 3 else "works.parquet"
    aout = sys.argv[3] if len(sys.argv) >= 4 else "authorships.parquet"
    convert(inp, wout, aout)

inp = '/Users/mohammad/Downloads/citation-bias-explorer'
wout = "tmp/works.parquet"
aout = "tmp/authorships.parquet"
convert(inp, wout, aout)



# python openalex_to_parquet.py input.json works.parquet authorships.parquet



Saved: works.parquet  (100000 rows)
Saved: authorships.parquet  (538410 rows)


IsADirectoryError: [Errno 21] Is a directory: '/Users/mohammad/Downloads/citation-bias-explorer'

In [10]:
import pandas as pd

# Load parquet files
works = pd.read_parquet("data/curated/works.parquet")
auths = pd.read_parquet("data/curated/authorships.parquet")

print("Works shape:", works.shape)
print("Authorships shape:", auths.shape)

# Merge them on work_id to get a combined view
# combined = auths.merge(works, on="work_id", how="left")

# print("Combined shape:", combined.shape)

# auths.head().to_csv('tmp/works.csv', index=False)



Works shape: (100000, 12)
Authorships shape: (538529, 9)


In [2]:
import os
import json
from dotenv import load_dotenv
from tqdm import tqdm

load_dotenv()
os.makedirs("data/raw", exist_ok=True)
out_path = "data/raw/works.jsonl"
from_year = os.getenv("FROM_YEAR")
per_page = int(os.getenv("WORKS_PER_PAGE"))
max_pages = int(os.getenv("WORKS_MAX_PAGES"))
filed = (os.getenv("CONCEPT_ID_FILED"))





params = {
    "filter": f"concepts.id:{filed},from_publication_date:{from_year}-01-01,type_crossref:journal-article",
    "per_page": per_page,
    "select": ",".join([
        "id","title","authorships",
        "primary_location","cited_by_count",
        "publication_year","open_access"
    ])
}



count = 0
with open(out_path, "w", encoding="utf-8") as f:
    for obj in tqdm(paginate("works", params=params, max_pages=max_pages)):
        f.write(json.dumps(obj, ensure_ascii=False) + "\n")
        count += 1
print(f"Saved {count} works to {out_path}")


NameError: name 'paginate' is not defined

In [3]:
import requests

endpoint = "works"
CONTACT = os.getenv("CONTACT_EMAIL", "mhmoslemi2338@gmail.com")
BASE = os.getenv("OPENALEX_BASE", "https://api.openalex.org")

import time
i=0

params = dict(params)
if CONTACT:
    params["mailto"] = CONTACT
url = f"{BASE}/{endpoint}"
cursor = "*"
for _ in range(max_pages):
    i+=1
    qp = dict(params)
    qp["cursor"] = cursor
    r = requests.get(url, params=qp, timeout=60)
    r.raise_for_status()
    js = r.json()
    j=0
    for item in js.get("results", []):
        # print(item)
        # continue
        j+=1
        print(j,end = ', ')
    print()
    cursor = js.get("meta", {}).get("next_cursor")
    if not cursor:
        break
    time.sleep(0.5)  # rate limit politeness
    print(i,'page done')




1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 
1 page done


KeyboardInterrupt: 

In [None]:
qp

{'filter': 'concepts.id:C41008148,from_publication_date:2020-01-01,type_crossref:journal-article',
 'per_page': 200,
 'select': 'id,title,authorships,primary_location,cited_by_count,publication_year,open_access',
 'mailto': 'mhmoslemi2338@gmail.com',
 'cursor': 'IlsxMDAuMCwgMjAzMiwgJ2h0dHBzOi8vb3BlbmFsZXgub3JnL1c0Mzc4MjEyNTQ0J10i'}

In [None]:

    

def get_total_count(params):
    url = f"{BASE}/{endpoint}"
    
    r = requests.get(url, params=qp, timeout=60)

    r.raise_for_status()
    return r.json()["meta"]["count"]



total = get_total_count(params)
print(f"Total works available: {total}")    


Total works available: 9822607


In [3]:
# from pyspark.sql.types import *
!pip3 install pyspark

Collecting pyspark
  Downloading pyspark-4.0.1.tar.gz (434.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m434.2/434.2 MB[0m [31m5.8 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.9
  Downloading py4j-0.10.9.9-py2.py3-none-any.whl (203 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m203.0/203.0 KB[0m [31m10.2 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-4.0.1-py2.py3-none-any.whl size=434813815 sha256=9c18653f7b2ad40fc628b964c59ed8e5c4f13af9fdb30b7523ee810c986c40c0
  Stored in directory: /Users/mohammad/Library/Caches/pip/wheels/10/e6/6b/c50eb601fa827dd56a5272db5d5db360e559e527a80a665b1d
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.9 pyspark-4.0.1


In [4]:
from pyspark.sql.types import *


works_schema = StructType([
    StructField("id", StringType()),
    StructField("title", StringType()),
    StructField("authorships", ArrayType(StructType([
        StructField("author", StructType([
            StructField("id", StringType()),
            StructField("display_name", StringType())
        ])),
        StructField("institutions", ArrayType(StructType([
            StructField("id", StringType()),
            StructField("display_name", StringType()),
            StructField("country_code", StringType())
        ]))),
        StructField("author_position", StringType())
    ]))),
    StructField("host_venue", StructType([
        StructField("id", StringType()),
        StructField("display_name", StringType()),
        StructField("publisher", StringType())
    ])),
    StructField("cited_by_count", IntegerType()),
    StructField("publication_year", IntegerType()),
    StructField("open_access", StructType([
        StructField("is_oa", BooleanType())
    ]))
])

In [6]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode


def load_raw_works(spark):
    return spark.read.schema(works_schema).json("data/raw/works.jsonl")

def curate_works(df):
    return df.select(
        col("id").alias("work_id"),
        col("title"),
        col("publication_year").alias("year"),
        col("host_venue.display_name").alias("venue"),
        col("cited_by_count"),
        col("open_access.is_oa").alias("is_oa")
    )

def flatten_authorships(df):
    a = df.select(
        col("id").alias("work_id"),
        explode("authorships").alias("auth")
    )
    a = a.select(
        "work_id",
        col("auth.author.id").alias("author_id"),
        col("auth.author.display_name").alias("author_name"),
        col("auth.author_position").alias("author_pos"),
        explode("auth.institutions").alias("inst")
    )
    a = a.select(
        "work_id","author_id","author_name","author_pos",
        col("inst.id").alias("institution_id"),
        col("inst.display_name").alias("institution_name"),
        col("inst.country_code").alias("country_code"),
    )
    return a

spark = SparkSession.builder.appName("citation_bias_etl").getOrCreate()
df_raw = load_raw_works(spark).dropna(subset=["id"])
works = curate_works(df_raw)
auth = flatten_authorships(df_raw)

os.makedirs("data/curated", exist_ok=True)
works.write.mode("overwrite").parquet("data/curated/works.parquet")
auth.write.mode("overwrite").parquet("data/curated/authorships.parquet")
print("ETL complete.")
spark.stop()


25/09/16 20:20:24 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/09/16 20:20:27 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

ETL complete.


In [4]:
import os
print(os.environ.get("JAVA_HOME"))

/usr/lib/jvm/java-8-openjdk-amd64


In [1]:
import os
print(os.environ.get("JAVA_HOME"))

/opt/homebrew/Cellar/openjdk@17/17.0.16/libexec/openjdk.jdk/Contents/Home


In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
print(spark.version)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/16 20:19:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


4.0.1
