In [0]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import *

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("Read Online CSV") \
    .getOrCreate()

# Define the URL of the CSV file
csv_url = "https://media.githubusercontent.com/media/metmuseum/openaccess/master/MetObjects.csv"

# Download the CSV file using pandas and create a Pandas DataFrame
df = pd.read_csv(csv_url,low_memory=False)




In [0]:
# Define function to split by '|' and return unique countries
def get_unique_values(row):
    values = []
    if row == 'nan':
        return [None]
    for v in row.split('|'):
        if v.strip():
            values.append(v.strip())
            if v.strip() == 'nan':
                values.append(None)
        else: 
            values.append(None)
    # if len(list(values)) > 0:
    #     return list(values)
    return values

df_pandas = df.copy()

df_pandas['Artist Role'] = df_pandas['Artist Role'].astype(str)
df_pandas['Artist Role'] = df_pandas['Artist Role'].apply(get_unique_values)

df_pandas['Artist Display Name'] = df_pandas['Artist Display Name'].astype(str)
df_pandas['Artist Display Name'] = df_pandas['Artist Display Name'].apply(get_unique_values)

df_pandas['Artist Gender'] = df_pandas['Artist Gender'].astype(str)
df_pandas['Artist Gender'] = df_pandas['Artist Gender'].apply(get_unique_values)

df_pandas = df_pandas.replace('nan', None)




In [0]:
# Medium Table
medium = {'Material': df_pandas['Medium'].unique(), 'ID': list(range(len(df_pandas['Medium'].unique())))}
df_medium_pd = pd.DataFrame(medium)
df_medium = spark.createDataFrame(df_medium_pd)

# Artist Table
df_exploded_name = df_pandas.explode('Artist Display Name')
df_exploded_gender = df_pandas.explode('Artist Gender')
df_exploded_name['Gender'] = df_exploded_gender['Artist Gender']
df_exploded_name.loc[(df_exploded_name['Gender'] != 'Female'), 'Gender'] = 'Male'
df_exploded_name.loc[(df_exploded_name['Artist Display Name'].isnull()), 'Gender'] = None
df_artist_pd = df_exploded_name[['Gender', 'Artist Display Name']]
df_artist_pd = df_artist_pd.drop_duplicates(subset=['Artist Display Name'])
df_artist_pd = df_artist_pd.dropna()
df_artist_pd.loc[:,'ID'] = list(range(df_artist_pd.shape[0]))
df_artist = spark.createDataFrame(df_artist_pd)

# Create Table
df = df_pandas.explode('Artist Display Name')
df = df[['Artist Display Name', 'Object ID']]
df = df.dropna()
df_creates_pd = pd.merge(df, df_artist_pd, on='Artist Display Name', how='left')
df_creates_pd.drop(['Artist Display Name', 'Gender'], axis=1, inplace=True)
df_creates_pd = df_creates_pd.rename(columns={'ID': 'Artist ID'})
df_creates = spark.createDataFrame(df_creates_pd)

# Artwork Table
df = df_pandas[['Object ID', 'Gallery Number', 'Medium', 'Department', 'Culture', 'Period', 'Country', 'Is Highlight', 'Artist Display Name']]
df_artwork_pd = pd.merge(df, df_medium_pd, left_on='Medium', right_on='Material')
df_artwork_pd.drop(['Material', 'Medium'], axis=1, inplace=True)
df_artwork_pd = df_artwork_pd.rename(columns={'ID': 'MaterialID'})
df_artwork = spark.createDataFrame(df_artwork_pd)



In [0]:
print(df_artwork.printSchema())
print(df_medium.printSchema())


root
 |-- Object ID: long (nullable = true)
 |-- Gallery Number: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Culture: string (nullable = true)
 |-- Period: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Is Highlight: boolean (nullable = true)
 |-- Artist Display Name: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- MaterialID: long (nullable = true)

None
root
 |-- Material: string (nullable = true)
 |-- ID: long (nullable = true)

None


In [0]:
print(df_artist.printSchema())
print(df_creates.printSchema())

root
 |-- Gender: string (nullable = true)
 |-- Artist Display Name: string (nullable = true)
 |-- ID: long (nullable = true)

None
root
 |-- Object ID: long (nullable = true)
 |-- Artist ID: long (nullable = true)

None


In [0]:
# Register the DataFrame as a SQL temporary view
df_artwork.createOrReplaceTempView("artwork")
df_medium.createOrReplaceTempView("medium")

# Run a SQL query on the DataFrame
result = spark.sql("SELECT medium.material, count(`Object ID`) FROM artwork JOIN medium ON medium.ID = artwork.MaterialID GROUP BY medium.material ORDER BY count(`Object ID`) DESC LIMIT 10")

# Show the result of the SQL query
result.show()

+--------------------+----------------+
|            material|count(Object ID)|
+--------------------+----------------+
|          Terracotta|           23532|
|Commercial color ...|           17548|
|             Etching|           16851|
|           Engraving|           11433|
|Gelatin silver print|           10340|
|  Albumen photograph|            9653|
|                Silk|            8495|
|              Bronze|            7217|
|                null|            7120|
|               Glass|            6523|
+--------------------+----------------+



In [0]:
df_spark.write.mode("overwrite").saveAsTable("METObjects")




In [0]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("LoadData")
sc = SparkContext.getOrCreate(conf)
rdd = sc.parallelize(df_pandas.to_dict("records"))
print(type(rdd))


<class 'pyspark.rdd.RDD'>
