In [None]:
project_name = "proj_1060_labor_division"
permissions = "fulldata" # please use the readonly-v3 cluster

In [None]:
%run /Snippets/header_008

In [None]:
# Read a Parquet file using Spark
# Specify the file path as "/Projects/proj_1060_labor_division/df_final.parquet"
df_final = spark.read.parquet("/Projects/proj_1060_labor_division/df_final.parquet")

In [None]:
'''
Generate a description analysis dataset
'''

from pyspark.sql.functions import split, explode, col, count, max, udf, min, collect_list
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.window import Window

# Drop the "source" and "issn" columns from the DataFrame
description_df = df_final.drop("source").drop("issn")

# Split the "credit" column into individual elements and aggregate by "doi", "year", "subjareas", "aunum", "auid", "surname", "position"
description_df = description_df.withColumn("credit", explode(split(col("credit"), ", ")))
description_df = description_df.groupBy("doi", "year", "subjareas", "aunum", "auid", "surname", "position").agg(collect_list("credit").alias("credit"))

# Calculate the row count for each "doi"
window_spec = Window.partitionBy("doi")
description_df = description_df.withColumn("row_count", count("doi").over(window_spec))

# Calculate the maximum "position" for each "doi" and create a new column
position_maxd = description_df.groupBy("doi").agg(max("position").alias("position_max"))
description_df = description_df.join(position_maxd, ["doi"], "left_outer")

# Calculate the minimum "position" for each "doi" and create a new column
position_mind = description_df.groupBy("doi").agg(min("position").alias("position_min"))
description_df = description_df.join(position_mind, ["doi"], "left_outer")


# Define a UDF (User Defined Function) to determine "discipline" based on elements in "subjareas"
def determine_discipline(subjareas):
    health_sciences_keywords = ["MEDI", "NURS", "VETE", "DENT", "HEAL"]
    life_sciences_keywords = ["AGRI", "BIOC", "IMMU", "NEUR", "PHAR"]
    physical_sciences_keywords = ["CENG", "CHEM", "COMP", "EART", "ENER", "ENGI", "ENVI", "MATE", "MATH", "PHYS"]
    social_sciences_keywords = ["ARTS", "BUSI", "DECI", "ECON", "PSYC", "SOCI"]
    multidisciplinary_keywords = ["MULT"]

    discipline = []
    # Check for keywords in "subjareas" to determine the corresponding discipline
    if any(keyword in subjareas for keyword in health_sciences_keywords):
        discipline.append("Health Sciences")
    if any(keyword in subjareas for keyword in life_sciences_keywords):
        discipline.append("Life Sciences")
    if any(keyword in subjareas for keyword in physical_sciences_keywords):
        discipline.append("Physical Sciences")
    if any(keyword in subjareas for keyword in social_sciences_keywords):
        discipline.append("Social Sciences")
    if any(keyword in subjareas for keyword in multidisciplinary_keywords):
        discipline.append("Multidisciplinary")

    return discipline

# Register the UDF with Spark
determine_discipline_udf = udf(determine_discipline, ArrayType(StringType()))

# Apply the UDF to add a new "discipline" column to the DataFrame
description_df = description_df.withColumn("discipline", determine_discipline_udf(col("subjareas")))

In [None]:
'''
Data cleaning
'''

from pyspark.sql.functions import col, split, expr, regexp_replace, countDistinct, collect_set, explode, size
from pyspark.sql.types import ArrayType, StringType

# Replace "Formal Analysis" with "Formal analysis" in the "credit" column
description_df = description_df.withColumn("credit", expr("transform(credit, x -> IF(x = 'Formal Analysis', 'Formal analysis', x))"))

# Remove elements not belonging to the CRediT14 categories
# Define the list of allowed elements (remove elements not in CRediT14)
allowed_elements = [
    "Conceptualization", "Data curation", "Formal analysis", "Funding acquisition",
    "Investigation", "Methodology", "Project administration", "Resources", "Software",
    "Supervision", "Validation", "Visualization", "Writing – original draft", "Writing – review & editing"
]

