In [42]:
%load_ext sagemaker_studio_analytics_extension.magics
%sm_analytics emr connect --verify-certificate False --cluster-id j-1OJGFWLF2O1O8 --auth-type None --language python  

The sagemaker_studio_analytics_extension.magics extension is already loaded. To reload it, use:
  %reload_ext sagemaker_studio_analytics_extension.magics
Successfully read emr cluster(j-1OJGFWLF2O1O8) details
Initiating EMR connection..
Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
12,application_1729270775885_0014,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.
{"namespace": "sagemaker-analytics", "cluster_id": "j-1OJGFWLF2O1O8", "error_message": null, "success": true, "service": "emr", "operation": "connect"}


# Connection Requirement:
- Start local instance with image SparkAnalytics 3.0 and Kernel SparkMagic Pyspark
- Start one EMR cluster
- Connect EMR cluster to the notebook instance

## 1. Reading Data from DataLake

In [3]:

import sys

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

session_name = 'pltv_v2_gclid'
database = 'ml_revenue_relevance_pltv_cln_local'
table = 'pltv_v2_model_gclid_daily_cln'

spark = (
        SparkSession.builder.appName(session_name)
        .config("hive.exec.dynamic.partition.mode", "nonstrict")
        .config("hive.exec.dynamic.partition", "true")
        .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
        .enableHiveSupport()
        .getOrCreate()
    )
pltv_part = spark.sql("""show partitions ml_revenue_relevance_pltv_cln.pltv_v2_model_weekly_cln""")
pltv_part.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|           partition|
+--------------------+
|year=2020/month=1...|
|year=2020/month=1...|
|year=2020/month=1...|
|year=2020/month=1...|
|year=2020/month=2...|
+--------------------+
only showing top 5 rows

In [9]:
%%sh
aws s3 ls s3://gd-revrelvnc-stage-pltv2/prod_doc/

                           PRE prod_model/
2022-11-04 23:12:15          0 
2023-01-27 04:44:34      75043 pltv_features-temp.csv
2023-09-07 19:32:04      78709 pltv_features.csv
2023-09-07 19:31:55      78709 pltv_features2.csv


## 2. Reading Data from S3

