Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
821 lines (798 sloc) 21.6 KB
<?xml version="1.0" encoding="UTF-8"?>
<transformation>
<info>
<name>t_read_dynamodb</name>
<description/>
<extended_description/>
<trans_version/>
<trans_type>Normal</trans_type>
<trans_status>0</trans_status>
<directory>/</directory>
<parameters>
<parameter>
<name>AWS_REGION</name>
<default_value>us-east-2</default_value>
<description>can be changed, or come in on row as well</description>
</parameter>
</parameters>
<log>
<trans-log-table>
<connection/>
<schema/>
<table/>
<size_limit_lines/>
<interval/>
<timeout_days/>
<field>
<id>ID_BATCH</id>
<enabled>Y</enabled>
<name>ID_BATCH</name>
</field>
<field>
<id>CHANNEL_ID</id>
<enabled>Y</enabled>
<name>CHANNEL_ID</name>
</field>
<field>
<id>TRANSNAME</id>
<enabled>Y</enabled>
<name>TRANSNAME</name>
</field>
<field>
<id>STATUS</id>
<enabled>Y</enabled>
<name>STATUS</name>
</field>
<field>
<id>LINES_READ</id>
<enabled>Y</enabled>
<name>LINES_READ</name>
<subject/>
</field>
<field>
<id>LINES_WRITTEN</id>
<enabled>Y</enabled>
<name>LINES_WRITTEN</name>
<subject/>
</field>
<field>
<id>LINES_UPDATED</id>
<enabled>Y</enabled>
<name>LINES_UPDATED</name>
<subject/>
</field>
<field>
<id>LINES_INPUT</id>
<enabled>Y</enabled>
<name>LINES_INPUT</name>
<subject/>
</field>
<field>
<id>LINES_OUTPUT</id>
<enabled>Y</enabled>
<name>LINES_OUTPUT</name>
<subject/>
</field>
<field>
<id>LINES_REJECTED</id>
<enabled>Y</enabled>
<name>LINES_REJECTED</name>
<subject/>
</field>
<field>
<id>ERRORS</id>
<enabled>Y</enabled>
<name>ERRORS</name>
</field>
<field>
<id>STARTDATE</id>
<enabled>Y</enabled>
<name>STARTDATE</name>
</field>
<field>
<id>ENDDATE</id>
<enabled>Y</enabled>
<name>ENDDATE</name>
</field>
<field>
<id>LOGDATE</id>
<enabled>Y</enabled>
<name>LOGDATE</name>
</field>
<field>
<id>DEPDATE</id>
<enabled>Y</enabled>
<name>DEPDATE</name>
</field>
<field>
<id>REPLAYDATE</id>
<enabled>Y</enabled>
<name>REPLAYDATE</name>
</field>
<field>
<id>LOG_FIELD</id>
<enabled>Y</enabled>
<name>LOG_FIELD</name>
</field>
<field>
<id>EXECUTING_SERVER</id>
<enabled>N</enabled>
<name>EXECUTING_SERVER</name>
</field>
<field>
<id>EXECUTING_USER</id>
<enabled>N</enabled>
<name>EXECUTING_USER</name>
</field>
<field>
<id>CLIENT</id>
<enabled>N</enabled>
<name>CLIENT</name>
</field>
</trans-log-table>
<perf-log-table>
<connection/>
<schema/>
<table/>
<interval/>
<timeout_days/>
<field>
<id>ID_BATCH</id>
<enabled>Y</enabled>
<name>ID_BATCH</name>
</field>
<field>
<id>SEQ_NR</id>
<enabled>Y</enabled>
<name>SEQ_NR</name>
</field>
<field>
<id>LOGDATE</id>
<enabled>Y</enabled>
<name>LOGDATE</name>
</field>
<field>
<id>TRANSNAME</id>
<enabled>Y</enabled>
<name>TRANSNAME</name>
</field>
<field>
<id>STEPNAME</id>
<enabled>Y</enabled>
<name>STEPNAME</name>
</field>
<field>
<id>STEP_COPY</id>
<enabled>Y</enabled>
<name>STEP_COPY</name>
</field>
<field>
<id>LINES_READ</id>
<enabled>Y</enabled>
<name>LINES_READ</name>
</field>
<field>
<id>LINES_WRITTEN</id>
<enabled>Y</enabled>
<name>LINES_WRITTEN</name>
</field>
<field>
<id>LINES_UPDATED</id>
<enabled>Y</enabled>
<name>LINES_UPDATED</name>
</field>
<field>
<id>LINES_INPUT</id>
<enabled>Y</enabled>
<name>LINES_INPUT</name>
</field>
<field>
<id>LINES_OUTPUT</id>
<enabled>Y</enabled>
<name>LINES_OUTPUT</name>
</field>
<field>
<id>LINES_REJECTED</id>
<enabled>Y</enabled>
<name>LINES_REJECTED</name>
</field>
<field>
<id>ERRORS</id>
<enabled>Y</enabled>
<name>ERRORS</name>
</field>
<field>
<id>INPUT_BUFFER_ROWS</id>
<enabled>Y</enabled>
<name>INPUT_BUFFER_ROWS</name>
</field>
<field>
<id>OUTPUT_BUFFER_ROWS</id>
<enabled>Y</enabled>
<name>OUTPUT_BUFFER_ROWS</name>
</field>
</perf-log-table>
<channel-log-table>
<connection/>
<schema/>
<table/>
<timeout_days/>
<field>
<id>ID_BATCH</id>
<enabled>Y</enabled>
<name>ID_BATCH</name>
</field>
<field>
<id>CHANNEL_ID</id>
<enabled>Y</enabled>
<name>CHANNEL_ID</name>
</field>
<field>
<id>LOG_DATE</id>
<enabled>Y</enabled>
<name>LOG_DATE</name>
</field>
<field>
<id>LOGGING_OBJECT_TYPE</id>
<enabled>Y</enabled>
<name>LOGGING_OBJECT_TYPE</name>
</field>
<field>
<id>OBJECT_NAME</id>
<enabled>Y</enabled>
<name>OBJECT_NAME</name>
</field>
<field>
<id>OBJECT_COPY</id>
<enabled>Y</enabled>
<name>OBJECT_COPY</name>
</field>
<field>
<id>REPOSITORY_DIRECTORY</id>
<enabled>Y</enabled>
<name>REPOSITORY_DIRECTORY</name>
</field>
<field>
<id>FILENAME</id>
<enabled>Y</enabled>
<name>FILENAME</name>
</field>
<field>
<id>OBJECT_ID</id>
<enabled>Y</enabled>
<name>OBJECT_ID</name>
</field>
<field>
<id>OBJECT_REVISION</id>
<enabled>Y</enabled>
<name>OBJECT_REVISION</name>
</field>
<field>
<id>PARENT_CHANNEL_ID</id>
<enabled>Y</enabled>
<name>PARENT_CHANNEL_ID</name>
</field>
<field>
<id>ROOT_CHANNEL_ID</id>
<enabled>Y</enabled>
<name>ROOT_CHANNEL_ID</name>
</field>
</channel-log-table>
<step-log-table>
<connection/>
<schema/>
<table/>
<timeout_days/>
<field>
<id>ID_BATCH</id>
<enabled>Y</enabled>
<name>ID_BATCH</name>
</field>
<field>
<id>CHANNEL_ID</id>
<enabled>Y</enabled>
<name>CHANNEL_ID</name>
</field>
<field>
<id>LOG_DATE</id>
<enabled>Y</enabled>
<name>LOG_DATE</name>
</field>
<field>
<id>TRANSNAME</id>
<enabled>Y</enabled>
<name>TRANSNAME</name>
</field>
<field>
<id>STEPNAME</id>
<enabled>Y</enabled>
<name>STEPNAME</name>
</field>
<field>
<id>STEP_COPY</id>
<enabled>Y</enabled>
<name>STEP_COPY</name>
</field>
<field>
<id>LINES_READ</id>
<enabled>Y</enabled>
<name>LINES_READ</name>
</field>
<field>
<id>LINES_WRITTEN</id>
<enabled>Y</enabled>
<name>LINES_WRITTEN</name>
</field>
<field>
<id>LINES_UPDATED</id>
<enabled>Y</enabled>
<name>LINES_UPDATED</name>
</field>
<field>
<id>LINES_INPUT</id>
<enabled>Y</enabled>
<name>LINES_INPUT</name>
</field>
<field>
<id>LINES_OUTPUT</id>
<enabled>Y</enabled>
<name>LINES_OUTPUT</name>
</field>
<field>
<id>LINES_REJECTED</id>
<enabled>Y</enabled>
<name>LINES_REJECTED</name>
</field>
<field>
<id>ERRORS</id>
<enabled>Y</enabled>
<name>ERRORS</name>
</field>
<field>
<id>LOG_FIELD</id>
<enabled>N</enabled>
<name>LOG_FIELD</name>
</field>
</step-log-table>
<metrics-log-table>
<connection/>
<schema/>
<table/>
<timeout_days/>
<field>
<id>ID_BATCH</id>
<enabled>Y</enabled>
<name>ID_BATCH</name>
</field>
<field>
<id>CHANNEL_ID</id>
<enabled>Y</enabled>
<name>CHANNEL_ID</name>
</field>
<field>
<id>LOG_DATE</id>
<enabled>Y</enabled>
<name>LOG_DATE</name>
</field>
<field>
<id>METRICS_DATE</id>
<enabled>Y</enabled>
<name>METRICS_DATE</name>
</field>
<field>
<id>METRICS_CODE</id>
<enabled>Y</enabled>
<name>METRICS_CODE</name>
</field>
<field>
<id>METRICS_DESCRIPTION</id>
<enabled>Y</enabled>
<name>METRICS_DESCRIPTION</name>
</field>
<field>
<id>METRICS_SUBJECT</id>
<enabled>Y</enabled>
<name>METRICS_SUBJECT</name>
</field>
<field>
<id>METRICS_TYPE</id>
<enabled>Y</enabled>
<name>METRICS_TYPE</name>
</field>
<field>
<id>METRICS_VALUE</id>
<enabled>Y</enabled>
<name>METRICS_VALUE</name>
</field>
</metrics-log-table>
</log>
<maxdate>
<connection/>
<table/>
<field/>
<offset>0.0</offset>
<maxdiff>0.0</maxdiff>
</maxdate>
<size_rowset>10000</size_rowset>
<sleep_time_empty>50</sleep_time_empty>
<sleep_time_full>50</sleep_time_full>
<unique_connections>N</unique_connections>
<feedback_shown>Y</feedback_shown>
<feedback_size>50000</feedback_size>
<using_thread_priorities>Y</using_thread_priorities>
<shared_objects_file/>
<capture_step_performance>N</capture_step_performance>
<step_performance_capturing_delay>1000</step_performance_capturing_delay>
<step_performance_capturing_size_limit>100</step_performance_capturing_size_limit>
<dependencies>
</dependencies>
<partitionschemas>
</partitionschemas>
<slaveservers>
</slaveservers>
<clusterschemas>
</clusterschemas>
<created_user>-</created_user>
<created_date>2019/03/24 09:44:52.728</created_date>
<modified_user>-</modified_user>
<modified_date>2019/03/24 09:44:52.728</modified_date>
<key_for_session_key>H4sIAAAAAAAAAAMAAAAAAAAAAAA=</key_for_session_key>
<is_key_private>N</is_key_private>
</info>
<notepads>
<notepad>
<note>parameters:
- AWS_REGION</note>
<xloc>16</xloc>
<yloc>16</yloc>
<width>89</width>
<heigth>42</heigth>
<fontname>Segoe UI</fontname>
<fontsize>9</fontsize>
<fontbold>N</fontbold>
<fontitalic>N</fontitalic>
<fontcolorred>0</fontcolorred>
<fontcolorgreen>0</fontcolorgreen>
<fontcolorblue>0</fontcolorblue>
<backgroundcolorred>255</backgroundcolorred>
<backgroundcolorgreen>205</backgroundcolorgreen>
<backgroundcolorblue>112</backgroundcolorblue>
<bordercolorred>100</bordercolorred>
<bordercolorgreen>100</bordercolorgreen>
<bordercolorblue>100</bordercolorblue>
<drawshadow>Y</drawshadow>
</notepad>
<notepad>
<note>instead searching for one server at a time,
batch them into a single request, tokenize, putRow</note>
<xloc>320</xloc>
<yloc>240</yloc>
<width>277</width>
<heigth>42</heigth>
<fontname>Segoe UI</fontname>
<fontsize>9</fontsize>
<fontbold>N</fontbold>
<fontitalic>N</fontitalic>
<fontcolorred>0</fontcolorred>
<fontcolorgreen>0</fontcolorgreen>
<fontcolorblue>0</fontcolorblue>
<backgroundcolorred>255</backgroundcolorred>
<backgroundcolorgreen>205</backgroundcolorgreen>
<backgroundcolorblue>112</backgroundcolorblue>
<bordercolorred>100</bordercolorred>
<bordercolorgreen>100</bordercolorgreen>
<bordercolorblue>100</bordercolorblue>
<drawshadow>Y</drawshadow>
</notepad>
<notepad>
<note>Reference:
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/QueryingJavaDocumentAPI.html
REQUIRES IAM or valid ~/.aws/config and credentials </note>
<xloc>160</xloc>
<yloc>16</yloc>
<width>573</width>
<heigth>73</heigth>
<fontname>Segoe UI</fontname>
<fontsize>9</fontsize>
<fontbold>N</fontbold>
<fontitalic>N</fontitalic>
<fontcolorred>0</fontcolorred>
<fontcolorgreen>0</fontcolorgreen>
<fontcolorblue>0</fontcolorblue>
<backgroundcolorred>255</backgroundcolorred>
<backgroundcolorgreen>205</backgroundcolorgreen>
<backgroundcolorblue>112</backgroundcolorblue>
<bordercolorred>100</bordercolorred>
<bordercolorgreen>100</bordercolorgreen>
<bordercolorblue>100</bordercolorblue>
<drawshadow>Y</drawshadow>
</notepad>
</notepads>
<order>
<hop>
<from>Data Grid: server_names</from>
<to>Get variables: AWS_REGION</to>
<enabled>Y</enabled>
</hop>
<hop>
<from>Get variables: AWS_REGION</from>
<to>mgb:server_names</to>
<enabled>N</enabled>
</hop>
<hop>
<from>mgb:server_names</from>
<to>udjc:batch-get-item</to>
<enabled>N</enabled>
</hop>
<hop>
<from>Get variables: AWS_REGION</from>
<to>udjc:batch-get-item</to>
<enabled>Y</enabled>
</hop>
</order>
<step>
<name>Data Grid: server_names</name>
<type>DataGrid</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<fields>
<field>
<name>lkp_server_name</name>
<type>String</type>
<format/>
<currency/>
<decimal/>
<group/>
<length>-1</length>
<precision>-1</precision>
<set_empty_string>N</set_empty_string>
</field>
</fields>
<data>
<line>
<item>DWH_DEFAULT</item>
</line>
<line>
<item>DWH_TENANT2</item>
</line>
<line>
<item>DWH_TENANT_DOESNOTEXIST</item>
</line>
</data>
<attributes/>
<cluster_schema/>
<remotesteps>
<input>
</input>
<output>
</output>
</remotesteps>
<GUI>
<xloc>80</xloc>
<yloc>128</yloc>
<draw>Y</draw>
</GUI>
</step>
<step>
<name>Get variables: AWS_REGION</name>
<type>GetVariable</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<fields>
<field>
<name>AWS_REGION</name>
<variable>${AWS_REGION}</variable>
<type>String</type>
<format/>
<currency/>
<decimal/>
<group/>
<length>-1</length>
<precision>-1</precision>
<trim_type>none</trim_type>
</field>
</fields>
<attributes/>
<cluster_schema/>
<remotesteps>
<input>
</input>
<output>
</output>
</remotesteps>
<GUI>
<xloc>272</xloc>
<yloc>128</yloc>
<draw>Y</draw>
</GUI>
</step>
<step>
<name>mgb:server_names</name>
<type>MemoryGroupBy</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<give_back_row>N</give_back_row>
<group>
<field>
<name>AWS_REGION</name>
</field>
</group>
<fields>
<field>
<aggregate>server_names</aggregate>
<subject>server_name</subject>
<type>CONCAT_COMMA</type>
<valuefield/>
</field>
</fields>
<attributes/>
<cluster_schema/>
<remotesteps>
<input>
</input>
<output>
</output>
</remotesteps>
<GUI>
<xloc>416</xloc>
<yloc>176</yloc>
<draw>Y</draw>
</GUI>
</step>
<step>
<name>udjc:batch-get-item</name>
<type>UserDefinedJavaClass</type>
<description/>
<distribute>Y</distribute>
<custom_distribution/>
<copies>1</copies>
<partitioning>
<method>none</method>
<schema_name/>
</partitioning>
<definitions>
<definition>
<class_type>TRANSFORM_CLASS</class_type>
<class_name>RowProcessor</class_name>
<class_source>import java.util.*;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.PutItemOutcome;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
import com.amazonaws.services.dynamodbv2.document.ItemCollection;
import com.amazonaws.services.dynamodbv2.document.utils.ValueMap;
import com.amazonaws.regions.*;
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
Object[] r = getRow();
if (r == null) {
setOutputDone();
return false;
}
AmazonDynamoDB client = AmazonDynamoDBClientBuilder.defaultClient(); //.build;
// .withRegion(Regions.US_WEST_2).build();
DynamoDB dynamoDB = new DynamoDB(client);
Table table = dynamoDB.getTable("servers");
String server_name = getInputRowMeta().getString(r, getParameter("SERVERNAME"), null );
QuerySpec spec = new QuerySpec()
.withKeyConditionExpression("server_name = :v_server_name")
.withValueMap(new ValueMap()
.withString(":v_server_name", server_name));
ItemCollection&lt;QueryOutcome> items = table.query(spec);
Iterator&lt;Item> iterator = items.iterator();
Item item = null;
while (iterator.hasNext()) {
item = (Item) iterator.next();
logBasic(item.toJSONPretty());
}
r = createOutputRow(r, data.outputRowMeta.size());
int index = getInputRowMeta().size();
r[index++] = item == null ? null : item.getStringSet("server_name");
r[index++] = item == null ? null : item.getStringSet("DB_HOST");
r[index++] = item == null ? null : item.getStringSet("DB_PORT");
r[index++] = item == null ? null : item.getStringSet("DB_SCHEMA");
r[index++] = item == null ? null : item.getStringSet("DB_USER");
r[index++] = item == null ? null : item.getStringSet("DB_PASS");
putRow(data.outputRowMeta, r);
return true;
}
public boolean init(StepMetaInterface stepMetaInterface, StepDataInterface stepDataInterface) {
if (super.init(stepMetaInterface, stepDataInterface)) {
return true;
}
return false;
}
</class_source>
</definition>
</definitions>
<fields>
<field>
<field_name>server_name</field_name>
<field_type>String</field_type>
<field_length>-1</field_length>
<field_precision>-1</field_precision>
</field>
<field>
<field_name>DB_HOST</field_name>
<field_type>String</field_type>
<field_length>-1</field_length>
<field_precision>-1</field_precision>
</field>
<field>
<field_name>DB_PORT</field_name>
<field_type>String</field_type>
<field_length>-1</field_length>
<field_precision>-1</field_precision>
</field>
<field>
<field_name>DB_SCHEMA</field_name>
<field_type>String</field_type>
<field_length>-1</field_length>
<field_precision>-1</field_precision>
</field>
<field>
<field_name>DB_USER</field_name>
<field_type>String</field_type>
<field_length>-1</field_length>
<field_precision>-1</field_precision>
</field>
<field>
<field_name>DB_PASS</field_name>
<field_type>String</field_type>
<field_length>-1</field_length>
<field_precision>-1</field_precision>
</field>
</fields>
<clear_result_fields>N</clear_result_fields>
<info_steps/>
<target_steps/>
<usage_parameters>
<usage_parameter>
<parameter_tag>SERVERNAME</parameter_tag>
<parameter_value>lkp_server_name</parameter_value>
<parameter_description/>
</usage_parameter>
</usage_parameters>
<attributes/>
<cluster_schema/>
<remotesteps>
<input>
</input>
<output>
</output>
</remotesteps>
<GUI>
<xloc>544</xloc>
<yloc>128</yloc>
<draw>Y</draw>
</GUI>
</step>
<step_error_handling>
</step_error_handling>
<slave-step-copy-partition-distribution>
</slave-step-copy-partition-distribution>
<slave_transformation>N</slave_transformation>
<attributes/>
</transformation>
You can’t perform that action at this time.