# 1. Demo Data Pipeline in Notebook
This phase includes:
- Hadoop - Spark Input and Output
- Bronze
- Silver

## Import libraries

In [7]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import (StructType, StructField, StringType, 
                               IntegerType, FloatType, ArrayType, 
                               DateType, ByteType, TimestampType)
from pyspark.sql.functions import (col, split, regexp_replace, to_date,
                                   monotonically_increasing_id, concat, 
                                   lit, substring, rtrim, ltrim, when, 
                                   length, explode, explode_outer, arrays_zip)
from pyspark.sql import DataFrame
from contextlib import contextmanager

## 1.1. Hadoop - Spark Input and Output
I create functions for:
- Creating SparkSession(default, snowflake configuration)
- Handling Hadoop input and output

In [8]:
#create spark Session
@contextmanager
def get_sparkSession(appName: str, master: str = 'local'):
    conf = SparkConf()
    conf.setAppName(appName)
    conf.setMaster(master)
    conf.set("spark.executor.memory", "2g") \
        .set("spark.executor.cores", "2")
    
    spark = SparkSession.builder.config(conf = conf).getOrCreate()
    
    print(f"Successfully create SparkSession with app name: {appName}, master: {master}\n")
    try:
        yield spark
    finally:
        spark.stop()
        print("Spark Session has stopped!")

#input HDFS function
def upload_HDFS(dataFrame: DataFrame, table_name: str, HDFS_path: str) -> None:
    print(f'''Starting upload file "{table_name}" into {HDFS_path}...''')
    #check types of parameters
    if not isinstance(dataFrame, DataFrame):
        raise TypeError("data must be a DataFrame!")
    if not isinstance(table_name, str):
        raise TypeError("table name must be a string!")
    if not HDFS_path.startswith("hdfs://namenode:9000/"):
        raise TypeError('HDFS path must start with "hdfs://namenode:9000/"')
    
    #upload data
    dataFrame.write.parquet(HDFS_path, mode = 'overwrite')
    print("========================================================")
    print(f'''Successfully upload "{table_name}" into {HDFS_path}.''')
    print("========================================================")

#read file from HDFS function
def read_HDFS(spark: SparkSession, HDFS_path: str) -> DataFrame:
    print(f"Starting read file from {HDFS_path}.")
    #check parameters
    if not isinstance(spark, SparkSession):
        raise TypeError("spark must be a Spark Session!")
    if not HDFS_path.startswith("hdfs://namenode:9000/"):
        raise TypeError('HDFS path must start with "hdfs://namenode:9000/"')
    
    #read file
    data = spark.read.parquet(HDFS_path, header = True)
    return data

#set spark connection with snowflake data warehouse
@contextmanager
def get_snowflake_sparkSession(appName: str, master: str = 'local'):
    conf = SparkConf()
    conf.setAppName(appName)
    conf.setMaster(master)
    conf.set("spark.executor.memory", "2g") \
        .set("spark.executor.cores", "2") \
        .set("spark.jars","/opt/jars/snowflake-jdbc-3.19.0.jar, \
                           /opt/jars/spark-snowflake_2.12-2.12.0-spark_3.4.jar")
    
    spark = SparkSession.builder.config(conf = conf).getOrCreate()

    print(f"Successfully create SparkSession for Snowflake with app name: {appName}, master: {master}\n")
    try:
        yield spark
    finally:
        spark.stop()
        print("Spark Sesion has stopped!")

#default config for snowflake
sfOptions_default = {
    "sfURL": "https://ae58556.ap-southeast-1.snowflakecomputing.com",
    "sfUser": "HUYNHTHUAN",
    "sfPassword": "********", #hide password
    "sfDatabase": "OLYMPICS_DB",
    "sfSchema": "OLYMPICS_SCHEMA",
    "sfWarehouse": "COMPUTE_WH",
    "sfRole": "ACCOUNTADMIN"
}

#load data from hdfs to snowflake data warehouse
def load_snowflake(dataFrame: DataFrame, table_name: str, sfOptions: dict = sfOptions_default):
    print(f'''Starting upload {table_name} into snowflake...''')
    #check parameters
    if not isinstance(dataFrame, DataFrame):
        raise TypeError("data must be a DataFrame!")
    if not isinstance(table_name, str):
        raise TypeError("table name must be a string!")
    
    #upload data
    dataFrame.write \
        .format("snowflake") \
        .options(**sfOptions) \
        .option("dbtable", table_name) \
        .mode("overwrite") \
        .save()
    print("========================================================")
    print(f'''Successfully upload "{table_name}" into SnowFlake.''')
    print("========================================================")

## 1.2. Bronze
In this phase, I will
- Create schemas for nine tables. (1.2.1)
- Apply the appropriate schema to each table and load all the data into HDFS as the Bronze layer. (1.2.2)

### 1.2.1. Creating Schemas

