Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub Sub Timeout Panic #45

Closed
ragyabraham opened this issue Aug 29, 2022 · 13 comments
Closed

Pub Sub Timeout Panic #45

ragyabraham opened this issue Aug 29, 2022 · 13 comments

Comments

@ragyabraham
Copy link

I am currently doing some load testing comparing my previous app, built in python, and this new application built in rust using your google-cloud-rust repo and with the rust application, Im getting the below error when sending a high volume of requests to pubsub.
thread 'actix-rt|system:0|arbiter:0' panicked at 'calledResult::unwrap()on anErrvalue: GAX(Auth(HttpError(reqwest::Error { kind: Request, url: Url { scheme: "http", cannot_be_a_base: false, username: "", password: None, host: Some(Ipv4(169.xxx.xxx.xxx)), port: None, path: "/computeMetadata/v1/instance/service-accounts/default/token", query: Some("scopes%3Dhttps%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform%2Chttps%3A%2F%2Fwww.googleapis.com%2Fauth%2Fpubsub.data"), fragment: None }, source: TimedOut })))'

the same number of requests are sent to the python app, which is forwarding the request to the same pubsub service in GCP, and I do not see any failures/errors on the python app.

Any chance you could help me understand why this is happening and how I can fix it.

Cheers

@yoshidan
Copy link
Owner

This may be due to a large number of token acquisition requests occurring at the same time due to the short scope of the lock.

https://github.com/yoshidan/google-cloud-rust/blob/main/foundation/auth/src/token_source/reuse_token_source.rs#L34
https://github.com/yoshidan/google-cloud-rust/blob/main/foundation/auth/src/token_source/compute_token_source.rs#L36

Like the Go library, locking from token expiration check to acquisition may solve the problem.

@ragyabraham
Copy link
Author

Ok, that makes sense. I'll give this a go and create a pull request once I have something meaningful

@yoshidan
Copy link
Owner

I merged the improvement and published google-cloud-pubsub v0.6.0.
Please try it out.

@ragyabraham
Copy link
Author

Amazing! Thank you, will deploy the new version today and test it. I will let you know of the outcome

@ragyabraham
Copy link
Author

ragyabraham commented Sep 27, 2022

Hi @yoshidan I'm still seeing the below errors with high-volume streams

Error creating client: GAX(TonicTransport(tonic::transport::Error(Transport, hyper::Error(Connect, Custom { kind: UnexpectedEof, error: "tls handshake eof" }))))

Error creating client: GAX(Auth(HttpError(reqwest::Error { kind: Request, url: Url { scheme: "http", cannot_be_a_base: false, username: "", password: None, host: Some(Ipv4(169.XXX.XXX.XX)), port: None, path: "/computeMetadata/v1/instance/service-accounts/default/token", query: Some("scopes%3Dhttps%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform%2Chttps%3A%2F%2Fwww.googleapis.com%2Fauth%2Fpubsub.data"), fragment: None }, source: TimedOut })))

@yoshidan
Copy link
Owner

Does it happen under low load?
How much load do you get that error? I will try to reproduce it in my environment.
Also, please tell us the environment where you are running the load test, such as GKE or Cloud Run.

@ragyabraham
Copy link
Author

It does happen under load. I'm using locust to load test.
I'm simulating 1000 users with a spawn rate of 10 users/s
This is running in GKE

@ragyabraham
Copy link
Author

I was hoping to use this in production today as we have a client onboarding but I might need to revert back to python for now. Unless we can sort this out in the next couple of hours

@yoshidan
Copy link
Owner

The token refresh request to the metadata server is only executed once per hour; if it occurs within an hour, are you creating a google_cloud_spanner::client::Client for each user request?

@ragyabraham
Copy link
Author

ragyabraham commented Sep 27, 2022

Ah, that might actually be the case. The below code would be called on every request. Could you explain how I can reuse the client? @yoshidan

use gcp_auth::{AuthenticationManager, CustomServiceAccount};
use google_cloud_gax::grpc::Status;
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::client::Client;
use std::env;
use std::path::PathBuf;
use tokio::task::JoinHandle;

