Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

huh?

  • Loading branch information...
commit 99aef3117da2777702af3e2fb4423be4540e83c4 1 parent b1f6e55
Jacob authored August 18, 2010
32  bin/bootstrap.sh
... ...
@@ -0,0 +1,32 @@
  1
+#!/usr/bin/env bash
  2
+
  3
+# A url directory with the scripts you'd like to stuff into the machine
  4
+REMOTE_FILE_URL_BASE="http://github.com/infochimps/wukong"
  5
+
  6
+# echo "`date` Broaden the apt universe"
  7
+# sudo bash -c 'echo "deb http://ftp.us.debian.org/debian  lenny  multiverse restricted universe" >> /etc/apt/sources.list.d/multiverse.list'
  8
+
  9
+# Do a non interactive apt-get so the user is never prompted for input
  10
+export DEBIAN_FRONTEND=noninteractive
  11
+
  12
+# Update package index and update the basic system files to newest versions
  13
+echo "`date` Apt update" 
  14
+sudo apt-get -y update  ;
  15
+sudo dpkg --configure -a
  16
+echo "`date` Apt upgrade, could take a while" 
  17
+sudo apt-get -y safe-upgrade
  18
+echo "`date` Apt install" 
  19
+sudo apt-get -f install ;
  20
+
  21
+echo "`date` Installing base packages"
  22
+# libopenssl-ruby1.8 ssl-cert 
  23
+sudo apt-get install -y unzip build-essential git-core ruby ruby1.8-dev rubygems ri irb build-essential wget git-core zlib1g-dev libxml2-dev;
  24
+echo "`date` Unchaining rubygems from the tyrrany of ubuntu" 
  25
+sudo gem install --no-rdoc --no-ri rubygems-update --version=1.3.7 ; sudo /var/lib/gems/1.8/bin/update_rubygems; sudo gem update --no-rdoc --no-ri --system ; gem --version ;
  26
+
  27
+echo "`date` Installing wukong gems" 
  28
+sudo gem install --no-rdoc --no-ri addressable extlib htmlentities configliere yard wukong right_aws uuidtools cheat
  29
+sudo gem list 
  30
+
  31
+echo "`date` Wukong bootstrap complete: `date`" 
  32
+true
48  config/emr-example.yaml
... ...
@@ -0,0 +1,48 @@
  1
+#
  2
+# Elastic MapReduce config in wukong
  3
+#
  4
+
  5
+#
  6
+# Infrastructure options
  7
+#
  8
+
  9
+# == Fill all your information into yet another file with your amazon key
  10
+:emr_credentials_file:          ~/.wukong/credentials.json
  11
+#
  12
+# == Use the credentials file, set the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY env vars, or enter them here:
  13
+# :access_key:                  ASDFAHKHASDF
  14
+# :secret_access_key:           ADSGHASDFJASDFASDF
  15
+#
  16
+# == Path to your keypair file
  17
+# :key_pair_file:               ~/.wukong/keypairs/gibbon.pem
  18
+# == Keypair will be named after your file, or force the name:
  19
+# :key_pair:                    ~
  20
+
  21
+# == Path to the Amazon elastic-mapreduce runner. Get a copy from
  22
+#    http://elasticmapreduce.s3.amazonaws.com/elastic-mapreduce-ruby.zip
  23
+:emr_runner:                    ~/ics/hadoop/elastic-mapreduce/elastic-mapreduce
  24
+
  25
+#
  26
+# Cluster Config
  27
+#
  28
+:num_instances:                 1
  29
+:instance_type:                 m2.xlarge
  30
+:master_instance_type:          ~
  31
+:hadoop_version:                '0.20'
  32
+# :availability_zone:           us-east-1b
  33
+
  34
+#
  35
+# Running and reporting options
  36
+#
  37
+:alive:                         true
  38
+:enable_debugging:              true
  39
+:emr_runner_verbose:            true
  40
+:emr_runner_debug:              ~
  41
+:step_action:                   CANCEL_AND_WAIT         # CANCEL_AND_WAIT, TERMINATE_JOB_FLOW or CONTINUE
  42
+
  43
+#
  44
+# Remote Paths
  45
+#
  46
+:emr_root:                      s3n://emr.infinitemonkeys.info
  47
+
  48
+
377  docpages/README-elastic_map_reduce.textile
Source Rendered
... ...
@@ -0,0 +1,377 @@
  1
+h2. Questions
  2
+
  3
+* can I access an EC2 resource (eg cassandra cluster)
  4
+
  5
+
  6
+h2. Setup
  7
+
  8
+* download from http://developer.amazonwebservices.com/connect/entry.jspa?externalID=2264&categoryID=273
  9
+* wget http://elasticmapreduce.s3.amazonaws.com/elastic-mapreduce-ruby.zip
  10
+* unzip elastic-mapreduce-ruby.zip
  11
+* cd elastic-mapreduce-ruby
  12
+* ln -nfs ~/.wukong/credentials.json
  13
+* put your keypair in ~/.wukong/keypairs/WHATEVER.pem
  14
+
  15
+  {
  16
+    "access-id":     "<insert your aws access id here>",
  17
+    "private-key":   "<insert your aws secret access key here>",
  18
+    "key-pair":      "WHATEVER",
  19
+    "key-pair-file": "~/.wukong/keypairs/WHATEVER.pem",
  20
+    "log-uri":       "s3n://yourmom/emr/logs"
  21
+  }
  22
+
  23
+h4. Paths
  24
