In [1]:
from wc_simd.utility import spark_path

In [2]:
from pyspark.sql import SparkSession


spark = SparkSession.builder \
    .appName("test_pyspark") \
    .config("spark.driver.memory", "12g") \
    .config("spark.executor.memory", "12g") \
    .config("spark.sql.orc.enableVectorizedReader", "false") \
    .config("spark.sql.parquet.columnarReaderBatchSize", "1024") \
    .config("spark.sql.orc.columnarReaderBatchSize", "1024") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/25 12:02:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
#spark.stop()

# Read and Partition Works JSON

In [3]:
df = spark.read.json(spark_path("../data/imports/works.json.gz"))

                                                                                

In [4]:
# Assume df is your existing PySpark DataFrame
n = 16  # Number of partitions you want

# Partition the DataFrame
df_repartitioned = df.repartition(n)

df_repartitioned.write.parquet(
    spark_path("../data/imports/works.parquet"),
    mode="overwrite")

25/04/24 16:00:47 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [3]:
df_repartitioned = spark.read.parquet(spark_path("../data/imports/works.parquet"))


In [4]:
def count_in_partition(index, iterator):
    yield (index, sum(1 for _ in iterator))


partition_counts = df_repartitioned.rdd.mapPartitionsWithIndex(
    count_in_partition).collect()

for index, count in partition_counts:
    print(f"Partition {index}: {count} rows")



Partition 0: 72966 rows
Partition 1: 72966 rows
Partition 2: 72965 rows
Partition 3: 72966 rows
Partition 4: 72965 rows
Partition 5: 72965 rows
Partition 6: 72966 rows
Partition 7: 72965 rows
Partition 8: 72965 rows
Partition 9: 72965 rows
Partition 10: 72965 rows
Partition 11: 72965 rows
Partition 12: 72965 rows
Partition 13: 72965 rows
Partition 14: 72965 rows
Partition 15: 72965 rows


                                                                                

In [10]:
df_repartitioned.printSchema()

root
 |-- alternativeTitles: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- availabilities: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- label: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- contributors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- agent: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- identifiers: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- identifierType: struct (nullable = true)
 |    |    |    |    |    |    |-- id: string (nullable = true)
 |    |    |    |    |    |    |-- label: string (nullable = true)
 |    |    |    |    |    |    |-- type: string (nullable = true)
 |    |    |    |    |    |-- type: string (nullable = true)
 |    |    |    |    |    |-- val

In [11]:
df_repartitioned.show()

                                                                                

+--------------------+--------------------+--------------------+-----------+----------------+--------------------+-----------+--------+-------+---------------+--------------------+--------------------+--------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+----+--------------------+
|   alternativeTitles|      availabilities|        contributors|createdDate|currentFrequency|         description|designation|duration|edition|formerFrequency|              genres|            holdings|      id|         identifiers|             images|               items|           languages|           lettering|               notes|              partOf|parts| physicalDescription|          precededBy|          production|referenceNumber|       

# Save Works table

In [5]:
df_repartitioned.write.saveAsTable("works")

25/04/25 11:42:04 WARN HiveConf: HiveConf of name hive.repl.dump.metadata.only.for.external.table does not exist
25/04/25 11:42:04 WARN HiveConf: HiveConf of name hive.druid.rollup does not exist
25/04/25 11:42:04 WARN HiveConf: HiveConf of name hive.repl.retain.prev.dump.dir does not exist
25/04/25 11:42:04 WARN HiveConf: HiveConf of name hive.log.explain.output.to.console does not exist
25/04/25 11:42:04 WARN HiveConf: HiveConf of name hive.server2.wm.delayed.move.timeout does not exist
25/04/25 11:42:04 WARN HiveConf: HiveConf of name hive.privilege.synchronizer does not exist
25/04/25 11:42:04 WARN HiveConf: HiveConf of name hive.metastore.wm.default.pool.size does not exist
25/04/25 11:42:04 WARN HiveConf: HiveConf of name hive.llap.task.scheduler.preempt.independent does not exist
25/04/25 11:42:04 WARN HiveConf: HiveConf of name hive.llap.io.cvb.memory.consumption. does not exist
25/04/25 11:42:04 WARN HiveConf: HiveConf of name hive.llap.output.format.arrow does not exist
25/04