In [9]:
def get_schema(table_name):
    '''
        Create schema for athletes table
    '''
    #list of columns containing string type
    cols = ['code', 'name', 'name_short', 'name_tv',
            'gender', 'function', 'country_code',
            'country', 'country_full', 'nationality', 
            'nationality_full', 'nationality_code']
    
    cols2 = ['birth_place', 'birth_country', 'residence_place', 
             'residence_country', 'nickname', 'hobbies', 'occupation', 
             'education', 'family']
    
    cols3 = ['coach', 'reason', 'hero', 'influence', 'philosophy', 
             'sporting_relatives', 'ritual', 'other_sports']
    
    #create schema for athletes table
    athletes_schema  = [StructField(col, StringType(), True) for col in cols]
    athletes_schema += [StructField('height', IntegerType(), True),
                        StructField('weight', IntegerType(), True),
                        StructField('disciplines', ArrayType(StringType(),True), True),
                        StructField('events', ArrayType(StringType(),True)),
                        StructField('birth_date', DateType(), True)]
    athletes_schema += [StructField(col, StringType(), True) for col in cols2]
    athletes_schema += [StructField('lang', ArrayType(StringType(),True),True)]
    athletes_schema += [StructField(col, StringType(), True) for col in cols3]
    athletes_schema  =  StructType(athletes_schema)


    '''
        Create schema for events table
    '''
    #events table has a suitable schema, so ignore it
    events_schema = None
    

    '''
        Create schema for medallists table 
    '''
    medallists_schema = [StructField("medal_date", DateType(), True),
                         StructField("medal_type", StringType(), True),
                         StructField("medal_code", ByteType(), True)]
    
    #list of columns containing string type
    cols = ['name', 'gender', 'country', 'country_code', 'nationality','team', 
            'team_gender', 'discipline', 'event', 'event_type', 'url_event']
    
    medallists_schema += [StructField(col, StringType(), True) for col in cols]
    medallists_schema += [StructField("birth_date", DateType(), True),
                          StructField("code", StringType(), True)]
    medallists_schema  =  StructType(medallists_schema)


    '''
        Create schema for medals table
    '''
    medals_schema = [StructField("medal_type", StringType(),True),
                     StructField("medal_code", ByteType(),True),
                     StructField("medal_date", DateType(), True)]
    
    #list of columns containing string type
    cols = ['name', 'country_code', 'gender', 'discipline',
             'event', 'event_type', 'url_event', 'code']
    
    medals_schema += [StructField(col, StringType(), True) for col in cols]
    medals_schema  = StructType(medals_schema)


    '''
        Create schema for schedules table
    '''
    schedules_schema = [StructField("start_date", TimestampType(), True),
                        StructField("end_date", TimestampType(), True),
                        StructField("day", DateType(), True),
                        StructField("status", StringType(), True),
                        StructField("discipline", StringType(), True),
                        StructField("discipline_code", StringType(), True),
                        StructField("event", StringType(), True),
                        StructField("event_medal", IntegerType(), True)]
    
    #list of columns containing string type 
    cols = ['phase', 'gender', 'event_type', 'venue', 
            'venue_code', 'location_description', 'location_code']
    
    schedules_schema += [StructField(col, StringType(), False) for col in cols]
    schedules_schema += [StructField("url", StringType(), True)]
    schedules_schema  = StructType(schedules_schema)


    '''
        Create schema for schedules_preliminary schemas

    '''
    schedules_pre_schema = [StructField("date_start_utc", TimestampType(), True),
                            StructField("date_end_utc", TimestampType(), True),
                            StructField("estimated", StringType(), True),
                            StructField("estimated_start", StringType(), True),
                            StructField("start_text", StringType(), True),
                            StructField("medal", IntegerType(), True),
                            StructField("venue_code", StringType(), True),
                            StructField("description", StringType(), True)]
    
    #list of columns containing string type - can be nullable
    cols = ['venue_code_other', 'discription_other', 
            'team_1_code', 'team_1', 'team_2_code', 'team_2']
    
    schedules_pre_schema += [StructField(col, StringType(), True) for col in cols]
    schedules_pre_schema += [StructField("tag", StringType(), True),
                             StructField("sport", StringType(), True),
                             StructField("sport_code", StringType(), True),
                             StructField("sport_url", StringType(), True)]
    schedules_pre_schema  = StructType(schedules_pre_schema)


    '''
        Create schema for teams table
    '''
    #list of column containing string type
    cols = ['code', 'team', 'team_gender', 'country', 'country_full', 
            'country_code', 'discipline', 'disciplines_code', 'events']
    
    teams_schema  = [StructField(col, StringType(), True) for col in cols]
    teams_schema += [StructField("athletes", ArrayType(StringType(),True)),
                     StructField("coaches", ArrayType(StringType(),True)),
                     StructField("athletes_codes",ArrayType(StringType(),True)),
                     StructField("num_athletes",IntegerType(),True),
                     StructField("coaches_codes",ArrayType(StringType(),True)),
                     StructField("num_coaches",IntegerType(),True)]
    teams_schema = StructType(teams_schema)


    '''
        Create schema for torch_route table
    '''
    torch_route_schema = [StructField("title", StringType(), True),
                          StructField("city", StringType(), True),
                          StructField("date_start", TimestampType(), True),
                          StructField("date_end", TimestampType(), True),
                          StructField("tag", StringType(), True),
                          StructField("url", StringType(), True),
                          StructField("stage_number", IntegerType(), True)]
    torch_route_schema = StructType(torch_route_schema)


    '''
        Create schema for venues table
    '''
    venues_schema = [StructField("venue", StringType(), True),
                     StructField("sports", ArrayType(StringType(),True),True),
                     StructField("date_start", TimestampType(), True),
                     StructField("date_end", TimestampType(), True),
                     StructField("tag", StringType(), True),
                     StructField("url", StringType(), True)]
    venues_schema = StructType(venues_schema)


    #create dict for mapping schema
    schema = {
        'athletes' : athletes_schema,
        'events' : events_schema,
        'medallists' : medallists_schema,
        'medals' : medals_schema,
        'schedules_preliminary' : schedules_pre_schema,
        'schedules' : schedules_schema,
        'teams' : teams_schema,
        'torch_route' : torch_route_schema,
        'venues' : venues_schema
    }

    return schema[table_name]

### 1.2.2. Applying Schemas and Loading Data into HDFS

In [11]:
#task
def bronze_task(tables: list, HDFS_path: str):
    with get_sparkSession("bronze_task_spark", "local") as spark:
        df = None
        for table_name in tables:
            '''
                Note that we need to preprocess data for athletes,
                teams and venues table before applying the schema
            '''
            if table_name == 'athletes':
                #read data from csv file
                data = spark.read.csv("/opt/data/athletes.csv", header = True)

                #replace unnecessary character 
                data = data.withColumn("disciplines", regexp_replace("disciplines", "[\[\]']", "")) \
                           .withColumn("events", regexp_replace("events","[\[\]']","")) 
                
                data = data.withColumn("disciplines", split(data["disciplines"],",")) \
                           .withColumn("events", split(data["events"],",")) \
                           .withColumn("height", col("height").cast("int")) \
                           .withColumn("weight", col("weight").cast("int")) \
                           .withColumn("birth_date", to_date(col("birth_date"), "yyyy-mm-dd")) \
                           .withColumn("lang", split(data["lang"],","))
                
                #create dataFrame
                df = spark.createDataFrame(data.rdd, schema = get_schema(table_name))


            elif table_name == "teams":
                #read data from csv file
                data = spark.read.csv("/opt/data/teams.csv", header = True)

                #replace unnecessary characters
                data = data.withColumn("athletes", regexp_replace("athletes","[\[\]']","")) \
                           .withColumn("coaches", regexp_replace("coaches","[\[\]']","")) \
                           .withColumn("athletes_codes", regexp_replace("athletes_codes","[\[\]']","")) \
                           .withColumn("coaches_codes", regexp_replace("coaches_codes","[\[\]']",""))
                
                #transform data type
                data = data.withColumn("athletes", split(data["athletes"],",")) \
                           .withColumn("coaches", split(data["coaches"],",")) \
                           .withColumn("athletes_codes", split(data["athletes_codes"],",")) \
                           .withColumn("coaches_codes", split(data["coaches_codes"],",")) \
                           .withColumn("num_athletes", col("num_athletes").cast("int")) \
                           .withColumn("num_coaches", col("num_coaches").cast("int"))
                 
                #create dataFrame
                df = spark.createDataFrame(data.rdd, schema = get_schema(table_name))


            elif table_name == 'venues':
                #read data from csv file
                data = spark.read.csv("/opt/data/venues.csv", header = True)

                #replace unnecessary characters
                data = data.withColumn("sports", regexp_replace("sports","[\[\]']",""))

                data = data.withColumn("sports", split(data["sports"],",")) \
                           .withColumn("date_start", col("date_start").cast("timestamp")) \
                           .withColumn("date_end", col("date_end").cast("timestamp"))
                
                #create dataFrame
                df = spark.createDataFrame(data.rdd, schema = get_schema(table_name)) 


            else:
                df = spark.read.csv(f"/opt/data/{table_name}.csv",header = True, schema = get_schema(table_name))


            #After reading csv files or preprocessing, we load data into HDFS
            upload_HDFS(df, table_name, HDFS_path + f'/{table_name}/')