In [4]:
data = [('James','','Smith','1991-04-01','M',3000), 
        ('Michael','Rose','','2000-05-19','M',4000),
        ('Robert','','Williams','1978-09-05','M',4000),
        ('Maria','Anne','Jones','1967-12-01','F',4000),
        ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.createOrReplaceTempView('emp_view')
df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+

In [5]:
%%sql
select * from emp_view

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Could not infer format, so each element will be parsed individually, falling back to `dateutil`. To ensure parsing is consistent and as-expected, please specify a format.


Could not infer format, so each element will be parsed individually, falling back to `dateutil`. To ensure parsing is consistent and as-expected, please specify a format.


Could not infer format, so each element will be parsed individually, falling back to `dateutil`. To ensure parsing is consistent and as-expected, please specify a format.


Could not infer format, so each element will be parsed individually, falling back to `dateutil`. To ensure parsing is consistent and as-expected, please specify a format.



VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [6]:
s3_path= "s3://gd-revrelvnc-stage-pltv2/table/f2p_combine_features/customer_type=nru/local_date=2020-04-01/"
s3_stage = "s3://gd-revrelvnc-stage-pltv2/table/f2p_combine_features/"
df = spark.read.parquet(s3_path)
df.select(['id', 'if_shopper', 'sample', 'weight', 'is_visit_flag', 'is_order_flag']).show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+----------+------+------+-------------+-------------+
|       id|if_shopper|sample|weight|is_visit_flag|is_order_flag|
+---------+----------+------+------+-------------+-------------+
|192959394|         1|     1|   1.0|            0|            0|
|227325893|         1|     1|   1.0|            0|            0|
|158368339|         1|     1|   1.0|            0|            0|
|218681774|         1|     1|   1.0|            0|            0|
|255488264|         1|     1|   1.0|            0|            0|
|187798373|         1|     1|   1.0|            0|            0|
|215552069|         1|     1|   1.0|            0|            0|
|243202223|         1|     1|   1.0|            0|            0|
|147111025|         1|     1|   1.0|            0|            0|
|268373240|         1|     1|   1.0|            0|            0|
+---------+----------+------+------+-------------+-------------+
only showing top 10 rows

In [7]:
df.select(['id', 'if_shopper', 'sample', 'weight', 'is_visit_flag', 'is_order_flag']).write.mode('overwrite').parquet('s3://gd-revrelvnc-stage-pltv2/p2p_model/test3/')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3. Connect S3 with boto3

In [8]:
import boto3
import re
import numpy as np
import os

def get_config_file_name(s3_path, end_date=None):
    s3 = boto3.resource('s3', region_name = 'us-west-2') 
    glue_bucket = s3.Bucket(s3_path.replace("s3://","").split("/")[0])
    folder_path = s3_path.split("/", 3)[3]

    objects = glue_bucket.objects.filter(Prefix=folder_path).all()
    dates_list = []
    for my_bucket_object in objects:
        print(my_bucket_object.key)
        dates_list.append(re.search(r"\d{4}-\d{2}-\d{2}", my_bucket_object.key).group(0))

    created_dates_list = [str(d) for d in np.unique(np.array(dates_list))]
    if end_date is not None:
        created_dates_list = [file for file in created_dates_list if file <= end_date]
        created_dates_list.sort()
    return created_dates_list[-1]

get_config_file_name('s3://gd-revrelvnc-stage-pltv2/pltv_model/pltv_adjust_model_v6/config/train_config/')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

pltv_model/pltv_adjust_model_v6/config/train_config/2022-09-01/normalizer.pkl
pltv_model/pltv_adjust_model_v6/config/train_config/2022-09-01/train_config.yaml
'2022-09-01'

## Saving data to connected EMR local

In [10]:
import pandas as pd

# Create a sample dataframe
Biodata = {'Name': ['John', 'Emily', 'Mike', 'Lisa'],
        'Age': [28, 23, 35, 31],
        'Gender': ['M', 'F', 'M', 'F']
        }
df = pd.DataFrame(Biodata)

# Save the dataframe to a CSV file
df.to_csv('./Biodata.csv', index=False)
#df.to_csv('/hodoop/opt/Biodata.csv', index=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
import yaml

data = dict(
    A = 'a',
    B = dict(
        C = 'c',
        D = 'd',
        E = 'e',
    )
)

with open('./data.yml', 'w') as outfile:
    yaml.dump(data, outfile, default_flow_style=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
import yaml
with open('data.yml', 'r') as file:
    prime_service = yaml.safe_load(file)
prime_service

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{'A': 'a', 'B': {'C': 'c', 'D': 'd', 'E': 'e'}}

In [26]:
import subprocess

# Run the command and capture the output
output = subprocess.check_output("ls", shell=True)
print(output.decode("utf-8"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Biodata.csv
data.yml

In [31]:
import boto3
from email import encoders
from email.mime.base import MIMEBase
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText


def send_email(
    to_addr,
    from_addr="pltv_report@rnr.gdcorp.tools",
    subject="PLTV Report",
    body="",
    html_body_flag=False,
    file_list=None,
):
    """Function to send email using BOTO SES

    Args:
        to_addr (list): To email address
        from_addr (str): From email address
        subject (str): Subject line for the email
        body (str): Email body
        html_body_flag (boolean): indicates if the body is html string or plain text
        file_list (list): path to List of email attachments

    Returns:
        str: Boto SES request ID
    """
    # Create message container
    msg = MIMEMultipart("alternative")
    msg["subject"] = subject
    msg["To"] = to_addr
    msg["From"] = from_addr
    msg.preamble = """
        Your mail reader does not support the report format.
    """

    # Attach Email Body
    if html_body_flag:
        body = MIMEText(body, "html")
    else:
        body = MIMEText(body)
    msg.attach(body)

    # Embed html cid
    if file_list:
        for filename in file_list:
            with open(filename, "rb") as attachment:
                part = MIMEApplication(attachment.read())
                part.add_header(
                    "Content-Disposition", "attachment", filename=filename
                )
                msg.attach(part)
            attachment.close()
    # Convert message to string and send
    ses_client = boto3.client("ses", region_name="us-west-2")
    response = ses_client.send_raw_email(
        Source=msg["From"],
        Destinations=[msg["To"]],
        RawMessage={"Data": msg.as_string()}
    )
    print(response)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
%%sh
aws s3 ls s3://gd-revrelvnc-stage-pltv2/table/f2p_combine_features/customer_type=nru/local_date=2020-01-01/

                           PRE task=infer/


In [35]:
data = [('James','','Smith','1991-04-01','M',3000), 
        ('Michael','Rose','','2000-05-19','M',4000),
        ('Robert','','Williams','1978-09-05','M',4000),
        ('Maria','Anne','Jones','1967-12-01','F',4000),
        ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns).toPandas()
df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

  firstname middlename  lastname         dob gender  salary
0     James                Smith  1991-04-01      M    3000
1   Michael       Rose            2000-05-19      M    4000
2    Robert             Williams  1978-09-05      M    4000
3     Maria       Anne     Jones  1967-12-01      F    4000
4       Jen       Mary     Brown  1980-02-17      F      -1

In [37]:
df_html = df.to_html()
subject = "pltv_report"
from_addr = "test_report@rnr.gdcorp.tools"
to_addr = "jli1@godaddy.com" 
file_list = []
send_email(to_addr, from_addr, subject, body=df_html, html_body_flag=True, file_list=file_list)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{'MessageId': '01010192a0f2b62e-41083755-76c3-473d-9249-4e723f012788-000000', 'ResponseMetadata': {'RequestId': '26638938-2b7e-4fa7-8e42-713b3b26693d', 'HTTPStatusCode': 200, 'HTTPHeaders': {'date': 'Fri, 18 Oct 2024 18:45:13 GMT', 'content-type': 'text/xml', 'content-length': '338', 'connection': 'keep-alive', 'x-amzn-requestid': '26638938-2b7e-4fa7-8e42-713b3b26693d'}, 'RetryAttempts': 0}}

In [40]:
%%sh
aws s3 ls

2024-05-24 17:16:51 cdk-cldinfra-assets-128519002737-us-west-2
2024-10-14 19:09:10 gd-access-revrelvnc-stage-logs-us-west-2
2024-10-14 19:06:18 gd-infra-logging-128519002737-us-west-2
2024-09-09 20:19:07 gd-revrelvnc-stage-applications
2024-09-09 20:19:07 gd-revrelvnc-stage-athena-results
2024-09-09 22:55:08 gd-revrelvnc-stage-backup
2022-05-18 16:23:05 gd-revrelvnc-stage-cdk-assets
2024-09-09 22:44:26 gd-revrelvnc-stage-cross-sell
2024-09-09 20:18:54 gd-revrelvnc-stage-data-lake
2024-09-21 04:50:50 gd-revrelvnc-stage-dnai-social-post-api
2024-10-14 19:09:10 gd-revrelvnc-stage-logging-us-west-2
2024-06-26 15:58:41 gd-revrelvnc-stage-mlflow
2024-09-09 21:40:22 gd-revrelvnc-stage-mmm-data
2023-12-12 20:23:58 gd-revrelvnc-stage-phd-events-us-west-2
2024-09-09 20:25:05 gd-revrelvnc-stage-pltv2
2024-09-09 21:33:06 gd-revrelvnc-stage-pricing
2023-10-18 21:46:09 gd-revrelvnc-stage-sem-bidding
2024-09-09 20:19:08 gd-revrelvnc-stage-success
2024-10-14 19:06:13 gd-server-access-revrelvnc-stage-u

In [41]:
# Read YAML file
with open("/home/hadoop/opt/p2p_model/config/train_config.yaml", 'r') as stream:
    data_loaded = yaml.safe_load(stream)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
[Errno 2] No such file or directory: '/home/hadoop/opt/p2p_model/config/train_config.yaml'
Traceback (most recent call last):
FileNotFoundError: [Errno 2] No such file or directory: '/home/hadoop/opt/p2p_model/config/train_config.yaml'



In [27]:
data_loaded

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{'CategoryFeatureCol': ['hvc_customer_tier', 'shopper_status', 'first_register_months'], 'IdCol': 'id', 'NumericalFeatureCol': ['l3q_product_group_domains_qty', 'lm_product_group_presence_and_commerce_qty', 'ly_product_group_business_applications_qty', 'l2q_product_group_presence_and_commerce_qty', 'rp_product_group_hosting', 'lm_product_unit_qty', 'l3q_product_group_hosting_qty', 'domain_portfolio_qty', 'lm_pattern_manage_mya_cnt', 'l3q_product_unit_qty', 'ly_product_unit_qty', 'lw_product_total_visit', 'recent_visit', 'l2q_product_category_other_hosting_qty', 'rp_product_group_presence_and_commerce', 'nq_product_category_presence_qty', 'rv_pattern_manage_website', 'lm_product_total_visit', 'lm_product_category_presence_qty', 'recent_dwelling', 'rp_product_group_domains', 'ly_product_group_hosting_qty', 'l3q_product_group_presence_and_commerce_qty', 'next_month_potential_gcr_usd_amt', 'rv_visit_name_registration', 'lq_product_category_presence_qty', 'rp_product_category_domain_registr

## 4. test send to spark

In [43]:
%%help

Magic,Example,Explanation
info,%%info,Outputs session information for the current Livy endpoint.
cleanup,%%cleanup -f,"Deletes all sessions for the current Livy endpoint, including this notebook's session. The force flag is mandatory."
delete,%%delete -f -s 0,Deletes a session by number for the current Livy endpoint. Cannot delete this kernel's session.
logs,%%logs,Outputs the current session's Livy logs.
configure,"%%configure -f {""executorMemory"": ""1000M"", ""executorCores"": 4}",Configure the session creation parameters. The force flag is mandatory if a session has already been  created and the session will be dropped and recreated. Look at Livy's POST /sessions Request Body for a list of valid parameters. Parameters must be passed in as a JSON string.
spark,%%spark -o df df = spark.read.parquet('...,"Executes spark commands.  Parameters:  -o VAR_NAME: The Spark dataframe of name VAR_NAME will be available in the %%local Python context as a  Pandas dataframe with the same name.  -m METHOD: Sample method, either take or sample.  -n MAXROWS: The maximum number of rows of a dataframe that will be pulled from Livy to Jupyter.  If this number is negative, then the number of rows will be unlimited.  -r FRACTION: Fraction used for sampling."
sql,%%sql -o tables -q SHOW TABLES,"Executes a SQL query against the variable sqlContext (Spark v1.x) or spark (Spark v2.x).  Parameters:  -o VAR_NAME: The result of the SQL query will be available in the %%local Python context as a  Pandas dataframe.  -q: The magic will return None instead of the dataframe (no visualization).  -m, -n, -r are the same as the %%spark parameters above."
local,%%local a = 1,All the code in subsequent lines will be executed locally. Code must be valid Python code.
send_to_spark,%%send_to_spark -i variable -t str -n var,"Sends a variable from local output to spark cluster.  Parameters:  -i VAR_NAME: Local Pandas DataFrame(or String) of name VAR_NAME will be available in the %%spark context as a Spark dataframe(or String) with the same name.  -t TYPE: Specifies the type of variable passed as -i. Available options are:  `str` for string and `df` for Pandas DataFrame. Optional, defaults to `str`.  -n NAME: Custom name of variable passed as -i. Optional, defaults to -i variable name.  -m MAXROWS: Maximum amount of Pandas rows that will be sent to Spark. Defaults to 2500."
pretty,%%pretty,"If the cell output is a dataframe, like df.show(), then it will pretty print the dataframe as an HTML table"


In [44]:
%%local
local_var = "This variable exists on Studio Notebook kernel"

In [45]:
%%local
local_var

'This variable exists on Studio Notebook kernel'

In [46]:
%%local
ls

2.demo_emr.ipynb     redshift.ipynb                        summary_demo.ipynb
Pyspark_local.ipynb  redshift_nc.sql
athena_v2_nc.sql     summary_demo-studio-create-emr.ipynb


In [47]:
%%send_to_spark -i local_var  -t str -n var_on_cluster

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Successfully passed 'local_var' as 'var_on_cluster' to Spark kernel

In [48]:
var_on_cluster

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'This variable exists on Studio Notebook kernel'