In [145]:
import os
from pyspark.sql import SparkSession

In [146]:
# Create an entry point to the PySpark Application
spark = SparkSession.builder \
      .master("local") \
      .appName("Assignment3") \
      .getOrCreate()


In [147]:
path_of_the_directory= 'C:\\Users\\matte\\Desktop\\3-progetto\\dataset'

dataset = {}

for filename in os.listdir(path_of_the_directory):
    f = os.path.join(path_of_the_directory,filename)
    if os.path.isfile(f):

        # Load a DataFrame
        df = spark.read.option("header", True).option("delimiter", "|").option("inferSchema",True).csv(f)
        dataset[filename.split(".")[0]]=df;
        

In [148]:
for key,df in dataset.items():
    print("NAME OF THE KEY:" + str(key))
    # Print detected 
    df.printSchema()
    df.show(2)
    print("/------------------------------------------------------------/\n")

NAME OF THE KEY:authors
root
 |-- author_name: string (nullable = true)
 |-- orcid: string (nullable = true)
 |-- month_of_birth: integer (nullable = true)
 |-- year_of_birth: integer (nullable = true)
 |-- mail: string (nullable = true)

+--------------+-------------------+--------------+-------------+----------------+
|   author_name|              orcid|month_of_birth|year_of_birth|            mail|
+--------------+-------------------+--------------+-------------+----------------+
| Gilles Guette|9404-5450-4767-7840|             6|         1985|iwAaxsL@duck.com|
|Renaud Pacalet|2886-4313-9155-9063|            12|         1952|  mAgIX@duck.com|
+--------------+-------------------+--------------+-------------+----------------+
only showing top 2 rows

/------------------------------------------------------------/

NAME OF THE KEY:citations
root
 |-- document: string (nullable = true)
 |-- cite: string (nullable = true)

+--------------------+--------------------+
|            document|

In [149]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType, ArrayType
from pyspark.sql.functions import col, count, avg, countDistinct


Load the top-10 institution's country which has written at least 10 pubblication in the 90' and in order 

of the average number of pages of all its publications. 

In [150]:
dataset['publications'].join(dataset['write_relationship'], 
                            dataset['publications'].id == dataset['write_relationship'].pub_id, 
                            'inner') \
                        .drop('author_order', 'isbn', 'publisher', 'doc_type') \
                        .join(dataset['authors'], 
                            dataset['write_relationship'].author_name == dataset['authors'].author_name,
                            'inner') \
                        .drop('orcid', 'month_of_birth', 'year_of_birth', 'mail') \
                        .join(dataset['work_relationship'], 
                            dataset['authors'].author_name == dataset['work_relationship'].author_name, 
                            'inner') \
                        .join(dataset['institutions'], 
                            dataset['work_relationship'].university == dataset['institutions'].institution,
                            'inner') \
                        .drop('world_rank', 'institution', 'national_rank') \
                        .filter( (dataset['publications'].year >= '1990') & (dataset['publications'].year < '2000') ) \
                        .groupBy('country') \
                        .agg(avg('pages').alias('average number of pages'),
                            countDistinct('id').alias('different publication')) \
                        .filter( col('different publication') >= 10) \
                        .sort(col('average number of pages').desc()) \
                        .limit(10) \
                        .show()

+--------------+-----------------------+---------------------+
|       country|average number of pages|different publication|
+--------------+-----------------------+---------------------+
|        Canada|     63.275862068965516|                   29|
|         Japan|      62.24193548387097|                   59|
|        Sweden|     60.714285714285715|                   13|
|   South Korea|     58.888888888888886|                   26|
|         Spain|     47.142857142857146|                   19|
|        France|                 44.375|                   29|
|       Germany|      42.81666666666667|                   57|
|           USA|     31.541666666666668|                  218|
|United Kingdom|     30.097560975609756|                   77|
|         Italy|     29.775510204081634|                   46|
+--------------+-----------------------+---------------------+



Load 3 authors who have written a book when they were 50 years old

In [151]:
dataset['publications'].join(dataset['write_relationship'], 
                            dataset['publications'].id == dataset['write_relationship'].pub_id, 
                            'left') \
                        .drop('title', 'id', 'pages', 'author_order', 'isbn', 'publisher') \
                        .join(dataset['authors'], 
                            ['author_name']) \
                        .drop('orcid', 'month_of_birth', 'mail', 'pub_id') \
                        .withColumn('years difference', (col('year') - col('year_of_birth'))) \
                        .filter((col('years difference') == 50) & (col('doc_type') == 'book')) \
                        .limit(3) \
                        .show()

+---------------+----+--------+-------------+----------------+
|    author_name|year|doc_type|year_of_birth|years difference|
+---------------+----+--------+-------------+----------------+
|Roman Dementiev|2007|    book|         1957|              50|
|   Oscar Cordón|2020|    book|         1970|              50|
|   Philip S. Yu|2019|    book|         1969|              50|
+---------------+----+--------+-------------+----------------+



Removing a row

In [152]:
# Selected row
print("Before the remove\n")
dataset["publications"].filter(dataset["publications"].id == "https://d-nb.info/960448683").show()
 
# Removing the row
dataset["publications"] = dataset["publications"].where(dataset["publications"].id != "https://d-nb.info/960448683")

# After removing the row
print("\nAfter the remove\n")
dataset["publications"].filter(dataset["publications"].id == "https://d-nb.info/960448683").limit(3).show()



Before the remove

+--------------------+--------------------+----+-----+-----------------+---------+--------+
|                  id|               title|year|pages|             isbn|publisher|doc_type|
+--------------------+--------------------+----+-----+-----------------+---------+--------+
|https://d-nb.info...|Zwischen Organism...|2001|  239|978-3-8244-4433-5|      DUV|    book|
+--------------------+--------------------+----+-----+-----------------+---------+--------+


After the remove

+---+-----+----+-----+----+---------+--------+
| id|title|year|pages|isbn|publisher|doc_type|
+---+-----+----+-----+----+---------+--------+
+---+-----+----+-----+----+---------+--------+