# Test works table with SQL

In [6]:
spark.sql("SELECT * FROM works").show()

+--------------------+--------------------+--------------------+-----------+----------------+--------------------+-----------+--------+--------------------+---------------+--------------------+--------------------+--------+--------------------+-------------------+--------------------+--------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------+--------------------+
|   alternativeTitles|      availabilities|        contributors|createdDate|currentFrequency|         description|designation|duration|             edition|formerFrequency|              genres|            holdings|      id|         identifiers|             images|               items|           languages|lettering|               notes|              partOf|               parts| physicalDescription|          precededBy|  

In [7]:
spark.sql("SELECT col.* FROM (SELECT EXPLODE(availabilities) FROM works)").show(5)

+-------------+-------------+------------+
|           id|        label|        type|
+-------------+-------------+------------+
|closed-stores|Closed stores|Availability|
|closed-stores|Closed stores|Availability|
|       online|       Online|Availability|
|closed-stores|Closed stores|Availability|
|       online|       Online|Availability|
+-------------+-------------+------------+
only showing top 5 rows



In [8]:
spark.sql("SELECT col.* FROM (SELECT EXPLODE(production) FROM works)").show(5)

+--------------------+--------------------+--------+--------------------+-----------------+---------------+
|              agents|               dates|function|               label|           places|           type|
+--------------------+--------------------+--------+--------------------+-----------------+---------------+
|[{G.A. Valvassore...|    [{1549, Period}]|    NULL|Venice : G.A. Val...|[{Venice, Place}]|ProductionEvent|
|                  []|[{18th July 1923,...|    NULL|      18th July 1923|               []|ProductionEvent|
|                  []|    [{1949, Period}]|    NULL|                1949|               []|ProductionEvent|
|                  []|                  []|    NULL|                    |      [{, Place}]|ProductionEvent|
|                  []|[{1940-1975, Peri...|    NULL|           1940-1975|               []|ProductionEvent|
+--------------------+--------------------+--------+--------------------+-----------------+---------------+
only showing top 5 rows



In [9]:
spark.sql("SELECT col.* FROM (SELECT EXPLODE(agents) FROM (SELECT col.* FROM (SELECT EXPLODE(production) FROM works)))").show(5)

+--------------------+-----+
|               label| type|
+--------------------+-----+
|     G.A. Valvassore|Agent|
|          L. d'Houry|Agent|
|International Uni...|Agent|
|The Commonwealth ...|Agent|
|Oxford University...|Agent|
+--------------------+-----+
only showing top 5 rows



# Explode structs exploration

In [6]:
df_repartitioned.select("type").distinct().show()

+----------+
|      type|
+----------+
|   Section|
|      Work|
|Collection|
|    Series|
+----------+



In [7]:
from pyspark.sql.functions import explode

# Explode the 'availabilities' array to create one row per element
exploded_df = df_repartitioned.select(
    explode("availabilities").alias("availability"))

# Select all fields from the availability struct and get distinct rows
unique_availabilities = exploded_df.select("availability.*").distinct()

unique_availabilities.show(truncate=False)

+-------------+-------------+------------+
|id           |label        |type        |
+-------------+-------------+------------+
|closed-stores|Closed stores|Availability|
|online       |Online       |Availability|
|open-shelves |Open shelves |Availability|
+-------------+-------------+------------+



In [8]:
# Explode the 'genres' array to get one row per genre struct
exploded_genres = df_repartitioned.select(explode("genres").alias("genre"))

# Select the 'label' field from the genre struct and get distinct values
unique_genre_labels = exploded_genres.select("genre.label").distinct()

# Show the unique genre labels
unique_genre_labels_pandas = unique_genre_labels.toPandas()
unique_genre_labels_pandas

                                                                                

Unnamed: 0,label
0,Electronic journals
1,Envelopes
2,Platinum prints
3,Dictionaries - English
4,Online-Publikation
...,...
1512,Caricature
1513,Pictorial Work
1514,Strips (teksten)
1515,Therapeutics


# Read Images dataset

In [3]:
df_img = spark.read.json(spark_path("../data/imports/images.json.gz"))

                                                                                

In [4]:
df_img.write.saveAsTable("images", mode="overwrite")

                                                                                

In [10]:
df_img.printSchema()

root
 |-- aspectRatio: double (nullable = true)
 |-- averageColor: string (nullable = true)
 |-- id: string (nullable = true)
 |-- locations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- accessConditions: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- method: struct (nullable = true)
 |    |    |    |    |    |-- id: string (nullable = true)
 |    |    |    |    |    |-- label: string (nullable = true)
 |    |    |    |    |    |-- type: string (nullable = true)
 |    |    |    |    |-- status: struct (nullable = true)
 |    |    |    |    |    |-- id: string (nullable = true)
 |    |    |    |    |    |-- label: string (nullable = true)
 |    |    |    |    |    |-- type: string (nullable = true)
 |    |    |    |    |-- type: string (nullable = true)
 |    |    |-- credit: string (nullable = true)
 |    |    |-- license: struct (nullable = true)
 |    |    |    |-- id: string (nullab

In [11]:
df_img.show()

+-----------+------------+--------+--------------------+--------------------+--------------------+-----+
|aspectRatio|averageColor|      id|           locations|              source|           thumbnail| type|
+-----------+------------+--------+--------------------+--------------------+--------------------+-----+
|  1.4705882|     #363935|msg3gj54|[{[{{view-online,...|{[], [], nf7cjmsg...|{[{{view-online, ...|Image|
|      0.775|     #5f5b55|p4tj67k8|[{[{{view-online,...|{[], [{[{dhgr6mj5...|{[{{view-online, ...|Image|
|       0.77|     #93908f|m5xrbr5f|[{[{{view-online,...|{[], [{[{esk9t6nc...|{[{{view-online, ...|Image|
|      0.665|     #865f46|b864z9kp|[{[{{view-online,...|{[{{crk28fmh, Car...|{[{{view-online, ...|Image|
|     0.8175|     #858585|ts7gybyc|[{[{{view-online,...|{[], [], tfm3zjpu...|{[{{view-online, ...|Image|
|      0.665|     #757370|jkkhdjjz|[{[{{view-online,...|{[{{v4yhncct, Wil...|{[{{view-online, ...|Image|
|     0.6325|     #b8b2ae|emf3xqh5|[{[{{view-online,...

In [10]:
from pyspark.sql import functions as F

In [7]:
df_img.count()

                                                                                

128755

In [11]:
spark.table("images").where(F.col("aspectRatio") == 1.0).count()

480

In [5]:
import json

print(json.dumps(spark.table("images").first().asDict(), indent=4))

{
    "aspectRatio": 1.4705882,
    "averageColor": "#363935",
    "id": "msg3gj54",
    "locations": [
        [
            [
                [
                    [
                        "view-online",
                        "View online",
                        "AccessMethod"
                    ],
                    [
                        "open",
                        "Open",
                        "AccessStatus"
                    ],
                    "AccessCondition"
                ]
            ],
            "Wellcome Collection",
            [
                "cc-by",
                "Attribution 4.0 International (CC BY 4.0)",
                "License",
                "http://creativecommons.org/licenses/by/4.0/"
            ],
            [
                "iiif-image",
                "IIIF Image API",
                "LocationType"
            ],
            "DigitalLocation",
            "https://iiif.wellcomecollection.org/image/N0014023/info.json"
    