### APE_CRETE Gold Layer Transformations

### Install and Imports

In [1]:
# install a spark version that is compatable with the iceberg catalog
!pip install pyspark==3.5.3



In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql import functions
from typing import Literal
import os
from ibm_watson_studio_lib import access_project_or_space

### Connect to Project and Download the Aggregation Config File

In [3]:
credentials_dic={"project_id":'1b80238b-ad9a-42c3-89a3-1a2a6c2c7013', "token":'p-2++gvyDGjUUV952Jr+jgb3qA==;eaUHTwyRXdarev2VJBnnKw==:t/cWU+x2ySghzR2tmRE3+mEa8sJJYP7Jh0eJvVElhvYWuc/LtVI4klmxOEv+KQKb3WWW/vIiOOZKLH/hFlE3lzem/kZojMa+yA=='}
wslib = access_project_or_space(params=credentials_dic)

In [4]:
wslib.download_file("ast_agg_config", "agg_config.py")

{'file_name': 'agg_config.py', 'summary': ['loaded data', 'saved to file']}

In [5]:
%run agg_config.py

In [8]:
minio_credentials

{'disable_chunked_encoding': 'false',
 'bucket': 'poc-cosmos-silver',
 'secret_key': '07663606d5a0d87244a22064',
 'trust_all_ssl_cert': 'false',
 'enable_global_bucket_access': 'true',
 'enable_path_style_access': 'true',
 'access_key': 'd55f6cea5df1f588d2e06b3c',
 'list_objects_api_version': 'v1',
 'region': 'eu-de',
 'url': 'http://eu-de.services.cloud.techzone.ibm.com:20271',
 '.': {'name': 'poc_silver',
  'description': None,
  'asset_id': '1341fcda-db2a-45b6-9ec4-60b7affc1136',
  'asset_type': 'connection'}}

### Define Tables, Schemas, Credentials

In [18]:
#MinIO credentials
minio_credentials=wslib.get_connection("poc_silver")

#Silver layer config
SILVER_CATALOG = os.environ.get('SILVER_CATALOG') #"poc_silver" #os variable
SILVER_BUCKET = minio_credentials["bucket"]
SILVER_SCHEMA = os.environ.get('SILVER_SCHEMA') #"ape_crete_slv" #os variable
SILVER_TABLE = os.environ.get('SILVER_TABLE')

#Gold layer config
GOLD_SCHEMA = os.environ.get('GOLD_SCHEMA') #"ape_crete_gold" #os variable
GOLD_TABLE_NAME = os.environ.get('GOLD_TABLE')

#Db2 credentials
db2_credentials=wslib.get_connection("db2")

# JDBC URL for DB2
JDBC_URL = f"""jdbc:db2://{db2_credentials["host"]}:{db2_credentials["port"]}/{db2_credentials["database"]}:sslConnection=true;"""

# Connection properties for Db2
CONNECTION_PROPERTIES = {
    "user": db2_credentials["username"],
    "password": db2_credentials["password"],
    "driver": "com.ibm.db2.jcc.DB2Driver"
}

### Initiate a spark session

In [19]:
spark = SparkSession.builder \
    .appName("poc_bronze_gold") \
    .config(f"spark.sql.catalog.{SILVER_CATALOG}", "org.apache.iceberg.spark.SparkCatalog") \
    .config(f"spark.sql.catalog.{SILVER_CATALOG}.type", "hadoop") \
    .config(f"spark.sql.catalog.{SILVER_CATALOG}.warehouse", f"s3a://{minio_credentials["bucket"]}") \
    .config("spark.hadoop.fs.s3a.endpoint", minio_credentials["url"]) \
    .config("spark.hadoop.fs.s3a.access.key", minio_credentials["access_key"]) \
    .config("spark.hadoop.fs.s3a.secret.key", minio_credentials["secret_key"]) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.sql.session.timeZone", "UTC") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.jars.packages","org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.5.0,org.apache.hadoop:hadoop-aws:3.3.4,software.amazon.awssdk:s3:2.20.40,com.ibm.db2:jcc:11.5.8.0") \
    .enableHiveSupport()\
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

### Define DB2 Read and Write Functions

