Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature to support OpenTelemetry logs to be ingested on Parseable #657

Merged
merged 1 commit into from
Feb 15, 2024

Conversation

nikhilsinhaparseable
Copy link
Contributor

feature to support OpenTelemetry logs to be ingested on Parseable

Fixes #312

Description


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Copy link
Contributor

@Eshanatnight Eshanatnight left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a partial implementation of the flatten function

use bytes::Bytes;
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::common::v1::any_value::Value as OtlpValue;
use opentelemetry_proto::tonic::common::v1::{AnyValue as OtlpAnyValue, ArrayValue as OtlpArrayValue};
use opentelemetry_proto::tonic::logs::v1::LogRecordFlags;
use serde_json::{Number as JsonNumber, Value as JsonValue};
use std::collections::BTreeMap;

pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, JsonValue>> {
    let mut vec_otel_json: Vec<BTreeMap<String, JsonValue>> = Vec::new();
    let body_str = std::str::from_utf8(body).unwrap();

    let message: ExportLogsServiceRequest = serde_json::from_slice(body).unwrap();

    // deal with the resource logs ->> arr
    for resource_log in message.resource_logs {
        let mut json = BTreeMap::new();

        // deal with the resource ->> object
        if let Some(resource) = resource_log.resource {
            // attributes ->> arr
            resource.attributes.iter().for_each(|attr| {
                json.append(&mut collect_json_from_values(
                    &attr.value,
                    &format!("resource_{}", attr.key),
                ))
            });

            if resource.dropped_attributes_count > 0 {
                json.insert(
                    "resource_dropped_attributes_count".to_string(),
                    JsonValue::Number(JsonNumber::from(resource.dropped_attributes_count)),
                );
            }
        }

        // deal with the scope logs ->> arr
        for sl in resource_log.scope_logs {
            // deal with instrumentation scope ->> object
            if let Some(inst_scope) = sl.scope {
                // deal with name
                if !inst_scope.name.is_empty() {
                    json.insert(
                        "instrumentation_scope_name".to_owned(),
                        JsonValue::String(inst_scope.name.to_string()),
                    );
                }

                // deal with version
                if !inst_scope.version.is_empty() {
                    json.insert(
                        "instrumentation_scope_version".to_owned(),
                        JsonValue::String(inst_scope.version.to_string()),
                    );
                }

                // deal with the attributes ->> arr
                for attr in inst_scope.attributes {
                    json.append(&mut collect_json_from_values(
                        &attr.value,
                        &format!("instrumentation_scope_{}", attr.key),
                    ));
                }

                // deal with dropped attributes count
                if inst_scope.dropped_attributes_count > 0 {
                    json.insert(
                        "instrumentation_scope_dropped_attributes_count".to_owned(),
                        JsonValue::Number(JsonNumber::from(inst_scope.dropped_attributes_count)),
                    );
                }
            }

            // deal with the log records ->> arr
            for lr in sl.log_records {
                let mut s = BTreeMap::new();

                // deal with time_unix_nano -->> number
                if !lr.time_unix_nano > 0 {
                    s.insert(
                        "time_unix_nano".to_owned(),
                        JsonValue::String(lr.time_unix_nano.to_string()),
                    );
                }

                // deal with observed_time_unix_nano -->>number
                if !lr.observed_time_unix_nano > 0 {
                    s.insert(
                        "observed_time_unix_nano".to_owned(),
                        JsonValue::String(lr.observed_time_unix_nano.to_string()),
                    );
                }

                // deal with severity_number -->> number
                if lr.severity_number > 0 {
                    s.insert(
                        "severity_number".to_owned(),
                        JsonValue::Number(JsonNumber::from(lr.severity_number)),
                    );
                }

                // deal with severity_text -->> string
                if !lr.severity_text.is_empty() {
                    s.insert(
                        "severity_text".to_owned(),
                        JsonValue::String(lr.severity_text.to_string()),
                    );
                } else {
                    s.insert(
                        "severity_text".to_owned(),
                        JsonValue::String(lr.severity_number().as_str_name().to_owned()),
                    );
                }

                // deal with body -->> object / AnyValue
                if let Some(body) = lr.body {
                    s.append(&mut collect_json_from_values(
                        &Some(body),
                        &"body".to_string(),
                    ));
                }

                // deal with attributes ->> arr
                for attr in lr.attributes {
                    s.append(&mut collect_json_from_values(
                        &attr.value,
                        &format!("log_record_{}", attr.key),
                    ));
                }

                // deal with dropped attributes count
                if lr.dropped_attributes_count > 0 {
                    s.insert(
                        "log_record_dropped_attributes_count".to_owned(),
                        JsonValue::Number(JsonNumber::from(lr.dropped_attributes_count)),
                    );
                }

                if lr.flags > 0 {
                    s.insert(
                        "flags_number".to_owned(),
                        JsonValue::Number(JsonNumber::from(lr.flags)),
                    );
                    s.insert(
                        "flags_string".to_owned(),
                        JsonValue::String(LogRecordFlags::TraceFlagsMask.as_str_name().to_owned()),
                    );
                }

                if !lr.span_id.is_empty() {
                    s.insert(
                        "span_id".to_owned(),
                        JsonValue::String(String::from_utf8(lr.span_id).unwrap()),
                    );
                }

                if !lr.trace_id.is_empty() {
                    s.insert(
                        "trace_id".to_owned(),
                        JsonValue::String(String::from_utf8(lr.trace_id).unwrap()),
                    );
                }

                json.append(&mut s);
                vec_otel_json.push(json.clone());
            }

            if !sl.schema_url.is_empty() {
                json.insert(
                    "scope_log_schema_url".to_string(),
                    JsonValue::String(sl.schema_url),
                );
            }
        }

        if !resource_log.schema_url.is_empty() {
            json.insert(
                "resource_schema_url".to_string(),
                JsonValue::String(resource_log.schema_url),
            );
        }
    }

    vec_otel_json
}