# Define UDF to filter out disallowed elements
def filter_elements(CRediT_list):
    return [elem for elem in CRediT_list if elem in allowed_elements]
# Register the UDF with Spark
spark.udf.register("filter_elements", filter_elements)
# Apply the UDF to filter the "credit" column
description_df = description_df.withColumn("credit", expr("filter_elements(credit)"))

# Converts the credit column format from a string to a list for subsequent analysis
# Remove rows where "credit" field is "[]"
description_df = description_df.filter(col("credit") != "[]")
# Remove square brackets "[" and "]" from the "credit" column
description_df = description_df.withColumn("credit", regexp_replace("credit", "[\\[\\]]", ""))
# Parse the "credit" column into a list
description_df = description_df.withColumn("credit", split(col("credit"), ", "))

# Define UDF to remove duplicate elements
def remove_duplicates(arr):
    return list(set(arr))
remove_duplicates_udf = udf(remove_duplicates, ArrayType(StringType()))
description_df = description_df.withColumn("credit", remove_duplicates_udf(col("credit")))



# Calculate the number of elements in the "credit" list and add a new column to store the result. This represents the number of credits per author involved in a publication.
description_df = description_df.withColumn("au_credit_num", size(description_df["credit"]))

# Generate a new column "paper_credit" listing unique elements for each "Eid". This represents the number of credits that are all used in a publication.
# Explode the "credit" column into separate rows
credit_explode_df = description_df.withColumn("credit_exploded", explode(description_df["credit"])) 
# Aggregate to collect unique credits for each "doi"
paper_credit = credit_explode_df.groupBy("year", "doi").agg(collect_set("credit_exploded").alias("paper_credit"))
# Count distinct "CRediT" elements for each "doi" and generate a new column "paper_credit_num"
paper_credit_num = credit_explode_df.groupBy("year", "doi").agg(countDistinct("credit_exploded").alias("paper_credit_num"))

# Merge the results back into the main DataFrame
description_df = description_df.join(paper_credit, ["year", "doi"], "left_outer").join(paper_credit_num, ["year", "doi"], "left_outer")

In [None]:
'''
number of contributions per article by fields
'''

from pyspark.sql.functions import explode, count, collect_list, struct, avg

# Calculate the average "au_credit_num" per group of ["doi"] (i.e., the average number of contributions per article)
paper_au_mean_num = description_df.groupBy("doi").agg(avg("au_credit_num").alias("paper_au_credit_avg"))
# Join the average contributions back to the DataFrame
figure3a_df = description_df.join(paper_au_mean_num, ["doi"], "left_outer")

# Aggregate data into paper-level data
paper_df = (
    figure3a_df.groupBy("doi", "year", 'subjareas', 'discipline', 'row_count', 'position_max', 'position_min', 'paper_credit', 'paper_credit_num', 'paper_au_credit_avg')
      .agg(collect_list(struct("auid", "surname", "position", "credit", "au_credit_num")).alias("Au"))
)

# Calculate the count of each discipline
# Use explode to expand the discipline array
paper_df_exploded = paper_df.select("doi", "paper_au_credit_avg", explode("discipline").alias("discipline"))
discipline_counts_df = paper_df_exploded.groupBy("discipline").agg(count("discipline").alias("count"))
# Display the counts of each discipline without truncation
discipline_counts_df.display(truncate=False)

# Select different paper_au_credit_avg counts for each discipline
average_df = paper_df_exploded.select("discipline", "paper_au_credit_avg")

| Discipline         | Paper Count |
|--------------------|-------------|
| Health Sciences     | 73720       |
| Life Sciences       | 164321      |
| Physical Sciences   | 503141      |
| Multidisciplinary   | 69542       |
| Social Sciences     | 69915      |


In [None]:
from pyspark.sql.functions import when, col

# Define the conditions for categorizing the ranges
conditions = [
    (col("paper_au_credit_avg") >= 1) & (col("paper_au_credit_avg") < 2),
    (col("paper_au_credit_avg") >= 2) & (col("paper_au_credit_avg") < 3),
    (col("paper_au_credit_avg") >= 3) & (col("paper_au_credit_avg") < 4),
    (col("paper_au_credit_avg") >= 4) & (col("paper_au_credit_avg") < 5),
    (col("paper_au_credit_avg") >= 5) & (col("paper_au_credit_avg") < 6),
    (col("paper_au_credit_avg") >= 6) & (col("paper_au_credit_avg") < 7),
    (col("paper_au_credit_avg") >= 7) & (col("paper_au_credit_avg") < 8),
    (col("paper_au_credit_avg") >= 8)
]

# Define the labels for the ranges
labels = ["[1,2)", "[2,3)", "[3,4)", "[4,5)", "[5,6)", "[6,7)", "[7,8)", "[8,14]"]

# Add a new column to categorize "paper_au_credit_avg" into different ranges based on the conditions
average_df = average_df.withColumn("range", 
                                   when(conditions[0], labels[0])
                                   .when(conditions[1], labels[1])
                                   .when(conditions[2], labels[2])
                                   .when(conditions[3], labels[3])
                                   .when(conditions[4], labels[4])
                                   .when(conditions[5], labels[5])
                                   .when(conditions[6], labels[6])
                                   .when(conditions[7], labels[7])
                                   .otherwise(None))

# Use groupBy and agg to count the occurrences of each range for different "discipline"
result_3a = average_df.groupBy("discipline", "range").count()

# Calculate the percentage of counts for each discipline in relation to total counts
result_3a = result_3a.withColumn("percentage", 
                           when(result_3a["discipline"] == "Health Sciences", result_3a["count"]/73720)
                           .otherwise(
                               when(result_3a["discipline"] == "Life Sciences", result_3a["count"]/164321)
                               .otherwise(
                                   when(result_3a["discipline"] == "Physical Sciences", result_3a["count"]/503141)
                                   .otherwise(
                                       when(result_3a["discipline"] == "Multidisciplinary", result_3a["count"]/69542)
                                       .otherwise(
                                           when(result_3a["discipline"] == "Social Sciences", result_3a["count"]/69915)
                                           .otherwise(0)
                                       )
                                   )
                               )
                           )
                          )

# Display the results
result_3a.display()

In [None]:
from pyspark.sql.functions import collect_list, struct, explode, avg

# Select relevant columns from the description_df DataFrame
figure3b_df = description_df.select("doi", "subjareas", "paper_credit", "row_count", "auid", "credit")

# Explode the "credit" column into separate rows
credit_explode_df2 = figure3b_df.withColumn("credit_exploded", explode(description_df["credit"])) 

# Use collect_list to create a list of credits, naming it "paper_credit_3b"
paper_credit_3b = credit_explode_df2.groupBy("doi").agg(collect_list("credit_exploded").alias("paper_credit_3b"))

# Join the new "paper_credit_3b" back to the original DataFrame
figure3b_df = figure3b_df.join(paper_credit_3b, ["doi"], "left_outer")

# Merge "auid" and "CRediT" columns
figure3b_df = (
    figure3b_df.groupBy("doi", 'subjareas', 'row_count', 'paper_credit_3b')
      .agg(collect_list(struct("auid", "credit")).alias("Au"))
).drop("Au")

# Explode the "paper_credit_3b" array into separate rows
exploded_df = figure3b_df.withColumn("paper_credit_3b", explode("paper_credit_3b"))

# Count occurrences of different elements
result_3b = exploded_df.groupBy("doi", "subjareas", "row_count", "paper_credit_3b").count()
# Calculate the percentage of each credit occurrence relative to the total row count
result_3b = result_3b.withColumn("percentage", result_3b["count"] / result_3b["row_count"])

# Explode the "subjareas" column into separate rows
exploded_df3 = result_3b.withColumn("subjareas", explode("subjareas"))

# Calculate the average percentage of occurrences for each "subjareas" and "paper_credit_3b"
data_3b = exploded_df3.groupBy("subjareas", "paper_credit_3b").avg("percentage")