pub async fn publish(_message: String, _topic_id: String) {
    let local = dotenv::var("local").unwrap_or("none".to_string());
    if local == String::from("true") {
        println!("==Running Locally==");
        // Get current working directory as owned string
        let current_dir = env::current_dir()
            .expect("Could not get current working directory")
            .to_str()
            .expect("Could not convert to str")
            .to_owned();
        println!("{:?}/src/keys/dev-key.json", current_dir.to_owned());
        // Concatenate string to form full path to key
        let _path_to_key = current_dir + "/src/keys/dev-key.json";
        // Take ownership of String and convert it to a &str
        let path_to_key = _path_to_key.as_str();
        println!("Path to key: {:?}", path_to_key);
        // Set GOOGLE_APPLICATION_CREDENTIALS env variable if working in a local env
        env::set_var("GOOGLE_APPLICATION_CREDENTIALS", path_to_key);
        println!(
            "GOOGLE_APPLICATION_CREDENTIALS: {}",
            dotenv::var("GOOGLE_APPLICATION_CREDENTIALS").unwrap_or("none".to_string())
        );
        // `credentials_path` variable is the path for the credentials `.json` file.
        let credentials_path = PathBuf::from(path_to_key);
        let service_account = CustomServiceAccount::from_file(credentials_path)
            .expect("Could not find Service account");
        let authentication_manager = AuthenticationManager::from(service_account);
        let scopes = &["https://www.googleapis.com/auth/cloud-platform"];
        let _token = authentication_manager
            .get_token(scopes)
            .await
            .expect("Token stuffed");
    }
    let topic_id = _topic_id.as_str();
    println!("Publishing to topic ID: '{}'", topic_id);
    // Create pubsub client.
    let client = match Client::default().await {
        Ok(c) => c,
        Err(e) => {
            println!("Error creating client: {:?}", e);
            return;
        }
    };
    // Create topic if topic does not exists.
    let topic = client.topic(topic_id);
    // if !topic.exists(None, None).await {
    //     topic.create(None, None, None).await;
    // }
    match topic.exists(None, None).await {
        Ok(_b) => {
            println!("Topic already exists.");
        }
        Err(_e) => {
            match topic.create(None, None, None).await {
                Ok(_b) => {
                    println!("Topic created.");
                }
                Err(_e) => {
                    println!("Error creating topic: {:?}", _e);
                }
            }
        }
    }
    // Start publisher.
    let publisher = topic.new_publisher(None);

    // Publish message.
    let tasks: Vec<JoinHandle<Result<String, Status>>> = (0..1)
        .into_iter()
        .map(|_i| {
            let publisher = publisher.clone();
            // Take ownership of the message
            let message = _message.clone();
            // println!("My cloned message: {}", message);
            tokio::spawn(async move {
                let mut msg = PubsubMessage::default();
                msg.data = message.as_bytes().into();
                // Set ordering_key if needed (https://cloud.google.com/pubsub/docs/ordering)
                // msg.ordering_key = "order".into();
                let awaiter = publisher.publish(msg).await;
                // The get method blocks until a server-generated ID or an error is returned for the published message.
                awaiter.get(None).await
            })
        })
        .collect();

    // Wait for all publish task finish
    for task in tasks {
        let _message_id = match task.await {
            Ok(message_id) => message_id,
            Err(e) => {
                println!("Error could not publish message: {:?}", e);
                return;
            }
        };
        println!("Message ID: {:#?}", _message_id.unwrap_or("None".to_string()));
    }

    // Wait for publishers in topic finish.
    let mut publisher = publisher;
    publisher.shutdown().await;
}

@yoshidan
Copy link
Owner

If you are using actix-web the following code may be helpful.
The key is to use actix_web::web::Data to pass the Publisher's clone to each request.

main.rs

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {

    let pubsub_client = google_cloud_pubsub::client::Client::default().await?;
    let publisher = pubsub_client
        .topic("test-topic")
        .new_publisher(Some(PublisherConfig {
            flush_interval: Duration::from_millis(50),
            ..Default::default()
        }));
    let publisher_for_web = actix_web::web::Data::new(publisher.clone());
    let web_task = tokio::spawn(async move {
        let server = HttpServer::new(move || {
            App::new()
                .wrap(middleware::Logger::default())
                .app_data(publisher_for_web.clone()) // reuse publisher by actix-web
                .service(web::publish) // <- see web.rs
        })
        .bind(("0.0.0.0", 8100))?
        .run();
        tracing::info!("starting HTTP server at 0.0.0.0:8100");

        // Automatically shutdown gracefully.
        server.await.context("server error")
    });

    let _ = web_task.await;
    let mut publisher = publisher;
    let _ = publisher.shutdown();
    Ok(())
}

web.rs

#[get("/api/handle")]
pub async fn publish(publisher: web::Data<Publisher>) -> HttpResponse {
    let result = publisher
        .publish(PubsubMessage {
            data: "web_test".as_bytes().to_vec(),
            ..Default::default()
        })
        .await
        .get(None)
        .await;
    match result {
        Ok(id) => {
            tracing::info!("web published id={}", id);
            HttpResponse::Ok().body(id)
        }
        Err(e) => {
            tracing::error!("failed to web publish {:?}", e);
            HttpResponse::InternalServerError().body("publish error")
        }
    }
}

@ragyabraham
Copy link
Author

Awesome! Thank you for sending that through. I'll give it a go and run some more load testing and update you once i have results

@ragyabraham
Copy link
Author

Worked like a charm! Thank you so much for your help. Happy to close this one out

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants