In [1]:
# Global Imports 
import oci
import os.path
import sys
import json
import pprint
import re
from collections import Counter 
import ipaddr #pip3 install ipaddr

state_json = None


In [2]:
def extract_value_by_field(obj, key):
    """Pull all values of specified key from nested JSON."""
    arr = []

    def extract(obj, arr, key):
        """Recursively search for values of key in JSON tree."""
        if isinstance(obj, dict):
            for k, v in obj.items():
                if isinstance(v, (dict, list)):
                    extract(v, arr, key)
                elif k == key:
                    arr.append(v)
        elif isinstance(obj, list):
            for item in obj:
                extract(item, arr, key)
        return arr

    results = extract(obj, arr, key)
    return results


In [54]:
# Logger
def local_logger(value):    
    print(value)

In [56]:
from pathlib import Path

# Check if file exist
def if_input_file_exist():
    if Path('hub_spokes.json').is_file():
        local_logger ("====================")
        local_logger ("Input file hub_spokes.json exists !")
        local_logger ("====================")
        return True
    else:
        local_logger ("====================")
        local_logger ("Input file hub_spokes.json does not exists !")
        local_logger ("====================")
        return False

Valid JSON file : {'hub': {'vcn': {'cidr': '10.0.0.0/16', 'name': 'hub-vcn', 'drg_name': 'HubDRG', 'compartment_id': 'ocid1.compartment.oc1..aaaaaaaad7ngb2s3armaa6qdkglst4nxnvvdrjjggq5afhfqxqgiyyoyluiq', 'dns_name': 'hubdns', 'subnets': [{'cidr': '10.0.0.0/24', 'name': 'hub-subnet', 'dns_name': 'hubsubnet1', 'compartment_id': 'ocid1.compartment.oc1..aaaaaaaad7ngb2s3armaa6qdkglst4nxnvvdrjjggq5afhfqxqgiyyoyluiq', 'ispublic': 'False'}]}}, 'spokes': [{'vcn': {'cidr': '192.168.0.0/24', 'name': 'spoke1', 'compartment_id': 'ocid1.compartment.oc1..aaaaaaaad7ngb2s3armaa6qdkglst4nxnvvdrjjggq5afhfqxqgiyyoyluiq', 'dns_name': 'spoke1', 'subnets': []}}, {'vcn': {'cidr': '192.168.1.0/24', 'name': 'spoke2', 'compartment_id': 'ocid1.compartment.oc1..aaaaaaaad7ngb2s3armaa6qdkglst4nxnvvdrjjggq5afhfqxqgiyyoyluiq', 'dns_name': 'spoke2', 'subnets': []}}]}


True

In [57]:
# Global configs
config = oci.config.from_file()
identity_client = oci.identity.IdentityClient(config)
virtual_network_client = oci.core.VirtualNetworkClient(config)
virtual_network_composite_operations = oci.core.VirtualNetworkClientCompositeOperations(virtual_network_client)

tenancy_ocid='ocid1.tenancy.oc1..aaaaaaaayn6s77e3y4hgz56uzwvay3jrpekafo6ycr5pr5xdsg3gahcygx7a'

spoke_vcns = []
hub_subnets = []
spoke_subnets = []

In [52]:
# Convert response to dict
def convert_response_to_dict(oci_response):
    return oci.util.to_dict(oci_response.data)

# Check for existing compartments
def check_compartment_exist(compartment_id, client):
    compartment_response = client.get_compartment(
        compartment_id=compartment_id, retry_strategy=oci.retry.DEFAULT_RETRY_STRATEGY,
    )
    compartment = convert_response_to_dict(compartment_response)
    return True if compartment["lifecycle_state"] == "ACTIVE" else False

def get_all_compartments():  
    compartmentResponse = oci.pagination.list_call_get_all_results(
        identity_client.list_compartments,
        compartment_id=tenancy_ocid,
        limit=200,
        access_level="ACCESSIBLE",
        compartment_id_in_subtree=True,
        retry_strategy=oci.retry.DEFAULT_RETRY_STRATEGY,
    )
    return convert_response_to_dict(compartmentResponse)



<pyjsonq.query.JsonQ object at 0x108627d68>


