# Extraction

In [1]:
import json
import nltk
import dash
import spacy
import pycountry
import pandas as pd
import seaborn as sns
import networkx as nx
from dash import dcc, html
import plotly.express as px
import matplotlib.pyplot as plt
import plotly.graph_objects as go

from pymongo import MongoClient, ASCENDING
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat_ws, regexp_replace , regexp_extract, to_date, count ,udf , year, month ,explode, split ,trim, lower, when
from pyspark.sql.types import StringType ,DateType ,ArrayType
from pyspark.sql import functions as F

from datetime import datetime

import warnings
warnings.filterwarnings('ignore')

import os

In [2]:
# Set JAVA_HOME for Java 8
os.environ['JAVA_HOME'] = 'C:\\Program Files\\Java\\jdk1.8.0_271' 
os.environ['PATH'] += os.pathsep + os.path.join(os.environ['JAVA_HOME'], 'bin')

# Set HADOOP_HOME and hadoop.home.dir environment variables
os.environ['HADOOP_HOME'] = 'C:\\Users\\maroua\\Desktop\\S5\\BusinessIntelligence\\hadoop-3.4.0'
os.environ['hadoop.home.dir'] = 'C:\\Users\\maroua\\Desktop\\S5\\BusinessIntelligence\\hadoop-3.4.0'
os.environ['PATH'] += os.pathsep + os.path.join(os.environ['HADOOP_HOME'], 'bin')
os.environ['PYSPARK_PYTHON'] = 'C:\\Users\\maroua\\AppData\\Local\\Programs\\Python\\Python311\\python.exe'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:\\Users\\maroua\\AppData\\Local\\Programs\\Python\\Python311\\python.exe'


## Connect to MongoDB

In [3]:
def export_collection_to_json(collection_name, file_name):
    collection = db[collection_name]
    documents = collection.find()
    docs = list(documents)

    with open(file_name, 'w') as file:
        json.dump([doc for doc in docs], file, default=str)

client = MongoClient("mongodb+srv://marouaamal2002:OpHSXGS8MWWZbJnq@clusterbi.zruer.mongodb.net/?retryWrites=true&w=majority&appName=ClusterBI")
db = client["Scrapping"]

# Export data forw each collection
export_collection_to_json("merged", "DB/merged.json")

In [4]:
# Créer une session Spark
spark = SparkSession.builder \
    .appName("ETL Pipeline") \
    .getOrCreate()

# Lire les fichiers JSON
merged_df = spark.read.json("DB/Merged.json")


# Afficher les DataFrames pour vérifier
merged_df.show()


+-----------------+---+---------+---------+----+--------------------+--------------------+--------------------+-------------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+----------------+--------------------+-----------+
|             Date|Day|Downloads|    Month|Year|                 _id|            abstract|             authors|authors_with_affiliations|citations|           countries|                 doi|            keywords|           locations|           publisher|               title|topic|            type|        universities|    website|
+-----------------+---+---------+---------+----+--------------------+--------------------+--------------------+-------------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+----------------+--------------------+-----------+
|     19 J

In [5]:
# Fetch data from MongoDB
# data = [{**doc, "_id": str(doc["_id"])} for doc in collection.find()]

# Convert MongoDB data to Pandas DataFrame
df = merged_df.toPandas()

In [6]:

df.columns

Index(['Date', 'Day', 'Downloads', 'Month', 'Year', '_id', 'abstract',
       'authors', 'authors_with_affiliations', 'citations', 'countries', 'doi',
       'keywords', 'locations', 'publisher', 'title', 'topic', 'type',
       'universities', 'website'],
      dtype='object')

In [7]:
# Check for any columns that contain non-iterable types where arrays are expected
for column in df.columns:
    print(f"Column: {column}")
    print(df[column].apply(type).value_counts())