# Display the resulting DataFrame
data_3b.display()

In [None]:
# All fields data

In [None]:
from itertools import combinations
from pyspark.sql.functions import array_contains
from pyspark.sql import Row

# Select relevant columns from the description_df DataFrame
df_figure4 = description_df.select("discipline", "credit")

# Define the list of all possible elements
elements = ["Conceptualization", "Data curation", "Formal analysis", "Funding acquisition", 
            "Investigation", "Methodology", "Project administration", "Resources", 
            "Software", "Supervision", "Validation", "Visualization", 
            "Writing – original draft", "Writing – review & editing"]

# Generate all combinations of the elements taken two at a time
element_combinations = list(combinations(elements, 2))

# Create an empty result list
figure4_result_df = []

# Iterate through each combination to calculate counts
for combination in element_combinations:
    element1, element2 = combination
    # Count the number of records containing either of the two elements
    count_either = df_figure4.filter((array_contains("credit", element1)) | (array_contains("credit", element2))).count()
    # Count the number of records containing both elements
    count_both = df_figure4.filter(array_contains("credit", element1) & array_contains("credit", element2)).count()
    # Append the results to the list
    figure4_result_df.append((element1, element2, count_either, count_both))

# Create a result DataFrame from the result list
figure4_result_df = spark.createDataFrame(figure4_result_df, schema=["element1", "element2", "count of either", "count of both"])
# Calculate the percentage of occurrences for both elements
figure4_result_df = figure4_result_df.withColumn("percentage", figure4_result_df["count of both"] / figure4_result_df["count of either"])
# Display the resulting DataFrame
figure4_result_df.display()

In [None]:
# Each decipline

In [None]:
from pyspark.sql.functions import explode
from itertools import combinations
from pyspark.sql.functions import array_contains

# Explode the discipline and credit fields from the description_df DataFrame
figure4_explode = description_df.select(explode("discipline").alias("discipline"), "credit")

# Define a list of all possible CRediT elements
elements = ["Conceptualization", "Data curation", "Formal analysis", "Funding acquisition", 
            "Investigation", "Methodology", "Project administration", "Resources", 
            "Software", "Supervision", "Validation", "Visualization", 
            "Writing – original draft", "Writing – review & editing"]

# Generate all possible pairs of CRediT elements using combinations
element_combinations = list(combinations(elements, 2))


# Analyze Health Sciences data
print("Health Sciences data:")
# Filter data for the Health Sciences discipline
health_data = figure4_explode.filter(figure4_explode["discipline"] == "Health Sciences")
# Create an empty list to store results for Health Sciences
figure4_health = []
# Iterate over each combination of elements
for combination in element_combinations:
    element1, element2 = combination
    # Count how many times either of the two elements appears in the data
    count_either = health_data.filter((array_contains("credit", element1)) | (array_contains("credit", element2))).count()
    # Count how many times both elements appear simultaneously in the data
    count_both = health_data.filter(array_contains("credit", element1) & array_contains("credit", element2)).count()
    # Append the results to the list
    figure4_health.append((element1, element2, count_either, count_both))
# Create a DataFrame from the results for Health Sciences
figure4_health = spark.createDataFrame(figure4_health, schema=["element1", "element2", "count of either", "count of both"])
# Calculate the percentage of occurrences where both elements are present
figure4_health = figure4_health.withColumn("percentage", figure4_health["count of both"] / figure4_health["count of either"])
# Display the results for Health Sciences
figure4_health.display(truncate=False)


# Analyze Life Sciences data
print("Life Sciences data:")
# Filter data for the Life Sciences discipline
life_data = figure4_explode.filter(figure4_explode["discipline"] == "Life Sciences")
# Create an empty list to store results for Life Sciences
figure4_life = []
# Iterate over each combination of elements
for combination in element_combinations:
    element1, element2 = combination
    # Count how many times either of the two elements appears in the data
    count_either = life_data.filter((array_contains("credit", element1)) | (array_contains("credit", element2))).count()
    # Count how many times both elements appear simultaneously in the data
    count_both = life_data.filter(array_contains("credit", element1) & array_contains("credit", element2)).count()
    # Append the results to the list
    figure4_life.append((element1, element2, count_either, count_both))
