In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pymongo import MongoClient
from pyspark import SparkConf
from contextlib import contextmanager
import pyspark.sql

""" Context manager for creating Spark Session. """
@contextmanager
def get_sparkSession(appName: str, master: str = 'local'):
    #declare sparkconf
    conf = SparkConf()

    #set config
    conf = conf.setAppName(appName) \
               .setMaster(master) \
               .set("spark.executor.memory", "4g") \
               .set("spark.executor.cores", "2") \
               .set("spark.sql.shuffle.partitions", "4") \
               .set("spark.sql.legacy.timeParserPolicy", "LEGACY") \
               .set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.4.0") \
            #    .set("spark.jars.packages", "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.4")
    
    #create Spark Session
    spark = SparkSession.builder.config(conf = conf).getOrCreate()

    print(f"Successfully created Spark Session with app name: {appName} and master: {master}!")

    #yield spark
    try:
        yield spark

    finally:
        #must stop Spark Session
        spark.stop()
        print("Successfully stopped Spark Session!")


""" Read data from mongoDB. """
def read_mongoDB(spark: SparkSession, database_name: str, collection_name: str, query: dict = None,
                 schema: StructType = None, username: str = 'huynhthuan', password: str = 'password', 
                 host: str = 'mongo', port: str = 27017) -> pyspark.sql.DataFrame:
    
    #check params
    if not isinstance(spark, SparkSession):
        raise TypeError("spark must be a SparkSession!")
    
    if query is not None and not isinstance(query, dict):
        raise TypeError("query must be a dict!")
    
    if schema is not None and not isinstance(schema, StructType):
        raise TypeError("schema must be a StructType!")
    
    #uri mongoDB 
    uri = f"mongodb://{username}:{password}@{host}:{port}/{database_name}.{collection_name}?authSource=admin"

    print(f"Starting to read data from database '{database_name}' and collection '{collection_name}'...")
  
    #read data
    try:
        data = spark.read.format('mongodb') \
                         .option("spark.mongodb.read.connection.uri", uri) \
                         .option('header', 'true')
        
        data = data.schema(schema).load() if schema is not None else data.load()

        return data 
    
    except Exception as e:
        print(f"An error occurred while reading data from mongoDB: {e}")


""" Read data from HDFS. """
def read_HDFS(spark: SparkSession, HDFS_dir: str, file_type: str) -> pyspark.sql.DataFrame:
    #check params
    if not isinstance(spark, SparkSession):
        raise TypeError("spark must be a SparkSession!")
    
    #set HDFS path
    HDFS_path = f"hdfs://namenode:9000/datalake/{HDFS_dir}"

    print(f"Starting to read data from {HDFS_path}...")

    #read data
    try:
        data = spark.read.format(file_type).option('header', 'true').load(HDFS_path)
        #return data
        return data
    
    except Exception as e:
        print(f"An error occurred while reading data from HDFS: {e}")


""" Write data into HDFS. """
def write_HDFS(spark: SparkSession, data: pyspark.sql.DataFrame, direct: str, 
               file_type: str, mode: str = 'overwrite', partition: str = None):
    #check params
    if not isinstance(spark, SparkSession):
        raise TypeError("spark must be a SparkSession!")
    
    if not isinstance(data, pyspark.sql.DataFrame):
        raise TypeError("data must be a DataFrame!")

    #set HDFS path  
    HDFS_path = f"hdfs://namenode:9000/datalake/{direct}"
    table_name = direct.split('/')[-1]

    print(f"Starting to upload '{table_name}' into {HDFS_path}...")
    
    #write data
    try:
        if partition is not None:
            data.write.format(file_type) \
                      .option('header', 'true') \
                      .mode(mode) \
                      .partitionBy('Execution_date') \
                      .save(HDFS_path)
        else:
            data.write.format(file_type) \
                      .option('header', 'true') \
                      .mode(mode) \
                      .save(HDFS_path)
        
        print(f"Successfully uploaded '{table_name}' into HDFS.")

    except Exception as e:
        print(f"An error occurred while upload data into HDFS: {e}")