+
  25
+
  26
+Paths:
  27
+  LogUri		s3	s3n://yourmom/emr/logs
  28
+  step log files 	s3	{log_uri}/Steps/{step}/{syslog,stdout,controller,stderr}
  29
+  Script                s3      s3://yourmom/emr/scripts/path/to/script
  30
+  Wukong                s3      s3://s3scripts.infochimps.org/wukong/current/....
  31
+  Input                 s3      s3n://yourmom/data/wordcount/input 
  32
+  Output                s3      s3n://yourmom/data/wordcount/output
  33
+  Bootstrap Scripts     s3      s3://elasticmapreduce/bootstrap-actions/{configure-hadoop,configure-daemons,run-if}
  34
+  
  35
+  
  36
+  Credentials   	desk	elastic-mapreduce-ruby/credentials.json
  37
+
  38
+  hadoop.tmp.dir	inst	/mnt/var/lib/hadoop/tmp
  39
+  local hdfs		inst	/mnt/var/lib/hadoop/dfs
  40
+  your home dir		inst	/home/hadoop			(small space)
  41
+  Job Settings		inst	/mnt/var/lib/info/job-flow.json
  42
+  Instance Settings	inst	/mnt/var/lib/info/instance.json
  43
+
  44
+
  45
+h4. Launching emr tasks in wukong
  46
+
  47
+* Uses configliere to get your credentials, log_uri, emr_root, script_path
  48
+* Uploads script phases.
  49
+    s3://emr_root/scripts/:script_path/script_name-datetime-mapper.rb
  50
+    s3://emr_root/scripts/:script_path/script_name-datetime-reducer.rb
  51
+** You can use the following symbols to assemble the path:
  52
+    :emr_root, :script_name, :script_path, :username, :date, :datetime, :phase, :rand, :pid, :hostname, :keypair
  53
+  The values for :emr_root and :script_path are taken from configliere.
  54
+  if :script_path is missing, scripts/:username is used.
  55
+  The same timestamp and random number will be used for each phase
  56
+
  57
+* uses elastic-mapreduce-ruby to launch the job
  58
+
  59
+** specify --emr.{option}
  60
+** eg --emr.alive, --emr.num-instances
  61
+
  62
+reads ~/.wukong/emr.yaml
  63
+
  64
+  common
  65
+  jobs / jobname 
  66
+
  67
+  name                  same as for hadoop name
  68
+  alive
  69
+  
  70
+  num_instances         .
  71
+  instance_type         .
  72
+  master_instance_type  .
  73
+  availability_zone     us-east-1b
  74
+  key_pair              job_handle
  75
+  key_pair_file         ~/.wukong/keypairs/{key_pair}.pem
  76
+
  77
+  hadoop_version        0.20
  78
+  plain_output          Return the job flow id from create step as simple text
  79
+  info                  JSON hash
  80
+  emr_root              
  81
+  log_uri               emr_root/logs/:script_path/:script_name-:datetime
  82
+
  83
+  --hadoop-version=0.20 --stream --enable_debugging --verbose --debug --alive
  84
+  --availability-zone AZ --key_pair KP --key_pair_file KPF --access_id EC2ID --private_key EC2PK
  85
+  --slave_instance_type m2.xlarge --master_instance_type m2.xlarge --num_instances NUM
  86
+  #
  87
+  --step_name           
  88
+  --step_action         CANCEL_AND_WAIT, TERMINATE_JOB_FLOW or CONTINUE
  89
+  --jobflow             JOBFLOWID
  90
+  #
  91
+  --info                Settings.emr.info.to_json
  92
+  #
  93
+  --input               INPUT
  94
+  --output              OUTPUT
  95
+  --mapper               s3://emr_root/jobs/:script_path/script_name-datetime-mapper.rb  (or class)
  96
+  --reducer              s3://emr_root/jobs/:script_path/script_name-datetime-reducer.rb (or class)
  97
+  --cache               s3n://emr_root/jobs/:script_path/cache/sample.py#sample.py
  98
+  --cache-archive        s3://s3scripts.infochimps.org/wukong/current/wukong.zip
  99
+  --cache-archive       s3n://emr_root/jobs/:script_path/cache/sample.jar
  100
+  --jobconf             whatever
  101
+  
  102
+...
  103
+
  104
+also:
  105
+
  106
+  --ssh
  107
+  --scp SRC --to DEST
  108
+  --terminate
  109
+  --logs
  110
+  --list
  111
+  --all   
  112
+
  113
+h4. Aggregate
  114
+
  115
+http://hadoop.apache.org/common/docs/r0.20.1/api/org/apache/hadoop/mapred/lib/aggregate/package-summary.html
  116
+
  117
+  DoubleValueSum 	sums up a sequence of double values.
  118
+  LongValueMax  	maintain the maximum of a sequence of long values.
  119
+  LongValueMin  	maintain the minimum of a sequence of long values.
  120
+  LongValueSum  	sums up a sequence of long values.
  121
+  StringValueMax 	maintain the biggest of a sequence of strings.
  122
+  StringValueMin 	maintain the smallest of a sequence of strings.
  123
+  UniqValueCount 	dedupes a sequence of objects.
  124
+  ValueHistogram 	computes the histogram of a sequence of strings.
  125
+  
  126
+h2. Commands
  127
+
  128
+    # create a job and run a mapper written in python and stored in Amazon S3
  129
+    elastic-mapreduce --create --enable_debugging \
  130
+      --stream 
  131
