In [None]:
from pyspark.sql import SparkSession
spark =  SparkSession.builder.getOrCreate()

In [None]:
usersDf = spark.read.csv('s3://mui-input/songs/', header=True, inferSchema = True)
songsDf = spark.read.csv('s3://mui-input/songs/', header=True, inferSchema = True)
streamsDf = spark.read.csv('s3://mui-input/streams/', header=True, inferSchema = True)

In [None]:
from pyspark.sql.functions import col, to_date
from pyspark.sql import functions as F, Window


def transform_data(spark, usersDf, songsDf, streamsDf):
    """drop/fill nulls, typecasting, remove duplicates"""
    
    #Convert listen_time to date.
    streamsDf = streamsDf.withColumn("report_date", to_date(streamsDf.listen_time))

    
    #filter invalid keys
    streamsDf = streamsDf.dropna(subset=['user_id', 'track_id', 'listen_time'])
    
    #type casting of duration to long
    songsDf = songsDf.withColumn("duration_ms", col("duration_ms").cast("long"))
    
    return usersDf, songsDf, streamsDf
        
usersDf, songsDf, streamsDf = transform_data(spark, usersDf, songsDf, streamsDf)

In [None]:
def load_data(spark, usersDf, songsDf, streamsDf):
    db_name = "music_insights"
    s3_output_path = "s3://buckdemo2222/output/"
    
     #Creating database if it doesnâ€™t exist
    spark.sql(f"CREATE DATABASE IF NOT EXISTS {db_name} LOCATION '{s3_output_path}';")
    
    #Save dataframe as glue table
    def save_as_table(df, table_name):
        table_path = f"{s3_output_path}{table_name}/"
        full_name = f"{db_name}.{table_name}"

        print(f"Writing table: {full_name}, {table_path}")
        df.write \
            .format("parquet") \
            .mode("overwrite") \
            .option("path", table_path) \
            .saveAsTable(full_name)
        print(f"Created table: {full_name}")
    
    save_as_table(songsDf, "songs")
    save_as_table(usersDf, "users")
    save_as_table(streamsDf, "streams")
    
    print("\nRaw data successfully written to S3 and registered in Glue Catalog.")
    print(f"Verify with: spark.sql('SHOW TABLES IN {db_name}').show()")


load_data(spark, usersDf, songsDf, streamsDf)