//traverse through Value by calling function ollect_json_from_any_value
fn collect_json_from_values(
    values: &Option<OtlpAnyValue>,
    key: &String,
) -> BTreeMap<String, JsonValue> {
	todo!()
}

fn collect_json_from_any_value(
    key: &str,
    value: &OtlpValue,
) -> BTreeMap<String, serde_json::Value> {
    let mut value_json = BTreeMap::new();

    match value {
        OtlpValue::ArrayValue(OtlpArrayValue { values }) => {
            Some(
                values.into_iter().flat_map(to_json_value_from_primitive_any_value).collect(),
            )
        },


        OtlpValue::StringValue(_) => todo!(),
        OtlpValue::BoolValue(_) => todo!(),
        OtlpValue::IntValue(_) => todo!(),
        OtlpValue::DoubleValue(_) => todo!(),
        OtlpValue::KvlistValue(_) => todo!(),
        OtlpValue::BytesValue(_) => todo!(),
    };

    // //ArrayValue is a vector of AnyValue
    // //traverse by recursively calling the same function
    // if value.array_val.is_some() {
    //     let array_val = value.array_val.as_ref().unwrap();
    //     let values = &array_val.values;

    //     for value in values {
    //         let value = &value.value;
    //         value_json = collect_json_from_any_value(key, value.clone());
    //     }
    // }

    // //KeyValueList is a vector of KeyValue
    // //traverse through each element in the vector
    // if value.kv_list_val.is_some() {
    //     let kv_list_val = value.kv_list_val.unwrap();
    //     for key_value in kv_list_val.values {
    //         let value = key_value.value;
    //         value_json = collect_json_from_values(&value, key);
    //     }
    // }
    // if value.bytes_val.is_some() {
    //     value_json.insert(
    //         key.to_string(),
    //         Value::String(value.bytes_val.as_ref().unwrap().to_owned()),
    //     );
    // }

    value_json
}

fn to_json_value_from_primitive_any_value(any_value: &OtlpAnyValue) -> Option<JsonValue> {
    match any_value.value {
        Some(OtlpValue::BoolValue(val)) => Some(JsonValue::Bool(val)),
        Some(OtlpValue::DoubleValue(val)) => JsonNumber::from_f64(val).map(JsonValue::Number),
        Some(OtlpValue::IntValue(val)) => Some(JsonValue::Number(JsonNumber::from(val))),
        Some(OtlpValue::StringValue(val)) => Some(JsonValue::String(val)),
        _ => None,
    }
}

let body_str = std::str::from_utf8(body).unwrap();

let message: LogsData = serde_json::from_str(body_str).unwrap();
for records in message.resource_logs.iter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calling .iter() on Option types makes no sense. There are too many loops that are not needed.

We can just use opentelemetry-proto
we wont have to deal with the proto file to rust structs generation. And as a added bonus it will get much less complicated.

@Eshanatnight
Copy link
Contributor

Here is a partial implementation of the flatten function

I don't think the util functions collect_json_from_any_value and collect_json_from_values need to return a BTreeMap I think it can just return a serde_json::Value

@nitisht nitisht merged commit 32f693e into parseablehq:main Feb 15, 2024
5 of 6 checks passed
@github-actions github-actions bot locked and limited conversation to collaborators Feb 15, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

feature: support OpenTelemetry Protocol (OTLP)
3 participants