if __name__ == '__main__':
    #List all tables
    tables = ['athletes', 'events', 'medallists', 
            'medals', 'schedules_preliminary', 
            'schedules', 'teams', 'torch_route', 'venues']
    
    #HDFS path
    HDFS_path = "hdfs://namenode:9000/datalake/bronze_storage/"
    
    print("=========================Bronze task starts!========================")

    bronze_task(tables, HDFS_path)
    
    print("========================Bronze task finishes!========================")

Successfully create SparkSession with app name: bronze_task_spark, master: local

Starting upload file "athletes" into hdfs://namenode:9000/datalake/bronze_storage//athletes/...


                                                                                

Successfully upload "athletes" into hdfs://namenode:9000/datalake/bronze_storage//athletes/.
Starting upload file "events" into hdfs://namenode:9000/datalake/bronze_storage//events/...


                                                                                

Successfully upload "events" into hdfs://namenode:9000/datalake/bronze_storage//events/.
Starting upload file "medallists" into hdfs://namenode:9000/datalake/bronze_storage//medallists/...


                                                                                

Successfully upload "medallists" into hdfs://namenode:9000/datalake/bronze_storage//medallists/.
Starting upload file "medals" into hdfs://namenode:9000/datalake/bronze_storage//medals/...


                                                                                

Successfully upload "medals" into hdfs://namenode:9000/datalake/bronze_storage//medals/.
Starting upload file "schedules_preliminary" into hdfs://namenode:9000/datalake/bronze_storage//schedules_preliminary/...


                                                                                

Successfully upload "schedules_preliminary" into hdfs://namenode:9000/datalake/bronze_storage//schedules_preliminary/.
Starting upload file "schedules" into hdfs://namenode:9000/datalake/bronze_storage//schedules/...


                                                                                

Successfully upload "schedules" into hdfs://namenode:9000/datalake/bronze_storage//schedules/.
Starting upload file "teams" into hdfs://namenode:9000/datalake/bronze_storage//teams/...


                                                                                

Successfully upload "teams" into hdfs://namenode:9000/datalake/bronze_storage//teams/.
Starting upload file "torch_route" into hdfs://namenode:9000/datalake/bronze_storage//torch_route/...
Successfully upload "torch_route" into hdfs://namenode:9000/datalake/bronze_storage//torch_route/.


                                                                                

Starting upload file "venues" into hdfs://namenode:9000/datalake/bronze_storage//venues/...


                                                                                

Successfully upload "venues" into hdfs://namenode:9000/datalake/bronze_storage//venues/.
Spark Sesion has stopped!


## 1.3 Silver
In this phase, I will handle each table individually, including:
- Creating a SilverLayer class to process and transform data, which has six main functions (1.3.1):
    - Drop N/A columns
    - Drop duplicates
    - Drop unnecessary columns
    - Rename columns
    - Handle nested structures
    - Handle missing values
- Using the SilverLayer class to process all tables. (1.3.2)


### 1.3.1. Creating SilverLayer Class

In [12]:
#create silver clean classs
class Silverlayer:
    #init
    def __init__(self, df: DataFrame, 
                 dropna_columns: list = None,
                 columns_dropDuplicates: list = None,
                 columns_drop: list = None, 
                 columns_rename: dict = None, 
                 nested_columns: list = None,
                 missval_columns: dict = None,
                 ):
        
        '''
            Initializes processing task
        '''
        #firstly, check all types of parameters 
        if df is not None and not isinstance(df, DataFrame):
            raise TypeError("data must be a DataFrame")
        if columns_dropDuplicates is not None and not isinstance(columns_dropDuplicates, list):
            raise TypeError("columns_dropDuplicates must be a list")
        if columns_drop is not None and not isinstance(columns_drop, list):
            raise TypeError("columns_drop must be a list")
        if columns_rename is not None and not isinstance(columns_rename, dict):
            raise TypeError("columns_rename must be a dict")
        if nested_columns is not None and not isinstance(nested_columns, list):
            raise TypeError("nested_columns must be a list")
        if missval_columns is not None and not isinstance(missval_columns, dict):
            raise TypeError("columns_null must be a dict")
        if dropna_columns is not None and not isinstance(dropna_columns, list):
            raise TypeError("dropna_columns must be a list")
        
        #data frame
        self.df = df

        #list of columns to apply the drop duplicate function
        self.columns_dropDuplicates = columns_dropDuplicates

        #list of columns to drop
        self.columns_drop = columns_drop

        #dict containing old name & new name
        self.columns_rename = columns_rename

        #list of columns that need to handle nested structures
        self.nested_columns = nested_columns

        #dict containing columns to check & a value to apply for nulls
        self.missval_columns = missval_columns 

        #list of columns to apply drop na function
        self.dropna_columns = dropna_columns

    def drop_na(self, df: DataFrame, dropna_columns: list):
        '''
            Drop rows containing na values
        '''
        self.df = df.dropna(how = 'all', subset = dropna_columns)

    def drop_duplicates(self, df: DataFrame, columns_dropDuplicates: list):
        '''
            Drop duplicates based on specified columns
        '''
        self.df = df.dropDuplicates(columns_dropDuplicates)

    def drop_columns(self, df: DataFrame, columns_drop: list):
        '''
            Drop unnecessary columns
        '''
        self.df = df.drop(*columns_drop)

    def rename_columns(self, columns_rename: dict):
        '''
            Rename columns
        '''
        for old_name, new_name in columns_rename.items():
            self.df = self.df.withColumnRenamed(old_name, new_name)

    def handle_nested(self, nested_columns: list):
        '''
            Handle nested columns 
        '''
        for col in nested_columns:
            self.df = self.df.withColumn(col, explode_outer(col))
            self.df = self.df.withColumn(col, ltrim(col))

    def handle_missing(self, missval_columns: dict):
        '''
            Handle missing values
        '''
        for col, value in missval_columns.items():
            self.df = self.df.fillna(value = value, subset = col)

    def process(self) -> DataFrame:
        '''
            Process based on all parameters
        '''
        self.drop_na(self.df, self.dropna_columns)
        self.drop_duplicates(self.df, self.columns_dropDuplicates)

        if self.columns_drop:
            self.drop_columns(self.df, self.columns_drop)
        
        if self.columns_rename:
            self.rename_columns(self.columns_rename)

        if self.nested_columns:
            self.handle_nested(self.nested_columns)
        
        if self.missval_columns:
            self.handle_missing(self.missval_columns)

        return self.df

