Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logstash not able to read and process .csv file #40

Open
vitthaltambe-gep opened this issue Jun 16, 2023 · 6 comments
Open

Logstash not able to read and process .csv file #40

vitthaltambe-gep opened this issue Jun 16, 2023 · 6 comments

Comments

@vitthaltambe-gep
Copy link

I am reading .csv file from the Azure storage blob and adding that data in Elasticsearch.
But logstash gets stuck after the below lines.

[2023-06-16T18:30:28,479][INFO ][logstash.inputs.azureblobstorage][main][181f6119611506bdf4dacc3ea01b35a1dee26ed795d3b801c7e8f3d7080a0e8c] learn json one of the attempts failed
[2023-06-16T18:30:28,479][INFO ][logstash.inputs.azureblobstorage][main][181f6119611506bdf4dacc3ea01b35a1dee26ed795d3b801c7e8f3d7080a0e8c] head will be: {"records":[ and tail is set to ]}

`below is the pipeline code:

input
{
	  azure_blob_storage {
        storageaccount => "xxx"
        access_key => "yyy"
        container => "zzz"
		#path_filters => ['**/invoices_2023.03.04_20.csv']
    }
}
filter {
mutate {
            gsub => ["message", "\"\"", ""]
       }

  csv {
      separator => ","
  columns => ["uniqueId","vendorId","vendorNumber","vendorName","reference","documentDate","purchaseOrderNumber","currency","amount","dueDate","documentStatus","invoiceBlocked","paymentReference","clearingDate","paymentMethod","sourceSystem","documentType","companyCode","companyCodeName","companyCodeCountry","purchaseOrderItem","deliveryNoteNumber","fiscalYear","sapInvoiceNumber","invoicePendingWith"]
  }
if [uniqueId] == "Unique Key"
{
drop { }
}
if [uniqueId] =~ /^\s*$/ {
  drop { }
}
   mutate { remove_field => "message" }
   mutate { remove_field => "@version" }
   mutate { remove_field => "host" }
   mutate { add_field => { "docTypeCode" => "1019" }}
  # mutate { remove_field => "@timestamp" }
}
output {
        elasticsearch {
            hosts => "aaa:9243"
            user => "bbb"
            password => "ccc@1234"
            index => "invoicehub_test"
            document_type  => "_doc"
            action => "index"
            document_id => "%{uniqueId}"
        }
       # stdout { codec => rubydebug { metadata => true} }
}

CSV file contains data:
Unique Key,Vendor Id,Vendor Number # VN,Vendor Name,Reference,Document Date,Purchase order number,Currency,Amount,Due Date,Document Status,Invoice blocked,Payment Reference Number,Payment/Clearing Date,Payment Method,Source System,Document Type,Company Code,Company Name,Company code country,Purchase order item,Delivery note number,FiscalYear,SAP Invoice Number,Invoice Pending with (email id) AMP_000013530327,50148526,50148526,CARTUS RELOCATION CANADA LTD,2000073927CA,10/21/2019,,CAD,2041.85,11/21/2019,Pending Payment,,,,,AMP,Invoice,1552,Imperial Oil-DS Br,CA,,,2019,2019, AMP_000013562803,783053,783053,CPS COMUNICACIONES SA,A001800009476,11/1/2019,,ARS,1103.52,12/1/2019,Pending Payment,,,,,AMP,Invoice,2399,ExxonMobil B.S.C Arg. SRL,AR,,,2019,2019, AMP_000013562789,50115024,50115026,FARMERS ALLOY FABRICATING INC,7667,11/5/2019,4410760848,USD,-38940.48,12/5/2019,In Progress,,,,,AMP,Credit Note,944,EM Ref&Mktg (Div),US,,,0,0,wyman.w.hardison@exxonmobil.com

@janmg
Copy link
Owner

janmg commented Jun 16, 2023

Can you set the codec for the input to 'line'. The default is JSON and that requires the plugin to learn the start and stop bytes, but for CSV it's not relevant. This plugin doesn't take into account that the first line is a header, I'm planning a new release soon to fix some other issues and I can add some line skipping logic, but probably you can filter the header in the filter stage

@vitthaltambe-gep
Copy link
Author

How can I set the codec for the input to 'line'?
Can you please help me with that?

@janmg
Copy link
Owner

janmg commented Jun 17, 2023

input
{
azure_blob_storage {
codec => "csv"

The codec is a standard logstash parameter, it defines how the content of an event. By default I set it to JSON. But you can set it to line or csv

https://www.elastic.co/guide/en/logstash/current/plugins-codecs-csv.html

@vitthaltambe-gep
Copy link
Author

vitthaltambe-gep commented Jun 19, 2023

I added the plugin below and modified the pipeline, but still not working for me.

bin/logstash-plugin install logstash-codec-csv

I tried both line and csv in the codec.

The pipeline is stuck after the below lines.

[2023-06-19T11:23:16,911][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"} [2023-06-19T11:23:16,982][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]} [2023-06-19T11:23:20,134][INFO ][logstash.inputs.azureblobstorage][main][3f2c68435ba035ec87614e80723b5194a8d29c6ead5ddbcddd54f218d46ed8cd] resuming from remote registry data/registry.dat

`input
{
azure_blob_storage {
storageaccount => "exxonpaymentstatusqc"
access_key => "QL6XD4T3L5mE3EeirQxRe79SV7Y1k3ezF5jKlZHkzBODoaGTT7W0X67BUhAR7ZrxqIZiYuvgBRPFBxUXNF812Q=="
container => "qcexxonmobildatastore"
codec => "line"
#path_filters => ['**/invoices_2023.03.04_20.csv']
}
}
filter {
mutate {
gsub => ["message", """", ""]
}

csv {
separator => ","
columns => ["uniqueId","vendorId","vendorNumber","vendorName","reference","documentDate","purchaseOrderNumber","currency","amount","dueDate","documentStatus","invoiceBlocked","paymentReference","clearingDate","paymentMethod","sourceSystem","documentType","companyCode","companyCodeName","companyCodeCountry","purchaseOrderItem","deliveryNoteNumber","fiscalYear","sapInvoiceNumber","invoicePendingWith"]
}
if [uniqueId] == "Unique Key"
{
drop { }
}
if [uniqueId] =~ /^\s*$/ {
drop { }
}
mutate { remove_field => "message" }
mutate { remove_field => "@Version" }
mutate { remove_field => "host" }
mutate { add_field => { "docTypeCode" => "1019" }}

mutate { remove_field => "@timestamp" }

}
output {
elasticsearch {
hosts => "https://7a9dd14c791d4615a146a805072c908b.eastus2.azure.elastic-cloud.com:9243"
user => "qcappowner"
password => "Password@1234"
index => "invoicehub_urtexxonmobil_test"
#document_type => "_doc"
action => "index"
document_id => "%{uniqueId}"
}
stdout { codec => rubydebug { metadata => true} }
}
`

@janmg
Copy link
Owner

janmg commented Jun 19, 2023

Make sure to change the passwords and access_keys. You can set registry_create_policy => "start_over" or delete the data/registry.dat because previous files have already been seen and will not be processed. Only new files or grown files will be processed by default. To output more when starting the plugin you can set debug_until => 1000 where the plugin will show more details for the first 1000 events/lines I processed.

@vitthaltambe-gep
Copy link
Author

The pipeline is not able to read headers in the CSV file and add headers as values (check screenshot).
Also processing only one records that is headers.

Do you have any sample pipeline to which I can refer to read data from CSV file and add it to the elasticsearch.

image

`input
{
azure_blob_storage {
storageaccount => "exxonpaymentstatusqc"
access_key => "QL6XD4T3L5mE3EeirQxRe79SV7Y1k3ezF5jKlZHkzBODoaGTT7W0X67BUhAR7ZrxqIZiYuvgBRPFBxUXNF812Q=="
container => "qcexxonmobildatastore"
codec => "csv"
skip_learning => true
registry_create_policy => "start_over"
path_filters => ['**/*.csv']
}
}
filter {
mutate {
gsub => ["message", """", ""]
}

csv {
separator => ","
columns => ["uniqueId","Vendor Id","Source","Vendor Name"]
}
if [uniqueId] == "Unique Key"
{
drop { }
}
if [uniqueId] =~ /^\s*$/ {
drop { }
}
mutate { remove_field => "message" }
mutate { remove_field => "@Version" }
mutate { remove_field => "host" }
mutate { add_field => { "docTypeCode" => "1019" }}

mutate { remove_field => "@timestamp" }

}
output {
elasticsearch {
hosts => "https://7a9dd14c791d4615a146a805072c908b.eastus2.azure.elastic-cloud.com:9243"
user => "qcappowner"
password => "Password@1234"
index => "invoicehub_urtexxonmobil_test"
#document_type => "_doc"
action => "index"
document_id => "%{uniqueId}"
}
stdout { codec => rubydebug { metadata => true} }
}
CSV Data:uniqueId,Vendor Id,Source,Vendor Name
AMP_000013562789,50115024,50115026,FARMERS ALLOY FABRICATING INC
`

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants