# Queries on Ontology #2

## Setup

In [1]:
# initialize findspark
import findspark
findspark.init()

In [2]:
# import pyspark
import pyspark
from pyspark.sql import SparkSession
pyspark.__version__

'3.5.4'

In [3]:
# Create a spark session

spark = (
    SparkSession.builder.master("local[1]")
    .appName("Analyzing Movielens Data")
    .getOrCreate()
)

In [4]:
from pyspark.sql.types import *

In [5]:
# import sparkSQL functions
import pyspark.sql.functions as F

## Define Schema and Load Data

In [6]:
data_path = "./data/"

### Schema: songs

In [7]:
# load artistIDs, genreIDs, awardIDs as Strings and the convert into arrays later
# this is because the CSV format does not support arrays
schema_songs = StructType(
    [
        StructField("id", StringType(), False),
        StructField("title", StringType(), False),
        StructField("duration", IntegerType(), True),
        StructField("releaseDate", StringType(), True),
        StructField("artistIDs", StringType(), True),
        StructField("albumID", StringType(), True),
        StructField("genreIDs", StringType(), True),
        StructField("awardIDs", StringType(), True),
    ]
)

### Load Data: songs

In [8]:
songs = (
    spark.read.format("csv")
    .option("encoding", "UTF-8")
    .option("header", True)
    .option("sep", ",")
    .option("escape", '"')
    .schema(schema_songs)
    .load(data_path+"songs.csv")
)

In [9]:
# before: artistIDs, genreIDs, awardIDs are strings
songs.show()

+-------+--------------------+--------+-----------+--------------------+--------+--------------------+------------+
|     id|               title|duration|releaseDate|           artistIDs| albumID|            genreIDs|    awardIDs|
+-------+--------------------+--------+-----------+--------------------+--------+--------------------+------------+
| song_0|Re-engineered ana...|     402| 1983-10-03|['artist_97', 'ar...|album_94|['genre_3', 'genr...|          []|
| song_1|Extended user-fac...|     405| 1994-11-25|['artist_19', 'ar...|album_93|         ['genre_4']|          []|
| song_2|Realigned dedicat...|     419| 2000-06-25|       ['artist_38']|    NULL|         ['genre_9']|          []|
| song_3|Versatile object-...|     391| 1999-06-09|['artist_36', 'ar...|album_80|         ['genre_9']|          []|
| song_4|Object-based stab...|     404| 2022-08-26|       ['artist_78']|album_35|['genre_8', 'genr...|          []|
| song_5|Quality-focused t...|     146| 1999-02-04|['artist_30', 'ar...|

In [10]:
# convert artistIDs, genreIDs, awardIDs into arrays
# once you remove the surrounding double quotes, which was addressed by specifying " as the escape char during file load
# the Array encoded as string is a fully qualified json object, so we can use from_json() to hack this. 
# smart eh?

songs = songs.withColumn("artistIDs", F.from_json("artistIDs", ArrayType(StringType()))) \
.withColumn("genreIDs", F.from_json("genreIDs", ArrayType(StringType()))) \
.withColumn("awardIDs", F.from_json("awardIDs", ArrayType(StringType())))

In [11]:
# after: artistIDs, genreIDs, awardIDs are arrays.
# Molto Bene!
songs.show()

+-------+--------------------+--------+-----------+--------------------+--------+-------------------+----------+
|     id|               title|duration|releaseDate|           artistIDs| albumID|           genreIDs|  awardIDs|
+-------+--------------------+--------+-----------+--------------------+--------+-------------------+----------+
| song_0|Re-engineered ana...|     402| 1983-10-03|[artist_97, artis...|album_94| [genre_3, genre_4]|        []|
| song_1|Extended user-fac...|     405| 1994-11-25|[artist_19, artis...|album_93|          [genre_4]|        []|
| song_2|Realigned dedicat...|     419| 2000-06-25|         [artist_38]|    NULL|          [genre_9]|        []|
| song_3|Versatile object-...|     391| 1999-06-09|[artist_36, artis...|album_80|          [genre_9]|        []|
| song_4|Object-based stab...|     404| 2022-08-26|         [artist_78]|album_35| [genre_8, genre_9]|        []|
| song_5|Quality-focused t...|     146| 1999-02-04|[artist_30, artis...|album_30|         [genre

### Schema: artists

In [12]:
#
schema_artists = StructType(
    [
        StructField("id", StringType(), False),
        StructField("name", StringType(), False),
        StructField("birthDate", StringType(), True),
        StructField("nationality", StringType(), True),
        StructField("labelID", StringType(), True)
    ]
)

### Load Data: artists

In [13]:
artists = (
    spark.read.format("csv")
    .option("encoding", "UTF-8")
    .option("header", True)
    .option("sep", ",")
    .option("escape", '"')
    .schema(schema_artists)
    .load(data_path+"artists.csv")
)

### Schema: albums

In [14]:
#
schema_albums = StructType(
    [
        StructField("id", StringType(), False),
        StructField("title", StringType(), False),
        StructField("releaseYear", IntegerType(), True),
        StructField("genreIDs", StringType(), True)
    ]
)

### Load Data: albums

In [15]:
albums = (
    spark.read.format("csv")
    .option("encoding", "UTF-8")
    .option("header", True)
    .option("sep", ",")
    .option("escape", '"')
    .schema(schema_albums)
    .load(data_path+"albums.csv")
)

### Schema: labels

In [16]:
#
schema_labels = StructType(
    [
        StructField("id", StringType(), False),
        StructField("labelName", StringType(), False),
        StructField("location", StringType(), True)
    ]
)

### Load Data: labels

In [17]:
labels = (
    spark.read.format("csv")
    .option("encoding", "UTF-8")
    .option("header", True)
    .option("sep", ",")
    .option("escape", '"')
    .schema(schema_labels)
    .load(data_path+"labels.csv")
)

### Schema: genres

In [18]:
#
schema_genres = StructType(
    [
        StructField("id", StringType(), False),
        StructField("name", StringType(), False),
        StructField("description", StringType(), True)
    ]
)

### Load Data: genres

In [19]:
genres = (
    spark.read.format("csv")
    .option("encoding", "UTF-8")
    .option("header", True)
    .option("sep", ",")
    .option("escape", '"')
    .schema(schema_genres)
    .load(data_path+"genres.csv")
)

### Schema: awards

In [20]:
#
schema_awards = StructType(
    [
        StructField("id", StringType(), False),
        StructField("awardName", StringType(), False),
        StructField("year", IntegerType(), True),
        StructField("awardingBody", StringType(), True)
    ]
)

### Load Data: awards

In [21]:
awards = (
    spark.read.format("csv")
    .option("encoding", "UTF-8")
    .option("header", True)
    .option("sep", ",")
    .option("escape", '"')
    .schema(schema_awards)
    .load(data_path+"awards.csv")
)

## Asking Questions

Here are a few questions we can now as from this ontology:
1. Find songs by genre
1. List artists by label location
1. Filter albums by release year
1. Discover songs with awards in certain years
1. Check which artists perform a given song
1. Find songs on a given album
1. List songs by a given artist
1. Aggregate how many songs each label’s artists have performed
1. Calculate average song duration by genre
1. Find artists without a label

In [23]:
# # clear cache
spark.catalog.clearCache()
# # stop spark
spark.stop()