Skip to content

Commit

Permalink
[PDI-6936] - Return Response Headers in HTTP POST/REST Client Steps
Browse files Browse the repository at this point in the history
  • Loading branch information
fbrissi authored and Elio Freitas committed Mar 18, 2016
1 parent a161744 commit b27396c
Show file tree
Hide file tree
Showing 16 changed files with 507 additions and 76 deletions.
55 changes: 41 additions & 14 deletions engine/src/org/pentaho/di/trans/steps/http/HTTP.java
Expand Up @@ -22,10 +22,13 @@

package org.pentaho.di.trans.steps.http;

import java.io.IOException;
import java.io.InputStreamReader;
import java.net.UnknownHostException;


import org.apache.commons.httpclient.Credentials;
import org.apache.commons.httpclient.Header;
import org.apache.commons.httpclient.HostConfiguration;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpMethod;
Expand All @@ -34,6 +37,7 @@
import org.apache.commons.httpclient.auth.AuthScope;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.util.URIUtil;
import org.json.simple.JSONObject;
import org.pentaho.di.cluster.SlaveConnectionManager;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.exception.KettleException;
Expand Down Expand Up @@ -93,21 +97,21 @@ private Object[] callHttpService( RowMetaInterface rowMeta, Object[] rowData ) t

// Prepare HTTP get
//
HttpClient httpclient = SlaveConnectionManager.getInstance().createHttpClient();
HttpClient httpClient = SlaveConnectionManager.getInstance().createHttpClient();
HttpMethod method = new GetMethod( url );

// Set timeout
if ( data.realConnectionTimeout > -1 ) {
httpclient.getHttpConnectionManager().getParams().setConnectionTimeout( data.realConnectionTimeout );
httpClient.getHttpConnectionManager().getParams().setConnectionTimeout( data.realConnectionTimeout );
}
if ( data.realSocketTimeout > -1 ) {
httpclient.getHttpConnectionManager().getParams().setSoTimeout( data.realSocketTimeout );
httpClient.getHttpConnectionManager().getParams().setSoTimeout( data.realSocketTimeout );
}

if ( !Const.isEmpty( data.realHttpLogin ) ) {
httpclient.getParams().setAuthenticationPreemptive( true );
httpClient.getParams().setAuthenticationPreemptive( true );
Credentials defaultcreds = new UsernamePasswordCredentials( data.realHttpLogin, data.realHttpPassword );
httpclient.getState().setCredentials( AuthScope.ANY, defaultcreds );
httpClient.getState().setCredentials( AuthScope.ANY, defaultcreds );
}

HostConfiguration hostConfiguration = new HostConfiguration();
Expand Down Expand Up @@ -139,15 +143,15 @@ private Object[] callHttpService( RowMetaInterface rowMeta, Object[] rowData ) t
// used for calculating the responseTime
long startTime = System.currentTimeMillis();

int statusCode = httpclient.executeMethod( hostConfiguration, method );

int statusCode = requestStatusCode( method, hostConfiguration, httpClient );
// calculate the responseTime
long responseTime = System.currentTimeMillis() - startTime;
if ( log.isDetailed() ) {
log.logDetailed( BaseMessages.getString( PKG, "HTTP.Log.ResponseTime", responseTime, url ) );
}

String body = null;
String headerString = null;
// The status code
if ( isDebug() ) {
logDebug( BaseMessages.getString( PKG, "HTTP.Log.ResponseStatusCode", "" + statusCode ) );
Expand All @@ -161,6 +165,7 @@ private Object[] callHttpService( RowMetaInterface rowMeta, Object[] rowData ) t
if ( statusCode != 401 ) {
// guess encoding
//
Header[] headers = searchForHeaders( method );
String encoding = meta.getEncoding();

// Try to determine the encoding from the Content-Type value
Expand All @@ -171,17 +176,18 @@ private Object[] callHttpService( RowMetaInterface rowMeta, Object[] rowData ) t
encoding = contentType.replaceFirst( "^.*;\\s*charset\\s*=\\s*", "" ).replace( "\"", "" ).trim();
}
}
JSONObject json = new JSONObject();
for ( Header header : headers ) {
json.put( header.getName(), header.getValue() );
}
headerString = json.toJSONString();

