## Prerequisite

In [1]:
#  Download riva python api
#  $ ngc registry resource download-version "nvidia/riva/riva_quickstart:1.9.0-beta"
#  $ cd riva_quickstart_v1.9.0-beta

In [1]:
# Install riva-api python wheel locally
!mkdir -p ./resources
!cd ./resources/riva_quickstart_v1.9.0-beta && pip install riva_api-1.9.0b0-py3-none-any.whl

Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com
Processing ./riva_api-1.9.0b0-py3-none-any.whl
Collecting grpcio-tools
  Downloading grpcio_tools-1.44.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.4 MB)
[K     |████████████████████████████████| 2.4 MB 204 kB/s eta 0:00:01     |███████████████████████████████▉| 2.4 MB 204 kB/s eta 0:00:01
Collecting grpcio>=1.44.0
  Downloading grpcio-1.44.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.3 MB)
[K     |████████████████████████████████| 4.3 MB 24.6 MB/s eta 0:00:01
Installing collected packages: grpcio, grpcio-tools, riva-api
  Attempting uninstall: grpcio
    Found existing installation: grpcio 1.41.0
    Uninstalling grpcio-1.41.0:
      Successfully uninstalled grpcio-1.41.0
Successfully installed grpcio-1.44.0 grpcio-tools-1.44.0 riva-api-1.9.0b0


## What is Riva

NVIDIA **Riva** is a GPU-accelerated SDK for developing speech AI applications. Riva is designed to help you access conversational AI functionalities easily and quickly. With a few commands, you can access the high-performance services through API operations and try demos. Task-specific AI services and gRPC endpoints provide out-of-the-box, high-performance ASR, NLP, and TTS. All these AI services are trained with thousands of hours of public and internal datasets to reach high accuracy. You can start using the pretrained models or fine-tune them with your own dataset to further improve model performance. 

The Riva text-to-speech or speech synthesis skill generates human-like speech and uses non-autoregressive models to deliver 12x higher performance on NVIDIA A100 GPUs when compared with Tacotron 2 and WaveGlow models on NVIDIA V100 GPUs. Furthermore, the service enables you to create a natural custom voice for every brand and virtual assistant with 30 mins of an actor’s data in a day.

![riva capabilities](https://developer-blogs.nvidia.com/wp-content/uploads/2021/11/riva-services-capabilities-2.png)
![riva pipeline](https://developer-blogs.nvidia.com/wp-content/uploads/2021/11/riva-skills.png)

Riva services are exposed through API operations accessible by `gRPC` endpoints that hide all the complexity. The gRPC API operations are exposed by the API server running in a Docker container. They are responsible for processing all the speech and NLP incoming and outgoing data.

In [2]:
# required imports

import io
import librosa
from time import time
import numpy as np
import IPython.display as ipd
import grpc
import requests

# NLP proto
import riva_api.riva_nlp_pb2 as rnlp
import riva_api.riva_nlp_pb2_grpc as rnlp_srv

# ASR proto
import riva_api.riva_asr_pb2 as rasr
import riva_api.riva_asr_pb2_grpc as rasr_srv

# TTS proto
import riva_api.riva_tts_pb2 as rtts
import riva_api.riva_tts_pb2_grpc as rtts_srv
import riva_api.riva_audio_pb2 as ra


In [3]:
# Create Riva clients and connect to Riva Speech API server
channel = grpc.insecure_channel("riva-speech:50051")

#server
riva_asr = rasr_srv.RivaSpeechRecognitionStub(channel)
riva_nlp = rnlp_srv.RivaLanguageUnderstandingStub(channel)
riva_tts = rtts_srv.RivaSpeechSynthesisStub(channel)

## Check Server status via Triton API

For conversational AI applications, it is crucial to keep the latency below a given threshold. This latency requirement translates into the execution of inference requests as soon as they arrive. To saturate the GPUs and increase performance, you must increase the batch size and delay the inference execution until more requests are received and a bigger batch can be formed.

Riva uses NVIDIA **Triton Inference Server** to serve multiple models for efficient and robust resource allocation, as well as to achieve high performance in terms of high throughput, low latency, and high accuracy. The API server sends inference requests to NVIDIA Triton and receives the results.

**Triton Inference Server** provides a cloud and edge inferencing solution optimized for both CPUs and GPUs. Triton supports an HTTP/REST and GRPC protocol that allows remote clients to request inferencing for any model being managed by the server. For edge deployments, Triton is available as a shared library with a C API that allows the full functionality of Triton to be included directly in an application.

![Triton](https://github.com/triton-inference-server/server/blob/main/docs/images/arch.jpg?raw=true)

For more details: https://github.com/triton-inference-server/server

In [4]:
# install tritonclient via pip
!pip install tritonclient

E0221 06:48:13.096669124     454 fork_posix.cc:70]           Fork support is only compatible with the epoll1 and poll polling strategies


Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com
Collecting tritonclient
  Downloading tritonclient-2.18.0-py3-none-manylinux1_x86_64.whl (7.8 MB)
[K     |████████████████████████████████| 7.8 MB 1.1 MB/s eta 0:00:01
Collecting python-rapidjson>=0.9.1
  Downloading python_rapidjson-1.6-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.6 MB)
[K     |████████████████████████████████| 1.6 MB 39.0 MB/s eta 0:00:01
[?25hInstalling collected packages: python-rapidjson, tritonclient
Successfully installed python-rapidjson-1.6 tritonclient-2.18.0


In [5]:
from tritonclient.grpc import service_pb2
from tritonclient.grpc import service_pb2_grpc

trt_channel = grpc.insecure_channel("riva-speech:8001")
grpc_stub = service_pb2_grpc.GRPCInferenceServiceStub(trt_channel)

try:
    request = service_pb2.ServerLiveRequest()
    response = grpc_stub.ServerLive(request)
    print("server {}".format(response))
except Exception as ex:
    print(ex)

request = service_pb2.ServerReadyRequest()
response = grpc_stub.ServerReady(request)
print("server {}".format(response))

server live: true

server ready: true



In [6]:
request = service_pb2.RepositoryIndexRequest()
response = grpc_stub.RepositoryIndex(request)

print("num models: {}\n".format(len(response.models)))
print(response.models)

num models: 43

[name: "citrinet-1024-en-US-asr-offline"
version: "1"
state: "READY"
, name: "citrinet-1024-en-US-asr-offline-ctc-decoder-cpu-streaming-offline"
version: "1"
state: "READY"
, name: "citrinet-1024-en-US-asr-offline-feature-extractor-streaming-offline"
version: "1"
state: "READY"
, name: "citrinet-1024-en-US-asr-offline-voice-activity-detector-ctc-streaming-offline"
version: "1"
state: "READY"
, name: "citrinet-1024-en-US-asr-streaming"
version: "1"
state: "READY"
, name: "citrinet-1024-en-US-asr-streaming-ctc-decoder-cpu-streaming"
version: "1"
state: "READY"
, name: "citrinet-1024-en-US-asr-streaming-feature-extractor-streaming"
version: "1"
state: "READY"
, name: "citrinet-1024-en-US-asr-streaming-voice-activity-detector-ctc-streaming"
version: "1"
state: "READY"
, name: "fastpitch_hifigan_ensemble-ljspeech"
version: "1"
state: "READY"
, name: "fastpitch_hifigan_ensemble-woojin"
version: "1"
state: "READY"
, name: "intent_slot_detokenizer"
version: "1"
state: "READY"
,

In [7]:
[i for i in response.models if "asr" in i.name]

[name: "citrinet-1024-en-US-asr-offline"
 version: "1"
 state: "READY",
 name: "citrinet-1024-en-US-asr-offline-ctc-decoder-cpu-streaming-offline"
 version: "1"
 state: "READY",
 name: "citrinet-1024-en-US-asr-offline-feature-extractor-streaming-offline"
 version: "1"
 state: "READY",
 name: "citrinet-1024-en-US-asr-offline-voice-activity-detector-ctc-streaming-offline"
 version: "1"
 state: "READY",
 name: "citrinet-1024-en-US-asr-streaming"
 version: "1"
 state: "READY",
 name: "citrinet-1024-en-US-asr-streaming-ctc-decoder-cpu-streaming"
 version: "1"
 state: "READY",
 name: "citrinet-1024-en-US-asr-streaming-feature-extractor-streaming"
 version: "1"
 state: "READY",
 name: "citrinet-1024-en-US-asr-streaming-voice-activity-detector-ctc-streaming"
 version: "1"
 state: "READY",
 name: "riva-trt-citrinet-1024-en-US-asr-offline-am-streaming-offline"
 version: "1"
 state: "READY",
 name: "riva-trt-citrinet-1024-en-US-asr-streaming-am-streaming"
 version: "1"
 state: "READY"]

---

Below examples are demonstration of: https://docs.nvidia.com/deeplearning/riva/user-guide/docs/notebooks/Riva_speech_API_demo.html

## 1. Core NLP Service Examples
- TransformText - map an input string to an output string
- ClassifyText - return a single label for the input string
- ClassifyTokens - return a label per input token

In [8]:
# Use the TextTransform API to run the punctuation model
req = rnlp.TextTransformRequest()
req.model.model_name = "riva_punctuation"
req.text.append("add punctuation to this sentence")
req.text.append("do you have any red nvidia shirts")
req.text.append("i need one cpu four gpus and lots of memory "
                "for my new computer it's going to be very cool")

nlp_resp = riva_nlp.TransformText(req)
print("TransformText Output:")
print("\n".join([f" {x}" for x in nlp_resp.text]))

TransformText Output:
 Add punctuation to this sentence.
 Do you have any red Nvidia shirts?
 I need one cpu, four gpus and lots of memory for my new computer. It's going to be very cool.


In [9]:
# Use the TokenClassification API to run a Named Entity Recognition (NER) model
# Note: the model configuration of the NER model indicates that the labels are
# in IOB format. Riva, subsequently, knows to:
#   a) ignore 'O' labels
#   b) Remove B- and I- prefixes from labels
#   c) Collapse sequences of B- I- ... I- tokens into a single token

req = rnlp.TokenClassRequest()
req.model.model_name = "riva_ner"     # If you have deployed a custom model with the domain_name 
                                        # parameter in ServiceMaker's `riva-build` command then you should use 
                                        # "riva_ner_<your_input_domain_name>" where <your_input_domain_name>
                                        # is the name you provided to the domain_name parameter.

req.text.append("Jensen Huang is the CEO of NVIDIA Corporation, "
                "located in Santa Clara, California")
resp = riva_nlp.ClassifyTokens(req)

print("Named Entities:")
for result in resp.results[0].results:
    print(f"  {result.token} ({result.label[0].class_name})")

Named Entities:
  jensen huang (PER)
  nvidia corporation (ORG)
  santa clara (LOC)
  california (LOC)


In [10]:
# Submit a TextClassRequest for text classification.
# Riva NLP comes with a default text_classification domain called "domain_misty" which consists of 
# 4 classes: meteorology, personality, weather and nomatch

request = rnlp.TextClassRequest()
request.model.model_name = "riva_text_classification_domain"       # If you have deployed a custom model  
                                        # with the `--domain_name` parameter in ServiceMaker's `riva-build` command 
                                        # then you should use "riva_text_classification_<your_input_domain_name>"
                                        # where <your_input_domain_name> is the name you provided to the 
                                        # domain_name parameter. In this case the domain_name is "domain"
request.text.append("Is it going to snow in Burlington, Vermont tomorrow night?")
request.text.append("What causes rain?")
request.text.append("What is your favorite season?")
ct_response = riva_nlp.ClassifyText(request)
print(ct_response)

results {
  labels {
    class_name: "weather"
    score: 0.9975590109825134
  }
}
results {
  labels {
    class_name: "meteorology"
    score: 0.984375
  }
}
results {
  labels {
    class_name: "personality"
    score: 0.984375
  }
}



## 2. ASR Examples
Riva Speech API supports `.wav` files in PCM format, `.alaw`, `.mulaw` and `.flac` formats with single channel in this release. 

In [11]:
# This example uses a .wav file with LINEAR_PCM encoding.
path = "./samples/en-US_sample.wav"
audio, sr = librosa.core.load(path, sr=None)
with io.open(path, 'rb') as fh:
    content = fh.read()
ipd.Audio(path)

In [12]:
# Set up an offline/batch recognition request
req = rasr.RecognizeRequest()
req.audio = content                                   # raw bytes
req.config.encoding = ra.AudioEncoding.LINEAR_PCM     # Supports LINEAR_PCM, FLAC, MULAW and ALAW audio encodings
req.config.sample_rate_hertz = sr                     # Audio will be resampled if necessary
req.config.language_code = "en-US"                    # Ignored, will route to correct model in future release
req.config.max_alternatives = 1                       # How many top-N hypotheses to return
req.config.enable_automatic_punctuation = True        # Add punctuation when end of VAD detected
req.config.audio_channel_count = 1                    # Mono channel"
#req.config.model="citrinet-1024-en-US-asr-offline"   #  In the case where multiple models might be able to fulfill the client request, one model is selected at random. Y

response = riva_asr.Recognize(req)
asr_best_transcript = response.results[0].alternatives[0].transcript
print("ASR Transcript:", asr_best_transcript)

print("\n\nFull Response Message:")
print(response)

ASR Transcript: What is natural language processing? 


Full Response Message:
results {
  alternatives {
    transcript: "What is natural language processing? "
    confidence: 1.0
  }
  channel_tag: 1
  audio_processed: 4.1519999504089355
}



## 3. TTS Service Example

Subsequent releases will include added features, including model registration to support multiple languages/voices with the same API. Support for resampling to alternative sampling rates will also be added.

In [13]:
req = rtts.SynthesizeSpeechRequest()
req.text = "My name is Woojin from South Korea"
req.language_code = "en-US"                    # currently required to be "en-US"
req.encoding = ra.AudioEncoding.LINEAR_PCM     # Supports LINEAR_PCM, FLAC, MULAW and ALAW audio encodings
req.sample_rate_hz = 22050                     # ignored, audio returned will be 22.05KHz
req.voice_name = "ljspeech"                    # ignored

resp = riva_tts.Synthesize(req)
audio_samples = np.frombuffer(resp.audio, dtype=np.float32)
ipd.Audio(audio_samples, rate=22050)

## 4. Riva NLP Service Examples

The NLP Service contains higher-level/more application-specific NLP APIs. This
guide demonstrates how the AnalyzeIntent API can be used for queries across
both known and unknown domains.

In [14]:
# The AnalyzeIntent API can be used to query a Intent Slot classifier. The API can leverage a
# text classification model to classify the domain of the input query and then route to the 
# appropriate intent slot model.

# Lets first see an example where the domain is known. This skips execution of the domain classifier
# and proceeds directly to the intent/slot model for the requested domain.

req = rnlp.AnalyzeIntentRequest()
req.query = "How is the humidity in San Francisco?"
req.options.domain = "weather"  # The <domain_name> is appended to "riva_intent_" to look for a 
                                # model "riva_intent_<domain_name>". So in this e.g., the model "riva_intent_weather"
                                # needs to be preloaded in riva server. If you would like to deploy your 
                                # custom Joint Intent and Slot model use the `--domain_name` parameter in 
                                # ServiceMaker's `riva-build intent_slot` command.

resp = riva_nlp.AnalyzeIntent(req)
print(resp)

intent {
  class_name: "weather.humidity"
  score: 0.9833980202674866
}
slots {
  token: "san francisco"
  label {
    class_name: "weatherplace"
    score: 0.9821460247039795
  }
}
slots {
  token: "?"
  label {
    class_name: "weatherplace"
    score: 0.6485490202903748
  }
}
domain_str: "weather"
domain {
  class_name: "weather"
  score: 1.0
}



In [15]:
# Below is an example where the input domain is not provided.

req = rnlp.AnalyzeIntentRequest()
req.query = "Is it going to rain tomorrow?"

        # The input query is first routed to the a text classification model called "riva_text_classification_domain"
        # The output class label of "riva_text_classification_domain" is appended to "riva_intent_"
        # to get the appropriate Intent Slot model to execute for the input query.
        # Note: The model "riva_text_classification_domain" needs to be loaded into Riva server and have the appropriate
        # class labels that would invoke the corresponding intent slot model.

resp = riva_nlp.AnalyzeIntent(req)
print(resp)

intent {
  class_name: "weather.rainfall"
  score: 0.9663090109825134
}
slots {
  token: "tomorrow"
  label {
    class_name: "weatherforecastdaily"
    score: 0.5346879959106445
  }
}
slots {
  token: "?"
  label {
    class_name: "weatherplace"
    score: 0.6896839737892151
  }
}
domain_str: "weather"
domain {
  class_name: "weather"
  score: 0.9975590109825134
}



In [16]:
# Some weather Intent queries
queries = [
    "Is it currently cloudy in Tokyo?",
    "What is the annual rainfall in Pune?",
    "What is the humidity going to be tomorrow?"
]
for q in queries:
    req = rnlp.AnalyzeIntentRequest()
    req.query = q
    start = time()
    resp = riva_nlp.AnalyzeIntent(req)

    print(f"[{resp.intent.class_name}]\t{req.query}")

[weather.cloudy]	Is it currently cloudy in Tokyo?
[weather.rainfall]	What is the annual rainfall in Pune?
[weather.humidity]	What is the humidity going to be tomorrow?


In [17]:
# Demonstrate latency by calling repeatedly.
# NOTE: this is a synchronous API call, so request #N will not be sent until
# response #N-1 is returned. This means latency and throughput will be negatively
# impacted by long-distance & VPN connections

req = rnlp.TextTransformRequest()
req.text.append("i need one cpu four gpus and lots of memory for my new computer it's going to be very cool")

iterations = 10
# Demonstrate synchronous performance
start_time = time()
for _ in range(iterations):
    nlp_resp = riva_nlp.PunctuateText(req)
end_time = time()
print(f"Time to complete {iterations} synchronous requests: {end_time-start_time} sec.")

# Demonstrate async performance
start_time = time()
futures = []
for _ in range(iterations):
    futures.append(riva_nlp.PunctuateText.future(req))
for f in futures:
    f.result()
end_time = time()
print(f"Time to complete {iterations} asynchronous requests: {end_time-start_time} sec.\n")


Time to complete 10 synchronous requests: 0.03712058067321777 sec.
Time to complete 10 asynchronous requests: 0.019395828247070312 sec.