In [21]:
# Create VCN wrapper
def create_vcn(cidr_block, vcn_name, compartment_id, dns_label):
    create_vcn_details = oci.core.models.CreateVcnDetails(
        cidr_block=cidr_block,
        display_name=vcn_name,
        compartment_id=compartment_id,
        dns_label= dns_label
    )
    create_vcn_response = virtual_network_composite_operations.create_vcn_and_wait_for_state(
        create_vcn_details,
        wait_for_states=[oci.core.models.Vcn.LIFECYCLE_STATE_AVAILABLE]
    )
    vcn = create_vcn_response.data

    local_logger('Created VCN')
    local_logger('=========================')
    local_logger('{}'.format(vcn.id))
    local_logger('{}'.format(vcn))
    local_logger('\n')
    
    return vcn

In [22]:
# Create DRG wrapper and attach it to VCN
def create_attach_drg(virtual_network_client, compartment_id, drg_name, vcn_ocid):
# Create new DRG in compartment
    if special_char_regex_validation(drg_name) is True:
        drg_result = virtual_network_client.create_drg(
            oci.core.models.CreateDrgDetails(
                compartment_id=compartment_id,
                display_name= drg_name
            )
        )
        drg = oci.wait_until(
            virtual_network_client,
            virtual_network_client.get_drg(drg_result.data.id),
            'lifecycle_state',
            'AVAILABLE'
        ).data
        local_logger('Created DRG')
        local_logger('===============')
        local_logger(drg)
        local_logger('\n')

# Attach newly created DRG to VCN using VCN OCID
    drg_attach_result = virtual_network_client.create_drg_attachment(
        oci.core.models.CreateDrgAttachmentDetails(
            display_name= drg_name,
            vcn_id=vcn_ocid,
            drg_id=drg.id
        )
    )
    drg_attachment = oci.wait_until(
        virtual_network_client,
        virtual_network_client.get_drg_attachment(drg_attach_result.data.id),
        'lifecycle_state',
        'ATTACHED'
    ).data
    local_logger('Created DRG Attachment')
    local_logger('=========================')
    local_logger(drg_attachment)
    local_logger('\n')

In [23]:
# Create Local Peering Gateway been Hub and Spokes VCN
def create_local_peering_gateway(lpg_name, compartment_id, vcn_ocid):
    if special_char_regex_validation(lpg_name) is True:
        create_lpg_details = oci.core.models.CreateLocalPeeringGatewayDetails(compartment_id = compartment_id, display_name = lpg_name, vcn_id = vcn_ocid)
        create_lpg_response = virtual_network_composite_operations.create_local_peering_gateway_and_wait_for_state(
        create_lpg_details,
        wait_for_states=[oci.core.models.LocalPeeringGateway.LIFECYCLE_STATE_AVAILABLE]
    )
    lpg = create_lpg_response.data

    local_logger('Created LPG')
    local_logger('=========================')
    local_logger('{}'.format(lpg.id))
    local_logger('{}'.format(lpg))
    local_logger('\n')
    
    return lpg
    

In [24]:
# Connect Local Peering Gateway between Hub and Spokes VCN
def connect_local_peering_gateway(source_lpg_id, peer_lpg_id):
    connect_lpg_details = oci.core.models.ConnectLocalPeeringGatewaysDetails(peer_id = peer_lpg_id)
    connect_lpg = virtual_network_client.connect_local_peering_gateways(source_lpg_id, connect_local_peering_gateways_details = connect_lpg_details)
    local_logger('=========================')   
    local_logger('Connected LPG')
    local_logger('=========================')
    local_logger('\n')
    return connect_lpg

In [25]:
# Create Subnet Wrapper
# availability_domain = 'VPLM:AP-MUMBAI-1-AD-1' == Omit this attribute to get regional subnet - best practice
def create_subnet(subnet_display_name, subnet_dns_name, vcn, public_subnet = False):
    subnet_name = subnet_display_name
    result = virtual_network_client.create_subnet(
        oci.core.models.CreateSubnetDetails(
            compartment_id = vcn.compartment_id,
            display_name = subnet_display_name,
            vcn_id = vcn.id,
            cidr_block = vcn.cidr_block,
            dns_label = subnet_dns_name,
            prohibit_public_ip_on_vnic = public_subnet
        )
    )
    get_subnet_response = oci.wait_until(
        virtual_network_client,
        virtual_network_client.get_subnet(result.data.id),
        'lifecycle_state',
        'AVAILABLE'
    )
    local_logger('Created Subnet')
    local_logger('=========================')
    local_logger('{}'.format(get_subnet_response.data.id))
    local_logger('\n')

    return get_subnet_response.data

