In [1]:
import json, os

import glob
import pandas as pd

import pyspark.sql.functions as F
from pyspark.sql.functions import col, udf, isnan
from pyspark.sql.types import StructType, ArrayType, StringType
from pyspark.sql import SparkSession
from IPython.display import JSON as pretty_print

import pandas as pd
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', 100)

# Loading Dataset

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
dataset_folder = "./diamondoa/" #change with your current dataset folder

In [4]:
with open(f"{dataset_folder}/affiliations-schema.json", "r") as f:
    affiliations_schema = json.load(f)
with open(f"{dataset_folder}/citations-schema.json", "r") as f:
    citations_schema = json.load(f)
with open(f"{dataset_folder}/explodedpublications-schema.json", "r") as f:
    explodedpublications_schema = json.load(f)
with open(f"{dataset_folder}/journals-schema.json", "r") as f:
    journals_schema = json.load(f)
with open(f"{dataset_folder}/publications-schema.json", "r") as f:
    publications_schema = json.load(f)

In [5]:
affiliations = spark.read.schema(StructType.fromJson(affiliations_schema)).json(dataset_folder + 'affiliations.json')
citations = spark.read.schema(StructType.fromJson(citations_schema)).json(dataset_folder + 'citations.json')
explodedpublications = spark.read.schema(StructType.fromJson(explodedpublications_schema)).json(dataset_folder + 'explodedpublications.json')
journals = spark.read.schema(StructType.fromJson(journals_schema)).json(dataset_folder + 'journals.json')
publications = spark.read.schema(StructType.fromJson(publications_schema)).json(dataset_folder + 'publications.json')

In [6]:
affiliations.createOrReplaceTempView("affiliations")
citations.createOrReplaceTempView("citations")
explodedpublications.createOrReplaceTempView("explodedpublications")
journals.createOrReplaceTempView("journals")
publications.createOrReplaceTempView("publications")

## Filtering for diamond journals in the Dataset 

In [7]:
with open("diamond-issns.json", "r") as f:
    diamond_journals = json.load(f)

In [8]:
df = spark.table("journals").filter(col("eissn").isin(diamond_journals) | col("issn").isin(diamond_journals))

In [9]:
df.createOrReplaceTempView("Diamond")

## Importing ROAD

In [10]:
road = pd.read_excel("./input/diamond_subjects.xlsx")

In [11]:
def map_to_fos(row):
    subjects = [row['FoS']]  # Start with subj1
    if pd.notna(row['Fos2']):  # Check for None (null in Spark)
        subjects.append(row['Fos2'])
    return subjects

In [12]:
road['road_subjects'] = road.apply(map_to_fos, axis=1)

In [13]:
spark.createDataFrame(road).createOrReplaceTempView("road")

In [14]:
spark.sql("""
select distinct d.*, r.road_subjects as road_fos
from diamond d
left join road r
on d.issn = r.issn or d.eissn = r.eissn
""").createOrReplaceTempView("diamond")

## Creating Tables for Diamond Articles

In [17]:
q = """
select p.*, d.issn, d.eissn, d.road_fos
from publications p 
join diamond d
on p.jourid = d.id
where extract(year from p.publicationDate) >= 2000 and extract(year from p.publicationDate) <= 2024
"""
spark.sql(q).createOrReplaceTempView("diamondPubs")

In [18]:
q = """
select c.*
from citations c 
join diamond d
on c.citedJournal = d.id
"""
spark.sql(q).createOrReplaceTempView("diamondCits")

## Scholarly Discipline representation

In [19]:
query = """
select subject, count(distinct id) as n_pubs 
from diamondpubs
lateral view explode(road_fos) s as subject
where extract(year from publicationDate) >= 2000 and extract(year from publicationDate) < 2025
group by subject
order by n_pubs desc
"""
spark.sql(query).toPandas()

Unnamed: 0,subject,n_pubs
0,05 social sciences,33558
1,06 humanities and the arts,25096
2,01 natural sciences,7103
3,03 medical and health sciences,4023
4,04 agricultural and veterinary sciences,3864
5,02 engineering and technology,1342


In [20]:
query = """
     select extract(year from publicationDate) as year, subject, count(*) as subject_count
     from diamondPubs 
     lateral view explode (road_fos) f as subject
     where extract(year from publicationDate) >= 2000 and extract(year from publicationDate) <= 2025
     group by year, subject
     order by subject, year asc
"""
spark.sql(query).toPandas()

