# OpenAIRE Community Data Dump Handling: Extraction, Tranformation and Enrichment

In this notebook we will start with one of the [OpenAIRE Community Subgraphs](https://graph.openaire.eu/docs/downloads/subgraphs) to enrich that informatino for further analysis.

This data process will extract an [OpenAIRE community data dump from Zenodo](https://doi.org/10.5281/zenodo.3974604), transforms it in to a portable file format .parquet (and updatable with changes for time seires analysis), that can be used to query with DuckDB, to enrich this with additional data  (also in .parquet, for join queries).

This additional data can be societal impact data from [Altmetric.com](https://details-page-api-docs.altmetric.com/) or [Overton.io](https://app.overton.io/swagger.php), Gender data using [genderize.io](https://genderize.io/documentation), sdg classification using [aurora-sdg](https://aurora-universities.eu/sdg-research/sdg-api/)

This script needs to be written in a way so that it can run every month using  the latest data.

## Processing steps

* the folder ./data/ is put in .gitignore to prevent that bulk datais sent to a code repository. So make sure that folder exists, and mkdir if not exists. 
* The script downloads the lastest Data Dump Tar file from one selected community. See https://doi.org/10.5281/zenodo.3974604 for the latest list. In our case the Aurora tar file. https://zenodo.org/records/14887484/files/aurora.tar?download=1
  * Use the json record of zenodo to get to the latest record, and fetch the download link of the aurora.tar file. for example : https://zenodo.org/records/14887484/export/json or https://zenodo.org/api/records/14887484/versions/latest 
  Make the tar filename a variable, so it can be used for multiple community dumps.
  Download the tar file in a target folder ./data/{filename+timestamp}/ where a subfolder is created using the filename and the timestamp. Make this also as a  variable to use later on.
* Extract the tar file, to the compressed .json.gz files and put these in target folder ./data/{filename+timestamp}/01-extracted/
* Transform the compressed .json.gz files into a single .parquet file in target folder ./data/{filename+timestamp}/02-transformed/
Use instructions in sections "Processing JSON files with DuckDB" and "Full dataset, bit by bit" and "Splitting and Processing JSON Files in Batches" https://github.com/mosart/OpenAIRE-tools/blob/main/duckdb-querying.ipynb to start with. (be aware of error messages, and fix the issues to get all the data in)
* Extract the SQL schema (schema-datadump.sql) from the .parquet file and put it in target folder ./data/{filename+timestamp}/02-transformed/ This is needed for further processing of the records with DuckDB later on.
Use instructions in section "Extracting Schema from Parquet File" https://github.com/mosart/OpenAIRE-tools/blob/main/duckdb-querying.ipynb to start with.
* Query to get all identifiers: openaire id, doi, isbn, hdl, etc.
* **Get Altmetric data:**
* Extract the Altmetric data using the Identifiers. put that in target folder ./data/{filename+timestamp}/03-altmetric-extracted/
* Transform the Altmetric data to a single .parquet file, with the identifiers. put that in target folder ./data/{filename+timestamp}/04-altmetric-transformed/ This way duckDB can make a join when querying over multiple parquet files.
* Extract the SQL schema (schema-altmetric.sql) from the .parquet file and put it in target folder ./data/{filename+timestamp}/04-altmetric-transformed/
* **Get Overton data:** Repeat the altmetric steps, bun than for Overton.
* **Get Gender data** query for the Author names and country codes, and run them over the gerderize api
* **Get SDG data** query for the abstracts, and run abstracs larger than 100 tokens over the aurora-SDG api.

ss

## Step 1 : Get the latest Community Dump File

In [14]:
import requests
import json

# Fetch the JSON data from the URL
url = "https://zenodo.org/api/records/14887484/versions/latest"
response = requests.get(url)
data = response.json()

# Extract the files information
files = data.get("files", [])

# Create a list of dictionaries for the .tar files
tar_files = []
for file in files:
    if file["key"].endswith(".tar"):
        tar_files.append({
            "filename": file["key"],
            "size": f"{file['size'] / (1024**3):.2f} GB",  # Convert bytes to GB
            "downloadlink": file["links"]["self"],
            "checksum": file["checksum"]
        })

# print the tar files
# If no tar files found, print a message
if not tar_files:
    print("No .tar files found in the dataset.")
else:
    print(f"Found {len(tar_files)} .tar files in the dataset.")
    print("Details of .tar files:")
    print(tar_files)

# get and print the publication date
publication_date = data.get("metadata", {}).get("publication_date", "Unknown")
print(f"Publication date: {publication_date}")
# get and print the DOI
doi = data.get("doi", "Unknown")
print(f"DOI: {doi}")
# get and print the title
title = data.get("title", "Unknown")
print(f"Title: {title}")

Found 37 .tar files in the dataset.
Details of .tar files:
[{'filename': 'energy-planning_1.tar', 'size': '6.99 GB', 'downloadlink': 'https://zenodo.org/api/records/14887484/files/energy-planning_1.tar/content', 'checksum': 'md5:0a2f551db46a9e629bb1d0a0098ae5cd'}, {'filename': 'edih-adria_1.tar', 'size': '5.86 GB', 'downloadlink': 'https://zenodo.org/api/records/14887484/files/edih-adria_1.tar/content', 'checksum': 'md5:23559bed5a9023398b431777bdc8a126'}, {'filename': 'uarctic_1.tar', 'size': '9.75 GB', 'downloadlink': 'https://zenodo.org/api/records/14887484/files/uarctic_1.tar/content', 'checksum': 'md5:302e3844ebd041c5f4ed94505eb9a285'}, {'filename': 'netherlands_1.tar', 'size': '3.91 GB', 'downloadlink': 'https://zenodo.org/api/records/14887484/files/netherlands_1.tar/content', 'checksum': 'md5:d1416c058b3961483aac340750ea8726'}, {'filename': 'knowmad_1.tar', 'size': '10.08 GB', 'downloadlink': 'https://zenodo.org/api/records/14887484/files/knowmad_1.tar/content', 'checksum': 'md5:

In [2]:
# Create a DataFrame to hold the tar files information for later use.

import pandas as pd

# Convert the list of dictionaries to a DataFrame
df_tar_files = pd.DataFrame(tar_files)

# Sort the DataFrame by filename alphabetically
df_tar_files = df_tar_files.sort_values(by='filename')

# Print the DataFrame
print(df_tar_files)

                      filename      size  \
5              argo-france.tar   0.00 GB   
8                   aurora.tar   1.73 GB   
22                  beopen.tar   0.20 GB   
6                   civica.tar   0.23 GB   
7                 covid-19.tar   2.03 GB   
23                  dariah.tar   0.02 GB   
9                    dh-ch.tar   1.16 GB   
11                     dth.tar   0.01 GB   
1             edih-adria_1.tar   5.86 GB   
12                  egrise.tar   0.02 GB   
25               elixir-gr.tar   0.01 GB   
0        energy-planning_1.tar   6.99 GB   
27                enermaps.tar   1.59 GB   
24              eu-conexus.tar   0.18 GB   
26                     eut.tar   0.21 GB   
15                 eutopia.tar   1.60 GB   
28                 forthem.tar   0.91 GB   
10        heritage-science.tar   0.03 GB   
29                   inria.tar   0.27 GB   
14               iperionhs.tar   0.00 GB   
4                knowmad_1.tar  10.08 GB   
19               knowmad_2.tar  

In [3]:
import signal

# Function to handle timeout
def timeout_handler(signum, frame):
    raise TimeoutError

# Set the timeout handler for the input
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(10)  # Set the timeout to 10 seconds

try:
    # Ask the user to select a tar file by its index
    print("Available tar files:")
    print(df_tar_files[['filename', 'size']].reset_index())
    selected_index = int(input("Enter the index of the tar file you want to download: "))
except TimeoutError:
    print("No response received. Defaulting to index 1.")
    selected_index = 1
finally:
    signal.alarm(0)  # Disable the alarm

# Get the selected tar file's download link and checksum
selected_file = df_tar_files.iloc[selected_index]
downloadlink = selected_file['downloadlink']
checksum = selected_file['checksum']

print(f"Selected file: {selected_file['filename']}")
print(f"Download link: {downloadlink}")
print(f"Checksum: {checksum}")

Available tar files:
    index                    filename      size
0       5             argo-france.tar   0.00 GB
1       8                  aurora.tar   1.73 GB
2      22                  beopen.tar   0.20 GB
3       6                  civica.tar   0.23 GB
4       7                covid-19.tar   2.03 GB
5      23                  dariah.tar   0.02 GB
6       9                   dh-ch.tar   1.16 GB
7      11                     dth.tar   0.01 GB
8       1            edih-adria_1.tar   5.86 GB
9      12                  egrise.tar   0.02 GB
10     25               elixir-gr.tar   0.01 GB
11      0       energy-planning_1.tar   6.99 GB
12     27                enermaps.tar   1.59 GB
13     24              eu-conexus.tar   0.18 GB
14     26                     eut.tar   0.21 GB
15     15                 eutopia.tar   1.60 GB
16     28                 forthem.tar   0.91 GB
17     10        heritage-science.tar   0.03 GB
18     29                   inria.tar   0.27 GB
19     14          

In [4]:
# Path Variables

# Path to save the downloaded tar file using file_name variable
download_path = f"./data/01_input/{selected_file['filename']}"

# Path to save the extracted files
extraction_path = "./data/02_extracted"

print(f"Download Path File: {download_path}")
print(f"Extraction Path Folder: {extraction_path}")

Download Path File: ./data/01_input/aurora.tar
Extraction Path Folder: ./data/02_extracted


### Download the tar file

In [None]:
import os

# Ensure the directory for the download path exists
os.makedirs(os.path.dirname(download_path), exist_ok=True)

# Check if the file already exists
if not os.path.exists(download_path):
    # Get the file size in bytes
    file_size_bytes = int(selected_file['size'].split()[0]) * (1024**3)  # Convert GB to bytes
    print(f"Downloading file: {selected_file['filename']} ({selected_file['size']})")
    
    # Estimate download duration assuming an average speed of 10 MB/s
    avg_speed = 10 * (1024**2)  # 10 MB/s in bytes
    estimated_duration = file_size_bytes / avg_speed
    print(f"Estimated download time: {estimated_duration:.2f} seconds")
    
    # Download the selected tar file
    response = requests.get(downloadlink, stream=True)
    with open(download_path, 'wb') as file:
        for chunk in response.iter_content(chunk_size=8192):
            file.write(chunk)
else:
    print(f"File already exists: {download_path}")

print(f"Download complete: {download_path}")


File already exists: ./data/01_input/aurora.tar
Download complete: ./data/01_input/aurora.tar


In [6]:
import hashlib

# Function to calculate the checksum of a file
def calculate_checksum(file_path, algorithm):
    hash_func = hashlib.new(algorithm)
    with open(file_path, 'rb') as f:
        while chunk := f.read(8192):
            hash_func.update(chunk)
    return hash_func.hexdigest()

# Extract the checksum algorithm and value
checksum_parts = checksum.split(':', 1)
checksum_algorithm = checksum_parts[0]
expected_checksum = checksum_parts[1]

# Calculate the checksum of the downloaded file
calculated_checksum = calculate_checksum(download_path, algorithm=checksum_algorithm)

# Compare the calculated checksum with the provided checksum
if calculated_checksum == expected_checksum:
    print("Checksum verification passed.")
else:
    print("Checksum verification failed.")
    print(f"Expected: {expected_checksum}")
    print(f"Calculated: {calculated_checksum}")

Checksum verification passed.


## Step 2: Extract the tar file

In [7]:
import os
import tarfile

In [8]:

# Check if the extraction directory already exists and contains files
if os.path.exists(extraction_path) and os.listdir(extraction_path):
    print("The tar file has already been extracted.")
else:
    # Create the directory if it doesn't exist
    os.makedirs(extraction_path, exist_ok=True)

    # Extract the tar file
    with tarfile.open(download_path, 'r') as tar:
        tar.extractall(path=extraction_path)

    print("Extraction complete.")
    

The tar file has already been extracted.


In [11]:
# List the extracted files
extracted_files = os.listdir(extraction_path)

# count he number of files in the extracted folder
num_files = len(extracted_files)
print(f"Number of files: {num_files}")

# print the first 5 files
print("First 5 files:")
for file in extracted_files[:5]:
    print(file) 

# print the added subdirectories
subdirectories = [file for file in extracted_files if os.path.isdir(os.path.join(extraction_path, file))]
print("Subdirectories:")
for subdirectory in subdirectories:
    print(subdirectory)

# print the latest added subdirectory based on date modified
latest_subdirectory = sorted(subdirectories, key=lambda x: os.path.getmtime(os.path.join(extraction_path, x)))[-1]
print(f"Latest subdirectory: {latest_subdirectory}")

# make varable for the path to the latest subdirectory
latest_extraction_path = os.path.join(extraction_path, latest_subdirectory)

# print the path of the latest extraction path
print(f"Latest extraction path: {latest_extraction_path}")

# make a DataFrame for the extracted files
df_extracted_files = pd.DataFrame({
    'filename': extracted_files,
    'is_directory': [os.path.isdir(os.path.join(extraction_path, f)) for f in extracted_files]
})
# Sort the DataFrame by filename
df_extracted_files = df_extracted_files.sort_values(by='filename')
# Print the DataFrame
print(df_extracted_files)

Number of files: 1
First 5 files:
aurora
Subdirectories:
aurora
Latest subdirectory: aurora
Latest extraction path: ./data/02_extracted/aurora
  filename  is_directory
0   aurora          True


## Step 3: Get a data sample to generate parquetfile and the SQL schema
We do this before we process the bulk of the data.

In [10]:
import duckdb

# Ensure the target directory exists
os.makedirs("./data/03_transformed", exist_ok=True)

# Define the target output file path
output_file = f"./data/03_transformed/{latest_subdirectory}-sample.parquet"

#print the output file path
print(f"Output file path: {output_file}")

Output file path: ./data/03_transformed/aurora-sample.parquet


The code below was my original starting point but it failed:

In [None]:
# Connect to an in-memory DuckDB database
con = duckdb.connect()

# Use DuckDB to process the extracted JSON files and save a sample of 1 rows as a Parquet file
con.sql(f'''
    COPY (
        SELECT unnest(items, max_depth:=2)
        FROM (
            SELECT *
            FROM read_json('{latest_extraction_path}/*.json.gz', sample_size=1, union_by_name=true)
        )
    )
    TO '{output_file}' (FORMAT parquet, COMPRESSION gzip)
''')

print(f"Transformed data saved to: {output_file}")

# Close the DuckDB connection
con.close()

this is my next itteration based on debugging

In [11]:
import os

# Connect to an in-memory DuckDB database
con = duckdb.connect()

# Get the list of gzipped JSON files in the latest_extraction_path
gz_files = [file for file in os.listdir(latest_extraction_path) if file.endswith(".json.gz")]

# Create a temporary table to store data from valid files
con.execute("CREATE OR REPLACE TEMP TABLE temp_table AS SELECT NULL AS dummy_column WHERE FALSE")

# Process each file individually
for gz_file in gz_files:
    gz_file_path = os.path.join(latest_extraction_path, gz_file)
    try:
        # Attempt to read the JSON file and append its data to the temporary table
        con.execute(f"""
            INSERT INTO temp_table
            SELECT *
            FROM read_json_auto('{gz_file_path}', compression='gzip', union_by_name=true)
        """)
        print(f"Successfully processed: {gz_file}")
    except Exception as e:
        # Log the error and skip the file
        print(f"Error processing file {gz_file}: {e}")

# Save the combined data from valid files to a Parquet file
con.execute(f"""
    COPY temp_table
    TO '{output_file}' (FORMAT parquet, COMPRESSION gzip)
""")

print(f"Transformed data saved to: {output_file}")

# Close the DuckDB connection
con.close()

Error processing file part-00000-7a70885f-56f2-4cc2-b836-a5bd99ab23c3-c000.json.gz: Binder Error: table temp_table has 1 columns but 32 values were supplied
Error processing file part-00001-7a70885f-56f2-4cc2-b836-a5bd99ab23c3-c000.json.gz: Binder Error: table temp_table has 1 columns but 32 values were supplied
Error processing file part-00002-7a70885f-56f2-4cc2-b836-a5bd99ab23c3-c000.json.gz: Binder Error: table temp_table has 1 columns but 32 values were supplied
Error processing file part-00003-7a70885f-56f2-4cc2-b836-a5bd99ab23c3-c000.json.gz: Binder Error: table temp_table has 1 columns but 32 values were supplied
Error processing file part-00004-7a70885f-56f2-4cc2-b836-a5bd99ab23c3-c000.json.gz: Binder Error: table temp_table has 1 columns but 32 values were supplied
Error processing file part-00005-7a70885f-56f2-4cc2-b836-a5bd99ab23c3-c000.json.gz: Binder Error: table temp_table has 1 columns but 32 values were supplied
Error processing file part-00006-7a70885f-56f2-4cc2-b836-a

KeyboardInterrupt: 

In [None]:
# Connect to an in-memory DuckDB database
con = duckdb.connect()

# Use DuckDB to process the extracted JSON files and save a sample of 100 rows as a Parquet file
con.sql(f'''
    COPY (
        SELECT *
        FROM read_json_auto('{latest_extraction_path}/*.json.gz', sample_size=1, compression='gzip', union_by_name=true)
    )
    TO '{output_file}' (FORMAT parquet, COMPRESSION gzip)
''')

print(f"Transformed data saved to: {output_file}")

# Close the DuckDB connection
con.close()

In [None]:
# Ensure the target directory exists
os.makedirs("./data/03_transformed", exist_ok=True)

# Define the target output file path using the latest_subdirectory variable
output_file = f"./data/03_transformed/{latest_subdirectory}-sample.parquet"

# Connect to an in-memory DuckDB database
con = duckdb.connect()

# Get the first 5 gzipped JSON files from the latest_extraction_path
gz_files = [file for file in os.listdir(latest_extraction_path) if file.endswith(".gz")][:5]

# Use DuckDB to process the selected JSON files and save as a Parquet file
for gz_file in gz_files:
    gz_file_path = os.path.join(latest_extraction_path, gz_file)
    con.sql(f'''
        COPY (
            SELECT unnest(items, max_depth:=2)
            FROM (
                SELECT *
                FROM read_json_auto('{gz_file_path}', compression='gzip')
            )
        )
        TO '{output_file}' (FORMAT parquet, COMPRESSION gzip)
    ''')

print(f"Transformed sample data saved to: {output_file}")

# Close the DuckDB connection
con.close()

### I WAS HERE REVIEWING THE CODE everything below was not checked

In [None]:
import duckdb
import os

# Connect to an in-memory DuckDB database
con = duckdb.connect()

# Get the first 3 gzipped JSON files from the latest_extraction_path
gz_files = [file for file in os.listdir(latest_extraction_path) if file.endswith(".gz")][:3]

# Load the gzipped JSON files directly into DuckDB
for gz_file in gz_files:
    gz_file_path = os.path.join(latest_extraction_path, gz_file)
    con.execute("CREATE OR REPLACE TEMP TABLE temp_table AS SELECT * FROM read_json_auto(?, compression='gzip')", [gz_file_path])

# Generate the SQL schema from the temporary table
schema = con.execute("DESCRIBE temp_table").fetchall()

# Print the schema
print("SQL Schema:")
for column in schema:
    print(f"{column[0]}: {column[1]}")

# Close the DuckDB connection
con.close()