In [26]:
# Special Characters Regex validation
def special_char_regex_validation(value):
        special_chars_regex = re.compile('[@!#$%^&*()<>?/\|}{~:`]')
        special_char_check = special_chars_regex.search(value)
        if special_char_check is None:            
            return True
        else:
            return False
        
# IPV4 CIDR Notation Regex Validation        
def ipv4_regex_validation(value):
    ipv4_cidr_regex = re.compile('^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])/(1[0-9]|2[0-9]|3[0-1])$')    
    ipv4_cidr_check = ipv4_cidr_regex.search(value)
    if ipv4_cidr_check is None:
        return False
    else:
        return True

In [27]:
# Json validation
""" 
Conditions checked 
1. VCN or subnet name or dns name cannot have special characters in both hub and spokes
2. VCN CIDR or Subnet CIDR for both Hub and Spokes validation within /16 (max) and /31 minimum with sanity check of CIDR established
3. No two VCN CIDR (be it hub or spoke) can be the same
4. Check if already a resource in the CIDR exist for either Hub or Spokes VCN
TODO : Conditions to be checked 
1. Given VCN CIDR should not overlap with on-premises CIDR
"""

def json_validation(payload):
    # Hub Check Start
    hub_check = False
    hub = payload['hub']
    hub_vcn = hub['vcn']
    hub_subnets = hub_vcn['subnets']
    hub_subnet_name_check = False
    hub_subnet_cidr_check = False
    hub_subnet_dns_name_check = False
    
    hub_vcn_name_check = special_char_regex_validation(hub_vcn['name'])
    hub_dns_name_check = special_char_regex_validation(hub_vcn['dns_name'])
    hub_cidr_check = ipv4_regex_validation(hub_vcn['cidr'])
    
    
    if hub_vcn_name_check and hub_dns_name_check is True:
        hub_check = True
        local_logger('Valid Hub values')
    else:
        hub_check = False
        local_logger('No special character allowed in Hub VCN name or Hub DNS name')
    
    if hub_cidr_check is True:
        hub_check = True
    else:
        hub_check = False
        local_logger('Hub VCN CIDR is invalid')
    for hub_subnet in hub_subnets:
        hub_subnet_name_check = special_char_regex_validation(hub_subnet['name'])
        hub_subnet_cidr_check = ipv4_regex_validation(hub_subnet['cidr'])
        hub_subnet_dns_name_check = special_char_regex_validation(hub_subnet['dns_name'])
        
        if hub_subnet_name_check and hub_subnet_cidr_check and hub_subnet_dns_name_check is True:
            local_logger('Hub Subnets are valid !')
        elif hub_subnet_cidr_check is False:
            local_logger('Hub Subnet CIDR is invalid !')
            return False
            break
        elif hub_subnet_name_check or hub_subnet_dns_name_check is False:
            local_logger('Hub Subnet Name or DNS Name is invalid !')
            return False
            break
    # Hub Check End
    
    # Spoke Check Start
    spokes_check = True
    spokes_cidr_validation = []
    spokes = payload['spokes']
    spoke_vcns = []
    spoke_subnet_cidr_validation=False
    spoke_subnet_name_validation=False
    spoke_subnet_dns_name_validation=False
    
    for spoke in spokes:
        spoke_vcn = spoke['vcn']        
        spoke_vcns.append(spoke_vcn)
        # CIDR validation
        spoke_ipv4_cidr_validation = ipv4_regex_validation(spoke_vcn['cidr'])
        # Special Character validations
        spoke_vcn_name_validation = special_char_regex_validation(spoke_vcn['name'])
        spoke_dns_name_validation = special_char_regex_validation(spoke_vcn['dns_name'])
        if spoke_ipv4_cidr_validation and spoke_vcn_name_validation and spoke_dns_name_validation is True:
            local_logger('Spokes VCN validation success !')
        elif spoke_ipv4_cidr_validation is False:
            local_logger('Spokes VCN CIDR range is invalid')
            return False
            break
        elif spoke_vcn_name_validation or spoke_dns_name_validation is False:
            local_logger('Spokes VCN Name or DNS Name is invalid')
            return False
            break
    for spoke_vcn in spoke_vcns:
        spoke_subnets = spoke_vcn['subnets']
        for spoke_subnet in spoke_subnets:
            # Spoke subnet CIDR validation
            spoke_subnet_cidr_validation = ipv4_regex_validation(spoke_subnet['cidr'])
            # Spoke Subnet name and DNS name validation
            spoke_subnet_name_validation = special_char_regex_validation(spoke_subnet['name'])
            spoke_subnet_dns_name_validation = special_char_regex_validation(spoke_subnet['dns_name'])
            if spoke_subnet_cidr_validation and spoke_subnet_name_validation and spoke_subnet_dns_name_validation is True:
                local_logger('Spoke Subnet CIDR validation success !')
            elif spoke_subnet_cidr_validation is False:
                local_logger('Spoke Subnet CIDR is invalid !')
                return False
                break
            elif spoke_subnet_name_validation or spoke_dns_name_validation is False:
                local_logger('Spoke Subnet Name or DNS name is invalid')
                return False
                break
    # Spoke Check End
    
    # Check if two VCN has same CIDR (Hub & Spokes)
    hub_vcn_cidr = hub_vcn['cidr']
    all_vcn_cidrs = []
    all_vcn_cidrs.append(hub_vcn_cidr)
    
    for spoke in spokes:
        all_vcn_cidrs.append(spoke['vcn']['cidr'])
    
   
    ips=[]
    cidr_check=[]
    
    for index, cidr in enumerate(all_vcn_cidrs):
        ip = ipaddr.IPNetwork(cidr)
        ips.append(ip)
        
    # Check for overlapping CIDRs in VCN level for both Hub and Spoke
    for index in range(len(ips)):        
        for ind in range(len(ips)):
            if index!=ind:
                is_overlap = ips[index].overlaps(ips[ind])
                if is_overlap is True:
                    print('Hub Spoke VCN CIDRs seem to be overlapping ! Cannot Proceed !')
                    break
                    
    # Check for overlapping CIDRs in Subnet level under one VCN for Spokes
    for spoke in spokes:
        vcn = spoke['vcn']
        subnet_ips=[]
        for subnet in vcn['subnets']:
            ip = ipaddr.IPNetwork(subnet['cidr'])
            subnet_ips.append(ip)
        for index in range(len(subnet_ips)):
            for ind in range(len(subnet_ips)):
                if index != ind:
                    is_overlap = subnet_ips[index].overlaps(subnet_ips[ind])
                    if is_overlap is True:
                        print('Spoke Subnet CIDRs are overlapping! Cannot Proceed !')
                        break
    
    
    # Final check before exit of function
    if hub_vcn_name_check and hub_dns_name_check and hub_cidr_check and spoke_subnet_cidr_validation and spoke_subnet_name_validation and spoke_subnet_dns_name_validation is True:
        return True
    else:
        return False         

In [28]:
# Quickstart code
def quickstart_hub_spokes():
    with open('./hub_spokes.json') as hub_spokes:
        data = json.load(hub_spokes)
    # TODO: JSON Validation to be captured
    json_validation(data)
    
    local_logger(list(find_match_in_dictionary('compartment_id')))
    # Create HUB VCN    
    hub_vcn_details = data['hub']['vcn']
    hub_vcn = create_vcn(hub_vcn_details['cidr'], hub_vcn_details['name'],hub_vcn_details['compartment_id'], hub_vcn_details['dns_name'])
    hub_subnets = hub_vcn_details['subnets']
    for count in range(len(hub_subnets)):
        is_public = False
        if hub_subnets[count]['ispublic'] == 'True':
            is_public = True
        hub_subnet_detail = create_subnet(hub_subnets[count]['name'], hub_subnets[count]['dns_name'], hub_vcn, is_public)
        hub_subnets.append(hub_subnet_detail)

    spoke_lpgs = []
    # Create SPOKE VCN
    spoke_vcn_data = data['spokes']    
    for spoke in range(len(spoke_vcn_data)):
        spoke_vcn_details = spoke_vcn_data[spoke]['vcn']
        spoke_vcn = create_vcn(spoke_vcn_details['cidr'], spoke_vcn_details['name'], spoke_vcn_details['compartment_id'], spoke_vcn_details['dns_name'])        

        # Create LPGs for Hub VCNs
        hub_lpg = create_local_peering_gateway(hub_vcn_details['name']+str('-LPG-')+str(spoke_vcn_details['name']), hub_vcn_details['compartment_id'], hub_vcn.id)

        # Create LPGs for Spoke VCNs
        spoke_lpg = create_local_peering_gateway(spoke_vcn_details['name']+'-'+str('LPG'), spoke_vcn_details['compartment_id'], spoke_vcn.id)  
        spoke_lpgs.append(spoke_lpg)
        connect_local_peering_gateway(str(hub_lpg.id), str(spoke_lpg.id))
        spoke_subnets = spoke_vcn_details['subnets']
        for count in range(len(spoke_vcn_details['subnets'])):
            is_public = False
            if spoke_subnets[count]['ispublic'] == 'True':
                is_public = True
            spoke_subnet_detail = create_subnet(spoke_subnets[count]['name'], spoke_subnets[count]['dns_name'], spoke_vcn, is_public)
            spoke_subnets.append(spoke_subnet_detail)

    # Create and attach DRG to Hub VCN
    create_attach_drg(virtual_network_client, hub_vcn_details['compartment_id'], hub_vcn_details['drg_name'], hub_vcn.id)


