# PyDPU documents

https://engineering.paypalcorp.com/confluence/display/RiskGRSDARTBO/MADMEn+Notebook+API+Tutorial#MADMEnNotebookAPITutorial-DriverFromHive

# get spark instance

In [None]:
from automation_utils.spark.session import get_spark

spark_instance_num = 128

spark_config = {
    "spark.driver.memoryOverhead": "8192",
    "spark.executor.memoryOverhead": "8192",
    "spark.executor.heartbeatInterval": "600s",
    "spark.network.timeout": "2000s",
    "spark.shuffle.service.enabled": "true",
    "spark.sql.shuffle.partitions": "1000",
    "spark.default.parallelism": "2400",
    "spark.yarn.access.hadoopFileSystems": "hdfs://horton",
    "spark.driver.maxResultSize": "32g",
    "spark.sql.execution.arrow.pyspark.enabled": "true",
}


In [None]:
spark = get_spark(app_name='guxia_debug',
                  queue='risk_gds_focus',
                  executor_instances=spark_instance_num,
                  executor_instances_min=spark_instance_num,
                  executor_instances_max=spark_instance_num*2,
                  executor_cores=1,
                  driver_memory='16g',
                  load_all_jars=True,
                  auto_setup_ports=True,
                  config=spark_config,
                 )

In [None]:
# spark v2

spark_config = {
    "spark.driver.memoryOverhead": "2048",
    "spark.executor.memoryOverhead": "2048",
    "spark.executor.heartbeatInterval": "1800s",
    "spark.network.timeout": "3000s",
    "spark.shuffle.service.enabled": "false",
    "spark.executor.instances": "500",
    "spark.sql.shuffle.partitions": "2000",
    "spark.default.parallelism": "3000",
    "spark.yarn.access.hadoopFileSystems": "hdfs://horton",
    "spark.driver.maxResultSize": "8g",
    "spark.sql.execution.arrow.pyspark.enabled": "true",
}

spark = get_spark(
    app_name=f"data_process",
    queue='risk_gds_focus',
    executor_memory='30g',
    dynamic_allocation=False,
    executor_instances_max=1000,
    driver_cores="4",
    config=spark_config,
    load_all_jars=True,
)

# use ModelResultReader to pull model result

In [None]:
from py_dpu import ModelResultReader
from py_dpu import save_pig
from model_automation.utils.rmr import run_cmd



data_hdfs_dir = ''

meta_cols = """
jms_queue_name
jms_message_name
message_name
message_id
message_type
service_name
time_published
time_consumed
current_page
total_pages
app_metadata
event_date
event_time
ATOM16Seg
AW_ANALYZER_VERSION
AW_MODEL_NAME
AW_UUID
BREAllInOneTrack
IS_AUDIT_SKIPPED
IS_AUDIT_TRACK
RUCS_UNIQUE_ID
SellerSeg
account_number
activity_id
isDCC
is_ucc
corr_id_
""".strip().split()

variables = """
""".strip().split()



- usually we run BREAllInOneTrack for compute item.
- set a smaller time window, or it may take too long time to finish
- use sample option is faster than limit option

In [None]:

# spark instance
spark = None

try:
    run_cmd(f"hadoop fs -rm -skipTrash {data_hdfs_dir}/*")
except:
    pass

mreader = ModelResultReader(spark,
                            checkpoint='ConsolidatedFunding',
                            computeItem='BREAllInOneTrack')\
                                .timeFilter('2023-05-07 00:00:00 to 2023-05-07 01:00:00')\
                                .option('sample', '0.01')

ret_df = mreader.load()

audit_data = ret_df.select(meta_cols + variables)

audit_data.printSchema()

audit_data.coalesce(4).write.mode('overwrite').parquet(data_hdfs_dir)

print(f'data saved to {data_hdfs_dir}')


# pull madmen data