### 1.3.2 Applying SilverLayer class to all tables

#### Athletes table

In [13]:
#athletes
def athletes_silver(spark: SparkSession, HDFS_load):
    '''
        Process athletes table
    '''
    #read file
    HDFS_path = "hdfs://namenode:9000/datalake/bronze_storage/athletes"
    df = read_HDFS(spark, HDFS_path)

    #process
    columns_drop = ['name_short', 'name_tv', 'hobbies', 'occupation', 'education', 
                    'family', 'coach', 'reason', 'hero', 'influence', 'philosophy', 
                    'sporting_relatives', 'ritual', 'other_sports']
    
    df_silver = Silverlayer(df = df, 
                            columns_drop    =  columns_drop,
                            columns_rename  = {'code':'athletes_id', 'name':'full_name', 'lang':'language'},
                            nested_columns  = ['disciplines', 'events', 'language'],
                            missval_columns = {'birth_place':'N/A', 
                                                'birth_country':'N/A', 
                                                'nickname': 'N/A',
                                                'residence_place':'N/A', 
                                                'residence_country':'N/A',
                                                'height':0, 'weight':0}).process()
    
    #upload hdfs
    upload_HDFS(df_silver, "athletes_silver", HDFS_load + '/athletes_silver/')

#### Events table

In [14]:
#events
def events_silver(spark: SparkSession, HDFS_load):
    '''
        Process events table
    '''
    HDFS_path = "hdfs://namenode:9000/datalake/bronze_storage/events"
    df = read_HDFS(spark, HDFS_path)

    #process
    df_silver = Silverlayer(df = df, 
                            columns_dropDuplicates = ['event', 'sport'],
                            columns_drop           = ['sport_url', 'tag'], 
                            columns_rename         = {'sport_code':'id_sport'}).process()
    
    #upload hdfs
    upload_HDFS(df_silver, 'events_silver', HDFS_load + '/events_silver/')

#### Medallists table

In [15]:
#medallists
def medallists_silver(spark: SparkSession, HDFS_load):
    '''
        Process medallists table
    '''
    #read file
    HDFS_path = "hdfs://namenode:9000/datalake/bronze_storage/medallists"
    df = read_HDFS(spark, HDFS_path)

    #process
    df_silver = Silverlayer(df = df, 
                            dropna_columns         = ['name'], 
                            columns_dropDuplicates = ['name', 'medal_type', 'discipline', 'event'],
                            columns_drop           = ['medal_code', 'url_event'],
                            columns_rename         = {'name':'full_name', 'code':'athletes_id'},
                            missval_columns        = {'team':'N/A','team_gender':'N/A', 'nationality':'N/A'}).process()
    
    #upload hdfs
    upload_HDFS(df_silver, 'medallists_silver', HDFS_load + '/medallists_silver/')

#### Medals table

In [16]:
#medals
def medals_silver(spark: SparkSession, HDFS_load):
    '''
        Process medals table
    '''
    #read file
    HDFS_path = "hdfs://namenode:9000/datalake/bronze_storage/medals"
    df = read_HDFS(spark, HDFS_path)

    #process
    df_silver = Silverlayer(df = df,
                            dropna_columns         = ['name'], 
                            columns_dropDuplicates = ['name', 'medal_type', 'discipline', 'event'],
                            columns_drop           = ['medal_code', 'url_event'],
                            columns_rename         = {'name':'full_name', 'code':'athletes_id'}).process()
    
    #upload hdfs
    upload_HDFS(df_silver, 'medals_silver', HDFS_load + '/medals_silver/')

#### Schedules Table
Due to a mismatch with data in the venues table, I will transform the data in the venue column of the schedules table after applying the SilverLayer class.

In [17]:
#schedules
def schedules_silver(spark: SparkSession, HDFS_load):
    '''
        Process schedules table
    '''
    #read file
    HDFS_path = "hdfs://namenode:9000/datalake/bronze_storage/schedules"
    df = read_HDFS(spark, HDFS_path)

    #process
    df_silver = Silverlayer(df = df,
                            dropna_columns         = ['event'],
                            columns_dropDuplicates = ['event', 'discipline', 'phase'],
                            columns_drop           = ['status', 'event_medal', 'url']).process()
    
    #preprocess to match name of venue before joining 
    df_silver = df_silver.withColumn('venue', regexp_replace("venue", "\\d", "")) \
                         .withColumn('venue', rtrim('venue')) \
                         .withColumn('venue_code', regexp_replace("venue_code", "\\d", "")) \
                         .withColumn('venue_code', rtrim('venue_code'))
    
    #process data to match venue
    df_silver = df_silver.withColumn('venue', when(col('venue') == 'Chateauroux Shooting Ctr', 'Chateauroux Shooting Centre') \
                         .otherwise(col('venue'))) \
                         .withColumn('venue', when(col('venue') == 'Nautical St - Flat water', 'Vaires-sur-Marne Nautical Stadium') \
                         .otherwise(col('venue'))) \
                         .withColumn('venue', when(col('venue') == 'BMX Stadium', 'Saint-Quentin-en-Yvelines BMX Stadium') \
                         .otherwise(col('venue'))) \
                         .withColumn('venue', when(col('venue') == 'Champ-de-Mars Arena', 'Champ de Mars Arena') \
                         .otherwise(col('venue'))) \
                         .withColumn('venue', when(col('venue') == 'Le Bourget Climbing Venue', 'Le Bourget Sport Climbing Venue') \
                         .otherwise(col('venue'))) \
                         .withColumn('venue', when(col('venue') == 'Nautical St - White water', 'Vaires-sur-Marne Nautical Stadium') \
                         .otherwise(col('venue'))) \
                         .withColumn('venue', when(col('venue') == 'Roland-Garros Stadium', 'Stade Roland-Garros') \
                         .otherwise(col('venue'))) \
                         .withColumn('venue', when(col('venue') == 'Le Golf National', 'Golf National') \
                         .otherwise(col('venue'))) \
                         .withColumn('venue', when(col('venue') == 'National Velodrome', 'Saint-Quentin-en-Yvelines Velodrome') \
                         .otherwise(col('venue')))
    
    #upload hdfs
    upload_HDFS(df_silver, 'schedules_silver', HDFS_load + '/schedules_silver/')

#### Schedules preliminary table