In [20]:
# Read data from a DB2 table
def read_db2_table(table_name) ->DataFrame:
    df = spark.read.jdbc(
        url=JDBC_URL,
        table=table_name,
        properties=CONNECTION_PROPERTIES
    )
    return df

# Write data to a DB2 table
def write_to_db2(dataframe:DataFrame, table_name, mode=Literal["overwrite","append"]):
    dataframe.write.option("truncate", "true").jdbc(
        url=JDBC_URL,
        table=table_name,
        mode=mode,
        properties=CONNECTION_PROPERTIES,
    )

# Read data from a DB2 table with a statement
def read_db2_query(query: str) -> DataFrame:
    wrapped_query = f"({query}) AS subq"
    df = spark.read.jdbc(
        url=JDBC_URL,
        table=wrapped_query,
        properties=CONNECTION_PROPERTIES
    )
    return df

### Perform Aggregations for the Gold Layer & Load as a Dataframe

In [26]:
gold_table = spark.sql(agg_config[GOLD_TABLE_NAME])

### Validate Aggregations

In [27]:
gold_table.show(5, truncate=False)

                                                                                

+-------------+--------------+------------------+----------------------+--------------------+----------+---------------+-----------+---------------+------------------+----------------------+------------+
|accuracyclass|total_counters|defective_counters|nonfunctional_counters|avg_currtransffactor|avg_factor|avg_lm_maxpower|sum_lm_real|sum_lm_reactive|sum_replacedrealen|sum_replacedreactiveen|last_lm_date|
+-------------+--------------+------------------+----------------------+--------------------+----------+---------------+-----------+---------------+------------------+----------------------+------------+
|0.5          |1             |0                 |0                     |76.0                |1.0       |180.2000000    |172.10     |50.40          |13.15             |6.75                  |2022-08-10  |
|0.5S         |1             |0                 |0                     |45.0                |1.0       |150.7500000    |120.20     |40.75          |10.34             |5.62             

In [29]:
gold_table.printSchema()

root
 |-- accuracyclass: string (nullable = true)
 |-- total_counters: long (nullable = false)
 |-- defective_counters: long (nullable = true)
 |-- nonfunctional_counters: long (nullable = true)
 |-- avg_currtransffactor: double (nullable = true)
 |-- avg_factor: double (nullable = true)
 |-- avg_lm_maxpower: decimal(12,7) (nullable = true)
 |-- sum_lm_real: decimal(20,2) (nullable = true)
 |-- sum_lm_reactive: decimal(20,2) (nullable = true)
 |-- sum_replacedrealen: decimal(22,2) (nullable = true)
 |-- sum_replacedreactiveen: decimal(22,2) (nullable = true)
 |-- last_lm_date: date (nullable = true)



### Write to DB2

In [30]:
# write the aggregated table to DB2 (either overwrite or append mode)
write_to_db2(gold_table,f"{GOLD_SCHEMA}.{GOLD_TABLE_NAME}","overwrite")

                                                                                

In [31]:
# read the written table from DB2
read_db2_table(f"{GOLD_SCHEMA}.{GOLD_TABLE_NAME}").show(truncate=False)

[Stage 14:>                                                         (0 + 1) / 1]

+-------------+--------------+------------------+----------------------+--------------------+----------+---------------+-----------+---------------+------------------+----------------------+------------+
|ACCURACYCLASS|TOTAL_COUNTERS|DEFECTIVE_COUNTERS|NONFUNCTIONAL_COUNTERS|AVG_CURRTRANSFFACTOR|AVG_FACTOR|AVG_LM_MAXPOWER|SUM_LM_REAL|SUM_LM_REACTIVE|SUM_REPLACEDREALEN|SUM_REPLACEDREACTIVEEN|LAST_LM_DATE|
+-------------+--------------+------------------+----------------------+--------------------+----------+---------------+-----------+---------------+------------------+----------------------+------------+
|0.5          |1             |0                 |0                     |76.000              |1.000     |180.200        |172.10     |50.40          |13.15             |6.75                  |2022-08-10  |
|0.5S         |1             |0                 |0                     |45.000              |1.000     |150.750        |120.20     |40.75          |10.34             |5.62             

                                                                                