In [None]:
#@title Install + Import Libraries {display-mode: "form"}
!pip install panapi
!pip install git+https://github.com/PaloAltoNetworks/prisma-sase-sdk-python.git

import panapi
import prisma_sase
from google.colab import files
from google.colab import data_table
import io
import requests
import json
import os

In [None]:
#@title Service Account Details {display-mode: "form"}
#@markdown =======================================

#@markdown #Service Account Parameters
#@markdown =======================================


#@markdown ##Source Tenant 
#@markdown ---
source_client_id = "sdkuser_store@12345.iam.panserviceaccount.com" #@param {type:"string"}
source_client_secret = "723456-122334-1234-abcd-1223440404" #@param {type:"string"}
source_tsgid = "12345" #@param {type:"string"}
#@markdown ---

#@markdown ##Destination Tenant
#@markdown ---
destination_client_id = "sdkuser_warehouse@56789.iam.panserviceaccount.com" #@param {type:"string"}
destination_client_secret = "1234566-abcd-1234-8765-1223440405" #@param {type:"string"}
destination_tsgid = "56789" #@param {type:"string"}
#@markdown ---


# Begin code

if (source_client_id is None) or (source_client_secret is None) or (source_tsgid is None):
  print("ERR: Please provide all the source parameters for a successful execution")

if (destination_client_id is None) or (destination_client_secret is None) or (destination_tsgid is None):
  print("ERR: Please provide all the destination parameters for a successful execution")


#
# PAN API SDK Instantiation
#

# token_url = "https://auth.apps.paloaltonetworks.com/am/oauth2/access_token"
# #
# # Source
# #
# if source_tsgid is not None:
#   source_scope = "tsg_id:{}".format(source_tsgid)
# else:
#   print("ERR: Please provide a valid TSG ID for the source tenant")


# #
# # Destination
# #
# if destination_tsgid is not None:
#   destination_scope = "tsg_id:{}".format(destination_tsgid)
# else:
#   print("ERR: Please provide a valid TSG ID for the destination tenant")
# session = panapi.PanApiSession()
# session.authenticate(client_id=cid, client_secret=csecret, scope=scope, token_url=token_url)


#
# Prisma SASE SDK Instantiation
#
sdk_source = prisma_sase.API(ssl_verify=False)
sdk_source.interactive.login_secret(client_id=source_client_id, client_secret=source_client_secret, tsg_id=source_tsgid)


#
# Prisma SASE SDK Instantiation
#
sdk_destination = prisma_sase.API(ssl_verify=False)
sdk_destination.interactive.login_secret(client_id=destination_client_id, client_secret=destination_client_secret,
                                 tsg_id=destination_tsgid)

#
# Global values
#
url_rules_pre = "https://api.sase.paloaltonetworks.com/sse/config/v1/security-rules?folder=Shared&position=pre"
url_rules_post = "https://api.sase.paloaltonetworks.com/sse/config/v1/security-rules?folder=Shared&position=post"


In [None]:
#@title Rule Selector
#@markdown ---

#@markdown #Rule Parameters
#@markdown ---


tmp = ["opt1", "opt2", "opt3"]

rule_name = 'Contractor-allow'  #@param {type: "string"}
description = "Allow access to private apps" #@param {type: "string"}
user_group = 'Contractor-PROJ-SSE'  #@param {type: "string"}
application = "microsoft-outlook" #@param ["any","Alvisofin Peoplesoft", "microsoft-outlook", "microsoft-excel","microsoft-azure-cdn", "microsoft-onenote-base", "ms-teams", "sharepoint-online-editing", "sharepoint-online", "sharepoint-online-editing"] {allow-input: true}
action = "allow"  #@param ['allow', 'deny', 'drop']
#@markdown ---


#
# Rule Creation
#

payload = {
        "name": rule_name,
        "folder": "Shared",
        "action": action,
		    "source_hip": ["any"],
		    "destination_hip": ["any"],
		    "from": ["any"],
		    "to": ["any"],
		    "destination": ["any"],
        "description": description,
		    "category": ["any"],
		    "service": ["application-default"],
        "log_setting": "Cortex Data Lake",
        "negate_source": False,
		    "source_user": ["CN=Contractors-PROJ-SSE,DC=prismasdwan,DC=onmicrosoft,DC=com"],
		    "application": [application],
        "profile-setting": { "group": ["best-practice"]},
		    "source": ["any"]
}

resp = sdk_destination.rest_call(url=url_rules_pre, method="POST", data=payload)
if resp.cgx_status:
  print("INFO: Rule {}[{}] successfully created!".format(rule_name, resp.cgx_content.get("id")))
