# Transform data by using Spark

This Jupyter Notebook demonstrates how to use Apache Spark to transform and process data. It includes steps to set up the Spark session, create necessary directories, populate a list of files to be processed, convert these files to Parquet format, and read the transformed data for further analysis.

In [2]:
import os
from pyspark.sql.functions import lit
from lib.common_functions import *
from lib.configuration import *
from files.output.ad_works.silver.schemas.dimdate import dim_date_schema
from files.output.ad_works.silver.schemas.dimcurrency import dim_currency_schema

In [3]:
spark = get_spark_session('ad_works')
spark.active()

In [120]:

# checks if a directory specified by `data_lake_path` exists. 
data_lake_path = f'{ad_works_output_path}/silver'

if not os.path.exists(data_lake_path):
    os.makedirs(data_lake_path)
    print(f'{data_lake_path} created')
else:
    print(f'{data_lake_path} already exists')

/home/jovyan/code/files/output/ad_works/silver already exists


In [121]:
# Extract column names from the schema
dim_date_column_names = [field.name for field in dim_date_schema.fields]
dim_currency_column_names = [field.name for field in dim_currency_schema.fields]

print(dim_date_column_names)

['DateKey', 'FullDateAlternateKey', 'DayNumberOfWeek', 'EnglishDayNameOfWeek', 'SpanishDayNameOfWeek', 'FrenchDayNameOfWeek', 'DayNumberOfMonth', 'DayNumberOfYear', 'WeekNumberOfYear', 'EnglishMonthName', 'SpanishMonthName', 'FrenchMonthName', 'MonthNumberOfYear', 'CalendarQuarter', 'CalendarYear', 'CalendarSemester', 'FiscalQuarter', 'FiscalYear', 'FiscalSemester', 'CreatedDate', 'UpdatedDate', 'year', 'month', 'day']


In [122]:
local = [{"table_name": "dimdate", "pk": "DateKey", "schema":dim_date_schema, "column_names": dim_date_column_names},
         {"table_name": "dimcurrency", "pk": "CurrencyKey", "schema":dim_currency_schema, "column_names": dim_currency_column_names}
         ]
local