In [18]:
#schedules_preliminary
def schedules_pre_silver(spark: SparkSession, HDFS_load):
    '''
        Process schedules preliminary table
    '''
    #read file
    HDFS_path = "hdfs://namenode:9000/datalake/bronze_storage/schedules_preliminary"
    df = read_HDFS(spark, HDFS_path)

    #process
    df_silver = Silverlayer(df = df,
                            dropna_columns         = ['team_1_code', 'team_2_code', 'team_1', 'team_2'],
                            columns_dropDuplicates = ['team_1_code', 'team_2_code', 'sport'],
                            columns_drop           = ['estimated', 'estimated_start', 'start_text', 
                                                    'medal', 'sport_url', 'tag'],
                            missval_columns        = {'venue_code':'N/A', 'venue_code_other':'N/A',
                                                    'discription_other':'N/A'}).process()
    
    #upload hdfs
    upload_HDFS(df_silver, 'schedules_pre_silver', HDFS_load + '/schedules_pre_silver/')

#### Teams Table
To handle the nested structure in this table, we need to merge the athletes and athletes_code arrays, and then explode them to match the two related arrays.

In [19]:
#teams
def teams_silver(spark: SparkSession, HDFS_load):
    '''
        Process teams table
    '''
    #read file
    HDFS_path = "hdfs://namenode:9000/datalake/bronze_storage/teams"
    df = read_HDFS(spark, HDFS_path)

    #process
    #first, we need to merge array athletes and athletes_code to handle nested structure
    df = df.withColumn('athletes_id_merge', arrays_zip('athletes','athletes_codes'))
    
    df_silver = Silverlayer(df = df,
                            dropna_columns         = ['code', 'team'],
                            columns_drop           = ['coaches', 'coaches_codes', 'num_coaches', 
                                                      'athletes', 'athletes_codes'],
                            columns_rename         = {'events':'event', 'code':'team_id', 'team':'team_name'},
                            missval_columns        = {'event':'Default'}).process()
    
    #After silver processing, we process nested structure
    df_silver = df_silver.withColumn('athletes_id_merge', explode('athletes_id_merge'))

    df_silver = df_silver.withColumn('athletes', col('athletes_id_merge.athletes')) \
                         .withColumn('athletes_id', col('athletes_id_merge.athletes_codes'))
    
    df_silver = df_silver.withColumn('athletes', ltrim('athletes')) \
                         .withColumn('athletes_id', ltrim('athletes_id'))
    
    df_silver = df_silver.drop('athletes_id_merge')

    #upload hdfs
    upload_HDFS(df_silver, 'teams_silver', HDFS_load + '/teams_silver/')

#### Torch route table

In [20]:
#torch_route
def torch_route_silver(spark: SparkSession, HDFS_load):
    '''
        Process torch route table
    '''
    #read filey
    HDFS_path = "hdfs://namenode:9000/datalake/bronze_storage/torch_route"
    df = read_HDFS(spark, HDFS_path)

    #process
    df_silver = Silverlayer(df = df, 
                            dropna_columns         = ['title'],
                            columns_dropDuplicates = ['title'],
                            columns_drop           = ['tag', 'url'],
                            missval_columns        = {'city':'N/A', 'stage_number':0}).process()
    
    #upload hdfs
    upload_HDFS(df_silver, 'torch_route_silver', HDFS_load + '/torch_route_silver/')

#### Venues table

In [21]:
#venues
def venues_silver(spark: SparkSession, HDFS_load):
    '''
        Process venues table
    '''
    #read file
    HDFS_path = "hdfs://namenode:9000/datalake/bronze_storage/venues"
    df = read_HDFS(spark, HDFS_path)

    #process
    df_silver = Silverlayer(df = df,
                            dropna_columns         = ['venue'],
                            columns_dropDuplicates = ['venue'],
                            columns_drop           = ['tag', 'url'],
                            columns_rename         = {'sports':'sport'},
                            nested_columns         = ['sport'],
                            missval_columns        = {'date_start':'N/A', 'date_end':'N/A'}).process()
    
    #upload hdfs
    upload_HDFS(df_silver, 'venues_silver', HDFS_load + '/venues_silver/')

#### Running all functions 

In [22]:
def silver_task(spark, HDFS_load):
    athletes_silver(spark, HDFS_load)
    events_silver(spark, HDFS_load)
    medallists_silver(spark, HDFS_load)
    medals_silver(spark, HDFS_load)
    schedules_pre_silver(spark, HDFS_load)
    schedules_silver(spark, HDFS_load)
    teams_silver(spark, HDFS_load)
    torch_route_silver(spark, HDFS_load)
    venues_silver(spark, HDFS_load)

if __name__ == '__main__':
    #hdfs path 
    HDFS_load = "hdfs://namenode:9000/datalake/silver_storage"

    print("=========================Silver task starts!========================")

    with get_sparkSession("silver_task_spark") as spark:
        silver_task(spark, HDFS_load)
        
    print("========================Silver task finishes!========================")

Successfully create SparkSession with app name: silver_task_spark, master: local

Starting read file from hdfs://namenode:9000/datalake/bronze_storage/athletes.


                                                                                

Starting upload file "athletes_silver" into hdfs://namenode:9000/datalake/silver_storage/athletes_silver/...


                                                                                

Successfully upload "athletes_silver" into hdfs://namenode:9000/datalake/silver_storage/athletes_silver/.
Starting read file from hdfs://namenode:9000/datalake/bronze_storage/events.
Starting upload file "events_silver" into hdfs://namenode:9000/datalake/silver_storage/events_silver/...


                                                                                

Successfully upload "events_silver" into hdfs://namenode:9000/datalake/silver_storage/events_silver/.
Starting read file from hdfs://namenode:9000/datalake/bronze_storage/medallists.
Starting upload file "medallists_silver" into hdfs://namenode:9000/datalake/silver_storage/medallists_silver/...


                                                                                

Successfully upload "medallists_silver" into hdfs://namenode:9000/datalake/silver_storage/medallists_silver/.
Starting read file from hdfs://namenode:9000/datalake/bronze_storage/medals.
Starting upload file "medals_silver" into hdfs://namenode:9000/datalake/silver_storage/medals_silver/...


                                                                                

Successfully upload "medals_silver" into hdfs://namenode:9000/datalake/silver_storage/medals_silver/.
Starting read file from hdfs://namenode:9000/datalake/bronze_storage/schedules_preliminary.
Starting upload file "schedules_pre_silver" into hdfs://namenode:9000/datalake/silver_storage/schedules_pre_silver/...


                                                                                

Successfully upload "schedules_pre_silver" into hdfs://namenode:9000/datalake/silver_storage/schedules_pre_silver/.
Starting read file from hdfs://namenode:9000/datalake/bronze_storage/schedules.
Starting upload file "schedules_silver" into hdfs://namenode:9000/datalake/silver_storage/schedules_silver/...


                                                                                

Successfully upload "schedules_silver" into hdfs://namenode:9000/datalake/silver_storage/schedules_silver/.
Starting read file from hdfs://namenode:9000/datalake/bronze_storage/teams.
Starting upload file "teams_silver" into hdfs://namenode:9000/datalake/silver_storage/teams_silver/...


                                                                                