+      --mapper s3://elasticmapreduce/samples/wordcount/wordSplitter.py \
  132
+      --input  s3n://elasticmapreduce/samples/wordcount/input \
  133
+      --output s3n://mybucket/output_path
  134
+      --log_uri 
  135
+
  136
+    elastic-mapreduce --list           # list recently created job flows
  137
+    elastic-mapreduce --list --active  # list all running or starting job flows
  138
+    elastic-mapreduce --list --all     # list all job flows      
  139
+
  140
+h4. Bootstrap actions
  141
+
  142
+    --bootstrap-action s3://elasticmapreduce/bootstrap-actions/configure-hadoop
  143
+    --args "--site-config-file,s3://bucket/config.xml,-s,mapred.tasktracker.map.tasks.maximum=2" 
  144
+
  145
+    --bootstrap-action s3://elasticmapreduce/bootstrap-actions/configure-daemons
  146
+    --args "--namenode-heap-size=2048,--namenode-opts=\"-XX:GCTimeRatio=19\""
  147
+
  148
+You should recompile cascading applications with the Hadoop 0.20 version specified so they can take advantage of the new features available in this version.
  149
+Hadoop 0.20 fully supports Pig scripts.
  150
+All Amazon Elastic MapReduce sample apps are compatible with Hadoop 0.20. The AWS Management Console supports only Hadoop 0.20, so samples will default to 0.20 once launched. 
  151
+
  152
+For Hadoop version 0.20, Hive version 0.5 and version Pig 0.6 is used. The version can be selected by setting HadoopVersion in JobFlowInstancesConfig. 
  153
+
  154
+h3. Pig
  155
+
  156
+  REGISTER s3:///my-bucket/piggybank.jar
  157
+
  158
+  Additional functions:
  159
+  
  160
+  http://developer.amazonwebservices.com/connect/entry.jspa?externalID=2730
  161
+  
  162
+  
  163
+h2. Hadoop and Cluster setup
  164
+  
  165
+h3. Data Compression
  166
+
  167
+  Output Compression:           -jobconf mapred.output.compress=true        FileOutputFormat.setCompressOutput(conf, true);
  168
+  Intermediate Compression:     -jobconf mapred.compress.map.output=true    conf.setCompressMapOutput(true);    
  169
+   
  170
+You can also use a bootstrap action to automatically compress all job outputs. Here is how to do that with the Ruby client.
  171
+  
  172
+   --bootstrap-action s3://elasticmapreduce/bootstrap-actions/configure-hadoop --args "-s,mapred.output.compress=true"
  173
+
  174
+Compressed Input data Hadoop automatically detects the .gz extension on file names and extracts the contents. You do not need to take any action to extract gzipped files.
  175
+
  176
+
  177
+===========================================================================
  178
+
  179
+
  180
+$LOAD_PATH << File.dirname(__FILE__)
  181
+  require 'amazon/coral/elasticmapreduceclient'
  182
+  require 'amazon/retry_delegator'
  183
+  
  184
+  config = {
  185
+    :endpoint            => "https://elasticmapreduce.amazonaws.com",
  186
+    :ca_file             => File.join(File.dirname(__FILE__), "cacert.pem"),
  187
+    :aws_access_key      => my_access_id,
  188
+    :aws_secret_key      => my_secret_key,
  189
+    :signature_algorithm => :V2
  190
+  }
  191
+  client = Amazon::Coral::ElasticMapReduceClient.new_aws_query(config)
  192
+  
  193
+  is_retryable_error_response = Proc.new do |response| 
  194
+  if response == nil then
  195
+    false
  196
+    else
  197
+      ret = false
  198
+    if response['Error'] then 
  199
+      ret ||= ['InternalFailure', 'Throttling', 'ServiceUnavailable', 'Timeout'].include?(response['Error']['Code'])
  200
+    end
  201
+    ret 
  202
+   end
  203
+  end
  204
+  
  205
+  client = Amazon::RetryDelegator.new(client, :retry_if => is_retryable_error_response)
  206
+  
  207
+  puts client.DescribeJobFlows.inspect
  208
+  puts client.DescribeJobFlows('JobFlowId' => 'j-ABAYAS1019012').inspect
  209
+
  210
+h3. Example job-flow.json and instance.json
  211
+
  212
+job-flow.json {"jobFlowId":"j-1UVPY9PQ3XAXE","jobFlowCreationInstant":1271711181000,
  213
+  "instanceCount":4,"masterInstanceId":"i-f987ee92","masterPrivateDnsName":
  214
+  "localhost","masterInstanceType":"m1.small","slaveInstanceType":
  215
+  "m1.small","hadoopVersion":"0.18"}
  216
+
  217
+instance.json {"isMaster":true,"isRunningNameNode":true,"isRunningDataNode":true,
  218
+  "isRunningJobTracker":false,"isRunningTaskTracker":false}        
  219
+
  220
+h3. Configuraion
  221
+
  222
+h4. Configure Hadoop
  223
+
  224
+  Location: s3://elasticmapreduce/bootstrap-actions/configure-hadoop
  225
+  
  226
+  -<f>, --<file>-key-value
  227
+    Key/value pair that will be merged into the specified config file.
  228
+  
  229
+  -<F>, --<file>-config-file
  230
+    Config file in Amazon S3 or locally that will be merged with the specified config file.
  231
+  
  232
+  Acceptable config files:
  233
+  s/S  site     hadoop-site.xml
  234