Unnamed: 0,year,subject,subject_count
0,2000,01 natural sciences,131
1,2001,01 natural sciences,78
2,2002,01 natural sciences,111
3,2003,01 natural sciences,304
4,2004,01 natural sciences,184
...,...,...,...
143,2020,06 humanities and the arts,3262
144,2021,06 humanities and the arts,2901
145,2022,06 humanities and the arts,3145
146,2023,06 humanities and the arts,2983


In [21]:
query = """
with jourcount as (
select jourid, count(distinct id) as pub_count
from diamondpubs
where extract(year from publicationDate) >= 2000
group by jourid
)
select subject, d.jourid, d.issn, d.eissn,  count(distinct d.id) as subject_count, j.pub_count as tot_pubs
from diamondpubs d
join jourcount j on j.jourid = d.jourid 
lateral view explode(d.road_fos) f as subject
where extract(year from d.publicationDate) >= 2000
group by subject, d.jourid, d.issn, d.eissn, tot_pubs
order by subject_count desc
"""
spark.sql(query).toPandas()

Unnamed: 0,subject,jourid,issn,eissn,subject_count,tot_pubs
0,05 social sciences,rivisteunimi::5ad13c233dd15674b2227e64bad092d4,2035-7680,2035-7680,1635,1635
1,06 humanities and the arts,rivisteunimi::5ad13c233dd15674b2227e64bad092d4,2035-7680,2035-7680,1635,1635
2,05 social sciences,unicaopenjou::7674172ec24021dd7ef4dd2836a1e18d,2039-6597,2039-6597,1614,1614
3,06 humanities and the arts,unicaopenjou::7674172ec24021dd7ef4dd2836a1e18d,2039-6597,2039-6597,1614,1614
4,03 medical and health sciences,doajarticles::a7f4d708768f9b1017dabcf9040fbf79,,2160-9381,1494,1494
...,...,...,...,...,...,...
252,05 social sciences,doajarticles::7f0b795696ea9b8404b031f9844d13e4,,2283-5652,2,2
253,06 humanities and the arts,doajarticles::7f0b795696ea9b8404b031f9844d13e4,,2283-5652,2,2
254,06 humanities and the arts,doajarticles::6799290dcba38e45c05bab5c280f3a4e,,2384-9568,1,1
255,05 social sciences,doajarticles::6799290dcba38e45c05bab5c280f3a4e,,2384-9568,1,1


## International Relevance of the journal

In [22]:
q = """
select p.*, d.officialname, d.organizationsCountries, d.organizationsNames, country
from diamondpubs p
join diamond d on p.jourid = d.id
lateral view outer explode(p.countries) c as country
"""
d = spark.sql(q).toJSON().collect()
#spark.sql(q).toPandas()

In [23]:
journals = {}
for k in d:
    jour = json.loads(k)
    jourid, journame = jour['jourid'], jour['officialname']
    org_country = ''
    if 'organizationsCountries' in jour:
        org_country = jour['organizationsCountries'][0] if not jour['organizationsCountries'] is None else ''
    if not jourid in journals: journals[jourid] = {'countries':{}, 'n_pubs': 0, 'org_diff': 0}
    if not 'publisher country' in journals[jourid]: journals[jourid]['publisher country'] = org_country
    if not 'name' in journals[jourid]: journals[jourid]['name'] = journame
    journals[jourid]['n_pubs'] += 1
    if 'country' in jour:
        country_diff = org_country not in jour['country']
        for c in jour['country']:
            if not c in journals[jourid]['countries']: journals[jourid]['countries'][c] = 0
            journals[jourid]['countries'][c] += 1
        if country_diff: journals[jourid]['org_diff'] += 1

In [24]:
l = []
for k,v in journals.items():
    d = {
        'id': k,
        'name': v['name'],
        'publisher country': v['publisher country'],
        '#pubs': v['n_pubs'],
        '#pubs where all authors come from different country': v['org_diff'] 
    }
    for code, pubs in v['countries'].items():
        d[code] = pubs
    l.append(d)

In [25]:
df = pd.DataFrame(l)
df.head()

Unnamed: 0,id,name,publisher country,#pubs,#pubs where all authors come from different country,code,label,provenance
0,doajarticles::e164fe214e074492c52bedc156ee0f52,Rassegna Iberistica,IT,561,81,81.0,81.0,81.0
1,doajarticles::a65d35d323812be48e58e921cc7c2f93,Axon,IT,2456,824,824.0,824.0,824.0
2,doajarticles::db4f5d69fb72f19575b9db98960853d1,Moneta e Credito,IT,568,56,56.0,56.0,56.0
3,doajarticles::07f4e9f39c7789481c062e9950c88ebe,Advances in Horticultural Science,IT,565,190,190.0,190.0,190.0
4,doajarticles::c15fdf0d1f5e215bdb1722685df301d3,European Journal of Spatial Development,IT,1128,352,352.0,352.0,352.0