Successfully upload "teams_silver" into hdfs://namenode:9000/datalake/silver_storage/teams_silver/.
Starting read file from hdfs://namenode:9000/datalake/bronze_storage/torch_route.
Starting upload file "torch_route_silver" into hdfs://namenode:9000/datalake/silver_storage/torch_route_silver/...


                                                                                

Successfully upload "torch_route_silver" into hdfs://namenode:9000/datalake/silver_storage/torch_route_silver/.
Starting read file from hdfs://namenode:9000/datalake/bronze_storage/venues.
Starting upload file "venues_silver" into hdfs://namenode:9000/datalake/silver_storage/venues_silver/...


                                                                                

Successfully upload "venues_silver" into hdfs://namenode:9000/datalake/silver_storage/venues_silver/.
Spark Sesion has stopped!


In [42]:
'''
    Check data quality of tables
'''
def check_data(spark: SparkSession, HDFS_path: str, table_name: str):
    '''
        Check data of all tables
    '''
    #read file
    df = read_HDFS(spark, HDFS_path + f'/{table_name}/')
    print(f'Checking data for table "{table_name}"...')
    df.show(5, truncate = False)

    #count the number of null values in each column of the table
    df_checkNull = {col:df.filter(df[col].isNull()).count() for col in df.columns}
    for col, num in df_checkNull.items():
        print(f'Number of null values in column "{col}": {num}')

    #print schema to ensure that all data doesn't have nested structure(array type of struct type)
    df.printSchema()

if __name__ == '__main__':
    #list all tables
    tables = ['athletes_silver', 'events_silver', 'medallists_silver', 
              'medals_silver', 'schedules_preliminary_silver', 'schedules_silver', 
              'teams_silver', 'torch_route_silver', 'venues_silver']
    #path
    HDFS_path = "hdfs://namenode:9000/datalake/silver_storage/"
    #call 
    with sparkSession("check_data_quality_spark", "local") as spark:
        for table in tables:
            check_data(spark, HDFS_path, table)



NameError: name 'sparkSession' is not defined

In [14]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import (monotonically_increasing_id, concat, 
                                   lit, substring, rtrim, when, length)

#dim table - medal 
def dim_medal(spark: SparkSession, HDFS_load):
    data = [(1,'Gold Medal'),(2,'Silver Medal'),(3,'Bronze Medal')]
    df_gold = spark.createDataFrame(data, schema = "medal_id int, medal_type string")
    #upload hdfs
    upload_HDFS(df_gold, 'dim_medal', HDFS_load + '/dim_medal/')

#dim table - discipline
def dim_discipline(spark: SparkSession, HDFS_load):
    #hdfs path
    HDFS_path = "hdfs://namenode:9000/datalake/silver_storage/schedules_silver"
    df = read_HDFS(spark, HDFS_path)
    #select
    df_gold = df.select('discipline_code', 'discipline').distinct() \
                .withColumnRenamed('discipline_code', 'discipline_id') \
                .withColumnRenamed('discipline', 'discipline_type')
    #upload hdfs
    upload_HDFS(df_gold, 'dim_discipline', HDFS_load + '/dim_discipline/')

#dim table - event
def dim_event(spark: SparkSession, HDFS_load):
    #hdfs path
    HDFS_path = "hdfs://namenode:9000/datalake/silver_storage/events_silver"
    df = read_HDFS(spark, HDFS_path)
    df = df.select('event').distinct()
    #add column
    df = df.withColumn('event_id', monotonically_increasing_id())
    df = df.withColumn('event_id', concat(lit('ev'), col('event_id')))
    #rename
    df_gold = df.withColumnRenamed('event', 'event_type')
    #upload hdfs
    upload_HDFS(df_gold, 'dim_event', HDFS_load + '/dim_event/')
    
#dim table - country
def dim_country(spark: SparkSession, HDFS_load):
    #hdfs path
    HDFS_athletes_path = "hdfs://namenode:9000/datalake/silver_storage/athletes_silver"
    HDFS_team_path = "hdfs://namenode:9000/datalake/silver_storage/teams_silver"
    df1 = read_HDFS(spark, HDFS_athletes_path)
    df2 = read_HDFS(spark, HDFS_team_path)
    #select
    df_nationality = df1.select('country_code', 'country')
    df_country = df1.select('nationality_code', 'nationality')
    df_team_country = df2.select('country_code', 'country')
    #union column
    df_gold = df_nationality.union(df_country).distinct()
    df_gold = df_gold.union(df_team_country).distinct()
    #rename
    df_gold = df_gold.withColumnRenamed('country_code', 'country_id') \
                     .withColumnRenamed('country', 'country_name')
    #upload hdfs
    upload_HDFS(df_gold, 'dim_country', HDFS_load + '/dim_country/')
    
#fact table - medallist
def fact_medallist(spark: SparkSession, HDFS_load):
    #hdfs path
    HDFS_path = "hdfs://namenode:9000/datalake/silver_storage/medals_silver"
    HDFS_discipline_gold = "hdfs://namenode:9000/datalake/gold_storage/dim_discipline"
    HDFS_event_gold = "hdfs://namenode:9000/datalake/gold_storage/dim_event"
    df = read_HDFS(spark, HDFS_path)
    df_discipline = read_HDFS(spark, HDFS_discipline_gold)
    df_event = read_HDFS(spark, HDFS_event_gold)
    
    df = df.filter(length('athletes_id') == 7)
    df = df.select('athletes_id', 'medal_type', 'medal_date', 'discipline', 'event')
    #handle table individually
    df = df.withColumn('medallist_id', concat(col('athletes_id'),substring('medal_type',1,1))) \
           .withColumn('medal_type', regexp_replace('medal_type', 'Bronze Medal', '3')) \
           .withColumn('medal_type', regexp_replace('medal_type', 'Silver Medal', '2')) \
           .withColumn('medal_type', regexp_replace('medal_type', 'Gold Medal', '1')) \
           .withColumn('medal_type', col('medal_type').cast('int')) \
           .withColumnRenamed('medal_type', 'medal_id') \

    #joining table
    df = df.join(df_discipline, df['discipline'] == df_discipline['discipline_type'], how = 'left')
    df = df.join(df_event, df['event'] == df_event['event_type'], how = 'left')
    #select
    df_gold = df.select('medallist_id', 'athletes_id', 'medal_id', 'medal_date', 'discipline_id', 'event_id').distinct()
    #upload hdfs
    upload_HDFS(df_gold, 'fact_medallist', HDFS_load + '/fact_medallist/')

#dim table - athletes
def dim_athletes(spark: SparkSession, HDFS_load):
    #hdfs path
    HDFS_athletes_path = "hdfs://namenode:9000/datalake/silver_storage/athletes_silver"
    df = read_HDFS(spark, HDFS_athletes_path)

    #select
    df = df.select('athletes_id', 'full_name', 'gender', 
                   'function', 'country_code', 'nationality_code', 
                   'height', 'weight', 'birth_date').distinct()
    #rename
    df_gold = df.withColumnRenamed('country_code', 'country_id') \
                .withColumnRenamed('nationality_code', 'nationality_id')
    #upload hdfs
    upload_HDFS(df_gold, 'dim_athletes', HDFS_load + '/dim_athletes/')

#fact table - medal_team
def fact_medal_team(spark: SparkSession, HDFS_load):
    HDFS_path = "hdfs://namenode:9000/datalake/silver_storage/medals_silver"
    HDFS_discipline_gold = "hdfs://namenode:9000/datalake/gold_storage/dim_discipline"
    HDFS_event_gold = "hdfs://namenode:9000/datalake/gold_storage/dim_event"
    df = read_HDFS(spark, HDFS_path)
    df_discipline = read_HDFS(spark, HDFS_discipline_gold)
    df_event = read_HDFS(spark, HDFS_event_gold)
    
    df = df.filter(length('athletes_id') > 7)
    df = df.select('athletes_id', 'medal_type', 'medal_date', 'discipline', 'event') \
           .withColumnRenamed('athletes_id', 'team_id')
    #handle table individually
    df = df.withColumn('medal_team_id', concat(col('team_id'),substring('medal_type',1,1))) \
           .withColumn('medal_type', regexp_replace('medal_type', 'Bronze Medal', '3')) \
           .withColumn('medal_type', regexp_replace('medal_type', 'Silver Medal', '2')) \
           .withColumn('medal_type', regexp_replace('medal_type', 'Gold Medal', '1')) \
           .withColumn('medal_type', col('medal_type').cast('int')) \
           .withColumnRenamed('medal_type', 'medal_id') \

    #joining table
    df = df.join(df_discipline, df['discipline'] == df_discipline['discipline_type'], how = 'left')
    df = df.join(df_event, df['event'] == df_event['event_type'], how = 'left')
    #select
    df_gold = df.select('medal_team_id', 'team_id', 'medal_id', 'medal_date', 'discipline_id', 'event_id').distinct()
    #upload hdfs
    upload_HDFS(df_gold, 'fact_medal_team', HDFS_load + '/fact_medal_team/')
    
#dim table - team
def dim_team(spark: SparkSession, HDFS_load):
    #hdfs path
    HDFS_path = "hdfs://namenode:9000/datalake/silver_storage/teams_silver"
    df = read_HDFS(spark, HDFS_path)
    df_gold = df.select('team_id', 'team_name', 'team_gender').distinct()
    #upload hdfs
    upload_HDFS(df_gold, 'dim_team', HDFS_load + '/dim_team/')

#dim table - athletes-team (associating dimension table)
def dim_athletes_team(spark: SparkSession, HDFS_load):
    #hdfs path
    HDFS_path = "hdfs://namenode:9000/datalake/silver_storage/teams_silver"
    df = read_HDFS(spark, HDFS_path)
    df_gold = df.select('athletes_id', 'team_id')
    upload_HDFS(df_gold, 'dim_athletes_team', HDFS_load + '/dim_athletes_team/')

#fact table - schedule & dim table - venue
def fact_schedule_dim_venue(spark: SparkSession, HDFS_load):
    #hdfs path
    HDFS_schedule_path = "hdfs://namenode:9000/datalake/silver_storage/schedules_silver"
    HDFS_venue_path = "hdfs://namenode:9000/datalake/silver_storage/venues_silver"
    HDFS_discipline_gold = "hdfs://namenode:9000/datalake/gold_storage/dim_discipline"
    HDFS_event_gold = "hdfs://namenode:9000/datalake/gold_storage/dim_event"

    df = read_HDFS(spark, HDFS_schedule_path)
    df_discipline = read_HDFS(spark, HDFS_discipline_gold)
    df_event = read_HDFS(spark, HDFS_event_gold)
    df_venue = read_HDFS(spark, HDFS_venue_path)
    
    '''
        Create fact - schedule table
    '''
    df = df.withColumn('schedule_id', monotonically_increasing_id())
    df = df.withColumn('schedule_id', concat(lit('sched'), col('schedule_id'))) \
    #drop column
    df = df.drop('event_type')
    #joining table
    df = df.join(df_discipline, df['discipline'] == df_discipline['discipline_type'], how = 'left')
    df = df.join(df_event, df['event'] == df_event['event_type'], how = 'left')
    #select
    df_gold_schedule = df.select('schedule_id', 'start_date', 'end_date', 
                        'gender', 'discipline_id', 'phase', 'venue_code', 'event_id') \
                .withColumnRenamed('venue_code', 'venue_id') \
    #upload hdfs
    upload_HDFS(df_gold_schedule, 'fact_schedule', HDFS_load + '/fact_schedule/')

    '''
        Create dim - venue table
    '''
    #get data
    df_venue_join = df.select('venue_code', 'venue') \
                      .withColumnRenamed('venue', 'venue_join')
    df_venue = df_venue.join(df_venue_join, df_venue['venue'] == df_venue_join['venue_join'], how = 'left')
    #select
    df_gold_venue = df_venue.select('venue_code', 'venue', 'date_start', 'date_end').distinct() \
                       .withColumnRenamed('venue_code', 'venue_id') \
                       .withColumnRenamed('venue', 'venue_name')
    #upload hdfs
    upload_HDFS(df_gold_venue, 'dim_venue', HDFS_load + '/dim_venue/')

#main call
def gold_task(spark, HDFS_load):
    #all func
    dim_medal(spark, HDFS_load)
    dim_discipline(spark, HDFS_load)
    dim_event(spark, HDFS_load)
    dim_country(spark, HDFS_load)
    fact_medallist(spark, HDFS_load)
    dim_athletes(spark, HDFS_load)
    fact_medal_team(spark, HDFS_load)
    fact_schedule_dim_venue(spark, HDFS_load)
    dim_team(spark, HDFS_load)
    dim_athletes_team(spark, HDFS_load)

if __name__ == '__main__':
    #hdfs path
    HDFS_load = "hdfs://namenode:9000/datalake/gold_storage"

    print("=========================Gold task starts!========================")
    with get_sparkSession("gold_task_spark", "local") as spark:
        gold_task(spark, HDFS_load)
    print("========================Gold task finishes!========================")

Successfully create SparkSession with app name: gold_task_spark, master: local

Starting upload file "dim_medal" into hdfs://namenode:9000/datalake/gold_storage/dim_medal/...


                                                                                

Successfully upload "dim_medal" into hdfs://namenode:9000/datalake/gold_storage/dim_medal/.
Starting read file from hdfs://namenode:9000/datalake/silver_storage/schedules_silver.
Starting upload file "dim_discipline" into hdfs://namenode:9000/datalake/gold_storage/dim_discipline/...


                                                                                