if ( isDebug() ) {
log.logDebug( toString(), BaseMessages.getString( PKG, "HTTP.Log.ResponseHeaderEncoding", encoding ) );
}

// the response
if ( !Const.isEmpty( encoding ) ) {
inputStreamReader = new InputStreamReader( method.getResponseBodyAsStream(), encoding );
} else {
inputStreamReader = new InputStreamReader( method.getResponseBodyAsStream() );
}
inputStreamReader = openStream( encoding, method );

StringBuilder bodyBuffer = new StringBuilder();

int c;
Expand Down Expand Up @@ -217,6 +223,9 @@ private Object[] callHttpService( RowMetaInterface rowMeta, Object[] rowData ) t
if ( !Const.isEmpty( meta.getResponseTimeFieldName() ) ) {
newRow = RowDataUtil.addValueData( newRow, returnFieldsOffset, new Long( responseTime ) );
}
if ( !Const.isEmpty( meta.getResponseHeaderFieldName() ) ) {
newRow = RowDataUtil.addValueData( newRow, returnFieldsOffset, headerString.toString() );
}

} finally {
if ( inputStreamReader != null ) {
Expand All @@ -225,7 +234,7 @@ private Object[] callHttpService( RowMetaInterface rowMeta, Object[] rowData ) t
// Release current connection to the connection pool once you are done
method.releaseConnection();
if ( data.realcloseIdleConnectionsTime > -1 ) {
httpclient.getHttpConnectionManager().closeIdleConnections( data.realcloseIdleConnectionsTime );
httpClient.getHttpConnectionManager().closeIdleConnections( data.realcloseIdleConnectionsTime );
}
}
return newRow;
Expand Down Expand Up @@ -267,6 +276,24 @@ private String determineUrl( RowMetaInterface outputRowMeta, Object[] row ) thro
}
}

protected int requestStatusCode( HttpMethod method, HostConfiguration hostConfiguration, HttpClient httpClient ) throws IOException {
return httpClient.executeMethod( hostConfiguration, method );
}

protected InputStreamReader openStream( String encoding, HttpMethod method ) throws Exception {

if ( !Const.isEmpty( encoding ) ) {
return new InputStreamReader( method.getResponseBodyAsStream(), encoding );
} else {
return new InputStreamReader( method.getResponseBodyAsStream() );
}

}

protected Header[] searchForHeaders( HttpMethod method ) {
return method.getResponseHeaders();
}

