In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as f 

In [2]:
spark = SparkSession.builder \
    .master("local") \
    .appName("SMBUD_project") \
    .getOrCreate()

In [3]:
# Define article custom schema
schemaArticle = StructType([
	StructField('_id', StringType(), True),
	StructField('title', StringType(), True),
	StructField('authors',
		ArrayType(
		StructType([
			 StructField('idAuth', StringType(), True),
			 StructField('org', StringType(), True)
		]), True)
	),
	StructField('n_citation', IntegerType(), True), 
	StructField('abstract', StringType(), True), 
	StructField('doi', StringType(), True),
	StructField('keywords', ArrayType(StringType()), True),
	StructField('isbn', StringType(), True),
	StructField('page_start', StringType(), True),
	StructField('page_end', StringType(), True),
	StructField('year', IntegerType(), True),
	StructField('fos', ArrayType(StringType()), True),
	StructField('references', ArrayType(StringType()), True),
	StructField('venue',
		StructType([
			 StructField('raw', StringType(), True),
			 StructField('type', IntegerType(), True),
			 StructField('issue', StringType(), True),
			 StructField('volume', StringType(), True),
			 StructField('publisher', StringType(), True)
		])
	),
])

In [4]:
#we decided to use import from schema to explicitly show data structure
df_articles = spark.read.schema(schemaArticle).json("./dblp_sample_filtered_spark.json", multiLine=True)

df_articles = df_articles.withColumn('address', f.when(f.col('venue.raw') == 'ESA', 'ESA_conference').otherwise(f.col('venue.raw')))
df_articles = df_articles.withColumn("venue", f.col("venue").dropFields("raw"))
df_articles = df_articles.withColumn("venue", f.struct("venue.*", f.col("address").alias("raw"))) 
df_articles = df_articles.drop("address")

In [5]:
#issue, volume and publisher attributes inside venue are moved back in the root structure and removed from the inner struct
df_articles = df_articles.withColumn("issue", f.col("venue.issue")) \
						.withColumn("volume", f.col("venue.volume")) \
						.withColumn("publisher", f.col("venue.publisher")) \
						.withColumn("venue", f.col("venue").dropFields("issue", "volume", "publisher"))


In [6]:
#VENUES COLLECTION
#A new dataframe is created with attributes of venue and the _id of the article
#then it is all grouped by venue attributes and a list of the articles id for each venue is created
#finally we drop rows with null raw to delete inconsistent tuple
df_venues = df_articles.select("venue.raw", "venue.type", "_id") \
						.groupBy("raw", "type") \
						.agg(f.collect_list("_id").alias("artIds")) \
						.dropna(subset=["raw"])

In [7]:
#now we can keep only the raw attribute of the venue
df_articles = df_articles.withColumn("venue_raw", f.col("venue.raw")).drop("venue")

In [8]:
#we now add a generated field inside venues collection
#for each venue a random city is selected that should represent the place where the venue was held
citiesList = ["New York", "London", "Paris", "Berlin", "Madrid", "Rome", "Dublin", "Copenhagen", "Vienna", "Amsterdam", "Brussels", "Lisbon", "Prague", "Athens", "Budapest", "Warsaw", "Zurich", "Luxembourg", "Oslo", "Stockholm", "Helsinki", "Moscow", "Istanbul", "Kiev", "Minsk", "Belgrade", "Bucharest", "Sofia", "Tallinn", "Riga", "Vilnius", "Tbilisi", "Yerevan", "Baku", "Dubai", "Abu Dhabi", "Doha", "Manama", "Muscat", "Riyadh", "Jeddah", "Mecca", "Medina", "Kuala Lumpur", "Singapore", "Hong Kong", "Shanghai", "Beijing", "Tokyo", "Seoul", "Bangkok", "Manila"]
cities = f.array([f.lit(city) for city in citiesList])
df_venues = df_venues.withColumn("city", cities[(f.rand() * len(citiesList)).cast("int")])

