In [10]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, array
from pyspark.sql.functions import col, last
from pyspark.sql.window import Window
from pyspark.sql.functions import count, avg, collect_list, countDistinct

# Read the CSV file into a DataFrame

spark = SparkSession.builder.appName("Maie").getOrCreate()

# Read the file into a DataFrame with the defined schema and drop malformed/invalid lines
df = spark.read.csv("E:\MetObjects.csv", header=True, inferSchema=True)


# Define the schema for the DataFrame
schema = StructType([
    StructField("ObjectID", IntegerType(), True),
    StructField("Is Highlight", StringType(), True),
    StructField("Is Timeline Work", StringType(), True),
    StructField("Is Public Domain", StringType(), True),
    StructField("Object ID", IntegerType(), True),
    StructField("Gallery Number", IntegerType(), True),
    StructField("Department", StringType(), True),
    StructField("AccessionYear", IntegerType(), True),
    StructField("Object Name", StringType(), True),
    StructField("Title", StringType(), True),
    StructField("Culture", StringType(), True),
    StructField("Period", StringType(), True),
    StructField("Dynasty", StringType(), True),
    StructField("Reign", StringType(), True),
    StructField("Portfolio", StringType(), True),
    StructField("Constituent ID", IntegerType(), True),
    StructField("Artist Role", StringType(), True),
    StructField("Artist Prefix", StringType(), True),
    StructField("Artist Display Name", StringType(), True),
    StructField("Artist Display Bio", StringType(), True),
    StructField("Artist Suffix", StringType(), True),
    StructField("Artist Alpha Sort", StringType(), True),
    StructField("Artist Nationality", StringType(), True),
    StructField("Artist Begin Date", IntegerType(), True),
    StructField("Artist End Date", IntegerType(), True),
    StructField("Artist Gender", StringType(), True),
    StructField("Artist ULAN URL", StringType(), True),
    StructField("Artist Wikidata URL", StringType(), True),
    StructField("Object Date", IntegerType(), True),
    StructField("Object Begin Date", IntegerType(), True),
    StructField("Object End Date", IntegerType(), True),
    StructField("Medium", StringType(), True),
    StructField("Dimensions", StringType(), True),
    StructField("Credit Line", StringType(), True),
    StructField("Geography Type", StringType(), True),
    StructField("City", StringType(), True),
    StructField("State", StringType(), True),
    StructField("County", StringType(), True),
    StructField("Country", StringType(), True),
    StructField("Region", StringType(), True),
    StructField("Subregion", StringType(), True),
    StructField("Locale", StringType(), True),
    StructField("Locus", StringType(), True),
    StructField("Excavation", StringType(), True),
    StructField("River", StringType(), True),
    StructField("Classification", StringType(), True),
    StructField("RightsAndReproduction", StringType(), True),
    StructField("LinkResource", StringType(), True),
    StructField("ObjectWikidata_URL", StringType(), True),
    StructField("MetadataDate", StringType(), True),
    StructField("Repository", StringType(), True),
    StructField("Tags", StringType(), True)
])
df = spark.read.csv("E:\MetObjects.csv", header=True, schema=schema, mode="DROPMALFORMED")


# Write the malformed records to a different path
df_bad_records = spark.read.csv("E:\MetObjects.csv", header=True, schema=schema, mode="PERMISSIVE")

# Split the "Country" column on "/" or "or" and store the split values in an array
df = df.withColumn("Country_array", array(*[split("Country", "\/") , split("Country", " or ")]))
from pyspark.sql.functions import col, when, last


# Fill null values in the "ConstituentID" column based on the previous non-null value (backward fill) or the next non-null value (forward fill)
df = df.withColumn("Constituent ID", when(col("Constituent ID").isNull(), None).otherwise(col("Constituent ID")))
df = df.withColumn("Constituent ID", last("Constituent ID", True).over(Window.partitionBy("Title").orderBy("Object ID")))
df = df.fillna({"Constituent ID": -1}, subset=["Constituent ID"])
df = df.withColumn("Constituent ID", when(col("Constituent ID") == -1, None).otherwise(col("Constituent ID")))


# Group the data by country and perform aggregation
country_agg_df = df.groupBy("Country").agg(
    count("Object ID").alias("Artworks"),
    countDistinct("Constituent ID").alias("Artists"),
   collect_list("Constituent ID").alias("Unique_ConstituentIDs")
)

# Show the resulting DataFrame
country_agg_df.show()
df.show(10)
spark.stop()

+--------------------+--------+-------+---------------------+
|             Country|Artworks|Artists|Unique_ConstituentIDs|
+--------------------+--------+-------+---------------------+
|                null|  357382|  17724| [107, 107, 107, 1...|
|     dark blue-green|       1|      1|              [13692]|
|             trailed|       1|      1|              [13689]|
|"Lisez et propage...|       1|      0|                   []|
|                1774|       2|      0|                   []|
|                1775|       1|      0|                   []|
|                1779|       1|      0|                   []|
|         Afghanistan|      35|      5| [946, 2522, 15708...|
|            Alamania|       3|      0|                   []|
|     Alamania|France|       1|      0|                   []|
|Alamania|Northern...|       3|      0|                   []|
|Alamania|Northern...|       1|      0|                   []|
|             Algeria|      49|      9| [2016, 2045, 2045...|
|  Alger