public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws KettleException {
meta = (HTTPMeta) smi;
data = (HTTPData) sdi;
Expand Down
21 changes: 21 additions & 0 deletions engine/src/org/pentaho/di/trans/steps/http/HTTPMeta.java
Expand Up @@ -33,6 +33,7 @@
import org.pentaho.di.core.exception.KettleStepException;
import org.pentaho.di.core.exception.KettleXMLException;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.core.row.ValueMeta;
import org.pentaho.di.core.row.ValueMetaInterface;
import org.pentaho.di.core.row.value.ValueMetaInteger;
import org.pentaho.di.core.row.value.ValueMetaString;
Expand Down Expand Up @@ -101,6 +102,7 @@ public class HTTPMeta extends BaseStepMeta implements StepMetaInterface {

private String resultCodeFieldName;
private String responseTimeFieldName;
private String responseHeaderFieldName;

private String[] headerParameter;
private String[] headerField;
Expand Down Expand Up @@ -324,6 +326,7 @@ public void setDefault() {
fieldName = "result";
resultCodeFieldName = "";
responseTimeFieldName = "";
responseHeaderFieldName = "";
encoding = "UTF-8";
}

Expand All @@ -346,6 +349,13 @@ public void getFields( RowMetaInterface inputRowMeta, String name, RowMetaInterf
v.setOrigin( name );
inputRowMeta.addValueMeta( v );
}
String headerFieldName = space.environmentSubstitute( responseHeaderFieldName );
if ( !Const.isEmpty( headerFieldName ) ) {
ValueMetaInterface v =
new ValueMeta( headerFieldName, ValueMeta.TYPE_STRING );
v.setOrigin( name );
inputRowMeta.addValueMeta( v );
}
}

public String getXML() {
Expand Down Expand Up @@ -385,6 +395,7 @@ public String getXML() {
retval.append( " " ).append( XMLHandler.addTagValue( "name", fieldName ) );
retval.append( " " ).append( XMLHandler.addTagValue( "code", resultCodeFieldName ) );
retval.append( " " ).append( XMLHandler.addTagValue( "response_time", responseTimeFieldName ) );
retval.append( " " ).append( XMLHandler.addTagValue( "response_header", responseHeaderFieldName ) );
retval.append( " </result>" ).append( Const.CR );

return retval.toString();
Expand Down Expand Up @@ -429,6 +440,7 @@ private void readData( Node stepnode, List<? extends SharedObjectInterface> data
fieldName = XMLHandler.getTagValue( stepnode, "result", "name" );
resultCodeFieldName = XMLHandler.getTagValue( stepnode, "result", "code" );
responseTimeFieldName = XMLHandler.getTagValue( stepnode, "result", "response_time" );
responseHeaderFieldName = XMLHandler.getTagValue( stepnode, "result", "response_header" );
} catch ( Exception e ) {
throw new KettleXMLException( BaseMessages.getString( PKG, "HTTPMeta.Exception.UnableToReadStepInfo" ), e );
}
Expand Down Expand Up @@ -466,6 +478,7 @@ public void readRep( Repository rep, IMetaStore metaStore, ObjectId id_step, Lis
fieldName = rep.getStepAttributeString( id_step, "result_name" );
resultCodeFieldName = rep.getStepAttributeString( id_step, "result_code" );
responseTimeFieldName = rep.getStepAttributeString( id_step, "response_time" );
responseHeaderFieldName = rep.getStepAttributeString( id_step, "response_header" );
} catch ( Exception e ) {
throw new KettleException(
BaseMessages.getString( PKG, "HTTPMeta.Exception.UnexpectedErrorReadingStepInfo" ), e );
Expand Down Expand Up @@ -500,6 +513,7 @@ public void saveRep( Repository rep, IMetaStore metaStore, ObjectId id_transform
rep.saveStepAttribute( id_transformation, id_step, "result_name", fieldName );
rep.saveStepAttribute( id_transformation, id_step, "result_code", resultCodeFieldName );
rep.saveStepAttribute( id_transformation, id_step, "response_time", responseTimeFieldName );
rep.saveStepAttribute( id_transformation, id_step, "response_header", responseHeaderFieldName );
} catch ( Exception e ) {
throw new KettleException( BaseMessages.getString( PKG, "HTTPMeta.Exception.UnableToSaveStepInfo" )
+ id_step, e );
Expand Down Expand Up @@ -670,5 +684,12 @@ public String getResponseTimeFieldName() {
public void setResponseTimeFieldName( String responseTimeFieldName ) {
this.responseTimeFieldName = responseTimeFieldName;
}
public String getResponseHeaderFieldName() {
return responseHeaderFieldName;
}

public void setResponseHeaderFieldName( String responseHeaderFieldName ) {
this.responseHeaderFieldName = responseHeaderFieldName;
}

}
Expand Up @@ -28,7 +28,8 @@ HTTPDialog.URL.Label=URL
HTTPDialog.GeneralTab.Title=General
HTTP.Log.NoField=URL field name is missing\!
HTTPDialog.Result.Label=Result field name
HTTPDialog.ResponseTime.Label=Response time (milliseconds) field name
HTTPDialog.ResponseTime.Label=Response time (milliseconds) field name
HTTPDialog.ResponseHeader.Label=Response header field name
HTTPMeta.CheckResult.MissingArguments=Missing arguments, not found in input from previous steps\:
HTTP.Log.ErrorFindingField=We can not find field [{0}] in the input stream\!
HTTPDialog.ResultType.Label=Result type
Expand Down
66 changes: 47 additions & 19 deletions engine/src/org/pentaho/di/trans/steps/httppost/HTTPPOST.java
Expand Up @@ -22,20 +22,25 @@

package org.pentaho.di.trans.steps.httppost;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.File;
import java.io.FileInputStream;
import java.io.ByteArrayInputStream;
import java.io.InputStreamReader;


import java.net.UnknownHostException;

import org.apache.commons.httpclient.Credentials;
import org.apache.commons.httpclient.Header;
import org.apache.commons.httpclient.HostConfiguration;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.NameValuePair;
import org.apache.commons.httpclient.UsernamePasswordCredentials;
import org.apache.commons.httpclient.auth.AuthScope;
import org.apache.commons.httpclient.methods.InputStreamRequestEntity;
import org.apache.commons.httpclient.methods.PostMethod;
import org.json.simple.JSONObject;
import org.pentaho.di.cluster.SlaveConnectionManager;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.exception.KettleException;
Expand Down Expand Up @@ -87,22 +92,22 @@ private Object[] callHTTPPOST( Object[] rowData ) throws KettleException {

// Prepare HTTP POST
//
HttpClient HTTPPOSTclient = SlaveConnectionManager.getInstance().createHttpClient();
HttpClient httpPostClient = SlaveConnectionManager.getInstance().createHttpClient();
PostMethod post = new PostMethod( data.realUrl );
// post.setFollowRedirects(false);

// Set timeout
if ( data.realConnectionTimeout > -1 ) {
HTTPPOSTclient.getHttpConnectionManager().getParams().setConnectionTimeout( data.realConnectionTimeout );
httpPostClient.getHttpConnectionManager().getParams().setConnectionTimeout( data.realConnectionTimeout );
}
if ( data.realSocketTimeout > -1 ) {
HTTPPOSTclient.getHttpConnectionManager().getParams().setSoTimeout( data.realSocketTimeout );
httpPostClient.getHttpConnectionManager().getParams().setSoTimeout( data.realSocketTimeout );
}

if ( !Const.isEmpty( data.realHttpLogin ) ) {
HTTPPOSTclient.getParams().setAuthenticationPreemptive( true );
httpPostClient.getParams().setAuthenticationPreemptive( true );
Credentials defaultcreds = new UsernamePasswordCredentials( data.realHttpLogin, data.realHttpPassword );
HTTPPOSTclient.getState().setCredentials( AuthScope.ANY, defaultcreds );
httpPostClient.getState().setCredentials( AuthScope.ANY, defaultcreds );
}

HostConfiguration hostConfiguration = new HostConfiguration();
Expand Down Expand Up @@ -202,7 +207,7 @@ private Object[] callHTTPPOST( Object[] rowData ) throws KettleException {
long startTime = System.currentTimeMillis();

// Execute the POST method
int statusCode = HTTPPOSTclient.executeMethod( hostConfiguration, post );
int statusCode = requestStatusCode( post, hostConfiguration, httpPostClient );

// calculate the responseTime
long responseTime = System.currentTimeMillis() - startTime;
Expand All @@ -216,13 +221,15 @@ private Object[] callHTTPPOST( Object[] rowData ) throws KettleException {
logDebug( BaseMessages.getString( PKG, "HTTPPOST.Log.ResponseCode", String.valueOf( statusCode ) ) );
}
String body = null;
String headerString = null;
if ( statusCode != -1 ) {
if ( statusCode == 204 ) {
body = "";
} else {
// if the response is not 401: HTTP Authentication required
if ( statusCode != 401 ) {

Header[] headers = searchForHeaders( post );
// Use request encoding if specified in component to avoid strange response encodings
// See PDI-3815
String encoding = data.realEncoding;
Expand All @@ -235,20 +242,15 @@ private Object[] callHTTPPOST( Object[] rowData ) throws KettleException {
encoding = contentType.replaceFirst( "^.*;\\s*charset\\s*=\\s*", "" ).replace( "\"", "" ).trim();
}
}
JSONObject json = new JSONObject();
for ( Header header : headers ) {
json.put( header.getName(), header.getValue() );
}
headerString = json.toJSONString();

// Get the response, but only specify encoding if we've got one
// otherwise the default charset ISO-8859-1 is used by HttpClient
if ( Const.isEmpty( encoding ) ) {
if ( isDebug() ) {
logDebug( BaseMessages.getString( PKG, "HTTPPOST.Log.Encoding", "ISO-8859-1" ) );
}
inputStreamReader = new InputStreamReader( post.getResponseBodyAsStream() );
} else {
if ( isDebug() ) {
logDebug( BaseMessages.getString( PKG, "HTTPPOST.Log.Encoding", encoding ) );
}
inputStreamReader = new InputStreamReader( post.getResponseBodyAsStream(), encoding );
}
inputStreamReader = openStream( encoding, post );

StringBuilder bodyBuffer = new StringBuilder();

Expand Down Expand Up @@ -284,14 +286,17 @@ private Object[] callHTTPPOST( Object[] rowData ) throws KettleException {
if ( !Const.isEmpty( meta.getResponseTimeFieldName() ) ) {
newRow = RowDataUtil.addValueData( newRow, returnFieldsOffset, new Long( responseTime ) );
}
if ( !Const.isEmpty( meta.getResponseHeaderFieldName() ) ) {
newRow = RowDataUtil.addValueData( newRow, returnFieldsOffset, headerString.toString() );
}
} finally {
if ( inputStreamReader != null ) {
inputStreamReader.close();
}
// Release current connection to the connection pool once you are done
post.releaseConnection();
if ( data.realcloseIdleConnectionsTime > -1 ) {
HTTPPOSTclient.getHttpConnectionManager().closeIdleConnections( data.realcloseIdleConnectionsTime );
httpPostClient.getHttpConnectionManager().closeIdleConnections( data.realcloseIdleConnectionsTime );
}
}
return newRow;
Expand All @@ -308,6 +313,29 @@ private Object[] callHTTPPOST( Object[] rowData ) throws KettleException {
}
}

protected int requestStatusCode( PostMethod post, HostConfiguration hostConfiguration, HttpClient httpPostClient ) throws IOException {
return httpPostClient.executeMethod( hostConfiguration, post );
}

protected InputStreamReader openStream( String encoding, PostMethod post ) throws Exception {
if ( Const.isEmpty( encoding ) ) {
if ( isDebug() ) {
logDebug( BaseMessages.getString( PKG, "HTTPPOST.Log.Encoding", "ISO-8859-1" ) );
}
return new InputStreamReader( post.getResponseBodyAsStream() );
} else {
if ( isDebug() ) {
logDebug( BaseMessages.getString( PKG, "HTTPPOST.Log.Encoding", encoding ) );
}
return new InputStreamReader( post.getResponseBodyAsStream(), encoding );
}

}

protected Header[] searchForHeaders( PostMethod post ) {
return post.getResponseHeaders();
}

public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws KettleException {
meta = (HTTPPOSTMeta) smi;
data = (HTTPPOSTData) sdi;
Expand Down

0 comments on commit b27396c

Please sign in to comment.