In [None]:

from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import os


# Define the base paths for attributes and features directories
attributes_base_path = "hdfs:///data/msd/audio/attributes/"
features_base_path = "hdfs:///data/msd/audio/features/"

# List of attribute directories
attribute_directories = [
    "msd-jmir-area-of-moments-all-v1.0",
    "msd-jmir-lpc-all-v1.0",
    "msd-jmir-methods-of-moments-all-v1.0",
    "msd-jmir-mfcc-all-v1.0",
    "msd-jmir-spectral-all-all-v1.0",
]

# Create a dictionary to store attribute schemas
attribute_schemas = {}

# Iterate through attribute directories and create schemas
for attribute_dir in attribute_directories:
    # Read the attributes CSV using Spark
    attributes_path = os.path.join(attributes_base_path, attribute_dir + ".attributes.csv")
    attributes_df = spark.read.csv(attributes_path, header=False, inferSchema=True)
    
    # Extract column names from the attributes DataFrame
    column_names = attributes_df.select("_c0").rdd.flatMap(lambda x: x).collect()
    
    # Extract data types from the attributes DataFrame
    column_types = attributes_df.select("_c1").rdd.flatMap(lambda x: x).collect()
    
    # Define a dictionary to map attribute types
    type_mapping = {
        'real': DoubleType(),
        'string': StringType(),
        
    }
    
    # Create a list of StructFields for the StructType
    struct_fields = [StructField(name, type_mapping.get(data_type, StringType()), True) for name, data_type in zip(column_names, column_types)]
    
  
    
    # Create the StructType and store it in the dictionary
    attribute_schemas[attribute_dir] = StructType(struct_fields)

# Iterate through attribute directories and print their schemas
for directory_name, schema in attribute_schemas.items():
    print(f"Schema for {directory_name}:")
    for field in schema.fields:
        print(f"{field.name}: {field.dataType}")
    print("\n")

In [None]:
from pyspark.sql.functions import col
# List of feature directories
feature_directories = [
    "msd-jmir-area-of-moments-all-v1.0",
    "msd-jmir-lpc-all-v1.0",
    "msd-jmir-methods-of-moments-all-v1.0",
    "msd-jmir-mfcc-all-v1.0",
    "msd-jmir-spectral-all-all-v1.0",]

# Create a dictionary to store DataFrames for features
feature_dataframes = {}

# Iterate through feature directories and read and structure the CSV files
for feature_dir in feature_directories:
    # Create a StructType for the feature directory based on the corresponding attribute schema
    feature_schema = attribute_schemas[feature_dir]

    # Construct the HDFS path for the feature files
    features_path = f"hdfs:///data/msd/audio/features/{feature_dir}.csv"
    
    # Read all CSV files within the feature directory and concatenate them
    feature_files = spark.read.csv(features_path, header=False, inferSchema=True)
    # Get the existing column names from the DataFrame
    existing_columns = feature_files.columns
    
    # Rename columns in the DataFrame to match the schema, and retain extra columns
    renamed_columns = [col(existing_columns[i]).alias(feature_schema.fieldNames()[i]) for i in range(len(feature_schema))]
    
    # Select the renamed columns
    feature_files = feature_files.select(*renamed_columns)
    
    # Store the DataFrame in the dictionary with the directory name as the key
    feature_dataframes[feature_dir] = feature_files
    
   

# Print schemas for all feature directories
for directory_name, schema in attribute_schemas.items():
    print(f"Schema for {directory_name}:")
    for field in schema.fields:
        print(f"{field.name}: {field.dataType}")
    print("\n")

# You now have feature DataFrames with structured data, and you can access them using feature_dataframes[feature_dir]
# for any feature directory.

# Iterate through feature DataFrames and print the top 5 rows
for feature_dir, feature_df in feature_dataframes.items():
    print(f"Top 5 rows for {feature_dir}:")
    feature_df.show(5)