# Create a DataFrame from the results for Life Sciences
figure4_life = spark.createDataFrame(figure4_life, schema=["element1", "element2", "count of either", "count of both"])
# Calculate the percentage of occurrences where both elements are present
figure4_life = figure4_life.withColumn("percentage", figure4_life["count of both"] / figure4_life["count of either"])
# Display the results for Life Sciences
figure4_life.display(truncate=False)


# Analyze Multidisciplinary data
print("Multidisciplinary data:")
# Filter data for the Multidisciplinary discipline
multi_data = figure4_explode.filter(figure4_explode["discipline"] == "Multidisciplinary")
# Create an empty list to store results for Multidisciplinary
figure4_multi = []
# Iterate over each combination of elements
for combination in element_combinations:
    element1, element2 = combination
    # Count how many times either of the two elements appears in the data
    count_either = multi_data.filter((array_contains("credit", element1)) | (array_contains("credit", element2))).count()
    # Count how many times both elements appear simultaneously in the data
    count_both = multi_data.filter(array_contains("credit", element1) & array_contains("credit", element2)).count()
    # Append the results to the list
    figure4_multi.append((element1, element2, count_either, count_both))
# Create a DataFrame from the results for Multidisciplinary
figure4_multi = spark.createDataFrame(figure4_multi, schema=["element1", "element2", "count of either", "count of both"])
# Calculate the percentage of occurrences where both elements are present
figure4_multi = figure4_multi.withColumn("percentage", figure4_multi["count of both"] / figure4_multi["count of either"])
# Display the results for Multidisciplinary
figure4_multi.display(truncate=False)


# Analyze Physical Sciences data
print("Physical Sciences data:")
# Filter data for the Physical Sciences discipline
physical_data = figure4_explode.filter(figure4_explode["discipline"] == "Physical Sciences")
# Create an empty list to store results for Physical Sciences
figure4_physical = []
# Iterate over each combination of elements
for combination in element_combinations:
    element1, element2 = combination
    # Count how many times either of the two elements appears in the data
    count_either = physical_data.filter((array_contains("credit", element1)) | (array_contains("credit", element2))).count()
    # Count how many times both elements appear simultaneously in the data
    count_both = physical_data.filter(array_contains("credit", element1) & array_contains("credit", element2)).count()
    # Append the results to the list
    figure4_physical.append((element1, element2, count_either, count_both))
# Create a DataFrame from the results for Physical Sciences
figure4_physical = spark.createDataFrame(figure4_physical, schema=["element1", "element2", "count of either", "count of both"])
# Calculate the percentage of occurrences where both elements are present
figure4_physical = figure4_physical.withColumn("percentage", figure4_physical["count of both"] / figure4_physical["count of either"])
# Display the results for Physical Sciences
figure4_physical.display(truncate=False)


# Analyze Social Sciences data
print("Social Sciences data:")
# Filter data for the Social Sciences discipline
social_data = figure4_explode.filter(figure4_explode["discipline"] == "Social Sciences")
# Create an empty list to store results for Social Sciences
figure4_social = []
# Iterate over each combination of elements
for combination in element_combinations:
    element1, element2 = combination
    # Count how many times either of the two elements appears in the data
    count_either = social_data.filter((array_contains("credit", element1)) | (array_contains("credit", element2))).count()
    # Count how many times both elements appear simultaneously in the data
    count_both = social_data.filter(array_contains("credit", element1) & array_contains("credit", element2)).count()
    # Append the results to the list
    figure4_social.append((element1, element2, count_either, count_both))