Column: Date
Date
<class 'str'>    6299
Name: count, dtype: int64
Column: Day
Day
<class 'str'>    6299
Name: count, dtype: int64
Column: Downloads
Downloads
<class 'float'>    6299
Name: count, dtype: int64
Column: Month
Month
<class 'str'>    6299
Name: count, dtype: int64
Column: Year
Year
<class 'str'>    6299
Name: count, dtype: int64
Column: _id
_id
<class 'str'>    6299
Name: count, dtype: int64
Column: abstract
abstract
<class 'str'>    6299
Name: count, dtype: int64
Column: authors
authors
<class 'list'>    6299
Name: count, dtype: int64
Column: authors_with_affiliations
authors_with_affiliations
<class 'list'>    6299
Name: count, dtype: int64
Column: citations
citations
<class 'float'>    6299
Name: count, dtype: int64
Column: countries
countries
<class 'list'>    6299
Name: count, dtype: int64
Column: doi
doi
<class 'str'>    6299
Name: count, dtype: int64
Column: keywords
keywords
<class 'list'>    6299
Name: count, dtype: int64
Column: locations
locations
<class 'NoneType

In [8]:

# # Step 1: Clean the `locations` column to handle mixed types (float and list)
# df['locations'] = df['locations'].apply(lambda x: [x] if isinstance(x, float) else x)

# # Step 2: Clean `Day`, `Year`, and `Downloads` columns to ensure consistent types
# df['Day'] = pd.to_numeric(df['Day'], errors='coerce').fillna(0).astype(int)
# df['Year'] = pd.to_numeric(df['Year'], errors='coerce').fillna(0).astype(int)
# df['Downloads'] = pd.to_numeric(df['Downloads'], errors='coerce').fillna(0).astype(int)

# Step 3: Convert Pandas DataFrame to Spark DataFrame without specifying the schema
spark_merged = spark.createDataFrame(df)

In [9]:
spark_merged.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Day: string (nullable = true)
 |-- Downloads: double (nullable = true)
 |-- Month: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- _id: string (nullable = true)
 |-- abstract: string (nullable = true)
 |-- authors: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- authors_with_affiliations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- author: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- location: string (nullable = true)
 |    |    |-- university: string (nullable = true)
 |-- citations: double (nullable = true)
 |-- countries: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- doi: string (nullable = true)
 |-- keywords: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- locations: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- pu

# Cleaning 

### Remove NULL values

In [10]:
# Count the null values in the specified columns
null_counts = spark_merged.filter(
    (F.col("publisher.name") == "") | (F.col("publisher.ISSN") == "N/A") | (F.col("publisher.Quartile") == "")
).count()

print(f"Number of null values in spark_merged: {null_counts}")

# Drop rows where ISSN is NULL
spark_merged = spark_merged.filter(F.col("publisher.ISSN").isNotNull())

# Drop rows with null values in the specified columns
spark_merged = spark_merged.filter(
    (F.col("publisher.name") != "") & (F.col("publisher.ISSN") != "N/A") & (F.col("publisher.Quartile") != "")
)

# Count the null values in the specified columns after dropping rows
null_counts_after = spark_merged.filter(
    (F.col("publisher.name") == "") | (F.col("publisher.ISSN") == "N/A") | (F.col("publisher.Quartile") == "")
).count()

print(f"Number of null values in spark_merged after dropping rows: {null_counts_after}")

Number of null values in spark_merged: 1485
Number of null values in spark_merged after dropping rows: 0


In [11]:
# Filter the spark_merged for specific values
date_not_found_df = spark_merged.filter(
    (F.col("Date") == "Date not found") |
    (F.col("Year") == "Year not found") |
    (F.col("Day") == "Day not found") |
    (F.col("Month") == "Month not found")
)

# Show the filtered DataFrame
date_not_found_df.show(truncate=False)

# Drop the filtered values from spark_merged
spark_merged = spark_merged.subtract(date_not_found_df)

+----+---+---------+-----+----+---+--------+-------+-------------------------+---------+---------+---+--------+---------+---------+-----+-----+----+------------+-------+
|Date|Day|Downloads|Month|Year|_id|abstract|authors|authors_with_affiliations|citations|countries|doi|keywords|locations|publisher|title|topic|type|universities|website|
+----+---+---------+-----+----+---+--------+-------+-------------------------+---------+---------+---+--------+---------+---------+-----+-----+----+------------+-------+
+----+---+---------+-----+----+---+--------+-------+-------------------------+---------+---------+---+--------+---------+---------+-----+-----+----+------------+-------+



In [12]:
# Remove rows with NaN values in the 'citations' column
spark_merged = spark_merged.dropna(subset=['citations'])

# Show the DataFrame after removing NaN values
spark_merged.show()