- need to delete outputdir, or madmen will load previous data instead of generating a new for you.
- the API is running an inner join way, so you can check record number after pulling variable to see coverage


In [None]:
from datetime import datetime

from py_dpu import loadByDriver
from py_dpu import HdfsManager
from py_dpu import estimate_partition_num


def madmen_pull_variable(spark,
                         variables,
                         output_dir,
                         driver_df,
                         checkpoint,
                         driver_start_dt,
                         driver_end_dt,
                         driver_date_col,
                         driver_key,
                        ):
    """
    pull madmen data using a driver.
    :param spark: spark instance.
    :param variables: variables you want to pull.
    :param output_dir: output data directory. This code will output one folder for each month in driver under output_dir,
                        and one folder of same data in parquet format.
    :param driver_df: driver dataframe.
    :param checkpoint: checkpoint name.
    :param driver_start_dt: start date of data range in driver.
    :param driver_end_dt: end date of data range in driver.
    :param driver_date_col: date column name in driver.
    :param driver_key: key in driver to pull madmen, usually it's txn id.
    """

    start_dt = datetime.strptime(driver_start_dt, '%Y-%m-%d')
    end_dt = datetime.strptime(driver_end_dt, '%Y-%m-%d')
    
    num_day = (end_dt - start_dt).days + 1
    filter_expr = f"{driver_date_col} between '{driver_start_dt}' and '{driver_end_dt}'"
    print(f'filter expression: {filter_expr}')
    data = driver_df.filter(filter_expr)
    rec_num = data.count()
    print(f'filtered data num: {rec_num}')

    output_data_dir = f"{output_dir}/{start_dt.strftime('%Y%m%d')}_{end_dt.strftime('%Y%m%d')}"
    output_parquet_dir = f"{output_dir}/{start_dt.strftime('%Y%m%d')}_{end_dt.strftime('%Y%m%d')}_parquet"
    
    hdfs = HdfsManager(spark)
    hdfs.delete(path=output_data_dir)
    
    print(f'output dir: {output_parquet_dir}')

    time_window = f"{driver_start_dt} to {driver_end_dt}"
    print(f"time widnow: {time_window}")
    
    df = loadByDriver(spark,
                      checkpoint=checkpoint,
                      driverSet=data,
                      time=time_window,
                      variables=variables,
                      dateColumn=driver_date_col,
                      driverKeys=[driver_key],
                      madmenKeys=['transaction'],
                      outputPath=output_data_dir,
                      groupSize=num_day)
    
    print('pulled rec num')
    rec_num = df.groupBy(driver_date_col).count().sort(driver_date_col)
    rec_num.show(num_day)
    
    partition_num = estimate_partition_num(spark, output_data_dir, 1024)
    df.coalesce(partition_num).write.mode('overwrite').parquet(output_parquet_dir)
    print(f'data saved to {output_parquet_dir}')

# move data from Teradata to HDFS

-  NOTE: for string type, loaded columns data contains extra space, trim it here

In [None]:
from py_dpu import save_pig
from py_dpu.teradata import TeradataManager
from credentials import TD_USER, TD_PASSWORD

spark = None # spark instance
td_mgr = TeradataManager(spark, TD_USER, TD_PASSWORD)
1

def move_to_hdfs(src_table, dt_column, dt_range, target_hdfs_dir):
    from pyspark.sql.functions import col, trim
    from pyspark.sql.types import StringType
    
    print(f'moving data from {src_table} to {target_hdfs_dir}, dt range: {dt_range}')
    
    df = td_mgr.load_by_date(
        time=dt_range,
        table=src_table,
        pit_column=dt_column,
    )
    
    for c in df.schema.fields:
        if not isinstance(c.dataType, StringType):
            continue
        df = df.withColumn(c.name, trim(col(c.name)))
    
    save_pig(spark, df.coalesce(8), target_hdfs_dir, delimiter='\x07')
    print(f'finish saving driver to {target_hdfs_dir}')