# Create a DataFrame from the results for Social Sciences
figure4_social = spark.createDataFrame(figure4_social, schema=["element1", "element2", "count of either", "count of both"])
# Calculate the percentage of occurrences where both elements are present
figure4_social = figure4_social.withColumn("percentage", figure4_social["count of both"] / figure4_social["count of either"])
# Display the results for Social Sciences
figure4_social.display(truncate=False)


In [None]:
'''
Calculate the number of authors for each position. Includes full data and individual disciplines.
'''

from pyspark.sql.functions import max, when, explode

# Select relevant columns from description_df to create figure5_df
figure5_df = description_df.select("doi", "auid", "position", "credit", "row_count", "position_max", "position_min", "discipline")

# Create a new column "position2" to categorize authorship based on their position
figure5_df = figure5_df.withColumn("position2",
                                      when(figure5_df["position"] == figure5_df["position_min"], "First author")  # Label as "First author" if position is minimum
                                      .when(figure5_df["position"] == figure5_df["position_max"], "Last author")   # Label as "Last author" if position is maximum
                                      .otherwise("Middle author"))  # Label as "Middle author" for all other positions

# Display the modified DataFrame
figure5_df.display()

# Count the number of authors for each position category in the total dataset
print("total data:")
figure5_df.groupBy("position2").count().display()

# Explode the discipline column and create a new DataFrame for further analysis
explode_figure5 = figure5_df.select(explode("discipline").alias("discipline"), "position2", "credit")

# Count the number of authors in each position category for Health Sciences
print("Health Sciences data:")
explode_figure5.filter(explode_figure5["discipline"] == "Health Sciences").groupBy("position2").count().display()

# Count the number of authors in each position category for Life Sciences
print("Life Sciences data:")
explode_figure5.filter(explode_figure5["discipline"] == "Life Sciences").groupBy("position2").count().display()

# Count the number of authors in each position category for Multidisciplinary
print("Multidisciplinary data:")
explode_figure5.filter(explode_figure5["discipline"] == "Multidisciplinary").groupBy("position2").count().display()

# Count the number of authors in each position category for Physical Sciences
print("Physical Sciences data:")
explode_figure5.filter(explode_figure5["discipline"] == "Physical Sciences").groupBy("position2").count().display()

# Count the number of authors in each position category for Social Sciences
print("Social Sciences data:")
explode_figure5.filter(explode_figure5["discipline"] == "Social Sciences").groupBy("position2").count().display()


# Total Data

| Position       | Author Count |
|----------------|--------------|
| Middle author  | 2,590,052    |
| First author   | 714,671      |
| Last author    | 701,041      |

# Health Sciences Data

| Position       | Author Count |
|----------------|--------------|
| Middle author  | 364,654      |
| First author   | 73,723       |
| Last author    | 72,821       |

# Life Sciences Data

| Position       | Author Count |
|----------------|--------------|
| Middle author  | 713,629      |
| First author   | 164,330      |
| Last author    | 162,339      |

# Multidisciplinary Data

| Position       | Author Count |
|----------------|--------------|
| Middle author  | 305,681      |
| First author   | 69,564       |
| Last author    | 68,416       |

# Physical Sciences Data

| Position       | Author Count |
|----------------|--------------|
| Middle author  | 1,654,716    |
| First author   | 503,129      |
| Last author    | 493,423      |

# Social Sciences Data

| Position       | Author Count |
|----------------|--------------|
| Middle author  | 160,627      |
| First author   | 69,912       |
| Last author    | 67,060       |


In [None]:
from pyspark.sql.functions import explode, when

# Explode the 'credit' column to create a new DataFrame with individual credits for each author position
explode_position = figure5_df.select("position2", explode("credit").alias("CRediT"))

# Group by author position and credit, counting the occurrences of each combination
position_credit = explode_position.groupBy("position2", "CRediT").count()

# Calculate the percentage of authors for each position based on predefined total counts
position_credit = position_credit.withColumn("percentage",
                                      when(position_credit["position2"] == "First author", position_credit["count"] / 714671)  # Total for First authors
                                      .when(position_credit["position2"] == "Middle author", position_credit["count"] / 2590052)  # Total for Middle authors
                                      .when(position_credit["position2"] == "Last author", position_credit["count"] / 701041))  # Total for Last authors