In [None]:
# Common block to run quickstarts
quickstart_hub_spokes()

In [None]:
# Delete created & tested VCNs (including subnets)
def delete_drg(virtual_network_composite_operations, drg):
    virtual_network_composite_operations.delete_drg_and_wait_for_state(
    drg.id,
    wait_for_states=[oci.core.models.Drg.LIFECYCLE_STATE_TERMINATED]
    )
    local_logger('Deleted DRG')
    local_logger('=========================')
    print('{}'.format(drg.id))
    print('\n')
    
def list_delete_lpg(virtual_network_client, virtual_network_composite_operations, compartment_id, vcn):
    lpg = virtual_network_client.list_local_peering_gateways(compartment_id, vcn.id).data
    for l in lpg:
        delete_lpg(virtual_network_client, l)
    
def delete_lpg(virtual_network_client, lpg):
    virtual_network_client.delete_local_peering_gateway(lpg.id)
    local_logger('Deleted LPG')
    local_logger('=========================')
    print('{}'.format(lpg.id))
    print('\n')

def delete_subnet(virtual_network_composite_operations, subnet):
    virtual_network_composite_operations.delete_subnet_and_wait_for_state(
        subnet.id,
        wait_for_states=[oci.core.models.Subnet.LIFECYCLE_STATE_TERMINATED]
    )

    local_logger('Deleted Subnet')
    local_logger('=========================')
    print('{}'.format(subnet.id))
    print('\n')


def delete_vcn(virtual_network_composite_operations, vcn_id):
    virtual_network_composite_operations.delete_vcn_and_wait_for_state(
        vcn_id,
        wait_for_states=[oci.core.models.Vcn.LIFECYCLE_STATE_TERMINATED]
    )
    
    
    local_logger('Deleted VCN')
    local_logger('=========================')

    print('{}'.format(vcn_id))
    print('\n')

In [None]:
compartment_id = "ocid1.compartment.oc1..aaaaaaaad7ngb2s3armaa6qdkglst4nxnvvdrjjggq5afhfqxqgiyyoyluiq"

vcns = virtual_network_client.list_vcns(compartment_id)
drgs = virtual_network_client.list_drg_attachments(compartment_id)
print(drgs.data)
for drg in drgs.data:
    delete_drg(virtual_network_composite_operations, drg)

for vcn in vcns.data:
    list_delete_lpg(virtual_network_client, virtual_network_composite_operations, compartment_id, vcn)
    if "hub" in vcn.display_name or "spoke" in vcn.display_name:
        subnets = virtual_network_client.list_subnets(compartment_id, vcn.id)
        if len(subnets.data) > 0:
            delete_subnet(virtual_network_composite_operations, subnets.data[0])            
        try:
            delete_vcn(virtual_network_composite_operations, vcn.id)
        except:
            pass