+  d/D  default  hadoop-default.xml
  235
+  c/C  core     core-site.xml
  236
+  h/H  hdfs     hdfs-site.xml
  237
+  m/M  mapred   mapred-site.xml
  238
+  
  239
+  
  240
+  Example Usage:
  241
+  
  242
+  elastic-mapreduce --create \
  243
+    --bootstrap-action s3://elasticmapreduce/bootstrap-actions/configure-hadoop
  244
+    --args "--site-config-file,s3://bucket/config.xml,-s,mapred.tasktracker.map.tasks.maximum=2" 
  245
+    
  246
+
  247
+  Specify no reducers:
  248
+    --mapred-key-value mapred.reduce.tasks=0
  249
+
  250
+
  251
+  -cacheFile	-files   	Comma separated URIs
  252
+  -cacheArchive	-archives	Comma separated URIs
  253
+  -jobconf	-D      	key=value
  254
+
  255
+h4. Run If  
  256
+
  257
+Location: s3://elasticmapreduce/bootstrap-actions/run-if <JSON path>[!]=<value> <command> [args...]
  258
+
  259
+  JSON path       A path in the instance config or job flow config for the key we should look up.
  260
+  Value           The value we expect to find.
  261
+  Command         The command to run if the value is what we expect (or not what we expect in the case of !=). This can be a path in S3 or a local command.
  262
+  Args            Arguments to pass to the command as it runs.
  263
+
  264
+  elastic-mapreduce --create --alive \
  265
+    --bootstrap-action s3://elasticmapreduce/bootstrap-actions/run-if 
  266
+    --args "instance.isMaster=true,echo,Running,on,master,node"
  267
+
  268
+  
  269
+h4. Configure Daemons
  270
+
  271
+  --<daemon>-heap-size  	Set the heap size in megabytes for the specified daemon.
  272
+  --<daemon>-opts       	Set additional Java options for the specified daemon.
  273
+  --replace               	Replace the existing hadoop-user-env.sh file if it exists.
  274
+  
  275
+  <daemon> is one of: namenode, datanode, jobtracker, tasktracker, client
  276
+
  277
+  elastic-mapreduce --create --alive
  278
+    --bootstrap-action s3://elasticmapreduce/bootstrap-actions/configure-daemons
  279
+    --args "--namenode-heap-size=2048,--namenode-opts=\"-XX:GCTimeRatio=19\""
  280
+
  281
+
  282
+h2. Command Line
  283
+
  284
+
  285
+  Creating Job Flows
  286
+        --create                     Create a new job flow
  287
+        --name NAME                  Name of the job flow
  288
+        --alive                      Create a job flow that stays running even though it has executed all its steps
  289
+        --num-instances NUM          Number of instances in the job flow
  290
+        --instance-type TYPE         The type of the instances to launch
  291
+        --slave-instance-type TYPE   The type of the slave instances to launch
  292
+        --master-instance-type TYPE  The type of the master instance to launch
  293
+        --key-pair KEY_PAIR          The name of your Amazon EC2 Keypair
  294
+        --key-pair-file FILE_PATH    Path to your local pem file for your EC2 key pair
  295
+        --log-uri LOG_URI            Location in S3 to store logs from the job flow, e.g. s3n://mybucket/logs
  296
+        --availability-zone A_Z      Specify the Availability Zone in which to launch the jobflow
  297
+        --info INFO                  Specify additional info in JSON
  298
+        --hadoop-version INFO        Specify the Hadoop Version to install
  299
+        --plain-output               Return the job flow id from create step as simple text
  300
+
  301
+  Adding Jar Steps to Job Flows
  302
+        --jar JAR                    Add a step that executes a jar
  303
+        --wait-for-step              Wait for the step to finish
  304
+        --main-class MAIN_CLASS      Specify main class for the JAR
  305
+
  306
+  Adding Streaming Steps to Job Flows
  307
+        --stream                     Add a step that performs hadoop streaming
  308
+        --input INPUT                Input to the steps, e.g. s3n://mybucket/input
  309
+        --output OUTPUT              The output to the steps, e.g. s3n://mybucket/output
  310
+        --mapper MAPPER              The mapper program or class
  311
+        --cache CACHE_FILE           A file to load into the cache, e.g. s3n://mybucket/sample.py#sample.py
  312
+        --cache-archive CACHE_FILE   A file to unpack into the cache, e.g. s3n://mybucket/sample.jar
  313
+        --jobconf KEY=VALUE          Specify jobconf arguments to pass to streaming, e.g. mapred.task.timeout=800000
  314
+        --reducer REDUCER            The reducer program or class
  315
+
  316
+  Job Flow Deugging Options
  317
+        --enable-debugging           Enable job flow debugging (you must be signed up to SimpleDB for this to work)
  318
+
  319
+  Adding Pig steps to job flows
  320
+        --pig-script                 Add a step that runs a Pig script
  321
+        --pig-interactive            Add a step that sets up the job flow for an interactive (via SSH) pig session
  322
+
  323
+  Configuring a Hive on a JobFlow
  324
+        --hive-site HIVE_SITE        Override Hive configuration with configuration from HIVE_SITE
  325
+        --hive-script                Add a step that runs a Hive script
  326
+        --hive-interactive           Add a step that sets up the job flow for an interactive (via SSH) hive session
  327
+
  328
+  Adding Steps from a Json File to Job Flows
  329
+        --json FILE                  Add a sequence of steps stored in a json file
  330