else:
  print("ERR: Rule {} could not be created".format(rule_name))
  prisma_sase.jd_detailed(resp)
  


In [None]:
#@title Clone Rules
#@markdown ---

#@markdown #Clone Rules
#@markdown ---

prerules = "True" #@param ["True", "False"]{allow-input: true}
postrules = "True" #@param ["True", "False"]{allow-input: true}

#@markdown ---

RULE_KEYS = ['name', 'folder', 'action', 'source_hip', 'destination_hip', 'from', 'to', 'destination', 'description', 'category', 'service', 'log_setting', 'negate_source', 'source_user', 'application', 'profile-setting', 'source']
def clean_rule(x):
  newrule = {}
  for key in x.keys():
    if key in RULE_KEYS:
      newrule[key] = x[key]

  return newrule


#
# Retrieve Pre Rules from source
# Copy Pre Rules to Destination 
#
if prerules == "True":
  print("INFO: Retrieving Pre-Rules from Source Tenant")
  xfer_rules_pre = {}
  resp = sdk_source.rest_call(url=url_rules_pre, method="GET")
  if resp.cgx_status:
    rules = resp.cgx_content.get("data", None)
    for rule in rules:
      newrule = clean_rule(rule)
      xfer_rules_pre[newrule["name"]] = newrule
      
  else:
    print("ERR: Could not retrieve rules")
    prisma_sase.jd_detailed(resp)


  print("INFO: Copying Pre Rules")
  for key in xfer_rules_pre:
    rule = xfer_rules_pre[key]
    resp = sdk_destination.rest_call(url=url_rules_pre, data=rule, method="POST")
    if resp.cgx_status:
      print("\tSUCCESS: {} created".format(rule["name"]))
    else:
      print("ERR: Could not create rule")
      prisma_sase.jd_detailed(resp)

else:
  print("INFO: Pre-Rules not selected. Not cloning Pre-rules")

if postrules == "True":
  xfer_rules_post = {}
  resp = sdk_source.rest_call(url=url_rules_post, method="GET")
  if resp.cgx_status:
    rules = resp.cgx_content.get("data", None)
    for rule in rules:
      newrule = clean_rule(rule)
      xfer_rules_post[newrule["name"]] = newrule
      
  else:
    print("ERR: Could not retrieve Post rules")
    prisma_sase.jd_detailed(resp)

  print("INFO: Copying Post Rules")
  for key in xfer_rules_post:
    rule = xfer_rules_post[key]
    resp = sdk_destination.rest_call(url=url_rules_post, data=rule, method="POST")
    if resp.cgx_status:
      print("\tSUCCESS: {} created".format(rule["name"]))
    else:
      print("ERR: Could not create rule")
      prisma_sase.jd_detailed(resp)

else:
  print("INFO: Post-Rules not selected. Not cloning Post-rules")


In [None]:
#@title Delete Rules {display-mode: "form"}

#@markdown #Delete Rules
#@markdown ---
tenant = "Destination" #@param ["Source", "Destination"] {allow-input: false}
prerules = "True" #@param ["True", "False"]{allow-input: true}
postrules = "True" #@param ["True", "False"]{allow-input: true}
#@markdown ---

if tenant == "Source":
  sdk = sdk_source
elif tenant == "Destination":
  sdk = sdk_destination
else:
  print("ERR: Invalid tenant type. Please select Source or Destination")
  sdk = None


#
# Delete Rules
#

urls = []
if prerules == "True":
  urls.append(url_rules_pre)
else:
  print("INFO: Skipping Deletion of Pre Rules from {} tenant".format(tenant))

if postrules == "True":
  urls.append(url_rules_post)
else:
  print("INFO: Skipping Deletion of Post Rules from {} tenant".format(tenant))


for url in urls:
  if "pre" in url:
    print("INFO: Deleting Pre Rules from {} tenant".format(tenant))
  else:
    print("INFO: Deleteing Post Rules from {} tenant".format(tenant))


  resp = sdk.rest_call(url=url,method="GET")
  if resp.cgx_status:
    rules = resp.cgx_content.get("data", None)
    for rule in rules:
      url = "https://api.sase.paloaltonetworks.com/sse/config/v1/security-rules/{}".format(rule["id"])
      resp = sdk.rest_call(url=url, method="DELETE")
      if resp.cgx_status:
        print("\tSUCCESS: {}[{}] deleted".format(rule["name"], rule["id"]))
      else:
        print("\tERR: Could not delete rule")
        prisma_sase.jd_detailed(resp)
  else:
        print("ERR: Could not retrieve rules")
        prisma_sase.jd_detailed(resp)