Successfully upload "dim_discipline" into hdfs://namenode:9000/datalake/gold_storage/dim_discipline/.
Starting read file from hdfs://namenode:9000/datalake/silver_storage/events_silver.
Starting upload file "dim_event" into hdfs://namenode:9000/datalake/gold_storage/dim_event/...


                                                                                

Successfully upload "dim_event" into hdfs://namenode:9000/datalake/gold_storage/dim_event/.
Starting read file from hdfs://namenode:9000/datalake/silver_storage/athletes_silver.
Starting read file from hdfs://namenode:9000/datalake/silver_storage/teams_silver.
Starting upload file "dim_country" into hdfs://namenode:9000/datalake/gold_storage/dim_country/...


                                                                                

Successfully upload "dim_country" into hdfs://namenode:9000/datalake/gold_storage/dim_country/.
Starting read file from hdfs://namenode:9000/datalake/silver_storage/medals_silver.
Starting read file from hdfs://namenode:9000/datalake/gold_storage/dim_discipline.
Starting read file from hdfs://namenode:9000/datalake/gold_storage/dim_event.
Starting upload file "fact_medallist" into hdfs://namenode:9000/datalake/gold_storage/fact_medallist/...


                                                                                

Successfully upload "fact_medallist" into hdfs://namenode:9000/datalake/gold_storage/fact_medallist/.
Starting read file from hdfs://namenode:9000/datalake/silver_storage/athletes_silver.
Starting upload file "dim_athletes" into hdfs://namenode:9000/datalake/gold_storage/dim_athletes/...


                                                                                

Successfully upload "dim_athletes" into hdfs://namenode:9000/datalake/gold_storage/dim_athletes/.
Starting read file from hdfs://namenode:9000/datalake/silver_storage/medals_silver.
Starting read file from hdfs://namenode:9000/datalake/gold_storage/dim_discipline.
Starting read file from hdfs://namenode:9000/datalake/gold_storage/dim_event.
Starting upload file "fact_medal_team" into hdfs://namenode:9000/datalake/gold_storage/fact_medal_team/...


                                                                                

Successfully upload "fact_medal_team" into hdfs://namenode:9000/datalake/gold_storage/fact_medal_team/.
Starting read file from hdfs://namenode:9000/datalake/silver_storage/schedules_silver.
Starting read file from hdfs://namenode:9000/datalake/gold_storage/dim_discipline.
Starting read file from hdfs://namenode:9000/datalake/gold_storage/dim_event.
Starting read file from hdfs://namenode:9000/datalake/silver_storage/venues_silver.
Starting upload file "fact_schedule" into hdfs://namenode:9000/datalake/gold_storage/fact_schedule/...


                                                                                

Successfully upload "fact_schedule" into hdfs://namenode:9000/datalake/gold_storage/fact_schedule/.
Starting upload file "dim_venue" into hdfs://namenode:9000/datalake/gold_storage/dim_venue/...
Successfully upload "dim_venue" into hdfs://namenode:9000/datalake/gold_storage/dim_venue/.
Starting read file from hdfs://namenode:9000/datalake/silver_storage/teams_silver.
Starting upload file "dim_team" into hdfs://namenode:9000/datalake/gold_storage/dim_team/...
Successfully upload "dim_team" into hdfs://namenode:9000/datalake/gold_storage/dim_team/.
Starting read file from hdfs://namenode:9000/datalake/silver_storage/teams_silver.
Starting upload file "dim_athletes_team" into hdfs://namenode:9000/datalake/gold_storage/dim_athletes_team/...
Successfully upload "dim_athletes_team" into hdfs://namenode:9000/datalake/gold_storage/dim_athletes_team/.


In [4]:
spark = SparkSession.builder \
        .appName("MySparkApp") \
        .master("local[*]")\
        .config('spark.jars','/opt/jars/snowflake-jdbc-3.19.0.jar, /opt/jars/spark-snowflake_2.12-2.12.0-spark_3.4.jar')\
        .getOrCreate()
sfOptions = {
    "sfURL": "https://ae58556.ap-southeast-1.snowflakecomputing.com",
    "sfUser": "HUYNHTHUAN",
    "sfPassword": "Thuan0355389551",
    "sfDatabase": "OLYMPICS_DB",
    "sfSchema": "OLYMPICS_SCHEMA",
    "sfWarehouse": "COMPUTE_WH",
    "sfRole": "ACCOUNTADMIN"
}
df = spark.read.parquet("hdfs://namenode:9000/datalake/gold_storage/dim_medal", header = True)
df.show()
df.write \
    .format("snowflake") \
    .options(**sfOptions) \
    .option("dbtable", "DIM_MEDAL") \
    .mode("overwrite") \
    .save()

                                                                                

+--------+------------+
|medal_id|  medal_type|
+--------+------------+
|       1|  Gold Medal|
|       2|Silver Medal|
|       3|Bronze Medal|
+--------+------------+



24/09/07 15:51:38 WARN ServerConnection$: JDBC 3.19.0 is being used. But the certified JDBC version 3.13.30 is recommended.
                                                                                

In [16]:
with get_sparkSession('check') as spark:
    HDFS_path = "hdfs://namenode:9000/datalake/gold_storage/dim_country"
    df = read_HDFS(spark, HDFS_path)
    df2 = read_HDFS(spark,"hdfs://namenode:9000/datalake/silver_storage/teams_silver")
    # #df.show()
    # #df2.select('country_code').show()
    # df2 = df2.join(df, df['country_id'] == df2['country_code'], how = 'left')
    # df2.select('country_code', 'country_id').filter(col('country_id').isNull()).show()
    df2.show()


Successfully create SparkSession with app name: check, master: local

Starting read file from hdfs://namenode:9000/datalake/gold_storage/dim_country.


                                                                                

Starting read file from hdfs://namenode:9000/datalake/silver_storage/teams_silver.


                                                                                

+-----------------+--------------------+-----------+-----------+--------------------+------------+----------+----------------+--------------------+------------+--------------------+-----------+
|          team_id|           team_name|team_gender|    country|        country_full|country_code|discipline|disciplines_code|               event|num_athletes|            athletes|athletes_id|
+-----------------+--------------------+-----------+-----------+--------------------+------------+----------+----------------+--------------------+------------+--------------------+-----------+
|ATHX4X400M--SUI01|         Switzerland|          X|Switzerland|         Switzerland|         SUI| Athletics|             ATH|4 x 400m Relay Mixed|           6|    DEVANTAY Charles|    1976491|
|ATHX4X400M--SUI01|         Switzerland|          X|Switzerland|         Switzerland|         SUI| Athletics|             ATH|4 x 400m Relay Mixed|           6|   PETRUCCIANI Ricky|    1977778|
|ATHX4X400M--SUI01|         Sw