+        --param VARIABLE=VALUE       subsitute <variable> with value in the json file
  331
+
  332
+  Contacting the Master Node
  333
+        --no-wait                    Don't wait for the Master node to start before executing scp or ssh
  334
+        --ssh [COMMAND]              SSH to the master node and optionally run a command
  335
+        --logs                       Display the step logs for the last executed step
  336
+        --scp SRC                    Copy a file to the master node
  337
+        --to DEST                    the destination to scp a file to
  338
+
  339
+  Settings common to all step types
  340
+        --step-name STEP_NAME        Set name for the step
  341
+        --step-action STEP_NAME      Action to take when step finishes. One of CANCEL_AND_WAIT, TERMINATE_JOB_FLOW or CONTINUE
  342
+        --arg ARG                    Specify an argument to a bootstrap action, jar, streaming, pig-script or hive-script step
  343
+        --args ARGS                  Specify a comma seperated list of arguments, e.g --args 1,2,3 would three arguments
  344
+
  345
+  Specifying Bootstrap Actions
  346
+        --bootstrap-action SCRIPT    Run a bootstrap action script on all instances
  347
+        --bootstrap-name NAME        Set the name of the bootstrap action
  348
+Note --arg and --args are used to pass arguments to bootstrap actions
  349
+
  350
+  Listing and Describing Job Flows
  351
+        --list                       List all job flows created in the last 2 days
  352
+        --describe                   Dump a JSON description of the supplied job flows
  353
+        --active                     List running, starting or shutting down job flows
  354
+        --all                        List all job flows in the last 2 months
  355
+        --nosteps                    Do not list steps when listing jobs
  356
+        --state STATE                List job flows in STATE
  357
+    -n, --max-results MAX_RESULTS    Maximum number of results to list
  358
+
  359
+  Terminating Job Flows
  360
+        --terminate                  Terminate the job flow
  361
+
  362
+  Common Options
  363
+    -j, --jobflow JOB_FLOW_ID
  364
+        --job-flow-id
  365
+    -c, --credentials CRED_FILE      File containing access-id and private-key
  366
+    -a, --access-id ACCESS-ID        AWS Access Id
  367
+    -k, --private-key PRIVATE-KEY    AWS Private Key
  368
+    -v, --verbose                    Turn on verbose logging of program interaction
  369
+
  370
+  Uncommon Options
  371
+        --debug                      Print stack traces when exceptions occur
  372
+        --endpoint ENDPOINT          Specify the webservice endpoint to talk to
  373
+        --region REGION              The region to use for the endpoint
  374
+        --apps-path APPS_PATH        Specify s3:// path to the base of the emr public bucket to use. e.g s3://us-east-1.elasticmapreduce
  375
+        --beta-path BETA_PATH        Specify s3:// path to the base of the emr public bucket to use for beta apps. e.g s3://beta.elasticmapreduce
  376
+        --version                    Print a version string
  377
+    -h, --help                       Show help message
56  docpages/avro/avro_notes.textile
Source Rendered
... ...
@@ -0,0 +1,56 @@
  1
+* Spec: http://avro.apache.org/docs/current/spec.html
  2
+* Jira: https://issues.apache.org/jira/browse/AVRO
  3
+* Wiki: https://cwiki.apache.org/confluence/display/AVRO/Index
  4
+
  5
+* http://github.com/phunt/avro-rpc-quickstart
  6
+
  7
+* http://lucene.apache.org/java/2_4_0/fileformats.html#VInt -- types
  8
+* http://code.google.com/apis/protocolbuffers/docs/encoding.html#types -- a good reference
  9
+* Avro + Eventlet (Python evented code): http://unethicalblogger.com/node/282
  10
+
  11
+
  12
+
  13
+Cassandra + Avro
  14
+
  15
+* Make bulk loading into Cassandra less crappy, more pluggable https://issues.apache.org/jira/browse/CASSANDRA-1278
  16
+* Refactor Streaming: https://issues.apache.org/jira/browse/CASSANDRA-1189
  17
+* Increment Counters: https://issues.apache.org/jira/browse/CASSANDRA-1072
  18
+
  19
+== From hammer's avro tools:
  20
+
  21
+#! /usr/bin/env python
  22
+
  23
+import sys
  24
+from avro import schema
  25
+from avro.genericio import DatumReader
  26
+from avro.io import DataFileReader
  27
+
  28
+if __name__ == "__main__":
  29
+  if len(sys.argv) < 2:
  30
+    print "Need to at least specify an Avro file."
  31
+  outfile_name = sys.argv[1]
  32
+
  33
+  message_schema = None
  34
+  if len(sys.argv) > 2:
  35
+    message_schema = schema.parse(schema.parse(sys.argv[2].encode("utf-8")))
  36
+
  37
+  r = file(outfile_name, 'r')
  38
+  dr = DatumReader(expected = message_schema)
  39
+  dfr = DataFileReader(r, dr)
  40
+  for record in dfr:
  41
+    print record
  42
+  dfr.close()
  43
+
  44
+from binascii import hexlify
  45
+
  46
+def avro_hexlify(reader):
  47
+  """Return the hex value, as a string, of a binary-encoded int or long."""
  48
+  bytes = []
  49
+  current_byte = reader.read(1)
  50
+  bytes.append(hexlify(current_byte))
  51
+  while (ord(current_byte) & 0x80) != 0:
  52
+    current_byte = reader.read(1)
  53
+    bytes.append(hexlify(current_byte))
  54
+  return ' '.join(bytes)
  55
+
  56
+
19  docpages/avro/tethering.textile
Source Rendered
... ...
@@ -0,0 +1,19 @@
  1
+The idea is that folks would somehow write a Ruby application.  On startup, it starts a server speaking InputProtocol:
  2
+
  3
+http://svn.apache.org/viewvc/avro/trunk/share/schemas/org/apache/avro/mapred/tether/InputProtocol.avpr?view=markup
  4
+
  5
+Then it gets a port from the environment variable AVRO_TETHER_OUTPUT_PORT.  It connects to this port, using OutputProtocol, and uses the configure() message to send the port of its input server to its parent:
  6
+
  7
+http://svn.apache.org/viewvc/avro/trunk/share/schemas/org/apache/avro/mapred/tether/OutputProtocol.avpr?view=markup
  8
+
  9
+The meat of maps and reduces consists of the parent sending inputs to the child's with the input() message and the child sending outputs back to the parent with the output() message.
  10
+
  11
+If it helps any, there's a Java implementation of the child including a demo WordCount application:
  12
+
  13
+http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/mapred/tether/
  14
+
  15
+One nit, should you choose to accept this task, is that Avro's ruby RPC stuff will need to be enhanced.  It doesn't yet support request-only messages.  I can probably cajole someone to help with this and there's a workaround for debugging (switch things to HTTP).
  16
+
  17
+http://svn.apache.org/viewvc/avro/trunk/lang/ruby/lib/avro/ipc.rb?view=markup
  18
+
  19
+
26  docpages/pig/commandline_params.txt
... ...
@@ -0,0 +1,26 @@
  1
+Apache Pig version 0.7.0 (r941408) 
  2
+compiled May 05 2010, 11:15:55
  3
+
  4
+USAGE: Pig [options] [-] : Run interactively in grunt shell.
  5
+       Pig [options] -e[xecute] cmd [cmd ...] : Run cmd(s).
  6
+       Pig [options] [-f[ile]] file : Run cmds found in file.
  7
+  options include:
  8
+    -4, -log4jconf log4j configuration file, overrides log conf
  9
+    -b, -brief brief logging (no timestamps)
  10
+    -c, -cluster clustername, kryptonite is default
  11
+    -d, -debug debug level, INFO is default
  12
+    -e, -execute commands to execute (within quotes)
  13
+    -f, -file path to the script to execute
  14
+    -h, -help display this message
  15
+    -i, -version display version information
  16
+    -j, -jar jarfile load jarfile
  17
+    -l, -logfile path to client side log file; current working directory is default
  18
+    -m, -param_file path to the parameter file
  19
+    -p, -param key value pair of the form param=val
  20
+    -r, -dryrun
  21
+    -t, -optimizer_off optimizer rule name, turn optimizer off for this rule; use all to turn all rules off, optimizer is turned on by default
  22
+    -v, -verbose print all error messages to screen
  23
+    -w, -warning turn warning on; also turns warning aggregation off
  24
+    -x, -exectype local|mapreduce, mapreduce is default
  25
+    -F, -stop_on_failure aborts execution on the first failed job; off by default
  26
+    -M, -no_multiquery turn multiquery optimization off; Multiquery is on by default
18  examples/emr/elastic_mapreduce_example.rb
... ...
@@ -0,0 +1,18 @@
  1
+#!/usr/bin/env ruby
  2
+Dir[File.dirname(__FILE__)+'/vendor/**/lib'].each{|dir| $: << dir }
  3
+require 'rubygems'
  4
+require 'wukong'
  5
+
  6
+class FooStreamer < Wukong::Streamer::LineStreamer
  7
+  def initialize *args
  8
+    super *args
  9
+    @line_no = 0
  10
+  end
  11
+
  12
+  def process *args
  13
+    yield ["%5d" % @line_no, *args]
  14
+    @line_no += 1
  15
+  end
  16
+end
  17
+
  18
+Wukong::Script.new(FooStreamer, FooStreamer).run
73  lib/wukong/filename_pattern.rb
... ...
@@ -0,0 +1,73 @@
  1
+module Wukong
  2
+    class FilenamePattern
  3
+      # the filename pattern, e.g. 'ripd/:handle/:date/:handle+:timestamp-:pid-:hostname.tsv'
  4
+      attr_accessor :pattern
  5
+      # custom token replacements
  6
+      attr_accessor :token_val_defaults
  7
+
  8
+      DEFAULT_PATTERN_STR = ":dest_dir/:handle_prefix/:handle/:date/:handle:timestamp-:pid-:hostname.tsv"
  9
+
  10
+      def initialize pattern, token_val_defaults={}
  11
+        self.pattern = pattern
  12
+        self.token_val_defaults    = token_val_defaults
  13
+      end
  14
+
  15
+      #
  16
+      # walk through pattern, replacing tokens (eg :time or :pid) with the
  17
+      # corresponding value.
  18
+      #
  19
+      def make token_vals={}
  20
+        token_vals = token_val_defaults.merge token_vals
  21
+        token_vals[:timestamp] ||= Time.now.utc.strftime("%Y%m%d%H%M%S")
  22
+        # CHH_NOTE: The following is broken for patterns that need a ":" or 
  23
+        # patterns that need text following a token with no special chars in 
  24
+        # between.
  25
+        val = pattern.gsub(/:(\w+)/){ replace($1, token_vals)  }
  26
+        val
  27
+      end
  28
+
  29
+      def to_s token_vals={}
  30
+        make token_vals
  31
+      end
  32
+
  33
+      #
  34
+      # substitute for token
  35
+      #
  36
+      def replace token, token_vals
  37
+        token = token.to_sym
  38
+        return token_vals[token] if token_vals.include? token
  39
+        case token
  40
+        when :pid           then pid
  41
+        when :hostname      then hostname
  42
+        when :handle        then token_vals[:handle] 
  43
+        when :handle_prefix then token_vals[:handle].to_s[0..5]
  44
+        when :timestamp     then token_vals[:timestamp]
  45
+        when :date          then token_vals[:timestamp][ 0..7]
  46
+        when :time          then token_vals[:timestamp][ 8..13]
  47
+        when :hour          then token_vals[:timestamp][ 8..9]
  48
+        when :h4            then "%0.2d" % (( token_vals[:timestamp][8..9].to_i / 4 ) * 4)
  49
+        when :min           then token_vals[:timestamp][10..11]
  50
+        when :sec           then token_vals[:timestamp][12..13]
  51
+        when :s10           then "%0.2d" % (( token_vals[:timestamp][12..13].to_i / 10 ) * 10)
  52
+        else
  53
+          raise "Don't know how to encode token #{token} #{token_vals[token]}"
  54
+        end
  55
+      end
  56
+
  57
+      # Memoized: the hostname for the machine running this script.
  58
+      def hostname
  59
+        @hostname ||= ENV['HOSTNAME'] || `hostname`.delete("\n")
  60
+      end
  61
+      # Memoized: the Process ID for this invocation.
  62
+      def pid
  63
+        @pid      ||= Process.pid
  64
+      end
  65
+
  66
+      # Characters deemed safe in a filename;
  67
+      SAFE_CHARS = 'a-zA-Z0-9_\-\.\+\/\;'
  68
+      def self.sanitize str
  69
+        str.gsub(%r{[^#{SAFE_CHARS}]+}, '-')
  70
+      end
  71
+
  72
+    end
  73
+end
5  lib/wukong/script/avro_command.rb
... ...
@@ -0,0 +1,5 @@
  1
+
  2
+
  3
+#
  4
+# Placeholder -- avro will go here
  5
+#
134  lib/wukong/script/emr_command.rb
... ...
@@ -0,0 +1,134 @@
  1
+require 'right_aws'
  2
+require 'configliere/config_block'
  3
+Settings.read(File.expand_path('~/.wukong/emr.yaml'))
  4
+Settings.define :emr_credentials_file, :description => 'A .json file holding your AWS access credentials. See http://bit.ly/emr_credentials_file for format'
  5
+Settings.define :access_key,           :description => 'AWS Access key',        :env_var => 'AWS_ACCESS_KEY_ID'
  6
+Settings.define :secret_access_key,    :description => 'AWS Secret Access key', :env_var => 'AWS_SECRET_ACCESS_KEY'
  7
+Settings.define :emr_runner,           :description => 'Path to the elastic-mapreduce command (~ etc will be expanded)'
  8
+Settings.define :emr_root,             :description => 'S3 url to use as the base for Elastic MapReduce storage'
  9
+Settings.define :key_pair_file,        :description => 'AWS Key pair file', :finally => lambda{ Settings.key_pair_file = File.expand_path(Settings.key_pair_file.to_s) if Settings.key_pair_file }
  10
+Settings.define :key_pair,             :description => "AWS Key pair name. If not specified, it's taken from key_pair_file's basename", :finally => lambda{ Settings.key_pair ||= File.basename(Settings.key_pair_file.to_s, '.pem') if Settings.key_pair_file }
  11
+Settings.define :instance_type,        :description => 'AWS instance type to use', :default => 'm1.small'
  12
+Settings.define :master_instance_type, :description => 'Overrides the instance type for the master node', :finally => lambda{ Settings.master_instance_type ||= Settings.instance_type }
  13
+Settings.define :jobflow
  14
+module Wukong
  15
+  #
  16
+  # EMR Options
  17
+  #
  18
+  module EmrCommand
  19
+
  20
+    def execute_emr_workflow
  21
+      copy_script_to_cloud
  22
+      execute_emr_runner
  23
+    end
  24
+
  25
+    def copy_script_to_cloud
  26
+      Log.info "  Copying this script to the cloud."
  27
+      S3Util.store(this_script_filename, mapper_s3_uri)
  28
+      S3Util.store(this_script_filename, reducer_s3_uri)
  29
+      S3Util.store(File.expand_path('~/ics/wukong/bin/bootstrap.sh'), bootstrap_s3_uri)
  30
+    end
  31
+
  32
+    def execute_emr_runner
  33
+      command_args = []
  34
+      command_args << Settings.dashed_flags(:hadoop_version, :enable_debugging, :step_action, [:emr_runner_verbose, :verbose], [:emr_runner_debug, :debug]).join(' ')
  35
+      command_args += emr_credentials
  36
+      if Settings.jobflow
  37
+        command_args << Settings.dashed_flag_for(:jobflow)
  38
+      else
  39
+        command_args << Settings.dashed_flag_for(:alive)
  40
+        command_args << "--create --name=#{job_name}"
  41
+        command_args << Settings.dashed_flags(:num_instances, [:instance_type, :slave_instance_type], :master_instance_type).join(' ')
  42
+      end
  43
+      command_args += [
  44
+        "--bootstrap-action=#{bootstrap_s3_uri}",
  45
+        "--log-uri=#{log_s3_uri}",
  46
+        "--stream",
  47
+        "--mapper=#{mapper_s3_uri} ",
  48
+        "--reducer=#{reducer_s3_uri} ",
  49
+        "--input=#{input_paths} --output=#{output_path}",
  50
+        # to specify zero reducers:
  51
+        # "--arg '-D mapred.reduce.tasks=0'"
  52
+      ]
  53
+      Log.info 'Follow along at http://localhost:9000/job'
  54
+      execute_command!( File.expand_path(Settings.emr_runner), *command_args )
  55
+    end
  56
+
  57
+    def emr_ship_jars
  58
+      S3Util.store(File.expand_path('/tmp/wukong-libs.jar'), wukong_libs_s3_uri)
  59
+      # "--cache-archive=#{wukong_libs_s3_uri}#vendor",
  60
+    end
  61
+
  62
+    def emr_credentials
  63
+      command_args = []
  64
+      if Settings.emr_credentials_file
  65
+        command_args << "--credentials #{File.expand_path(Settings.emr_credentials_file)}"
  66
+      else
  67
+        command_args << %Q{--access-id #{Settings.access_key} --private-key #{Settings.secret_access_key} }
  68
+      end
  69
+      command_args << Settings.dashed_flags(:availability_zone, :key_pair, :key_pair_file).join(' ')
  70
+      command_args
  71
+    end
  72
+
  73
+    # A short name for this job
  74
+    def job_handle
  75
+      File.basename($0,'.rb')
  76
+    end
  77
+
  78
+    def mapper_s3_uri
  79
+      emr_s3_path(job_handle+'-mapper.rb')
  80
+    end
  81
+    def reducer_s3_uri
  82
+      emr_s3_path(job_handle+'-reducer.rb')
  83
+    end
  84
+    def log_s3_uri
  85
+      emr_s3_path('log', job_handle)
  86
+    end
  87
+    def bootstrap_s3_uri
  88
+      emr_s3_path('bin', "bootstrap-#{job_handle}.sh")
  89
+    end
  90
+    def wukong_libs_s3_uri
  91
+      emr_s3_path('bin', "wukong-libs.jar")
  92
+    end
  93
+
  94
+    def emr_s3_path *path_segs
  95
+      File.join(Settings.emr_root, path_segs.flatten.compact)
  96
+    end
  97
+
  98
+    module ClassMethods
  99
+
  100
+      # Standard hack to create ClassMethods-on-include
  101
+      def self.included base
  102
+        base.class_eval do
  103
+          extend ClassMethods
  104
+        end
  105
+      end
  106
+    end
  107
+
  108
+    class S3Util
  109
+      # class methods
  110
+      class << self
  111
+        def s3
  112
+          @s3 ||= RightAws::S3Interface.new(
  113
+            Settings.access_key, Settings.secret_access_key,
  114
+            {:multi_thread => true, :logger => Log})
  115
+        end
  116
+
  117
+        def bucket_and_path_from_uri uri
  118
+          uri =~ %r{^s3\w*://([\w\.\-]+)\W*(.*)} and return([$1, $2])
  119
+        end
  120
+
  121
+        def store filename, uri
  122
+          Log.debug "    #{filename} => #{uri}"
  123
+          dest_bucket, dest_key = bucket_and_path_from_uri(uri)
  124
+          contents = File.open(filename)
  125
+          s3.store_object(:bucket => dest_bucket, :key => dest_key, :data => contents)
  126
+        end
  127
+
  128
+      end
  129
+    end
  130
+  end
  131
+  Script.class_eval do
  132
+    include EmrCommand
  133
+  end
  134
+end
36  lib/wukong/store/chh_chunked_flat_file_store.rb
... ...
@@ -0,0 +1,36 @@
  1
+module Wukong
  2
+  module Store
  3
+    class ChhChunkedFlatFileStore < Wukong::Store::FlatFileStore
  4
+      attr_accessor :filename_pattern, :handle, :rootdir
  5
+
  6
+      # Move to configliere
  7
+      Settings.define :chunk_file_pattern,   :default => ":rootdir/:date/:handle:timestamp-:pid.tsv",:description => "The pattern for chunked files."
  8
+      Settings.define :chunk_file_rootdir,   :default => nil, :description => "The root directory for the chunked files."
  9
+      
  10
+      #Note that filemode is inherited from flat_file
  11
+
  12
+      def initialize options={}
  13
+        # super wants a :filename in the options or it will fail. We need to get the initial filename
  14
+        # set up before we call super, so we need all of the parts of the pattern set up. 
  15
+        self.rootdir          = options[:rootdir]   || Settings[:chunk_file_rootdir]
  16
+        self.handle           = options[:handle] 
  17
+        pattern               = options[:pattern] || Settings[:chunk_file_pattern]
  18
+        self.filename_pattern = FilenamePattern.new(pattern, :handle => handle, :rootdir => self.rootdir)
  19
+        options[:filename]    = filename_pattern.make() 
  20
+      
  21
+        super options
  22
+
  23
+        self.mkdir!
  24
+      end
  25
+
  26
+      def new_chunk
  27
+        new_filename = filename_pattern.make()
  28
+        Log.info "Rotating chunked file #{filename} into #{new_filename}"
  29
+        self.close
  30
+        @filename = new_filename
  31
+        self.mkdir!
  32
+      end
  33
+
  34
+    end
  35
+  end
  36
+end

0 notes on commit 99aef31

Please sign in to comment.
Something went wrong with that request. Please try again.