""" Write data into SnowFlake Data Warehouse. """
def write_SnowFlake(spark: SparkSession, data: pyspark.sql.DataFrame, table_name: str):
    #check params
    if not isinstance(spark, SparkSession):
        raise TypeError("spark must be a SparkSession!")
    
    if not isinstance(data, pyspark.sql.DataFrame):
        raise TypeError("data must be a DataFrame!")
    
    snowflake_connection_options = {
        "sfURL": "https://sl70006.southeast-asia.azure.snowflakecomputing.com",
        "sfUser": "HUYNHTHUAN", 
        "sfPassword": "Thuan123456",
        "sfWarehouse": "COMPUTE_WH",
        "sfDatabase": "SPOTIFY_MUSIC_DB" 
    }

    print(f"Starting to upload {table_name.split('.')[-1]} into SnowFlake...")
    try:
        data.write.format("snowflake") \
                .options(**snowflake_connection_options) \
                .option("dbtable", table_name) \
                .mode('overwrite') \
                .save()
        print(f"Successfully uploaded '{table_name}' into SnowFlake.")
    except Exception as e:
        print(f"An error occurred while upload data into HDFS: {e}")
    

In [2]:
from pyspark.sql.functions import lit

""" Load all csv files into mongoDB."""
def initial_load(Execution_date: str):
    with get_sparkSession(appName = "init_load") as spark:
        #uri
        uri_artist_name = "mongodb://huynhthuan:password@mongo:27017/music_database.artist_name_collection?authSource=admin"
        uri_artist = "mongodb://huynhthuan:password@mongo:27017/music_database.artist_collection?authSource=admin"
        uri_album = "mongodb://huynhthuan:password@mongo:27017/music_database.album_collection?authSource=admin"
        uri_track = "mongodb://huynhthuan:password@mongo:27017/music_database.track_collection?authSource=admin"
        uri_trackfeature = "mongodb://huynhthuan:password@mongo:27017/music_database.trackfeature_collection?authSource=admin"

        # read
        df_ArtistName = spark.read.option('header', 'true').csv("/opt/data/ArtistName.csv")
        df_ArtistName = df_ArtistName.withColumn('Execution_date', lit(Execution_date))
        
        df_Artist = spark.read.option('header', 'true').csv("/opt/data/Artist.csv")
        df_Artist = df_Artist.withColumn('Execution_date', lit(Execution_date))

        df_Album = spark.read.option('header', 'true').csv("/opt/data/Album.csv")
        df_Album = df_Album.withColumn('Execution_date', lit(Execution_date))

        df_Track = spark.read.option('header', 'true').csv("/opt/data/Track.csv")
        df_Track = df_Track.withColumn('Execution_date', lit(Execution_date))
        
        df_TrackFeature = spark.read.option('header', 'true').csv("/opt/data/TrackFeature.csv")
        df_TrackFeature = df_TrackFeature.withColumn('Execution_date', lit(Execution_date))

        #write
        try:
            print("Starting load csv files into MongoDB...")
            df_ArtistName.write.format('mongoDB') \
                            .option("spark.mongodb.write.connection.uri", uri_artist_name) \
                            .mode("overwrite") \
                            .save()
            
            df_Artist.write.format('mongoDB') \
                        .option("spark.mongodb.write.connection.uri", uri_artist) \
                        .mode("overwrite") \
                        .save()
            
            df_Album.write.format('mongoDB') \
                        .option("spark.mongodb.write.connection.uri", uri_album) \
                        .mode("overwrite") \
                        .save()
            
            df_Track.write.format('mongoDB') \
                        .option("spark.mongodb.write.connection.uri", uri_track) \
                        .mode("overwrite") \
                        .save()
            
            df_TrackFeature.write.format('mongoDB') \
                                .option("spark.mongodb.write.connection.uri", uri_trackfeature) \
                                .mode("overwrite") \
                                .save()
            print("Successfully uploaded data into mongoDB.")
        except Exception as e:
            print(f"An error occured while loading data: {e}")

