In [1]:
""" LIST SIP OUTBOUND TRUNKS """
from dotenv import load_dotenv 
from livekit import api
from livekit.protocol.sip import ListSIPOutboundTrunkRequest

load_dotenv()

async def main():
    livekit_api = api.LiveKitAPI()

    trunks = await livekit_api.sip.list_sip_outbound_trunk(
        ListSIPOutboundTrunkRequest()
    )
    print(f"{trunks}")

    await livekit_api.aclose()

(await main())

items {
  sip_trunk_id: "ST_q8fzfH63TurF"
  name: "My outbound trunk"
  address: "flowon.pstn.twilio.com"
  numbers: "+447723180506"
  auth_username: "flowon_outbound"
  auth_password: "9gHQBC@JNHe@VrJ"
}



In [None]:
""" CREATE SIP OUTBOUND TRUNK """
from livekit import api 
from livekit.protocol.sip import CreateSIPOutboundTrunkRequest, SIPOutboundTrunkInfo

async def main(twilio_number: str):
    livekit_api = api.LiveKitAPI()

    trunk = SIPOutboundTrunkInfo(
    name = f"trunk_{twilio_number}",
    numbers = [twilio_number],
    address = "sip.flowon.ai",
    )
  
    request = CreateSIPOutboundTrunkRequest(
        trunk = trunk
    )
    
    trunk = await livekit_api.sip.create_sip_inbound_trunk(request)
    
    await livekit_api.aclose()

twilio_number = "+447723180506"
await main(twilio_number)

In [None]:
""" LIST SIP INBOUND TRUNKS """
from dotenv import load_dotenv 
from livekit import api
from livekit.protocol.sip import ListSIPInboundTrunkRequest

load_dotenv()

async def main():
  livekit_api = api.LiveKitAPI()

  rules = await livekit_api.sip.list_sip_inbound_trunk(
    ListSIPInboundTrunkRequest()
  )
  print(f"{rules}")

  await livekit_api.aclose()

(await main())

In [None]:
""" CREATE SIP INBOUND TRUNK """
from livekit import api 
from livekit.protocol.sip import CreateSIPInboundTrunkRequest, SIPInboundTrunkInfo

async def main(twilio_number: str):
    livekit_api = api.LiveKitAPI()

    trunk = SIPInboundTrunkInfo(
    name = f"trunk_{twilio_number}",
    numbers = [twilio_number],
    )
  
    request = CreateSIPInboundTrunkRequest(
        trunk = trunk
    )
    
    trunk = await livekit_api.sip.create_sip_inbound_trunk(request)
    
    await livekit_api.aclose()

twilio_number = "+447723180506"
await main(twilio_number)

In [None]:
""" DELETE SIP INBOUND TRUNK """
from livekit import api
from livekit.protocol.sip import DeleteSIPTrunkRequest 
async def delete_sip_trunk(trunk_id: str):
    livekit_api = api.LiveKitAPI()
    
    try:
        await livekit_api.sip.delete_sip_trunk(DeleteSIPTrunkRequest(sip_trunk_id=trunk_id))
        print(f"Successfully deleted SIP trunk with ID: {trunk_id}")
    except Exception as e:
        print(f"Error deleting SIP trunk: {str(e)}")
    finally:
        await livekit_api.aclose()

# Example usage:
trunk_id = "ST_WCmfQowTZB2J"  # Replace with actual trunk ID
await delete_sip_trunk(trunk_id)

In [None]:
""" CREATE SIP DISPATCH RULE """
from dotenv import load_dotenv 
from livekit import api 
from livekit.protocol.sip import (
    CreateSIPDispatchRuleRequest,
    SIPDispatchRule,
    SIPDispatchRuleIndividual
)

load_dotenv()

async def main(twilio_number: str, trunk_id: str):

  livekit_api = api.LiveKitAPI()

  # Create a dispatch rule to place each caller in a separate room
  rule = SIPDispatchRule(
    dispatch_rule_individual = SIPDispatchRuleIndividual(
      room_prefix = f'call-_{twilio_number}_',
      pin = ''
    )   
  )

  request = CreateSIPDispatchRuleRequest(
    rule = rule,
    name = f'rule_{twilio_number}_{trunk_id}',
    trunk_ids = [ 
      trunk_id,
    ],  
    hide_phone_number = False
  )

  try:
    dispatchRule = await livekit_api.sip.create_sip_dispatch_rule(request)
    print(f"Successfully created {dispatchRule}")
  except api.twirp_client.TwirpError as e:
    print(f"{e.code} error: {e.message}")

  await livekit_api.aclose()

twilio_number = "+13614281772"
trunk_id = "ST_E4GjkXqaCyqr"
(await main(twilio_number, trunk_id))

In [None]:
""" LIST SIP DISPATCH RULES """
from dotenv import load_dotenv
from livekit import api
from livekit.protocol.sip import ListSIPDispatchRuleRequest, DeleteSIPDispatchRuleRequest

load_dotenv()


async def main():
  livekit_api = api.LiveKitAPI()

  rules = await livekit_api.sip.list_sip_dispatch_rule(
    ListSIPDispatchRuleRequest()
  )
  print(f"{rules}")

  await livekit_api.aclose()

(await main())

In [None]:
""" DELETE SIP DISPATCH RULE """
from dotenv import load_dotenv
from livekit import api
from livekit.protocol.sip import ListSIPDispatchRuleRequest, DeleteSIPDispatchRuleRequest

load_dotenv()


async def main(dispatch_rule_id: str):
    livekit_api = api.LiveKitAPI()

    # First get all rules
    rules = await livekit_api.sip.list_sip_dispatch_rule(
        ListSIPDispatchRuleRequest()
    )
    print(f"Found rules: {rules}")

    await livekit_api.sip.delete_sip_dispatch_rule(
        DeleteSIPDispatchRuleRequest(
            sip_dispatch_rule_id=dispatch_rule_id
        )
    )
    print(f"Deleted rule with ID: {dispatch_rule_id}")

    await livekit_api.aclose()

(await main("SDR_9oiituE5jeyG"))