## Journal Active in the last two years

In [26]:
q = """
select count(distinct jourid) as n_journals, count(*) as n_pubs from diamondpubs
where extract(year from publicationDate) > 2022
"""
spark.sql(q).toPandas()

Unnamed: 0,n_journals,n_pubs
0,131,10390


## Total Pubs DOA ITA vs ITA

In [17]:
df = spark.sql("select extract(year from publicationDate) as year, count(distinct jourid) as n_journals, count(*) as c from diamondpubs group by year").toPandas()
diamond_pubs = df.sort_values(by="year", ascending=True)[df['year'].apply(lambda x: 2000 <= x <= 2025)]

  diamond_pubs = df.sort_values(by="year", ascending=True)[df['year'].apply(lambda x: 2000 <= x <= 2025)]


In [18]:
df = spark.sql("select extract(year from publicationDate) as year, count(*) as c from explodedPublications group by year").toPandas()
all_pubs = df.sort_values(by="year", ascending=True)[df['year'].apply(lambda x: 2000 <= x <= 2025)]

  all_pubs = df.sort_values(by="year", ascending=True)[df['year'].apply(lambda x: 2000 <= x <= 2025)]


In [19]:
df1 = diamond_pubs.merge(all_pubs,on='year', how='inner')
df1 = df1.rename(columns={'c_x':'diamond_pubs', 'c_y': 'all_italian_pubs'})
df1['percentage'] = (df1['diamond_pubs']/df1['all_italian_pubs'])*100
df1.to_excel("results/3C.xlsx", index=None)

# Impact 

## Journal in "A" Class of ANVUR journal classification

In [83]:
anvur_df = pd.read_excel("anvur_intersection.xlsx")

In [90]:
m1 = subj_diamond_res.merge(anvur_df, left_on="issn", right_on="ISSN", how="inner")
m2 = subj_diamond_res.merge(anvur_df, left_on="eissn", right_on="ISSN", how="inner")

In [93]:
munion = pd.concat([m1, m2])

In [95]:
munion.to_excel("anvur_journals.xlsx")

## B.

## Citation Distribution

In [14]:
df = spark.sql("""
    select extract(year from p.publicationDate) as year, count(distinct c.citing)
    from diamondCits c
    join diamondPubs p
    on c.cited = p.id
    group by year
""").toPandas()

diamond_cits = df.sort_values(by="year", ascending=True)[df['year'].apply(lambda x: 2000 <= x <= 2025)]

  diamond_cits = df.sort_values(by="year", ascending=True)[df['year'].apply(lambda x: 2000 <= x <= 2025)]


In [16]:
diamond_cits.show()

## Citation per Scholarly Discipline

In [140]:
df1 = spark.sql("""
    select subject, count(distinct p.id) as n_pubs, count(c.citing) as n_citations
    from diamondPubs p
    left join diamondCits c
    on c.cited = p.id
    lateral view explode(p.fos) s as subject
    group by subject
    order by n_pubs desc
""").toPandas()

## Citation Origin

In [150]:
countries = {}
for k in res:
    cit = json.loads(k)
    if 'resultCountries' in cit:
        for country in cit['resultCountries']:
            code = country['code']
            label = country['label']
            if not code in countries: countries[code] = {'label': label, 'cit_count': 0}
            countries[code]['cit_count'] += 1

In [151]:
countries = dict(sorted(countries.items(), key=lambda x: x[1]['cit_count'], reverse=True))

In [154]:
l = []
for k,v in countries.items():
    l.append({
        'code': k,
        'label': v['label'],
        '# citations provided': v['cit_count']
    })
    
df2 = pd.DataFrame(l)

Unnamed: 0,code,label,# citations provided
0,IT,Italy,23124
1,FR,France,4011
2,GB,United Kingdom,2961
3,ES,Spain,2715
4,NL,Netherlands,2126
...,...,...,...
71,EG,Egypt,1
72,ET,Ethiopia,1
73,BD,Bangladesh,1
74,LIE,LIE,1


In [None]:
excel = pd.ExcelWriter("4-Impact.xlsx")

In [None]:
diamond_cits.to_excel(excel, sheet_name="4-1", index=None)
df2.to_excel(excel, sheet_name="4-3", index=None)
df1.to_excel(excel, sheet_name="4-4", index=None)

In [None]:
excel.save()

  excel.save()