if __name__ == "__main__":
    print("------------------------------- Initial load task starts! -------------------------------")
    initial_load("01-12-2004")
    print("------------------------------- Initial load task finished! -------------------------------")

------------------------------- Initial load task starts! -------------------------------
:: loading settings :: url = jar:file:/opt/conda/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-9cdb7693-043a-44ff-96bb-de2add6e49b7;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;10.4.0 in central
	found org.mongodb#mongodb-driver-sync;5.1.4 in central
	[5.1.4] org.mongodb#mongodb-driver-sync;[5.1.1,5.1.99)
	found org.mongodb#bson;5.1.4 in central
	found org.mongodb#mongodb-driver-core;5.1.4 in central
	found org.mongodb#bson-record-codec;5.1.4 in central
:: resolution report :: resolve 2083ms :: artifacts dl 8ms
	:: modules in use:
	org.mongodb#bson;5.1.4 from central in [default]
	org.mongodb#bson-record-codec;5.1.4 from central in [default]
	org.mongodb#mongodb-driver-core;5.1.4 from central in [default]
	org.mongodb#mongodb-driver-sync;5.1.4 from central in [default]
	org.mongodb.spark#mongo-spark-connector_

Successfully created Spark Session with app name: init_load and master: local!
Starting load csv files into MongoDB...
An error occured while loading data: An error occurred while calling o77.save.
: com.mongodb.MongoTimeoutException: Timed out while waiting for a server that matches WritableServerSelector. Client view of cluster state is {type=UNKNOWN, servers=[{address=mongo:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketException: mongo}, caused by {java.net.UnknownHostException: mongo}}]
	at com.mongodb.internal.connection.BaseCluster.createAndLogTimeoutException(BaseCluster.java:392)
	at com.mongodb.internal.connection.BaseCluster.selectServer(BaseCluster.java:148)
	at com.mongodb.internal.connection.SingleServerCluster.selectServer(SingleServerCluster.java:46)
	at com.mongodb.internal.binding.ClusterBinding.getWriteConnectionSource(ClusterBinding.java:126)
	at com.mongodb.client.internal.ClientSessionBinding.getConnectionSource(ClientSessionBinding.java:

In [3]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, DateType, FloatType

""" Function for getting schemas. """
def get_schema(table_name: str) -> StructType:
    """ Artist schema. """
    artist_schema = [StructField('Artist_ID',      StringType(), True),
                     StructField('Artist_Name',    StringType(), True),
                     StructField('Genres',         ArrayType(StringType(), True), True),
                     StructField('Followers',      IntegerType(), True),
                     StructField('Popularity',     IntegerType(), True),
                     StructField('Artist_Image',   StringType(), True),
                     StructField('Artist_Type',    StringType(), True),
                     StructField('External_Url',   StringType(), True),
                     StructField('Href',           StringType(), True),
                     StructField('Artist_Uri',     StringType(), True),
                     StructField('Execution_date', DateType(), True)]
    #applying struct type
    artist_schema = StructType(artist_schema)
    
    """ Album schema. """
    album_schema = [StructField('Artist',               StringType(), True),
                    StructField('Artist_ID',            StringType(), True),
                    StructField('Album_ID',             StringType(), True),
                    StructField('Name',                 StringType(), True),
                    StructField('Type',                 StringType(), True),
                    StructField('Genres',               ArrayType(StringType(), True), True),
                    StructField('Label',                StringType(), True),
                    StructField('Popularity',           IntegerType(), True),
                    StructField('Available_Markets',    StringType(), True),
                    StructField('Release_Date',         DateType(), True),
                    StructField('ReleaseDatePrecision', StringType(), True),
                    StructField('TotalTracks',          IntegerType(), True),
                    StructField('Copyrights',           StringType(), True),
                    StructField('Restrictions',         StringType(), True),
                    StructField('External_URL',         StringType(), True),
                    StructField('Href',                 StringType(), True),
                    StructField('Image',                StringType(), True),
                    StructField('Uri',                  StringType(), True),
                    StructField('Execution_date',       DateType(), True)]
    #Applying struct type
    album_schema = StructType(album_schema)

    """ Track schema. """
    track_schema = [StructField("Artists",          StringType(), True),
                    StructField("Album_ID",         StringType(), True),
                    StructField("Album_Name",       StringType(), True),
                    StructField("Track_ID",         StringType(), True),
                    StructField("Name",             StringType(), True),
                    StructField("Track_Number",     IntegerType(), True),
                    StructField("Type",             StringType(), True),
                    StructField("AvailableMarkets", StringType(), True),
                    StructField("Disc_Number",      IntegerType(), True),
                    StructField("Duration_ms",      IntegerType(), True),
                    StructField("Explicit",         StringType(), True),
                    StructField("External_urls",    StringType(), True),
                    StructField("Href",             StringType(), True),
                    StructField("Restrictions",     StringType(), True),
                    StructField("Preview_url",      StringType(), True),
                    StructField("Uri",              StringType(), True),
                    StructField("Is_Local",         StringType(), True),
                    StructField('Execution_date',   StringType(), True)]
    #Applying struct type
    track_schema = StructType(track_schema)
    
    """ TrackFeature schema. """
    trackfeature_schema = [StructField("Track_ID",         StringType(), True),
                           StructField("Danceability",     FloatType(), True),
                           StructField("Energy",           FloatType(), True),
                           StructField("Key",              IntegerType(), True),
                           StructField("Loudness",         FloatType(), True),
                           StructField("Mode",             IntegerType(), True),
                           StructField("Speechiness",      FloatType(), True),
                           StructField("Acousticness",     FloatType(), True),
                           StructField("Instrumentalness", FloatType(), True),
                           StructField("Liveness",         FloatType(), True),
                           StructField("Valence",          FloatType(), True),
                           StructField("Tempo",            FloatType(), True),
                           StructField("Time_signature",   IntegerType(), True),
                           StructField("Track_href",       StringType(), True),
                           StructField("Type_Feature",     StringType(), True),
                           StructField("Analysis_Url",     StringType(), True),
                           StructField('Execution_date',   StringType(), True)]
    #Applying struct type
    trackfeature_schema = StructType(trackfeature_schema)

    #mapping
    mapping = {
        'artist': artist_schema,
        'album': album_schema,
        'track': track_schema,
        'trackfeature': trackfeature_schema
    }
    
    #return schema
    return mapping[table_name]

In [4]:
from pyspark.sql.functions import split, col, get_json_object, to_date, regexp_replace, length

""" Applying schemas and loading data from MongoDB into HDFS."""
def bronze_layer_processing(Execution_date: str):
    #get spark Session
    with get_sparkSession(appName = 'Bronze_task_spark') as spark:
        """------------------------ BRONZE ARTIST ------------------------"""
        artist_data = read_mongoDB(spark, database_name = 'music_database', collection_name = 'artist_collection')
        artist_data = artist_data.filter(artist_data['Execution_date'] == Execution_date)

        print("Starting bronze preprocessing for artist data...")
        #preprocessing before loading data
        try:
            artist_data = artist_data.withColumn('Genres', split(col('Genres'), ",")) \
                                     .withColumn('Followers', col('Followers').cast('int')) \
                                     .withColumn('Popularity', col('Popularity').cast('int')) \
                                     .withColumn('External_Url', get_json_object(col('External_Url'),'$.spotify')) \
                                     .withColumn('Execution_date', col('Execution_date').cast('date'))
            #reorder columns after reading 
            artist_data = artist_data.select('Artist_ID', 'Artist_Name', 'Genres', 
                                            'Followers', 'Popularity', 'Artist_Image', 
                                            'Artist_Type', 'External_Url', 'Href', 'Artist_Uri', 'Execution_date')
            #applying schema        
            artist_data = spark.createDataFrame(artist_data.rdd, schema = get_schema('artist'))

            print("Finished bronze preprocessing for artist data.")

            #upload data into HDFS
            write_HDFS(spark, data = artist_data, direct = 'bronze_data/bronze_artist', 
                       file_type = 'parquet', mode = "append", partition = 'Execution_date')
        except Exception as e:
            print(f"An error occurred while preprocessing bronze data: {e}")

        """------------------------ BRONE ALBUM ------------------------"""
        album_data = read_mongoDB(spark, database_name = 'music_database', collection_name = 'album_collection')
        album_data = album_data.filter(album_data['Execution_date'] == Execution_date)
        print("Starting bronze preprocessing for album data...")
        try:
            album_data = album_data.withColumn('Popularity', col('Popularity').cast('int')) \
                                   .withColumn('Genres', split(col('Genres'), ",")) \
                                   .withColumn('Release_Date', to_date('Release_Date', "MM/dd/yyyy")) \
                                   .withColumn('TotalTracks', col('TotalTracks').cast('int')) \
                                   .withColumn('Execution_date', col('Execution_date').cast('date'))
            #reorder columns after reading
            album_data = album_data.select('Artist', 'Artist_ID', 'Album_ID', 'Name', 'Type', 'Genres', 
                                        'Label', 'Popularity', 'Available_Markets', 'Release_Date', 
                                        'ReleaseDatePrecision', 'TotalTracks', 'Copyrights', 'Restrictions', 
                                        'External_URL', 'Href', 'Image', 'Uri', 'Execution_date')
            album_data = spark.createDataFrame(album_data.rdd, schema = get_schema('album'))
            print("Finished bronze preprocessing for album data.")
            #upload data into HDFS
            write_HDFS(spark, data = album_data, direct = 'bronze_data/bronze_album', 
                       file_type = 'parquet', mode = "append", partition = 'Execution_date')
        except Exception as e:
            print(f"An error occurred while preprocessing bronze data: {e}")


        """------------------------ BRONZE TRACK -------------------------"""
        track_data = read_mongoDB(spark, database_name = 'music_database', collection_name = 'track_collection', 
                                  schema = get_schema('track'))
        track_data = track_data.filter(track_data['Execution_date'] == Execution_date)
        track_data = track_data.withColumn('Execution_date', col('Execution_date').cast('date'))

        #upload data into HDFS
        write_HDFS(spark, data = track_data, direct = 'bronze_data/bronze_track', 
                   file_type = 'parquet', mode = "append", partition = 'Execution_date')


        """------------------------ BRONZE TRACK FEATURE ------------------------"""
        track_feature_data = read_mongoDB(spark, database_name = 'music_database', collection_name = 'trackfeature_collection', 
                                          schema = get_schema('trackfeature'))
        track_feature_data = track_feature_data.filter(track_feature_data['Execution_date'] == Execution_date)
        track_feature_data = track_feature_data.withColumn('Execution_date', col('Execution_date').cast('date'))
        
        #upload data into HDFS
        write_HDFS(spark, data = track_feature_data, direct = 'bronze_data/bronze_track_feature', 
                   file_type = 'parquet', mode = "append", partition = 'Execution_date')


if __name__ == "__main__":
    print("------------------------------- Bronze task starts! -------------------------------")
    bronze_layer_processing("01-12-2004")
    print("------------------------------ Bronze task finished! -------------------------------")

------------------------------- Bronze task starts! -------------------------------


24/12/16 13:43:49 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/12/16 13:43:49 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


Successfully created Spark Session with app name: Bronze_task_spark and master: local!
Starting to read data from database 'music_database' and collection 'artist_collection'...
An error occurred while reading data from mongoDB: An error occurred while calling o151.load.
: com.mongodb.MongoTimeoutException: Timed out while waiting for a server that matches ReadPreferenceServerSelector{readPreference=primary}. Client view of cluster state is {type=UNKNOWN, servers=[{address=mongo:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketException: mongo}, caused by {java.net.UnknownHostException: mongo}}]
	at com.mongodb.internal.connection.BaseCluster.createAndLogTimeoutException(BaseCluster.java:392)
	at com.mongodb.internal.connection.BaseCluster.selectServer(BaseCluster.java:148)
	at com.mongodb.internal.connection.SingleServerCluster.selectServer(SingleServerCluster.java:46)
	at com.mongodb.internal.binding.ClusterBinding.getReadConnectionSource(ClusterBinding.java:1

AttributeError: 'NoneType' object has no attribute 'filter'

In [5]:
import pyspark
from pyspark.sql.functions import explode_outer, ltrim

""" Create SilverLayer class to process data in the Silver layer. """
class SilverLayer:
    #init 
    def __init__(self, data: pyspark.sql.DataFrame, 
                 drop_columns: list = None, 
                 drop_null_columns: list = None,
                 fill_nulls_columns: dict = None,
                 duplicate_columns: list = None,
                 nested_columns: list = None,
                 rename_columns: dict = None,
                 ):
        
        #check valid params
        if data is not None and not isinstance(data, pyspark.sql.DataFrame):
            raise TypeError("data must be a DataFrame!")
        
        if drop_columns is not None and not isinstance(drop_columns, list):
            raise TypeError("drop_columns must be a list!")
        
        if drop_null_columns is not None and not isinstance(drop_null_columns, list):
            raise TypeError("drop_null_columns must be a list!")
        
        if fill_nulls_columns is not None and not isinstance(fill_nulls_columns, dict):
            raise TypeError("handle_nulls must be a dict!")
        
        if duplicate_columns is not None and not isinstance(duplicate_columns, list):
            raise TypeError("duplicate_columns must be a list!")
        
        if nested_columns is not None and not isinstance(nested_columns, list):
            raise TypeError("handle_nested must be a list!")
        
        if rename_columns is not None and not isinstance(rename_columns, dict):
            raise TypeError("rename_columns must be a dict!")
        """Initialize class attributes for data processing."""
        self._data = data
        self._drop_columns = drop_columns
        self._drop_null_columns = drop_null_columns
        self._fill_nulls_columns = fill_nulls_columns
        self._duplicate_columns = duplicate_columns
        self._nested_columns = nested_columns
        self._rename_columns = rename_columns


    """ Method to drop unnecessary columns. """
    def drop(self):
        self._data = self._data.drop(*self._drop_columns)

    
    """ Method to drop rows based on null values in each column. """
    def drop_null(self):
        self._data = self._data.dropna(subset = self._drop_null_columns, how = "all")

    
    """ Method to fill null values. """
    def fill_null(self):
        for column_list, value in self._fill_nulls_columns.items():
            self._data = self._data.fillna(value = value, subset = column_list)


    """ Method to rename columns. """
    def rename(self):
        for old_name, new_name in self._rename_columns.items():
            self._data = self._data.withColumnRenamed(old_name, new_name)


    """ Method to handle duplicates. """
    def handle_duplicate(self):
        self._data = self._data.dropDuplicates(self._duplicate_columns)


    """ Method to handle nested. """
    def handle_nested(self):
        for column in self._nested_columns:
            self._data = self._data.withColumn(column, explode_outer(column)) \
                                   .withColumn(column, ltrim(column))
    
    
    """ Main processing. """
    def process(self) -> pyspark.sql.DataFrame:
        #drop unnecessary columns
        if self._drop_columns:
            self.drop() 

        #drop rows contain null values for each col
        if self._drop_null_columns:
            self.drop_null()

        #fill null values
        if self._fill_nulls_columns:
            self.fill_null()
        
        #handle duplicate rows
        if self._duplicate_columns:
            self.handle_duplicate()

        #handle nested columns 
        if self._nested_columns:
            self.handle_nested()

        #rename columns
        if self._rename_columns:
            self.rename()

        return self._data

In [6]:
from pyspark.sql.functions import col, year

""" Processing silver artist data. """
def silver_artist_process(spark: SparkSession):
    #read bronze artist data
    bronze_artist = read_HDFS(spark, HDFS_dir = "bronze_data/bronze_artist", file_type = 'parquet')

    #applying SilverLayer class 
    silver_artist = SilverLayer(data = bronze_artist, 
                                drop_columns       = ['Artist_Type', 'Href', 'Artist_Uri', 'Execution_date'],
                                drop_null_columns  = ['Artist_ID'], 
                                fill_nulls_columns = {'Followers': 0,
                                                      'Popularity': 0},
                                duplicate_columns  = ['Artist_ID'],
                                nested_columns     = ['Genres'],
                                rename_columns     = {'Artist_ID': 'id',
                                                      'Artist_Name': 'name',
                                                      'Genres': 'genres',
                                                      'Followers': 'followers',
                                                      'Popularity': 'popularity',
                                                      'Artist_Image': 'link_image',
                                                      'External_Url': 'url'})
    
    #processing data
    print("Processing for 'silver_artist' ...")
    silver_artist = silver_artist.process()
    print("Finished processing for 'silver_artist'.")
    #load data into HDFS
    write_HDFS(spark, data = silver_artist, direct = "silver_data/silver_artist", file_type = 'parquet')


""" Processing silver album data. """
def silver_album_process(spark: SparkSession):
    #read bronze album data
    bronze_album = read_HDFS(spark, HDFS_dir = 'bronze_data/bronze_album', file_type = 'parquet')
    #applying Silver Layer class
    silver_album = SilverLayer(data = bronze_album,
                               drop_columns       = ['Genres', 'Available_Markets', 'Restrictions', 
                                                     'Href','Uri', 'Execution_date'],
                               drop_null_columns  = ['Album_ID'],
                               fill_nulls_columns = {'Popularity': 0,
                                                     'TotalTracks': 0},
                               duplicate_columns  = ['Album_ID'],
                               rename_columns     = {'Artist': 'artist',
                                                     'Artist_ID': 'artist_id',
                                                     'Album_ID': 'id',
                                                     'Name': 'name',
                                                     'Type': 'type',
                                                     'Label': 'label',
                                                     'Popularity': 'popularity',
                                                     'Release_Date': 'release_date',
                                                     'ReleaseDatePrecision': 'release_date_precision',
                                                     'TotalTracks': 'total_tracks',
                                                     'Copyrights': 'copyrights',
                                                     'External_URL': 'url',
                                                     'Image': 'link_image'})
    
    #processing data
    print("Processing for 'silver_album' ...")
    silver_album = silver_album.process()
    print("Finished processing for 'silver_album'.")
    #load data into HDFS
    write_HDFS(spark, data = silver_album, direct = 'silver_data/silver_album', file_type = 'parquet')


""" Processing silver track data. """
def silver_track_process(spark: SparkSession):
    #read bronze track data
    bronze_track = read_HDFS(spark, HDFS_dir = 'bronze_data/bronze_track', file_type = 'parquet')
    #applying Silver Layer class
    silver_track = SilverLayer(data               = bronze_track,
                               drop_columns       = ['Artists', 'Type', 'AvailableMarkets', 'Href', 
                                                     'Uri', 'Is_Local', 'Execution_date'],
                               drop_null_columns  = ['Track_ID'],
                               fill_nulls_columns = {'Restrictions': 'None'},
                               duplicate_columns  = ['Track_ID'],
                               rename_columns     = {'Album_ID': 'album_id',
                                                     'Album_Name': 'album_name',
                                                     'Track_ID': 'id',
                                                     'Name': 'name',
                                                     'Track_Number': 'track_number',
                                                     'Disc_Number': 'disc_number',
                                                     'Duration_ms': 'duration_ms',
                                                     'Explicit': 'explicit',
                                                     'External_urls': 'url',
                                                     'Restrictions': 'restriction',
                                                     'Preview_url': 'preview'})
    
    #processing data
    print("Processing for 'silver_track' ...")
    silver_track = silver_track.process()
    print("Finished processing for 'silver_track'.")
    #load data into HDFS
    write_HDFS(spark, data = silver_track, direct = 'silver_data/silver_track', file_type = 'parquet')


""" Processing silver track feature data. """
def silver_track_feature_process(spark: SparkSession):
    #read silver track feature data
    bronze_track_feature = read_HDFS(spark, HDFS_dir = 'bronze_data/bronze_track_feature', file_type = 'parquet')
    #applying Silver Layer class
    silver_track_feature = SilverLayer(data              = bronze_track_feature,
                                       drop_columns      = ['Track_href', 'Type_Feature', 'Analysis_Url', 'Execution_date'],
                                       drop_null_columns = ['Track_ID'],
                                       duplicate_columns = ['Track_ID'],
                                       rename_columns    = {'Track_ID': 'id',
                                                            'Danceability': 'danceability',
                                                            'Energy': 'energy',
                                                            'Key': 'key',
                                                            'Loudness': 'loudness',
                                                            'Mode': 'mode',
                                                            'Speechiness': 'speechiness',
                                                            'Acousticness': 'acousticness',
                                                            'Instrumentalness': 'instrumentalness',
                                                            'Liveness': 'liveness',
                                                            'Valence': 'valence',
                                                            'Tempo': 'tempo',
                                                            'Time_signature': 'time_signature'})
    #processing data
    print("Processing for 'silver_track_feature' ...")
    silver_track_feature = silver_track_feature.process()
    print("Finished processing for 'silver_track_feature'.")
    #load data into HDFS
    write_HDFS(spark, data = silver_track_feature, direct = 'silver_data/silver_track_feature', file_type = 'parquet')


#main call
if __name__ == "__main__":

    with get_sparkSession("Silver_task_spark") as spark:
        print("------------------------------- Silver task starts! -------------------------------")
        print("Starting silver artist data processing...")
        silver_artist_process(spark)
        print("Starting silver album data processing...")
        silver_album_process(spark)
        print("Starting silver track data processing...")
        silver_track_process(spark)
        print("Starting silver track feature data processing...")
        silver_track_feature_process(spark)
        print("------------------------------ Silver task finished! -------------------------------")

24/12/16 13:25:53 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Successfully created Spark Session with app name: Silver_task_spark and master: local!
------------------------------- Silver task starts! -------------------------------
Starting silver artist data processing...
Starting to read data from hdfs://namenode:9000/datalake/bronze_data/bronze_artist...
Processing for 'silver_artist' ...
Finished processing for 'silver_artist'.
Starting to upload 'silver_artist' into hdfs://namenode:9000/datalake/silver_data/silver_artist...


                                                                                

Successfully uploaded 'silver_artist' into HDFS.
Starting silver album data processing...
Starting to read data from hdfs://namenode:9000/datalake/bronze_data/bronze_album...
Processing for 'silver_album' ...
Finished processing for 'silver_album'.
Starting to upload 'silver_album' into hdfs://namenode:9000/datalake/silver_data/silver_album...


                                                                                

Successfully uploaded 'silver_album' into HDFS.
Starting silver track data processing...
Starting to read data from hdfs://namenode:9000/datalake/bronze_data/bronze_track...
Processing for 'silver_track' ...
Finished processing for 'silver_track'.
Starting to upload 'silver_track' into hdfs://namenode:9000/datalake/silver_data/silver_track...


                                                                                

Successfully uploaded 'silver_track' into HDFS.
Starting silver track feature data processing...
Starting to read data from hdfs://namenode:9000/datalake/bronze_data/bronze_track_feature...
Processing for 'silver_track_feature' ...
Finished processing for 'silver_track_feature'.
Starting to upload 'silver_track_feature' into hdfs://namenode:9000/datalake/silver_data/silver_track_feature...


                                                                                

Successfully uploaded 'silver_track_feature' into HDFS.
------------------------------ Silver task finished! -------------------------------
Successfully stopped Spark Session!