+-----------------+---+---------+---------+----+--------------------+--------------------+--------------------+-------------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+----------------+--------------------+-----------+
|             Date|Day|Downloads|    Month|Year|                 _id|            abstract|             authors|authors_with_affiliations|citations|           countries|                 doi|            keywords|           locations|           publisher|               title|topic|            type|        universities|    website|
+-----------------+---+---------+---------+----+--------------------+--------------------+--------------------+-------------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+----------------+--------------------+-----------+
|30 Septem

In [13]:
# Filter out rows where authors_with_affiliations is an empty list
spark_merged = spark_merged.filter(F.size(F.col('authors_with_affiliations')) > 0)

# Show the DataFrame after filtering
spark_merged.show()

+-----------------+---+---------+---------+----+--------------------+--------------------+--------------------+-------------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+----------------+--------------------+--------------+
|             Date|Day|Downloads|    Month|Year|                 _id|            abstract|             authors|authors_with_affiliations|citations|           countries|                 doi|            keywords|           locations|           publisher|               title|topic|            type|        universities|       website|
+-----------------+---+---------+---------+----+--------------------+--------------------+--------------------+-------------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+----------------+--------------------+--------------+
|

In [14]:
# Filter out rows where authors is an empty list
spark_merged = spark_merged.filter(F.size(F.col('authors')) > 0)

# Show the DataFrame after filtering
spark_merged.show()

+-----------------+---+---------+---------+----+--------------------+--------------------+--------------------+-------------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+----------------+--------------------+--------------+
|             Date|Day|Downloads|    Month|Year|                 _id|            abstract|             authors|authors_with_affiliations|citations|           countries|                 doi|            keywords|           locations|           publisher|               title|topic|            type|        universities|       website|
+-----------------+---+---------+---------+----+--------------------+--------------------+--------------------+-------------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+----------------+--------------------+--------------+
|

In [15]:
# Drop the 'Downloads' column from the spark_merged DataFrame
spark_merged = spark_merged.drop('Downloads')

# Show the DataFrame after dropping the column
spark_merged.show()

+-----------------+---+---------+----+--------------------+--------------------+--------------------+-------------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+----------------+--------------------+--------------+
|             Date|Day|    Month|Year|                 _id|            abstract|             authors|authors_with_affiliations|citations|           countries|                 doi|            keywords|           locations|           publisher|               title|topic|            type|        universities|       website|
+-----------------+---+---------+----+--------------------+--------------------+--------------------+-------------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+----------------+--------------------+--------------+
|30 September 2024| 30|Septembe

### Correct country names

In [16]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
import pycountry

# Define a function to correct country names
def correct_country_names(countries):
    if countries is None:
        return []
    corrected_countries = []
    for country in countries:
        try:
            corrected_country = pycountry.countries.lookup(country).name
            corrected_countries.append(corrected_country)
        except LookupError:
            corrected_countries.append(country)  # Keep the original name if not found
    return corrected_countries

# Register the UDF
correct_country_names_udf = udf(correct_country_names, ArrayType(StringType()))

# Apply the UDF to the 'countries' column
spark_merged = spark_merged.withColumn('countries', correct_country_names_udf(spark_merged['countries']))

# Show the DataFrame after correcting country names
spark_merged.show(truncate=False)

+-----------------+---+---------+----+------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [17]:
from pyspark.sql.functions import col, explode, struct, udf
from pyspark.sql.types import ArrayType, StructType, StructField, StringType
import pycountry

# Define a function to correct country names in authors_with_affiliations
def correct_affiliation_country(authors_with_affiliations):
    if authors_with_affiliations is None:
        return []
    corrected_affiliations = []
    for affiliation in authors_with_affiliations:
        corrected_affiliation = affiliation.asDict()
        try:
            corrected_affiliation['country'] = pycountry.countries.lookup(affiliation['country']).name
        except LookupError:
            pass  # Keep the original country name if not found
        corrected_affiliations.append(corrected_affiliation)
    return corrected_affiliations

# Define the schema for the corrected affiliations
affiliation_schema = ArrayType(StructType([
    StructField('author', StringType(), True),
    StructField('country', StringType(), True),
    StructField('location', StringType(), True),
    StructField('university', StringType(), True)
]))

# Register the UDF
correct_affiliation_country_udf = udf(correct_affiliation_country, affiliation_schema)

# Apply the UDF to the 'authors_with_affiliations' column
spark_merged = spark_merged.withColumn('authors_with_affiliations', correct_affiliation_country_udf(spark_merged['authors_with_affiliations']))

# Show the DataFrame after correcting country names
spark_merged.show(truncate=False)

+-----------------+---+---------+----+------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [18]:
# Define a regular expression pattern to identify email addresses
email_pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$'

#
spark_merged = spark_merged.filter(~F.array_contains(F.col('countries'), email_pattern))

# Filter out rows where author_affiliation.country contains email addresses
spark_merged = spark_merged.withColumn(
    'authors_with_affiliations',
    F.expr(f"filter(authors_with_affiliations, x -> x.country not rlike '{email_pattern}')")
)

In [19]:
# Drop the '_id' column from the spark_merged DataFrame
spark_merged = spark_merged.drop('_id')

# Show the DataFrame after dropping the column
spark_merged.show()

+-----------------+---+---------+----+--------------------+--------------------+-------------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+----------------+--------------------+--------------+
|             Date|Day|    Month|Year|            abstract|             authors|authors_with_affiliations|citations|           countries|                 doi|            keywords|           locations|           publisher|               title|topic|            type|        universities|       website|
+-----------------+---+---------+----+--------------------+--------------------+-------------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+----------------+--------------------+--------------+
|30 September 2024| 30|September|2024|Cell-free (CF) ma...|[Yiyang Zhu, Jiay...|     [{Yiyang 

### Encode the dataframes

In [20]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import re


# Define a UDF to clean and encode strings to UTF-8, retaining valid special characters
def clean_and_encode_string(s):
    if s is not None:
        # Remove unwanted characters while keeping valid special characters
        cleaned_string = re.sub(r'[^A-Za-zÀ-ÿ0-9\s\'-]', '', s)  # Keep letters, numbers, spaces, and some special characters
        cleaned_string = cleaned_string.replace('\n', '')  # Remove newline characters
        return cleaned_string.encode('utf-8', errors='ignore').decode('utf-8')
    return s

# Register the UDF
clean_and_encode_udf = udf(clean_and_encode_string, StringType())

# Function to apply the cleaning and encoding to all string columns in a Spark DataFrame
def clean_and_encode_spark_df(spark_df):
    for column in spark_df.columns:
        if dict(spark_df.dtypes)[column] == 'string':
            spark_df = spark_df.withColumn(column, clean_and_encode_udf(spark_df[column]))
    return spark_df

In [21]:
spark_merged = clean_and_encode_spark_df(spark_merged)

# Show the cleaned DataFrame
spark_merged.show(truncate=False)

+-----------------+---+---------+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [22]:
row_count = spark_merged.count()
print(f"Number of rows in spark_merged: {row_count}")

Number of rows in spark_merged: 3795


# Transform

In [23]:
spark_merged.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Day: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- abstract: string (nullable = true)
 |-- authors: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- authors_with_affiliations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- author: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- location: string (nullable = true)
 |    |    |-- university: string (nullable = true)
 |-- citations: double (nullable = true)
 |-- countries: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- doi: string (nullable = true)
 |-- keywords: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- locations: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- publisher: struct (nullable = true)
 |    |-- ISSN: string (nullable = true)
 

## Get publishers

In [24]:
from pyspark.sql import functions as F


publishers_df = spark_merged.select(
    F.col("publisher.ISSN").alias("ISSN"),
    F.col("publisher.name").alias("Name"),
    F.col("publisher.Quartile").alias("Quartile")
).distinct()
publishers_df.show(truncate=False)

+------------------+--------------------------------------------------------------+--------+
|ISSN              |Name                                                          |Quartile|
+------------------+--------------------------------------------------------------+--------+
|19411189          |IEEE Reviews in Biomedical Engineering                        |Q1      |
|00018791, 10959084|Journal of Vocational Behavior                                |Q1      |
|23327790          |IEEE Transactions on Big Data                                 |Q1      |
|21682208          |IEEE Journal of Biomedical and Health Informatics             |Q1      |
|02628856          |Image and Vision Computing                                    |Q1      |
|23327731          |IEEE Transactions on Cognitive Communications and Networking  |Q1      |
|00016918, 18736297|Acta Psychologica                                             |Q1      |
|23299274          |IEEE/CAA Journal of Automatica Sinica             

## Get keywords

In [40]:

keywords_df = spark_merged.select(
    F.explode("keywords").alias("Keyword")
).withColumn(
    "KeywordID", F.monotonically_increasing_id()
).select(
    "KeywordID", "Keyword"
)

keywords_df.show(truncate=False)

+---------+-----------------------------------+
|KeywordID|Keyword                            |
+---------+-----------------------------------+
|0        |Precoding                          |
|1        |Uplink                             |
|2        |Channel estimation                 |
|3        |Wireless communication             |
|4        |Training                           |
|5        |Reconfigurable intelligent surfaces|
|6        |Symbols                            |
|7        |Reinforcement learning             |
|8        |Optimization                       |
|9        |Reflection coefficient             |
|10       |Pipelines                          |
|11       |Feature extraction                 |
|12       |Natural gas                        |
|13       |Leak detection                     |
|14       |Background noise                   |
|15       |Acoustics                          |
|16       |Training                           |
|17       |Vectors                      

## Get topic

In [26]:
topics_df = merged_df.select(
    F.col("topic").alias("Topic")
).distinct().withColumn(
    "TopicID", F.monotonically_increasing_id()
).select(
    "TopicID", "Topic"
)

topics_df.show(truncate=False)

+-------+------------+
|TopicID|Topic       |
+-------+------------+
|0      |Cryptography|
|1      |AI          |
|2      |IoT         |
|3      |Big Data    |
|4      |Blockchain  |
|5      |DevOps      |
+-------+------------+



## Get date

In [27]:
date_df = merged_df.select(
    F.col("Date").alias("PublicationDate"),
    F.col("Day"),
    F.col("Month"),
    F.col("Year")
).distinct().withColumn(
    "DateID", F.monotonically_increasing_id()
).select(
    "DateID", "PublicationDate", "Day", "Month", "Year"
)

date_df.show(truncate=False)

+------+----------------+---+--------+----+
|DateID|PublicationDate |Day|Month   |Year|
+------+----------------+---+--------+----+
|0     |01 July 2024    |1  |July    |2024|
|1     |9 January 2024  |9  |January |2024|
|2     |16 November 2023|16 |November|2023|
|3     |05 November 2023|5  |November|2023|
|4     |6 March 2024    |6  |March   |2024|
|5     |20 July 2021    |20 |July    |2021|
|6     |15 July 2023    |15 |July    |2023|
|7     |13 August 2021  |13 |August  |2021|
|8     |02 November 2023|2  |November|2023|
|9     |8 May 2023      |8  |May     |2023|
|10    |28 November 2016|28 |November|2016|
|11    |27 October 2024 |27 |October |2024|
|12    |8 October 2024  |8  |October |2024|
|13    |15 April 2024   |15 |April   |2024|
|14    |10 August 2024  |10 |August  |2024|
|15    |25 November 2023|25 |November|2023|
|16    |29 May 2019     |29 |May     |2019|
|17    |24 October 2016 |24 |October |2016|
|18    |09 August 2024  |9  |August  |2024|
|19    |21 December 2018|21 |Dec

## Get authors 

In [28]:
authors_df = spark_merged.select(
    F.explode("authors_with_affiliations").alias("author_affiliation")
).select(
    F.col("author_affiliation.author").alias("FullName"),
    F.col("author_affiliation.country").alias("Country"),
    F.col("author_affiliation.university").alias("University")
).withColumn(
    "AuthorID", F.monotonically_increasing_id()
).select(
    "AuthorID", "FullName", "Country", "University"
).distinct()


## Get articles

In [29]:
article_df = spark_merged.select(
    F.col("doi").alias("DOI"),
    F.col("title").alias("Title"),
    F.col("abstract").alias("Abstract"),
    F.col("citations").alias("Citations"),
    F.col("publisher.ISSN").alias("ISSN"),
    F.col("Date").alias("PublicationDate"),
    F.col("website").alias("Website"),
    F.col("topic").alias("Topic") 
).join(
    topics_df, 
    "Topic",  
    "left"
).join(
    date_df,
    "PublicationDate",
    "left"
).join(
    publishers_df,
    "ISSN",
    "left"
).select(
    "DOI", "Title", "Abstract", "Citations", "ISSN", "DateID", "Website", "TopicID"
)


## Get author_article_map

In [30]:
# Explode the authors_with_affiliations column to get individual author entries
author_article_map_df = spark_merged.select(
    F.col("doi").alias("DOI"),
    F.explode("authors_with_affiliations").alias("author_affiliation")
).select(
    "DOI",
    F.col("author_affiliation.author").alias("FullName")
).join(
    authors_df,
    "FullName",
    "inner"
).select(
    "DOI",
    "AuthorID"
)

## Get keyword_article_map

In [42]:
# Explode the keywords column to get individual keyword entries
keyword_article_df = spark_merged.select(
    F.col("doi").alias("DOI"),
    F.explode("keywords").alias("Keyword")
).join(
    keywords_df,
    "Keyword",
    "inner"
).select(
    "DOI",
    "KeywordID"
)



# Loading

In [32]:
print("authors")
authors_df.printSchema()
print("journaux_")
publishers_df.printSchema()
print("articles")
article_df.printSchema()
print("unique_keywords")
keywords_df.printSchema()
print("----Relation---")
print("author_article_mapping")
author_article_map_df.printSchema()
print("keywords_articles_mapping")
keyword_article_df.printSchema()
print("Date")
date_df.printSchema()

authors
root
 |-- AuthorID: long (nullable = false)
 |-- FullName: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- University: string (nullable = true)

journaux_
root
 |-- ISSN: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Quartile: string (nullable = true)

articles
root
 |-- DOI: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Abstract: string (nullable = true)
 |-- Citations: double (nullable = true)
 |-- ISSN: string (nullable = true)
 |-- DateID: long (nullable = true)
 |-- Website: string (nullable = true)
 |-- TopicID: long (nullable = true)

unique_keywords
root
 |-- KeywordID: long (nullable = false)
 |-- Keyword: string (nullable = true)

----Relation---
author_article_mapping
root
 |-- DOI: string (nullable = true)
 |-- AuthorID: long (nullable = false)

keywords_articles_mapping
root
 |-- DOI: string (nullable = true)
 |-- KeywordID: long (nullable = false)

Date
root
 |-- DateID: long (nullable = false)
 |-- Pub

In [50]:
print("unique_keywords")
article_df.printSchema()

unique_keywords
root
 |-- DOI: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Abstract: string (nullable = true)
 |-- Citations: double (nullable = true)
 |-- ISSN: string (nullable = true)
 |-- DateID: long (nullable = true)
 |-- Website: string (nullable = true)
 |-- TopicID: long (nullable = true)



## To pandas

In [47]:
# publishers_df_pd = publishers_df.toPandas()
# authors_df_pd = authors_df.toPandas()
# author_article_map_df_pd = author_article_map_df.toPandas()
# keywords_df_pd = keywords_df.toPandas()
# keywords_articles_mapping_df_pd = keyword_article_df.toPandas()
article_df_pd = article_df.toPandas()
# topics_df_pd = topics_df.toPandas()
# date_df_pd = date_df.toPandas()

## To CSV

In [48]:
# publishers_df_pd.to_csv('Tables/publishers.csv', index=False)
# authors_df_pd.to_csv('Tables/authors.csv', index=False)
# author_article_map_df_pd.to_csv('Tables/author_article_map.csv', index=False)
# keywords_df_pd.to_csv('Tables/keywords.csv', index=False)
# keywords_articles_mapping_df_pd.to_csv('Tables/keywords_articles_mapping.csv', index=False)
article_df_pd.to_csv('Tables/articles.csv', index=False)
# topics_df_pd.to_csv('Tables/topics.csv', index=False)
# date_df_pd.to_csv('Tables/dates.csv', index=False)

## To JSON

In [49]:
# publishers_df_pd.to_json('Tables/publishers.json', orient='records', lines=True)
# authors_df_pd.to_json('Tables/authors.json', orient='records', lines=True)
# author_article_map_df_pd.to_json('Tables/author_article_map.json', orient='records', lines=True)
# keywords_df_pd.to_json('Tables/keywords.json', orient='records', lines=True)
# keywords_articles_mapping_df_pd.to_json('Tables/keywords_articles_mapping.json', orient='records', lines=True)
article_df_pd.to_json('Tables/articles.json', orient='records', lines=True)
# topics_df_pd.to_json('Tables/topics.json', orient='records', lines=True)
# date_df_pd.to_json('Tables/dates.json', orient='records', lines=True)