In [9]:
# Create the schema for the DataFrame of Authors
schemaAuthors = StructType([
    StructField("_id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("nationality", StringType(), True),
    StructField("articles", ArrayType(StringType(), True), True),
    StructField("bio", StringType(), True),
    StructField("email", StringType(), True),
    StructField("orcid", StringType(), True),
    StructField("dob", TimestampType(), True)
])

In [10]:
#AUTHORS COLLECTION
#We simply import from json with specified schema and the conversion from string to timestamp is applied
df_authors = spark.read.schema(schemaAuthors).json("./dblp_sample_reverted_filtered_spark.json", multiLine=True)
df_authors = df_authors.withColumn("dateofbirth", f.to_timestamp(df_authors["dob"], "yyyy-MM-dd'T'HH:mm:ss'Z'")) \
						.drop("dob") \
						.withColumnRenamed("dateofbirth", "dob")

In [11]:
df_articles.printSchema()
df_authors.printSchema()
df_venues.printSchema()
df_venues.show(10)

root
 |-- _id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- idAuth: string (nullable = true)
 |    |    |-- org: string (nullable = true)
 |-- n_citation: integer (nullable = true)
 |-- abstract: string (nullable = true)
 |-- doi: string (nullable = true)
 |-- keywords: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- isbn: string (nullable = true)
 |-- page_start: string (nullable = true)
 |-- page_end: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- fos: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- references: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- issue: string (nullable = true)
 |-- volume: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- venue_raw: string (nullable = true)

root
 |-- _id: string (nullable = true)
 |-- name: stri

# Data creation

##### 1 - Insert new author Emanuele Della Valle

In [12]:
from pyspark.sql import Row
from datetime import datetime
import pyspark.sql.functions as f
from pyspark.sql.types import *

# Create a new Row object with the values for the new author
new_author = Row(
    _id="638db170ae9ea0d19fad7a79",              #??????????????????????????????????????
    name="Emanuele Delle Valle ",
    nationality="it",
    # Set values for any other required columns
    articles=[],
    bio="Emanuele Della Valle holds a PhD in Computer Science from the \
        Vrije Universiteit Amsterdam and a Master degree in Computer Science\
        and Engineering from Politecnico di Milano. He is associate professor\
        at the Department of Electronics, Information and Bioengineering of\
        the Politecnico di Milano.",
    email="emanuele.dellavalle@gmail.com ",
    orcid="0000-0002-5176 -5885",
    dob= datetime.strptime("March 7, 1975", "%B %d, %Y")  # Create a datetime object for the author's date of birth
)

# Add the new row to the DataFrame
df_authors = df_authors.union(spark.createDataFrame([new_author], schema = schemaAuthors))

In [13]:
df_authors.filter(f.col("_id") == "638db170ae9ea0d19fad7a79").show()

+--------------------+--------------------+-----------+--------+--------------------+--------------------+--------------------+-------------------+
|                 _id|                name|nationality|articles|                 bio|               email|               orcid|                dob|
+--------------------+--------------------+-----------+--------+--------------------+--------------------+--------------------+-------------------+
|638db170ae9ea0d19...|Emanuele Delle Va...|         it|      []|Emanuele Della Va...|emanuele.dellaval...|0000-0002-5176 -5885|1975-03-07 00:00:00|
+--------------------+--------------------+-----------+--------+--------------------+--------------------+--------------------+-------------------+



##### 2 - Insert new publication

In [14]:
new_authors =  [Row("638db170ae9ea0d19fad7a79", "Politecnico di Milano"), Row("638db170ae9ea0d19fad7a7a", "Politecnico di Milano")] #????????????????

new_article = Row(
    _id="638db237d794b76f45c77916",
    title="An extensive study of C-SMOTE, a Continuous Synthetic Minority Oversampling Technique for Evolving Data Streams",
    authors=new_authors,
    n_citation=3,
    abstract = "Streaming Machine Learning (SML) studies algorithms that update their models,\
        given an unbounded and often non-stationary flow of data performing a single pass. Online \
        class imbalance learning is a branch of SML that combines the challenges of both class imbalance\
        and concept drift. In this paper, we investigate the binary classification problem by rebalancing\
        an imbalanced stream of data in the presence of concept drift, accessing one sample at a time.",
    doi="10.1016/j.eswa.2022.116630",
    keywords=["Evolving Data Stream","Streaming","Concept drift","Balancing"],
    isbn="123-4-567-89012-3",
    page_start="39",
    page_end="46",
    year=2022,
    fos=["Computer Science","Stream Reasoning","Big Data"],
    references=["53e99fe4b7602d97028bf743","53e99fddb7602d97028bc085"],
    issue="1",
    volume="196",
    publisher="Elsevier",
    venue_raw="ESA"
)

# Add the new row to the DataFrame
df_articles = df_articles.union(spark.createDataFrame([new_article]))

In [15]:
df_articles.filter(f.col("_id") == "638db237d794b76f45c77916").show()

+--------------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+-----------------+----------+--------+----+--------------------+--------------------+-----+------+---------+---------+
|                 _id|               title|             authors|n_citation|            abstract|                 doi|            keywords|             isbn|page_start|page_end|year|                 fos|          references|issue|volume|publisher|venue_raw|
+--------------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+-----------------+----------+--------+----+--------------------+--------------------+-----+------+---------+---------+
|638db237d794b76f4...|An extensive stud...|[{638db170ae9ea0d...|         3|Streaming Machine...|10.1016/j.eswa.20...|[Evolving Data St...|123-4-567-89012-3|        39|      46|2022|[Computer Science...|[53e99fe4b7602d97...|    1|

##### 3 - Insert new venue "ESA", assuming it is not present in the db yet

In [16]:
new_venue = Row(                  
    raw="ESA", 
    type=1,
    artIds=["638db237d794b76f45c77916"],
    city="Montreal"
)

# Add the new row to the DataFrame
df_venues = df_venues.union(spark.createDataFrame([new_venue]))

In [17]:
df_venues.filter(f.col("raw") == "ESA").show()

+---+----+--------------------+--------+
|raw|type|              artIds|    city|
+---+----+--------------------+--------+
|ESA|   1|[638db237d794b76f...|Montreal|
+---+----+--------------------+--------+



##### 4 - Adding the new article to both authors

In [18]:
#adding the new article to the new author

df_authors = df_authors.withColumn(
    "articles",
    f.when(f.col("_id") == "638db170ae9ea0d19fad7a79",
        f.array_union(df_authors.articles, f.array(f.lit("638db237d794b76f45c77916"))))\
    .when(f.col("_id") == "638db170ae9ea0d19fad7a7a",
        f.array_union(df_authors.articles, f.array(f.lit("638db237d794b76f45c77916"))))
    .otherwise(f.col("articles"))
)

In [19]:
df_authors.filter(f.col("_id") == "638db170ae9ea0d19fad7a79").show() #checking only Emanuele della Valle since the other author hasn't been inserted

+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+-------------------+
|                 _id|                name|nationality|            articles|                 bio|               email|               orcid|                dob|
+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+-------------------+
|638db170ae9ea0d19...|Emanuele Delle Va...|         it|[638db237d794b76f...|Emanuele Della Va...|emanuele.dellaval...|0000-0002-5176 -5885|1975-03-07 00:00:00|
+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+-------------------+



##### 5 - Incrementing n_citations by 1 of cited articles

In [20]:
#checking previous n_citation
df_articles.filter(f.col("_id") == "53e99fe4b7602d97028bf743").select("_id","n_citation").show()
df_articles.filter(f.col("_id") == "53e99fddb7602d97028bc085").select("_id","n_citation").show()

+--------------------+----------+
|                 _id|n_citation|
+--------------------+----------+
|53e99fe4b7602d970...|        12|
+--------------------+----------+

+--------------------+----------+
|                 _id|n_citation|
+--------------------+----------+
|53e99fddb7602d970...|         2|
+--------------------+----------+



In [21]:
#increment number of citations
df_articles = df_articles.withColumn(
    "n_citation",
    f.when(f.col("_id") == "53e99fe4b7602d97028bf743",
       df_articles.n_citation+1) \
    .when(f.col("_id") == "53e99fddb7602d97028bc085",
       df_articles.n_citation+1)   
    .otherwise(f.col("n_citation"))
)

In [22]:
#checking updated n_citation
df_articles.filter(f.col("_id") == "53e99fe4b7602d97028bf743").select("_id", "n_citation").show()
df_articles.filter(f.col("_id") == "53e99fddb7602d97028bc085").select("_id", "n_citation").show()

+--------------------+----------+
|                 _id|n_citation|
+--------------------+----------+
|53e99fe4b7602d970...|        13|
+--------------------+----------+

+--------------------+----------+
|                 _id|n_citation|
+--------------------+----------+
|53e99fddb7602d970...|         3|
+--------------------+----------+



# QUERIES

In [23]:
#WHERE+JOIN - QUERY 1
#Print the type of the venue of an article with a specific title
df_articles.join(df_venues, df_articles.venue_raw == df_venues.raw, "inner")\
           .filter(f.col("title") == "Locality Sensitive Outlier Detection: A ranking driven approach").select("title", "raw", "type").show()

+--------------------+----+----+
|               title| raw|type|
+--------------------+----+----+
|Locality Sensitiv...|ICDE|   0|
+--------------------+----+----+



In [24]:
#WHERE+LIMIT+LIKE - QUERY 2
#Articles whose title string contains "Machine Learning" - limit 3
df_articles.filter(f.col("title").like("%Machine Learning%")).limit(3).show()

+--------------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+-----------------+----------+--------+----+--------------------+----------+-----+------+---------+----------------+
|                 _id|               title|             authors|n_citation|            abstract|                 doi|            keywords|             isbn|page_start|page_end|year|                 fos|references|issue|volume|publisher|       venue_raw|
+--------------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+-----------------+----------+--------+----+--------------------+----------+-----+------+---------+----------------+
|53e99fd6b7602d970...|Editorial: The Te...|[{53f48cc4dabfaea...|        11|                null|10.1023/A:1022840...|  [machine learning]|978-1-5904-9884-3|       141|     144|1986|[Terminology, Com...|        []|    2|     1|     null|Ma

In [33]:
#WHERE+IN+NESTED_QUERY - QUERY 3
#Find authors that has the same nationality of at least one of the authors of "Locality Sensitive Outlier Detection: A ranking driven approach" article

#Create the list of nationalities of the article's authors
nationalities_list = df_articles.filter(f.col("title") == "Locality Sensitive Outlier Detection: A ranking driven approach")\
                            .select(f.explode(df_articles.authors.idAuth).alias("idAuth"))\
                            .join(df_authors, on=f.col("idAuth") == df_authors._id)\
                            .select("nationality")\
                            .agg(f.collect_set("nationality")).collect()[0][0]
#find all the authors with the same nationalities of the authors of the initial article 
df_authors.filter(f.col("nationality")\
          .isin(nationalities_list)).limit(10).show()



+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+-----+-------------------+
|                 _id|                name|nationality|            articles|                 bio|               email|orcid|                dob|
+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+-----+-------------------+
|542a4c9fdabfae61d...|             Ye Wang|         dk|[53e99f86b7602d97...|Marcus J. Nadenau...|   Ye.Wang@gmail.com| null|1961-07-02 01:00:00|
|53f48bc5dabfaea7c...|Srinivasan Partha...|         jp|[53e99f86b7602d97...|Marian Codreanu (...|Srinivasan.Partha...| null|1969-06-06 01:00:00|
|53f44b6fdabfaec09...|   Shirish Tatikonda|         gr|[53e99f86b7602d97...|Dimitris Papadias...|Shirish.Tatikonda...| null|1950-01-10 01:00:00|
|53f7f79fdabfae90e...|      Moshe Zukerman|         jp|[53e99f86b7602d97...|Shugong Xu [SM] (...|Moshe.Zukerman@gm...| null|1984-0

In [35]:
#GROUP_BY+JOIN+AS - QUERY 4
#Print the 3 most frequent keywords of articles written by italian authors
df_italian = df_authors.filter(f.col("nationality") == "it")\
                       .select(f.explode("articles")).withColumnRenamed("col","articles")
df_italian = df_italian.groupby("articles").count() #Dummy count

df_keywords = df_italian.join(df_articles, df_italian.articles == df_articles._id, "inner")\
                        .select("articles", f.explode("keywords")).withColumnRenamed("col","keywords")\
                        .groupby("keywords")\
                        .agg(f.count("keywords").alias("n_occurences"))\
                        .sort("n_occurences", ascending=False)\
                        .limit(3).show()

+----------------+------------+
|        keywords|n_occurences|
+----------------+------------+
|     data mining|          27|
|computer science|          22|
|        internet|          17|
+----------------+------------+



In [27]:
#WHERE+GROUP_BY - QUERY 5
#Print the cities with more than 65 venues
df_venues \
    .groupby("city")\
    .count()\
    .filter(f.col("count") > 65)\
    .sort("count", ascending=False).show()

+----------+-----+
|      city|count|
+----------+-----+
|     Seoul|   73|
|     Tokyo|   71|
|    Berlin|   69|
|Luxembourg|   68|
|  Belgrade|   67|
|    Jeddah|   66|
|  Shanghai|   66|
+----------+-----+



In [28]:
#Query 6 GROUP BY +  HAVING + AS
#find the field of studies that appers more than 15 times

df_articles\
    .select("_id", "title", f.explode("fos")).withColumnRenamed("col", "fos")\
    .groupby("fos")\
    .agg(f.count("fos").alias("n_occurence"))\
    .filter(f.col("n_occurence") > 15).show()

+--------------------+-----------+
|                 fos|n_occurence|
+--------------------+-----------+
|         Computation|        113|
|            Test set|         16|
| Operations research|         61|
|         Game theory|         32|
|       Mixture model|         24|
|Bandwidth (signal...|         58|
|          Annotation|         31|
|Load balancing (c...|         29|
|         Source code|         42|
|     Word error rate|         16|
|     Cloud computing|         57|
|        Broadcasting|         25|
|Knowledge management|        203|
|           Test case|         17|
|          Web server|         16|
|    Nonlinear system|         72|
|User experience d...|         18|
|       Interpolation|         26|
|            Data Web|         21|
|Entropy (informat...|         27|
+--------------------+-----------+
only showing top 20 rows



In [29]:
#QUERY7 WHERE + GROUP BY + HAVING + AS
#Find all the volumes with at least 5 articles in this dataset published after 2000
df_articles\
    .filter(f.col("year") > 2000)\
    .groupby("venue_raw", "volume")\
    .agg(f.count("volume").alias("num_articles"))\
    .filter(f.col("num_articles") > 4)\
    .show()

+--------------------+------+------------+
|           venue_raw|volume|num_articles|
+--------------------+------+------------+
|Applied Mathemati...|   218|           5|
| Pattern Recognition|    45|           5|
|  Expert Syst. Appl.|    39|           5|
|Applied Mathemati...|   217|           5|
|  IEICE Transactions|  97-A|           5|
|  Expert Syst. Appl.|    37|           5|
+--------------------+------+------------+



In [30]:
#QUERY 10 WHERE, GROUP BY, HAVING, 2 JOINS
#Find all the authors that published on more than 2 Journals 

df_exploded_authors = df_authors.alias("auth")\
                        .select("auth._id","auth.name", f.explode("auth.articles").alias("article"))\
                        .join(df_articles.alias("art"), on=f.col("article") == df_articles._id)\
                        .select("auth._id","auth.name","art._id","art.venue_raw")\
                        .join(df_venues.alias("ven"), on=f.col("venue_raw") == df_venues.raw)\
                        .filter(f.col("type") == 1)\
                        .groupBy("auth._id")\
                        .agg(f.first("name").alias("name"),f.countDistinct("raw").alias("venue_count"),f.concat_ws(" - ",f.collect_set("raw")).alias("venues_list"))\
                        .filter(f.col("venue_count") > 2)\
                        .orderBy("venue_count", ascending=False).show(3,truncate=False)


+------------------------+-------------+-----------+-------------------------------------------------------------------------------------+
|_id                     |name         |venue_count|venues_list                                                                          |
+------------------------+-------------+-----------+-------------------------------------------------------------------------------------+
|54055740dabfae44f0803fbb|Naohiro Ishii|3          |Las Vegas, NV - Honolulu, HI - International Journal on Artificial Intelligence Tools|
+------------------------+-------------+-----------+-------------------------------------------------------------------------------------+