[{'table_name': 'dimdate',
  'pk': 'DateKey',
  'schema': StructType([StructField('DateKey', IntegerType(), True), StructField('FullDateAlternateKey', DateType(), True), StructField('DayNumberOfWeek', IntegerType(), True), StructField('EnglishDayNameOfWeek', StringType(), True), StructField('SpanishDayNameOfWeek', StringType(), True), StructField('FrenchDayNameOfWeek', StringType(), True), StructField('DayNumberOfMonth', IntegerType(), True), StructField('DayNumberOfYear', IntegerType(), True), StructField('WeekNumberOfYear', IntegerType(), True), StructField('EnglishMonthName', StringType(), True), StructField('SpanishMonthName', StringType(), True), StructField('FrenchMonthName', StringType(), True), StructField('MonthNumberOfYear', IntegerType(), True), StructField('CalendarQuarter', IntegerType(), True), StructField('CalendarYear', IntegerType(), True), StructField('CalendarSemester', IntegerType(), True), StructField('FiscalQuarter', IntegerType(), True), StructField('FiscalYear',

In [123]:
def check_table_exists(db_name,table_name):
    if spark.catalog.tableExists(f'{db_name}.{table_name}'):
        print(f"Table {db_name}.{table_name} exists")
        return True
    else:
        print(f"Table {db_name}.{table_name} does not exist")
        return False

In [124]:
def check_file_exists(csv_directory):  
    if os.path.exists(f'{csv_directory}.csv'):
        print(f'{csv_directory}.csv exists')
        return True
    else:
        print(f'{csv_directory}.csv does not exist')
        return False

In [125]:
max_created_date = spark.sql(f'SELECT MAX(CreatedDate) as max_date FROM stage_ad_works.dimdate').collect()[0]['max_date']
max_created_date

'2025-02-03'

In [126]:
def get_last_load_date(table_name):
    
    if check_table_exists('ad_works',table_name) == True:
        
        # Add max CreatedDate to a variable
        max_created_date = spark.sql(f'SELECT MAX(CreatedDate) as max_date FROM ad_works.{table_name}').collect()[0]['max_date']
        
        print(max_created_date)

        return max_created_date
    
    else:
        return '2025-01-01'

In [127]:
def get_stage_load_count(table_name):
    
    if check_table_exists('stage_ad_works',table_name) == True:
        
        # Add max CreatedDate to a variable
        record_count = spark.sql(f'SELECT COUNT(*) as record_count FROM stage_ad_works.{table_name}').collect()[0]['record_count']
        
        print(f'record count - stage_ad_works,{table_name}: {record_count}')

        return record_count
    
    else:
        print('0 records in stage table')
        return 0

In [128]:
# def create_ad_works_db_from_csv(df_datalake,table_name,csv_directory):
    
    # if check_table_exists(db_name,table_name) == False:
    #     # Create DATABASE ad_works if it does not exist
    #     spark.sql("CREATE DATABASE IF NOT EXISTS ad_works")
        
    #     print('ad_works database created')
        
    #     # Create table from df_datalake using schema dim_date_schema
    #     df_datalake.write.mode("overwrite").saveAsTable(f'ad_works.{table_name}', schema=dim_date_schema)    
    #     df_datalake.toPandas().to_csv(f'{csv_directory}.csv', header=True, index=False)
        
    #     print(f'{table_name} csv and table created')
        
    #     return True
    
    # else:
    #     print(f'{table_name} table already exists')
    #     return False

In [129]:
def create_ad_works_db_table(df_datalake,db_name,table_name,schema):
    
    if check_table_exists(db_name,table_name) == False:
        # Create DATABASE ad_works if it does not exist
        spark.sql(f"CREATE DATABASE IF NOT EXISTS {db_name}")
        
        print(f'{db_name} database created')
        
        if db_name == 'ad_works':
            df_datalake.write.mode("overwrite").saveAsTable(f'{db_name}.{table_name}', schema=schema)
            spark.sql(f"TRUNCATE TABLE {db_name}.{table_name}")
            print(f'{db_name}.{table_name} table created')
        else:
            # Create table from df_datalake using schema dim_date_schema
            df_datalake.write.mode("overwrite").saveAsTable(f'{db_name}.{table_name}', schema=schema)
            print(f'{db_name}.{table_name} table created')
        
        return True
    
    else:
        print(f'{db_name}.{table_name} table already exists')
        return False

In [130]:
def create_csv_from_dwh(db_name,table_name,csv_directory):
    
    if check_table_exists(db_name, table_name):
        df_datalake = spark.sql(f'SELECT * FROM {db_name}.{table_name}')
        df_datalake.toPandas().to_csv(f'{csv_directory}.csv', header=True, index=False)
        print(f'{db_name}.{table_name} csv created')
    else:
        print(f'Table {db_name}.{table_name} does not exist')

In [131]:
parquet_directory = f'{ad_works_data_lake}/dimdate'

df_datalake = spark.read.parquet(parquet_directory)

df_datalake_filtered = df_datalake.filter(df_datalake['CreatedDate'] > get_last_load_date('dimdate'))
 


for old_col, new_col in zip(df_datalake_filtered.columns, table['column_names']):
    df_datalake_filtered = df_datalake_filtered.withColumnRenamed(old_col, new_col)
    
df_datalake_filtered.show(5)

Table ad_works.dimdate exists
2025-02-03
+-------+--------------------+---------------+--------------------+--------------------+-------------------+----------------+---------------+----------------+----------------+----------------+---------------+-----------------+---------------+------------+----------------+-------------+----------+--------------+-----------+-----------+----+-----+---+
|DateKey|FullDateAlternateKey|DayNumberOfWeek|EnglishDayNameOfWeek|SpanishDayNameOfWeek|FrenchDayNameOfWeek|DayNumberOfMonth|DayNumberOfYear|WeekNumberOfYear|EnglishMonthName|SpanishMonthName|FrenchMonthName|MonthNumberOfYear|CalendarQuarter|CalendarYear|CalendarSemester|FiscalQuarter|FiscalYear|FiscalSemester|CreatedDate|UpdatedDate|year|month|day|
+-------+--------------------+---------------+--------------------+--------------------+-------------------+----------------+---------------+----------------+----------------+----------------+---------------+-----------------+---------------+------------+

In [133]:
for table in local:
    
    # Define the directory containing the parquet files
    parquet_directory = f'{ad_works_data_lake}/{table["table_name"]}'
    csv_directory = f'{ad_works_dwh_silver}/{table["table_name"]}'

    df_datalake = spark.read.parquet(parquet_directory)
    
    df_datalake_filtered = df_datalake.filter(df_datalake['CreatedDate'] > get_last_load_date(table["table_name"]))
    
    # Update df_datalake with column_names from local
    for old_col, new_col in zip(df_datalake_filtered.columns, table['column_names']):
        df_datalake_filtered = df_datalake_filtered.withColumnRenamed(old_col, new_col)

    create_ad_works_db_table(df_datalake_filtered,'stage_ad_works',table["table_name"],schema=["schema"])

    if get_stage_load_count(table["table_name"]) > 0:

        query = f'''
        SELECT * FROM (
            SELECT *, ROW_NUMBER() OVER (PARTITION BY {table["pk"]} ORDER BY UpdatedDate DESC) as row_num
            FROM stage_ad_works.{table["table_name"]}
        ) WHERE row_num = 1
        '''
        
        df_datalake_dedupe = spark.sql(query)
        df_datalake_dedupe = df_datalake_dedupe.drop('row_num')
        
        df_datalake_empty = df_datalake_filtered.filter(df_datalake_filtered['CreatedDate'] < '1900-01-01')        
        
        create_ad_works_db_table(df_datalake_empty,'ad_works',table["table_name"],schema=["schema"])
        
    #     # Perform the merge operation
    #     # merge_query = f"""
    #     # MERGE INTO ad_works.{table["table_name"]} AS main
    #     # USING {table["table_name"]}_temp AS temp
    #     # ON main.DateKey = temp.DateKey
    #     # WHEN MATCHED THEN
    #     #     UPDATE SET main.UpdatedDate = temp.UpdatedDate
    #     # WHEN NOT MATCHED THEN
    #     #     INSERT *
    #     # """
    #     # spark.sql(merge_query)
        
    #     # MERGE INTO TABLE is not supported temporarily.

    # Count of new records to be inserted
    new_records_count = spark.sql(f"""
        SELECT COUNT(*) as count
        FROM stage_ad_works.{table["table_name"]}
        WHERE {table["pk"]} NOT IN (SELECT {table["pk"]} FROM ad_works.{table["table_name"]})
    """).collect()[0]['count']
    print(f'New records to be inserted: {new_records_count}')

    # Count of existing records to be updated
    existing_records_count = spark.sql(f"""
        SELECT COUNT(*) as count
        FROM stage_ad_works.{table["table_name"]} AS temp
        JOIN ad_works.{table["table_name"]} AS existing
        ON temp.{table["pk"]} = existing.{table["pk"]}
        WHERE temp.UpdatedDate > existing.UpdatedDate
    """).collect()[0]['count']
    print(f'Existing records to be updated: {existing_records_count}')  
    
    # Insert new records
    df_new_records = spark.sql(f"""
        SELECT temp.*
        FROM stage_ad_works.{table["table_name"]} AS temp
        LEFT JOIN ad_works.{table["table_name"]} AS existing
        ON temp.{table["pk"]} = existing.{table["pk"]}
        WHERE existing.{table["pk"]} IS NULL
    """)
    df_new_records.write.mode("append").saveAsTable(f'ad_works.{table["table_name"]}')
        
    create_csv_from_dwh('ad_works',table["table_name"],csv_directory)

Table ad_works.dimdate exists
2025-02-03
Table stage_ad_works.dimdate exists
stage_ad_works.dimdate table already exists
Table stage_ad_works.dimdate exists
record count - stage_ad_works,dimdate: 1523
Table ad_works.dimdate exists
ad_works.dimdate table already exists
New records to be inserted: 0
Existing records to be updated: 0
Table ad_works.dimdate exists
ad_works.dimdate csv created
Table ad_works.dimcurrency does not exist
Table stage_ad_works.dimcurrency exists
stage_ad_works.dimcurrency table already exists
Table stage_ad_works.dimcurrency exists
record count - stage_ad_works,dimcurrency: 105
Table ad_works.dimcurrency does not exist
ad_works database created
ad_works.dimcurrency table created
New records to be inserted: 105
Existing records to be updated: 0
Table ad_works.dimcurrency exists
ad_works.dimcurrency csv created


In [None]:
#spark.sql("drop table ad_works.dimdate")

In [None]:
parquet_directory = 'spark-warehouse/ad_works.db/dimdate'

df_dimdate = spark.read.parquet(parquet_directory)
df_dimdate.show()



+--------+--------------------+---------------+--------------------+--------------------+-------------------+----------------+---------------+----------------+----------------+----------------+---------------+-----------------+---------------+------------+----------------+-------------+----------+--------------+-----------+-----------+----+-----+---+
| DateKey|FullDateAlternateKey|DayNumberOfWeek|EnglishDayNameOfWeek|SpanishDayNameOfWeek|FrenchDayNameOfWeek|DayNumberOfMonth|DayNumberOfYear|WeekNumberOfYear|EnglishMonthName|SpanishMonthName|FrenchMonthName|MonthNumberOfYear|CalendarQuarter|CalendarYear|CalendarSemester|FiscalQuarter|FiscalYear|FiscalSemester|CreatedDate|UpdatedDate|year|month|day|
+--------+--------------------+---------------+--------------------+--------------------+-------------------+----------------+---------------+----------------+----------------+----------------+---------------+-----------------+---------------+------------+----------------+-------------+-------

In [1]:
spark.sql("select * from ad_works.dimdate").show(500000)

NameError: name 'spark' is not defined

In [4]:
spark.sql("describe ad_works.dimdate").show(40)

AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `ad_works`.`dimdate` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.; line 1 pos 9;
'DescribeRelation false, [col_name#0, data_type#1, comment#2]
+- 'UnresolvedTableOrView [ad_works, dimdate], DESCRIBE TABLE, true
