# Spark Database Workloads
Tested only with __postgresql__

In [1]:
'''
    WARNING CONTROL to display or ignore all warnings
'''
import warnings; warnings.simplefilter('ignore')     #switch betweeb 'default' and 'ignore'
import traceback

''' Set debug flag to view extended error messages; else set it to False to turn off debugging mode '''
debug = True


## Instantiate Classes

In [14]:
import os
import sys

proj_dir = os.path.abspath(os.pardir)
sys.path.insert(1,proj_dir.split('rezaware/')[0])
from rezaware.modules.etl.loader import sparkDBwls as db

''' restart initiate classes '''
if debug:
    import importlib
    db = importlib.reload(db)

__desc__ = "read and write files from and to postgresql database"
clsSDB = db.SQLWorkLoads(desc=__desc__)
if clsSDB.session:
    clsSDB._session.stop
print("\n%s class initialization and load complete!" % __desc__)

All functional SPARKDBWLS-libraries in LOADER-package of ETL-module imported successfully!

read and write files from and to postgresql database class initialization and load complete!


## Load data from DB using SQL query

In [24]:
_from_date = '2023-04-24'
_to_date = '2023-04-29'
_asset = 'bitcoin'

# _query = "select * from warehouse.mcap_past "+\
#         f"where mcap_date >= '{_from_date}' and "+\
#         f"mcap_date <= '{_to_date}'"
_query = "SELECT * FROM warehouse.weighted_portfolio wmp "+\
        f"WHERE deactivate_dt IS NULL AND wmp.mcap_value > 1000000 "+\
        f"AND wmp.portfolio_date BETWEEN '{_from_date}' AND '{_to_date}' "+\
        f"AND wmp.asset_name = '{_asset}' "
#         f"AND wmp.asset_name = '{_asset}' AND uuid='{_uuid}'"


_mcap_sdb=clsSDB.read_data_from_table(
    select=_query,
    db_table="",
    db_column="",
    lower_bound=None,
    upper_bound=None,
#     **_kwargs
)

if _mcap_sdb.count() > 0:
    print(_mcap_sdb.show(n=3,vertical=True))
else:
    print("Returned empty data set")

-RECORD 0------------------------------
 portfolio_pk   | 35                   
 alt_id         | 64e154f975be45af9... 
 asset_grp_id   | 64e7e5f33f9408281... 
 asset_name     | bitcoin              
 portfolio_date | 2023-04-26 00:00:00  
 mcap_past_pk   | 138937               
 mcap_value     | 577778864974.2459... 
 mcap_ror       | 0.1183025752248149   
 mcap_weight    | 0.5491878743874402   
 created_dt     | 2023-08-28 08:30:... 
 created_by     | farmraider           
 created_proc   | mining_crypto_etp... 
 modified_dt    | 2023-08-28 09:03:... 
 modified_by    | farmraider           
 modified_proc  | rezaware_etl_load... 
 deactivate_dt  | 2023-08-28 08:52:... 
-RECORD 1------------------------------
 portfolio_pk   | 29                   
 alt_id         | 64e154f975be45af9... 
 asset_grp_id   | 64e7e5f33f9408281... 
 asset_name     | bitcoin              
 portfolio_date | 2023-04-26 00:00:00  
 mcap_past_pk   | 138937               
 mcap_value     | 577778864974.2459... 


## Replace modify attributes with Nulls
* Set modify_proc & modify_by values to Null
* Drop the modify_dt column

In [25]:
from datetime import date, datetime, timedelta
from pyspark.sql import functions as F

_upsert_sdf = _mcap_sdb \
                    .filter(F.col('portfolio_pk').isin([35])) \
                    .withColumn('deactivate_dt',F.lit(datetime.now())) \
#                     .withColumn('modified_proc',F.lit(None))
# _upsert_sdf = _upsert_sdf.drop(F.col('modified_dt'))
_upsert_sdf.show(n=3,vertical=True)

-RECORD 0------------------------------
 portfolio_pk   | 35                   
 alt_id         | 64e154f975be45af9... 
 asset_grp_id   | 64e7e5f33f9408281... 
 asset_name     | bitcoin              
 portfolio_date | 2023-04-26 00:00:00  
 mcap_past_pk   | 138937               
 mcap_value     | 577778864974.2459... 
 mcap_ror       | 0.1183025752248149   
 mcap_weight    | 0.5491878743874402   
 created_dt     | 2023-08-28 08:30:... 
 created_by     | farmraider           
 created_proc   | mining_crypto_etp... 
 modified_dt    | 2023-08-28 09:03:... 
 modified_by    | farmraider           
 modified_proc  | rezaware_etl_load... 
 deactivate_dt  | 2023-08-28 09:16:... 



## Upsert table to verify Modify values are auto added]
* Load data again to verify that modify_dt is added and modify_by & modify_proc have values

In [26]:
_db_name ='tip'
_tbl_name='weighted_portfolio'
_pk = ['portfolio_pk']
_cols_not_for_update = ['asset_name', 'portfolio_date',
                        'created_dt','created_by','created_proc']
_options={
    "BATCHSIZE":1000,   # batch size to partition the dtaframe
    "PARTITIONS":1,    # number of parallel clusters to run
    "OMITCOLS":_cols_not_for_update,    # columns to be excluded from update
    
}
_records=clsSDB.upsert_sdf_to_table(
    save_sdf=_upsert_sdf,
    db_name =_db_name,
    db_table=_tbl_name,
    unique_keys=_pk,
    **_options,
)

print("Upserted %d records" % _records)

Validating upsert attributes and parameters ...
Wait a moment, writing data to postgresql tip database ...


SQLWorkLoads @staticmethod <batch_and_upsert> PSQL connection set with <class 'psycopg2.extensions.cursor'> and connection <connection object at 0x7fe5991fa2c0; dsn: 'user=farmraider password=xxx dbname=tip host=127.0.0.1 port=5432', closed: 0>
SQLWorkLoads @staticmethod <batch_and_upsert> PSQL connection set with <class 'psycopg2.extensions.cursor'> and connection <connection object at 0x7fe5991fa2c0; dsn: 'user=farmraider password=xxx dbname=tip host=127.0.0.1 port=5432', closed: 0>


Upserted 1 records
