Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Initial version.

  • Loading branch information...
commit ee5b25a10d0d760423571fbe3895345cb0644df5 0 parents
@umitanuki authored
39 META.json
@@ -0,0 +1,39 @@
+{
+ "name": "s3_fdw",
+ "abstract": "foreign-data wrapper for Amazon S3",
+ "description": "This module provides reading files located in Amazon S3 privately, using COPY mechanism.",
+ "version": "0.1.0",
+ "maintainer": "Hitoshi Harada <umi.tanuki@gmail.com>",
+ "license": "postgresql",
+ "provides": {
+ "s3_fdw": {
+ "abstract": "fdw for Amazon S3",
+ "version": "0.1.0",
+ "file": "s3_fdw.sql",
+ "docfile": "doc/s3_fdw.md"
+ }
+ },
+ "resources": {
+ "bugtracker": {
+ "web": "http://github.com/umitanuki/s3_fdw/issues/"
+ },
+ "repository": {
+ "url": "git://github.com/umitanuki/s3_fdw.git",
+ "web": "http://github.com/umitanuki/s3_fdw",
+ "type": "git"
+ }
+ },
+ "release_status": "unstable",
+ "meta-spec": {
+ "version": "1.0.0",
+ "url": "http://pgxn.org/meta/spec.txt"
+ },
+ "tags": [
+ "fdw",
+ "web",
+ "internet",
+ "amazon",
+ "cloud",
+ "bulkload"
+ ]
+}
23 Makefile
@@ -0,0 +1,23 @@
+
+MODULE_big = s3_fdw
+OBJS = s3_fdw.o connutil.o# copy_patched.o
+EXTENSION = $(MODULE_big)
+EXTVERSION = 0.1.0
+EXTSQL = $(MODULE_big)--$(EXTVERSION).sql
+DATA = $(EXTSQL)
+EXTRA_CLEAN += $(EXTSQL)
+SHLIB_LINK = -lcurl -lssl -lcrypto
+
+#DOCS = doc/$(MODULES).md
+REGRESS = $(MODULE_big)
+
+all: $(EXTSQL)
+
+$(EXTSQL): $(MODULE_big).sql
+ cp $< $@
+
+
+
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
37 README.md
@@ -0,0 +1,37 @@
+s3\_fdw
+=======
+
+s3\_fdw provides a foreign-data wrapper (FDW) for Amazon S3 files,
+using the builtin COPY format.
+
+To build it, just do this:
+
+ make
+ make install
+
+If you encounter an error such as:
+
+ make: pg_config: Command not found
+
+Be sure that you have `pg_config` installed and in your path. If you used a
+package management system such as RPM to install PostgreSQL, be sure that the
+`-devel` package is also installed. If necessary tell the build process where
+to find it:
+
+ env PG_CONFIG=/path/to/pg_config make && make installcheck && make install
+
+Once `make install` is done, connect to your database with psql or other client
+and type
+
+ CREATE EXTENSION s3_fdw;
+
+then you'll see the FDW is installed. With the FDW, create server, user mapping,
+foreign table. You'll need Amazon S3 access key ID and secret access key to
+authenticate private access to your data. Consult AWS documentation for those keys.
+The access information is stored in user mapping. Foreign tables stores options
+for COPY as well as hostname, bucketname and filename.
+
+Dependencies
+------------
+The `s3_fdw` data type depends on libcurl and openssl. You need those developer
+packages installed in the system path.
141 connutil.c
@@ -0,0 +1,141 @@
+#include "openssl/hmac.h"
+
+#include "postgres.h"
+#include "lib/stringinfo.h"
+
+#include "connutil.h"
+
+static char *sign_by_secretkey(char *input, char *secretkey);
+static int b64_encode(const uint8 *src, unsigned len, uint8 *dst);
+
+/*
+ * Constructs GMT-style string
+ */
+char *
+httpdate(time_t *timer)
+{
+ char *datestring;
+ time_t t;
+ struct tm *gt;
+
+ t = time(timer);
+ gt = gmtime(&t);
+ datestring = (char *) palloc0(256 * sizeof(char));
+ strftime(datestring, 256 * sizeof(char), "%a, %d %b %Y %H:%M:%S +0000", gt);
+ return datestring;
+}
+
+/*
+ * Construct signed string for the Authorization header,
+ * following the Amazon S3 REST API spec.
+ */
+char *
+s3_signature(char *method, char *datestring,
+ char *bucket, char *file, char *secretkey)
+{
+ size_t rs_size;
+ char *resource;
+ StringInfoData buf;
+
+ rs_size = strlen(bucket) + strlen(file) + 3; /* 3 = '/' + '/' + '\0' */
+ resource = (char *) palloc0(rs_size);
+
+ snprintf(resource, rs_size, "/%s/%s", bucket, file);
+ initStringInfo(&buf);
+ /*
+ * StringToSign = HTTP-Verb + "\n" +
+ * Content-MD5 + "\n" +
+ * Content-Type + "\n" +
+ * Date + "\n" +
+ * CanonicalizedAmzHeaders +
+ * CanonicalizedResource;
+ */
+ appendStringInfo(&buf, "%s\n", method);
+ appendStringInfo(&buf, "\n");
+ appendStringInfo(&buf, "\n");
+ appendStringInfo(&buf, "%s\n", datestring);
+// appendStringInfo(&buf, "");
+ appendStringInfo(&buf, "%s", resource);
+
+//elog(INFO, "StringToSign:%s", buf.data);
+ return sign_by_secretkey(buf.data, secretkey);
+}
+
+static char *
+sign_by_secretkey(char *input, char *secretkey)
+{
+ HMAC_CTX ctx;
+ /* sha1 has to be 30 charcters */
+ char result[256];
+ unsigned int len;
+ /* base64 may enlarge the size up to double */
+ char b64_result[256];
+ int b64_len;
+
+ HMAC_CTX_init(&ctx);
+ HMAC_Init(&ctx, secretkey, strlen(secretkey), EVP_sha1());
+ HMAC_Update(&ctx, (unsigned char *) input, strlen(input));
+ HMAC_Final(&ctx, (unsigned char *) result, &len);
+ HMAC_CTX_cleanup(&ctx);
+
+ b64_len = b64_encode((unsigned char *) result, len, (unsigned char *) b64_result);
+ b64_result[b64_len] = '\0';
+
+ return pstrdup(b64_result);
+}
+
+/*
+ * BASE64 - duplicated :(
+ */
+
+static int
+b64_encode(const uint8 *src, unsigned len, uint8 *dst)
+{
+ static const unsigned char _base64[] =
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
+
+ uint8 *p,
+ *lend = dst + 76;
+ const uint8 *s,
+ *end = src + len;
+ int pos = 2;
+ unsigned long buf = 0;
+
+ s = src;
+ p = dst;
+
+ while (s < end)
+ {
+ buf |= *s << (pos << 3);
+ pos--;
+ s++;
+
+ /*
+ * write it out
+ */
+ if (pos < 0)
+ {
+ *p++ = _base64[(buf >> 18) & 0x3f];
+ *p++ = _base64[(buf >> 12) & 0x3f];
+ *p++ = _base64[(buf >> 6) & 0x3f];
+ *p++ = _base64[buf & 0x3f];
+
+ pos = 2;
+ buf = 0;
+ }
+ if (p >= lend)
+ {
+ *p++ = '\n';
+ lend = p + 76;
+ }
+ }
+ if (pos != 2)
+ {
+ *p++ = _base64[(buf >> 18) & 0x3f];
+ *p++ = _base64[(buf >> 12) & 0x3f];
+ *p++ = (pos == 0) ? _base64[(buf >> 6) & 0x3f] : '=';
+ *p++ = '=';
+ }
+
+ return p - dst;
+}
11 connutil.h
@@ -0,0 +1,11 @@
+#ifndef _S3_CONNUTIL_H_
+#define _S3_CONNUTIL_H_
+
+#include <time.h>
+
+extern char *httpdate(time_t *timer);
+extern char *s3_signature(char *method, char *datestring,
+ char *bucket, char *file, char *secretkey);
+
+
+#endif /* _S3_CONNUTIL_H */
84 doc/s3_fdw.md
@@ -0,0 +1,84 @@
+s3\_fdw
+=======
+
+Synopsis
+--------
+
+ db1=# CREATE EXTENSION s3_fdw;
+ CREATE EXTENSION
+
+ db1=# CREATE SERVER amazon_s3 FOREIGN DATA WRAPPER s3_fdw;
+ CREATE SERVER
+
+ db1=# CREATE USER MAPPING FOR CURRENT_USER SERVER amazon_s3
+ OPTIONS (
+ accesskey 'your-access-key-id',
+ secretkey 'your-secret-access-key'
+ );
+ CREATE USER MAPPING
+
+ db1=# CREATE FOREIGN TABLE log20110901(
+ atime timestamp,
+ method text, elapse int,
+ session text
+ ) SERVER amazon_s3
+ OPTIONS (
+ hostname 's3-ap-northeast-1.amazonaws.com',
+ bucketname 'umitanuki-dbtest',
+ filename 'log20110901.txt',
+ delimiter E'\t'
+ );
+ CREATE FOREIGN TABLE
+
+Description
+-----------
+
+This module provides foreign-data wrapper for Amazon S3 files.
+The procedure to initiate your foreign table is shown above.
+For the first process, `create extension` for this module. Then,
+`create server` with some name whatever you like without options,
+since server option is not supported yet. After that,
+`create user mapping` for current user with mandatory options
+`accesskey` and `secretkey`. They are provied from Amazon to you.
+
+Last, `create foreign table` for your file. At the moment you
+need to define one table for one file, as file\_fdw in contrib.
+s3\_fdw does support all the COPY options as file\_fdw does, as
+well as these additional mandatory options:
+
+ - hostname
+ - bucketname
+ - filename
+
+You'll find the access URL to S3 file. Split it into these
+tree options and specify separately.
+
+Roadmap
+-------
+
+ - gz file support
+ - bucket files bulk load
+ - normal URL option rather than split path
+ - windows support
+
+Caveat
+------
+
+This module is still under development. You may encounter
+unpredictable situation by using this program.
+
+Especially s3\_fdw forks backend and calls mkfifo to achieve
+read and write in parallel. So, it doesn't work on the
+platforms in which fork / mkfifo doesn't work.
+
+Support
+-------
+
+Goto http://github.com/umitanuki/s3_fdw
+Feel free to report any bug/issues if you find.
+
+Author
+------
+
+Hitoshi Harada
+
887 s3_fdw.c
@@ -0,0 +1,887 @@
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#include "curl/curl.h"
+
+#include "postgres.h"
+#include "fmgr.h"
+
+#include "access/reloptions.h"
+#include "catalog/pg_foreign_table.h"
+#include "catalog/pg_user_mapping.h"
+#include "commands/copy.h"
+#include "commands/defrem.h"
+#include "commands/explain.h"
+#include "foreign/fdwapi.h"
+#include "foreign/foreign.h"
+#include "lib/stringinfo.h"
+#include "libpq/pqsignal.h"
+#include "miscadmin.h"
+#include "optimizer/cost.h"
+#include "postmaster/fork_process.h"
+#include "storage/fd.h"
+#include "storage/ipc.h"
+#include "utils/builtins.h"
+#include "utils/rel.h"
+//#include "utils/resowner.h"
+
+#include "connutil.h"
+
+PG_MODULE_MAGIC;
+
+
+/*
+ * Describes the valid options for objects that use this wrapper.
+ */
+struct S3FdwOption
+{
+ const char *optname;
+ Oid optcontext; /* Oid of catalog in which option may appear */
+};
+
+/*
+ * Valid options for s3_fdw.
+ * These options are based on the options for COPY FROM command.
+ *
+ * Note: If you are adding new option for user mapping, you need to modify
+ * s3GetOptions(), which currently doesn't bother to look at user mappings.
+ */
+static struct S3FdwOption valid_options[] = {
+ /* File options */
+ {"filename", ForeignTableRelationId},
+
+ {"bucketname", ForeignTableRelationId},
+ {"hostname", ForeignTableRelationId},
+
+ /* Format options */
+ /* oids option is not supported */
+ {"format", ForeignTableRelationId},
+ {"header", ForeignTableRelationId},
+ {"delimiter", ForeignTableRelationId},
+ {"quote", ForeignTableRelationId},
+ {"escape", ForeignTableRelationId},
+ {"null", ForeignTableRelationId},
+ {"encoding", ForeignTableRelationId},
+
+ {"accesskey", UserMappingRelationId},
+ {"secretkey", UserMappingRelationId},
+
+ /* Sentinel */
+ {NULL, InvalidOid}
+};
+
+/*
+ * FDW-specific information for ForeignScanState.fdw_state.
+ */
+typedef struct S3FdwExecutionState
+{
+ char *hostname;
+ char *bucketname;
+ char *filename; /* file to read */
+ char *accesskey;
+ char *secretkey;
+ List *copy_options; /* merged COPY options, excluding filename */
+ CopyState cstate; /* state of reading file */
+ char *datafn;
+} S3FdwExecutionState;
+
+/*
+ * forked processes communicate via FIFO, which is described
+ * in this struct. Some experiments tell that it should be
+ * a bad idead to re-open these FIFO; we prepare two files
+ * as one for synchronizing flag, the other for data transfer.
+ */
+typedef struct s3_ipc_context
+{
+ char datafn[MAXPGPATH];
+ FILE *datafp;
+ char flagfn[MAXPGPATH];
+ FILE *flagfp;
+} s3_ipc_context;
+
+/*
+ * Function declarations
+ */
+PG_FUNCTION_INFO_V1(s3test);
+PG_FUNCTION_INFO_V1(s3_fdw_handler);
+PG_FUNCTION_INFO_V1(s3_fdw_validator);
+
+Datum s3test(PG_FUNCTION_ARGS);
+Datum s3_fdw_handler(PG_FUNCTION_ARGS);
+Datum s3_fdw_validator(PG_FUNCTION_ARGS);
+
+/*
+ * FDW callback routines
+ */
+static FdwPlan *s3PlanForeignScan(Oid foreigntableid,
+ PlannerInfo *root,
+ RelOptInfo *baserel);
+static void s3ExplainForeignScan(ForeignScanState *node, ExplainState *es);
+static void s3BeginForeignScan(ForeignScanState *node, int eflags);
+static TupleTableSlot *s3IterateForeignScan(ForeignScanState *node);
+static void s3ReScanForeignScan(ForeignScanState *node);
+static void s3EndForeignScan(ForeignScanState *node);
+
+/*
+ * Helper functions
+ */
+static bool is_valid_option(const char *option, Oid context);
+static void s3GetOptions(Oid foreigntableid, S3FdwExecutionState *state);
+static void estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
+ const char *filename,
+ Cost *startup_cost, Cost *total_cost);
+
+static size_t header_handler(void *buffer, size_t size, size_t nmemb, void *buf);
+static size_t body_handler(void *buffer, size_t size, size_t nmemb, void *userp);
+static size_t write_data_to_buf(void *buffer, size_t size, size_t nmemb, void *buf);
+
+static char *create_tempprefix(char *seed);
+//static void s3_resource_release(ResourceReleasePhase phase, bool isCommit, bool isTopLevel, void *arg);
+static void s3_on_exit(int code, Datum arg);
+
+void _PG_init(void);
+
+Datum
+s3test(PG_FUNCTION_ARGS)
+{
+ CURL *curl;
+ StringInfoData buf;
+ int sc;
+ char *url;
+ char tmp[1024];
+ struct curl_slist *slist;
+ char *datestring;
+ char *signature;
+
+ char *bucket = "umitanuki-dbtest";
+ char *file = "1.txt";
+ char *host = "s3-ap-northeast-1.amazonaws.com";
+
+ char *accesskey = "";
+ char *secretkey = "";
+
+ url = text_to_cstring(PG_GETARG_TEXT_P(0));
+
+ url = palloc0(1024);
+ snprintf(url, 1024, "http://%s/%s/%s", host, bucket, file);
+ datestring = httpdate(NULL);
+ signature = s3_signature("GET", datestring, bucket, file, secretkey);
+
+ slist = NULL;
+ snprintf(tmp, sizeof(tmp), "Date: %s", datestring);
+ slist = curl_slist_append(slist, tmp);
+ snprintf(tmp, sizeof(tmp), "Authorization: AWS %s:%s", accesskey, signature);
+ slist = curl_slist_append(slist, tmp);
+ initStringInfo(&buf);
+
+ curl = curl_easy_init();
+ curl_easy_setopt(curl, CURLOPT_HTTPHEADER, slist);
+ curl_easy_setopt(curl, CURLOPT_URL, url);
+ curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_data_to_buf);
+ curl_easy_setopt(curl, CURLOPT_WRITEDATA, &buf);
+
+ sc = curl_easy_perform(curl);
+
+ curl_easy_cleanup(curl);
+
+ PG_RETURN_TEXT_P(cstring_to_text(buf.data));
+}
+
+/*
+ * Foreign-data wrapper handler function: return a struct with pointers
+ * to my callback routines.
+ */
+Datum
+s3_fdw_handler(PG_FUNCTION_ARGS)
+{
+ FdwRoutine *fdwroutine = makeNode(FdwRoutine);
+
+ fdwroutine->PlanForeignScan = s3PlanForeignScan;
+ fdwroutine->ExplainForeignScan = s3ExplainForeignScan;
+ fdwroutine->BeginForeignScan = s3BeginForeignScan;
+ fdwroutine->IterateForeignScan = s3IterateForeignScan;
+ fdwroutine->ReScanForeignScan = s3ReScanForeignScan;
+ fdwroutine->EndForeignScan = s3EndForeignScan;
+
+ PG_RETURN_POINTER(fdwroutine);
+}
+
+/*
+ * Validate the generic options given to a FOREIGN DATA WRAPPER, SERVER,
+ * USER MAPPING or FOREIGN TABLE that uses file_fdw.
+ *
+ * Raise an ERROR if the option or its value is considered invalid.
+ */
+Datum
+s3_fdw_validator(PG_FUNCTION_ARGS)
+{
+ List *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
+ Oid catalog = PG_GETARG_OID(1);
+ char *filename = NULL,
+ *bucketname = NULL,
+ *hostname = NULL,
+ *accesskey = NULL,
+ *secretkey = NULL;
+ List *copy_options = NIL;
+ ListCell *cell;
+
+ /*
+ * Check that only options supported by s3_fdw, and allowed for the
+ * current object type, are given.
+ */
+ foreach(cell, options_list)
+ {
+ DefElem *def = (DefElem *) lfirst(cell);
+
+ if (!is_valid_option(def->defname, catalog))
+ {
+ struct S3FdwOption *opt;
+ StringInfoData buf;
+
+ /*
+ * Unknown option specified, complain about it. Provide a hint
+ * with list of valid options for the object.
+ */
+ initStringInfo(&buf);
+ for (opt = valid_options; opt->optname; opt++)
+ {
+ if (catalog == opt->optcontext)
+ appendStringInfo(&buf, "%s%s", (buf.len > 0) ? ", " : "",
+ opt->optname);
+ }
+
+ ereport(ERROR,
+ (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
+ errmsg("invalid option \"%s\"", def->defname),
+ errhint("Valid options in this context are: %s",
+ buf.data)));
+ }
+
+ /* Separate out filename, since ProcessCopyOptions won't allow it */
+ if (strcmp(def->defname, "filename") == 0)
+ {
+ if (filename)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ filename = defGetString(def);
+ }
+ else if(strcmp(def->defname, "bucketname") == 0)
+ {
+ if (bucketname)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ bucketname = defGetString(def);
+ }
+ else if(strcmp(def->defname, "hostname") == 0)
+ {
+ if (hostname)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ hostname = defGetString(def);
+ }
+ else if(strcmp(def->defname, "accesskey") == 0)
+ {
+ if (accesskey)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ accesskey = defGetString(def);
+ }
+ else if(strcmp(def->defname, "secretkey") == 0)
+ {
+ if (secretkey)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ secretkey = defGetString(def);
+ }
+ else
+ copy_options = lappend(copy_options, def);
+ }
+
+ /*
+ * Now apply the core COPY code's validation logic for more checks.
+ */
+ ProcessCopyOptions(NULL, true, copy_options);
+
+ /*
+ * Hostname option is required for s3_fdw foreign tables.
+ */
+ if (catalog == ForeignTableRelationId && hostname == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED),
+ errmsg("hostname is required for s3_fdw foreign tables")));
+
+ /*
+ * Bucketname option is required for s3_fdw foreign tables.
+ */
+ if (catalog == ForeignTableRelationId && bucketname == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED),
+ errmsg("bucketname is required for s3_fdw foreign tables")));
+
+ /*
+ * Filename option is required for s3_fdw foreign tables.
+ */
+ if (catalog == ForeignTableRelationId && filename == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED),
+ errmsg("filename is required for s3_fdw foreign tables")));
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * Check if the provided option is one of the valid options.
+ * context is the Oid of the catalog holding the object the option is for.
+ */
+static bool
+is_valid_option(const char *option, Oid context)
+{
+ struct S3FdwOption *opt;
+
+ for (opt = valid_options; opt->optname; opt++)
+ {
+ if (context == opt->optcontext && strcmp(opt->optname, option) == 0)
+ return true;
+ }
+ return false;
+}
+
+/*
+ * Fetch the options for a s3_fdw foreign table.
+ *
+ * We have to separate out "filename", "bucketname" and "hostname"
+ * from the other options because it must not appear in the options
+ * list passed to the core COPY code.
+ */
+static void
+s3GetOptions(Oid foreigntableid, S3FdwExecutionState *state)
+{
+ ForeignTable *table;
+ ForeignServer *server;
+ ForeignDataWrapper *wrapper;
+ UserMapping *mapping;
+ List *options, *new_options;
+ ListCell *lc;
+
+ /*
+ * Extract options from FDW objects.
+ */
+ table = GetForeignTable(foreigntableid);
+ server = GetForeignServer(table->serverid);
+ wrapper = GetForeignDataWrapper(server->fdwid);
+ mapping = GetUserMapping(GetUserId(), table->serverid);
+
+ options = NIL;
+ options = list_concat(options, wrapper->options);
+ options = list_concat(options, server->options);
+ options = list_concat(options, mapping->options);
+ options = list_concat(options, table->options);
+
+ /*
+ * Separate out the host, bucket and filename.
+ */
+ state->hostname = NULL;
+ state->bucketname = NULL;
+ state->filename = NULL;
+ new_options = NIL;
+ foreach(lc, options)
+ {
+ DefElem *def = (DefElem *) lfirst(lc);
+
+ if (strcmp(def->defname, "hostname") == 0)
+ {
+ state->hostname = defGetString(def);
+ }
+ else if (strcmp(def->defname, "bucketname") == 0)
+ {
+ state->bucketname = defGetString(def);
+ }
+ else if (strcmp(def->defname, "filename") == 0)
+ {
+ state->filename = defGetString(def);
+ }
+ else if (strcmp(def->defname, "accesskey") == 0)
+ {
+ state->accesskey = defGetString(def);
+ }
+ else if (strcmp(def->defname, "secretkey") == 0)
+ {
+ state->secretkey = defGetString(def);
+ }
+ else
+ new_options = lappend(new_options, def);
+ }
+
+ /*
+ * The validator should have checked those mandatory options were
+ * included in the options, but check again, just in case.
+ */
+ if (state->hostname == NULL)
+ elog(ERROR, "hostname is required for s3_fdw foreign tables");
+ if (state->bucketname == NULL)
+ elog(ERROR, "bucketname is required for s3_fdw foreign tables");
+ if (state->filename == NULL)
+ elog(ERROR, "filename is required for s3_fdw foreign tables");
+
+ state->copy_options = new_options;
+}
+
+/*
+ * s3PlanForeignScan
+ * Create a FdwPlan for a scan on the foreign table
+ */
+static FdwPlan *
+s3PlanForeignScan(Oid foreigntableid,
+ PlannerInfo *root,
+ RelOptInfo *baserel)
+{
+ FdwPlan *fdwplan;
+ S3FdwExecutionState state;
+
+ /* Fetch options -- it's not sure what is needed here */
+ s3GetOptions(foreigntableid, &state);
+
+ /* Construct FdwPlan with cost estimates */
+ fdwplan = makeNode(FdwPlan);
+ estimate_costs(root, baserel, state.filename,
+ &fdwplan->startup_cost, &fdwplan->total_cost);
+ fdwplan->fdw_private = NIL; /* not used */
+
+ return fdwplan;
+}
+
+/*
+ * s3ExplainForeignScan
+ * Produce extra output for EXPLAIN
+ */
+static void
+s3ExplainForeignScan(ForeignScanState *node, ExplainState *es)
+{
+ S3FdwExecutionState state;
+ StringInfoData url;
+
+ initStringInfo(&url);
+
+ /* Fetch options */
+ s3GetOptions(RelationGetRelid(node->ss.ss_currentRelation), &state);
+ appendStringInfo(&url, "http://%s/%s/%s",
+ state.hostname, state.bucketname, state.filename);
+
+ ExplainPropertyText("Foreign URL", url.data, es);
+}
+
+/*
+ * s3BeginForeignScan
+ * Initiate access to the file by creating CopyState
+ */
+static void
+s3BeginForeignScan(ForeignScanState *node, int eflags)
+{
+ CopyState cstate;
+ S3FdwExecutionState *festate;
+ StringInfoData buf;
+ char *url, *datestring, *signature;
+ char *prefix;
+ pid_t pid;
+ s3_ipc_context ctx;
+
+ /*
+ * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
+ */
+ if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
+ return;
+
+ festate = (S3FdwExecutionState *) palloc(sizeof(S3FdwExecutionState));
+ /* Fetch options of foreign table */
+ s3GetOptions(RelationGetRelid(node->ss.ss_currentRelation), festate);
+
+ initStringInfo(&buf);
+ appendStringInfo(&buf, "http://%s/%s/%s",
+ festate->hostname, festate->bucketname, festate->filename);
+ url = pstrdup(buf.data);
+
+ datestring = httpdate(NULL);
+ signature = s3_signature("GET", datestring,
+ festate->bucketname, festate->filename, festate->secretkey);
+
+ prefix = create_tempprefix(signature);
+ snprintf(ctx.flagfn, sizeof(ctx.flagfn), "%s.flag", prefix);
+ snprintf(ctx.datafn, sizeof(ctx.datafn), "%s.data", prefix);
+// unlink(ctx.flagfn);
+// unlink(ctx.datafn);
+ if (mkfifo(ctx.flagfn, S_IRUSR | S_IWUSR) != 0)
+ elog(ERROR, "mkfifo failed(%d):%s", errno, ctx.flagfn);
+ if (mkfifo(ctx.datafn, S_IRUSR | S_IWUSR) != 0)
+ elog(ERROR, "mkfifo failed(%d):%s", errno, ctx.datafn);
+
+
+ /*
+ * Fork to maximize parallelism of input from HTTP and output to SQL.
+ * The spawned child process cheats by on_exit_rest() to die immediately.
+ */
+ pid = fork_process();
+ if (pid == 0) /* child */
+ {
+ struct curl_slist *slist;
+ CURL *curl;
+ int sc;
+
+ MyProcPid = getpid(); /* reset MyProcPid */
+
+ /*
+ * The exit callback routines clean up
+ * unnecessary resources holded the parent process.
+ * The child dies silently when finishing its job.
+ */
+ on_exit_reset();
+
+ /*
+ * Set up request header list
+ */
+ slist = NULL;
+ initStringInfo(&buf);
+ appendStringInfo(&buf, "Date: %s", datestring);
+ slist = curl_slist_append(slist, buf.data);
+ resetStringInfo(&buf);
+ appendStringInfo(&buf, "Authorization: AWS %s:%s",
+ festate->accesskey, signature);
+ slist = curl_slist_append(slist, buf.data);
+ /*
+ * Set up CURL instance.
+ */
+ curl = curl_easy_init();
+ curl_easy_setopt(curl, CURLOPT_HTTPHEADER, slist);
+ curl_easy_setopt(curl, CURLOPT_URL, url);
+ curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, header_handler);
+ curl_easy_setopt(curl, CURLOPT_HEADERDATA, &ctx);
+ curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, body_handler);
+ curl_easy_setopt(curl, CURLOPT_WRITEDATA, &ctx);
+ sc = curl_easy_perform(curl);
+ if (ctx.datafp)
+ FreeFile(ctx.datafp);
+ if (sc != 0)
+ {
+ elog(NOTICE, "%s:curl_easy_perform = %d", url, sc);
+ unlink(ctx.datafn);
+ }
+ curl_slist_free_all(slist);
+ curl_easy_cleanup(curl);
+
+ proc_exit(0);
+ }
+ elog(DEBUG1, "child pid = %d", pid);
+
+ {
+ int status;
+ FILE *fp;
+
+ fp = AllocateFile(ctx.flagfn, PG_BINARY_R);
+ read(fileno(fp), &status, sizeof(int));
+ FreeFile(fp);
+ unlink(ctx.flagfn);
+ if (status != 200)
+ {
+ elog(ERROR, "bad input from API. Status code: %d", status);
+ }
+ }
+
+ /*
+ * Create CopyState from FDW options. We always acquire all columns, so
+ * as to match the expected ScanTupleSlot signature.
+ */
+ cstate = BeginCopyFrom(node->ss.ss_currentRelation,
+ ctx.datafn,
+ NIL,
+ festate->copy_options);
+
+ /*
+ * Save state in node->fdw_state. We must save enough information to call
+ * BeginCopyFrom() again.
+ */
+ festate->cstate = cstate;
+ festate->datafn = pstrdup(ctx.datafn);
+
+ node->fdw_state = (void *) festate;
+}
+
+/*
+ * s3IterateForeignScan
+ * Read next record from the data file and store it into the
+ * ScanTupleSlot as a virtual tuple
+ */
+static TupleTableSlot *
+s3IterateForeignScan(ForeignScanState *node)
+{
+ S3FdwExecutionState *festate = (S3FdwExecutionState *) node->fdw_state;
+ TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
+ bool found;
+ ErrorContextCallback errcontext;
+
+ /* Set up callback to identify error line number. */
+ errcontext.callback = CopyFromErrorCallback;
+ errcontext.arg = (void *) festate->cstate;
+ errcontext.previous = error_context_stack;
+ error_context_stack = &errcontext;
+
+ /*
+ * The protocol for loading a virtual tuple into a slot is first
+ * ExecClearTuple, then fill the values/isnull arrays, then
+ * ExecStoreVirtualTuple. If we don't find another row in the file, we
+ * just skip the last step, leaving the slot empty as required.
+ *
+ * We can pass ExprContext = NULL because we read all columns from the
+ * file, so no need to evaluate default expressions.
+ *
+ * We can also pass tupleOid = NULL because we don't allow oids for
+ * foreign tables.
+ */
+ ExecClearTuple(slot);
+ found = NextCopyFrom(festate->cstate, NULL,
+ slot->tts_values, slot->tts_isnull,
+ NULL);
+ if (found)
+ ExecStoreVirtualTuple(slot);
+
+ /* Remove error callback. */
+ error_context_stack = errcontext.previous;
+
+ return slot;
+}
+
+/*
+ * s3EndForeignScan
+ * Finish scanning foreign table and dispose objects used for this scan
+ */
+static void
+s3EndForeignScan(ForeignScanState *node)
+{
+ S3FdwExecutionState *festate = (S3FdwExecutionState *) node->fdw_state;
+
+ /* if festate is NULL, we are in EXPLAIN; nothing to do */
+ if (festate)
+ {
+ EndCopyFrom(festate->cstate);
+ unlink(festate->datafn);
+ }
+}
+
+/*
+ * s3ReScanForeignScan
+ * Rescan table, possibly with new parameters
+ */
+static void
+s3ReScanForeignScan(ForeignScanState *node)
+{
+ S3FdwExecutionState *festate = (S3FdwExecutionState *) node->fdw_state;
+
+ EndCopyFrom(festate->cstate);
+
+ festate->cstate = BeginCopyFrom(node->ss.ss_currentRelation,
+ festate->filename,
+ NIL,
+ festate->copy_options);
+}
+
+/*
+ * Estimate costs of scanning a foreign table.
+ */
+static void
+estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
+ const char *filename,
+ Cost *startup_cost, Cost *total_cost)
+{
+// struct stat stat_buf;
+ BlockNumber pages;
+// int tuple_width;
+ double ntuples;
+ double nrows;
+ Cost run_cost = 0;
+ Cost cpu_per_tuple;
+
+ /*
+ * Get size of the file. It might not be there at plan time, though, in
+ * which case we have to use a default estimate.
+ */
+// if (stat(filename, &stat_buf) < 0)
+// stat_buf.st_size = 10 * BLCKSZ;
+
+ /*
+ * Convert size to pages for use in I/O cost estimate below.
+ */
+// pages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
+// if (pages < 1)
+// pages = 1;
+ pages = 10;
+
+ /*
+ * Estimate the number of tuples in the file. We back into this estimate
+ * using the planner's idea of the relation width; which is bogus if not
+ * all columns are being read, not to mention that the text representation
+ * of a row probably isn't the same size as its internal representation.
+ * FIXME later.
+ */
+// tuple_width = MAXALIGN(baserel->width) + MAXALIGN(sizeof(HeapTupleHeaderData));
+
+// ntuples = clamp_row_est((double) stat_buf.st_size / (double) tuple_width);
+ ntuples = 1000;
+
+ /*
+ * Now estimate the number of rows returned by the scan after applying the
+ * baserestrictinfo quals. This is pretty bogus too, since the planner
+ * will have no stats about the relation, but it's better than nothing.
+ */
+ nrows = ntuples *
+ clauselist_selectivity(root,
+ baserel->baserestrictinfo,
+ 0,
+ JOIN_INNER,
+ NULL);
+
+ nrows = clamp_row_est(nrows);
+
+ /* Save the output-rows estimate for the planner */
+ baserel->rows = nrows;
+
+ /*
+ * Now estimate costs. We estimate costs almost the same way as
+ * cost_seqscan(), thus assuming that I/O costs are equivalent to a
+ * regular table file of the same size. However, we take per-tuple CPU
+ * costs as 10x of a seqscan, to account for the cost of parsing records.
+ */
+ run_cost += seq_page_cost * pages;
+
+ *startup_cost = baserel->baserestrictcost.startup;
+ cpu_per_tuple = cpu_tuple_cost * 10 + baserel->baserestrictcost.per_tuple;
+ run_cost += cpu_per_tuple * ntuples;
+ *total_cost = *startup_cost + run_cost;
+}
+
+static size_t
+header_handler(void *buffer, size_t size, size_t nmemb, void *userp)
+{
+ const char *HTTP_1_1 = "HTTP/1.1";
+ size_t segsize = size * nmemb;
+ s3_ipc_context *ctx = (s3_ipc_context *) userp;
+
+ if (strncmp(buffer, HTTP_1_1, strlen(HTTP_1_1)) == 0)
+ {
+ int status;
+
+ status = atoi((char *) buffer + strlen(HTTP_1_1) + 1);
+ ctx->flagfp = AllocateFile(ctx->flagfn, PG_BINARY_W);
+ write(fileno(ctx->flagfp), &status, sizeof(int));
+ FreeFile(ctx->flagfp);
+ if (status != 200)
+ {
+ ctx->datafp = NULL;
+ /* interrupt */
+ return 0;
+ }
+ /* iif success */
+// ctx->datafp = AllocateFile(ctx->dataname, PG_BINARY_W);
+ ctx->datafp = AllocateFile(ctx->datafn, PG_BINARY_W);
+ }
+
+ return segsize;
+}
+
+static size_t
+body_handler(void *buffer, size_t size, size_t nmemb, void *userp)
+{
+ size_t segsize = size * nmemb;
+ s3_ipc_context *ctx = (s3_ipc_context *) userp;
+
+ fwrite(buffer, size, nmemb, ctx->datafp);
+
+ return segsize;
+}
+
+static size_t
+write_data_to_buf(void *buffer, size_t size, size_t nmemb, void *userp)
+{
+ size_t segsize = size * nmemb;
+ StringInfo info = (StringInfo) userp;
+
+ appendBinaryStringInfo(info, (const char *) buffer, segsize);
+
+ return segsize;
+}
+
+static char *
+create_tempprefix(char *seed)
+{
+ char filename[MAXPGPATH], path[MAXPGPATH], *s;
+
+ snprintf(filename, sizeof(filename), "%u.%s", MyProcPid, seed);
+ s = &filename[0];
+ while(*s)
+ {
+ if (*s == '/')
+ *s = '%';
+ s++;
+ }
+ mkdir("base/" PG_TEMP_FILES_DIR, S_IRWXU);
+ snprintf(path, sizeof(path), "base/%s/%s", PG_TEMP_FILES_DIR, filename);
+
+ return pstrdup(path);
+}
+
+static bool
+presuffix_test(char *base, char *prefix, char *suffix)
+{
+ int len, plen, slen;
+
+ len = strlen(base);
+ plen = strlen(prefix);
+ slen = strlen(suffix);
+ if (len < plen + slen)
+ return false;
+
+ return memcmp(base, prefix, plen) == 0 &&
+ memcmp(base + len - slen, suffix, slen) == 0;
+}
+
+/*
+ * Clean up fifos on process exit.
+ * We don't care other process's fifo since it may be
+ * in use right now. It might be better to delete fifos
+ * as soone as possible, but they don't consume disk space
+ * so let's postpone it till exit, where it's sure to delete.
+ */
+static void
+s3_on_exit(int code, Datum arg)
+{
+ char prefix[32];
+ char *dirname = "base/" PG_TEMP_FILES_DIR;
+ DIR *dir;
+ struct dirent *ent;
+
+ snprintf(prefix, sizeof(prefix), "%d.", MyProcPid);
+ dir = AllocateDir(dirname);
+ while((ent = ReadDir(dir, dirname)) != NULL)
+ {
+ int len;
+
+ len = strlen(ent->d_name);
+ if (presuffix_test(ent->d_name, prefix, ".data") ||
+ presuffix_test(ent->d_name, prefix, ".flag"))
+ {
+ unlink(ent->d_name);
+ }
+ }
+
+ FreeDir(dir);
+}
+
+void
+_PG_init(void)
+{
+ on_proc_exit(s3_on_exit, (Datum) 0);
+// RegisterResourceReleaseCallback(s3_resource_release, NULL);
+}
5 s3_fdw.control
@@ -0,0 +1,5 @@
+# s3fdw extension
+comment = 'foreign-data wrapper for Amazon S3'
+default_version = '0.1.0'
+module_pathname = '$libdir/s3_fdw'
+relocatable = true
11 s3_fdw.sql
@@ -0,0 +1,11 @@
+CREATE FUNCTION s3test(text) RETURNS text AS
+'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION s3_fdw_handler() RETURNS fdw_handler AS
+'MODULE_PATHNAME' LANGUAGE C;
+CREATE FUNCTION s3_fdw_validator(text[], oid) RETURNS void AS
+'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FOREIGN DATA WRAPPER s3_fdw
+ HANDLER s3_fdw_handler
+ VALIDATOR s3_fdw_validator;
Please sign in to comment.
Something went wrong with that request. Please try again.