# Rename 'position2' to 'Author Position' for clarity
position_credit = position_credit.withColumnRenamed("position2", "Author Position")

# Display the total data with counts and percentages
print("total data:")
position_credit.display(truncate=False)

# Rename the exploded DataFrame column for consistency
explode_figure5 = explode_figure5.withColumnRenamed("position2", "Author Position")

# Analyze Health Sciences discipline data
print("Health Sciences data:")
health_explode = explode_figure5.filter(explode_figure5["discipline"] == "Health Sciences").select("Author Position", explode("credit").alias("CRediT")).groupBy("Author Position", "CRediT").count()
health_explode.withColumn("percentage",
        when(health_explode["Author Position"] == "First author", health_explode["count"] / 73723)  # Total for First authors in Health Sciences
        .when(health_explode["Author Position"] == "Middle author", health_explode["count"] / 364654)  # Total for Middle authors
        .when(health_explode["Author Position"] == "Last author", health_explode["count"] / 72821)).display()  # Total for Last authors

# Analyze Life Sciences discipline data
print("Life Sciences data:")
life_explode = explode_figure5.filter(explode_figure5["discipline"] == "Life Sciences").select("Author Position", explode("credit").alias("CRediT")).groupBy("Author Position", "CRediT").count()
life_explode.withColumn("percentage",
        when(life_explode["Author Position"] == "First author", life_explode["count"] / 164330)  # Total for First authors in Life Sciences
        .when(life_explode["Author Position"] == "Middle author", life_explode["count"] / 713629)  # Total for Middle authors
        .when(life_explode["Author Position"] == "Last author", life_explode["count"] / 162339)).display()  # Total for Last authors

# Analyze Multidisciplinary discipline data
print("Multidisciplinary data:")
Multidisciplinary_explode = explode_figure5.filter(explode_figure5["discipline"] == "Multidisciplinary").select("Author Position", explode("credit").alias("CRediT")).groupBy("Author Position", "CRediT").count()
Multidisciplinary_explode.withColumn("percentage",
        when(Multidisciplinary_explode["Author Position"] == "First author", Multidisciplinary_explode["count"] / 69564)  # Total for First authors in Multidisciplinary
        .when(Multidisciplinary_explode["Author Position"] == "Middle author", Multidisciplinary_explode["count"] / 305681)  # Total for Middle authors
        .when(Multidisciplinary_explode["Author Position"] == "Last author", Multidisciplinary_explode["count"] / 68416)).display()  # Total for Last authors

# Analyze Physical Sciences discipline data
print("Physical Sciences data:")
Physical_explode = explode_figure5.filter(explode_figure5["discipline"] == "Physical Sciences").select("Author Position", explode("credit").alias("CRediT")).groupBy("Author Position", "CRediT").count()
Physical_explode.withColumn("percentage",
        when(Physical_explode["Author Position"] == "First author", Physical_explode["count"] / 503129)  # Total for First authors in Physical Sciences
        .when(Physical_explode["Author Position"] == "Middle author", Physical_explode["count"] / 1654716)  # Total for Middle authors
        .when(Physical_explode["Author Position"] == "Last author", Physical_explode["count"] / 493423)).display()  # Total for Last authors

# Analyze Social Sciences discipline data
print("Social Sciences data:")
Social_explode = explode_figure5.filter(explode_figure5["discipline"] == "Social Sciences").select("Author Position", explode("credit").alias("CRediT")).groupBy("Author Position", "CRediT").count()
Social_explode.withColumn("percentage",
        when(Social_explode["Author Position"] == "First author", Social_explode["count"] / 69912)  # Total for First authors in Social Sciences
        .when(Social_explode["Author Position"] == "Middle author", Social_explode["count"] / 160627)  # Total for Middle authors
        .when(Social_explode["Author Position"] == "Last author", Social_explode["count"] / 67060)).display()  # Total for Last authors