# get data from hdfs to local

In [None]:
from automation_utils.common.hdfs import get_hdfs_to_local_csv

hdfs_data_dir = 'hdfs://horton/apps/risk/det/guxia/ACDC_COMPONENT/task/addr_pattern/decline_tagging_20230717'
local_data_path = 'data/decline_tagging_20230717/data.csv'

In [None]:
local_data_dir = os.path.dirname(local_data_path)
os.makedirs(local_data_dir, exist_ok=True)
print(f'downloading data from {hdfs_data_dir}')

get_hdfs_to_local_csv(hdfs_data_dir,
                      local_data_path,
                      os.path.join(hdfs_data_dir, '.pig_header'))
print(f'data downloaded to {local_data_path}')

# upload local data to hdfs

In [None]:
from automation_utils.common.hdfs import get_hdfs_to_local_csv, upload_local_file_to_hdfs

local_data_path = ''
hdfs_data_dir = ''



In [None]:
upload_local_file_to_hdfs(
    local_file=local_data_path,
    hdfs_path=hdfs_data_dir
)
print(f'finish upload data from {local_data_path} to {hdfs_data_dir}')



# pull context data


reference

- https://github.paypal.com/DART/eve-builder-util/blob/main/bizlog-pull/src/main/resources/pull-context-bizlog-example.ipynb

- can found context schema path from citadel: go/citadel

- request.header.2 format: `correlation-id:5c92069e8250b`
- can use this schema to extract k-v pair schema: `"findInKVPairs(request.body.context.data_set.k_v_pairs, 'ucc_sender_email_address')"`

In [None]:
from bizlog_pull_py.bom_bizlog_tool import BomBizlogTool
from bizlog_pull_py.event_log_tool import EventLogTool
from bizlog_pull_py.context_log_tool import ContextLogTool


context_data_dir = '/sys/pp_dm/dm_hdp_batch/kafka_data/RISK/BLOGGING/riskunifiedcomputeserv/SC_riskunifiedcomputeserv/2023/11/01/12/*'

data_save_dir = 'hdfs://horton/apps/risk/det/guxia/UCC23_V2/task/context_variable/context_2023110112'



In [None]:
spark = None

contextTool = ContextLogTool(spark)

contextTool.contextPath(context_data_dir)
contextTool.filter("request.body.context.caller_info.flow_name == 'DCC'
contextTool.elements(
    'request.header.2', # correlation id
    'request.body.context.client_configuration.api',
    'request.body.context.client_configuration.integration_artifact',
    'request.body.context.caller_info.flow_name',
)


df = contextTool.pull()
df.printSchema()

# move data between GCP & HDFS

notes:
- will not overwrite dst data dir if file from src is found in dst

In [None]:
from model_automation.utils.gcp_utils.data_manipulate import move_data_gcp_hadoop
from model_automation.utils.rmr import run_cmd


src_data_dir = ''
dst_data_dir = ''

In [None]:


# rm target folder
# try:
#     run_cmd(f'gsutil -m rm -r {dst_data_dir}')
# except Exception as e:
#     print(e)
#     pass
    
try:
    run_cmd(f'hadoop fs -rm -r -skipTrash {dst_data_dir}')
except Exception as e:
    print(e)
    pass
run_cmd(f'hadoop fs -mkdir -p {dst_data_dir}')

move_data_gcp_hadoop(
    src=src_data_dir,
    dest=dst_data_dir,
    show_log=True,
    check=False
)

print(f'finish moving data from {src_data_dir} to {dst_data_dir}')

# get columns from pig header file

In [None]:
from model_automation.utils.rmr import run_cmd


dev_data_dir = ''



In [None]:


ret = run_cmd(f"hadoop fs -cat {os.path.join(dev_data_dir, '.pig_header')}", print_ret=False, get_ret=True)
columns = ret.split('\x07')

