Permalink
Browse files

adding a simple static file server

  • Loading branch information...
1 parent 05145b5 commit 8705c6fd89427c9e851e3738453d076f5fc1e022 @joewalker joewalker committed Nov 17, 2010
Showing with 242 additions and 0 deletions.
  1. +242 −0 static.py
View
242 static.py
@@ -0,0 +1,242 @@
+#!/usr/bin/env python2.4
+"""static - A stupidly simple WSGI way to serve static (or mixed) content.
+
+(See the docstrings of the various functions and classes.)
+
+Copyright (C) 2006-2009 Luke Arno - http://lukearno.com/
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the GNU Lesser General Public
+License as published by the Free Software Foundation; either
+version 2.1 of the License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public
+License along with this library; if not, write to:
+
+The Free Software Foundation, Inc.,
+51 Franklin Street, Fifth Floor,
+Boston, MA 02110-1301, USA.
+
+Luke Arno can be found at http://lukearno.com/
+
+"""
+
+import mimetypes
+import rfc822
+import time
+import string
+import sys
+from os import path, stat, getcwd
+from wsgiref import util
+from wsgiref.headers import Headers
+from wsgiref.simple_server import make_server
+from optparse import OptionParser
+
+try: from pkg_resources import resource_filename, Requirement
+except: pass
+
+try: import kid
+except: pass
+
+
+class MagicError(Exception): pass
+
+
+class StatusApp:
+ """Used by WSGI apps to return some HTTP status."""
+
+ def __init__(self, status, message=None):
+ self.status = status
+ if message is None:
+ self.message = status
+ else:
+ self.message = message
+
+ def __call__(self, environ, start_response, headers=[]):
+ if self.message:
+ Headers(headers).add_header('Content-type', 'text/plain')
+ start_response(self.status, headers)
+ if environ['REQUEST_METHOD'] == 'HEAD':
+ return [""]
+ else:
+ return [self.message]
+
+
+class Cling(object):
+ """A stupidly simple way to serve static content via WSGI.
+
+ Serve the file of the same path as PATH_INFO in self.datadir.
+
+ Look up the Content-type in self.content_types by extension
+ or use 'text/plain' if the extension is not found.
+
+ Serve up the contents of the file or delegate to self.not_found.
+ """
+
+ block_size = 16 * 4096
+ index_file = 'index.html'
+ not_found = StatusApp('404 Not Found')
+ not_modified = StatusApp('304 Not Modified', "")
+ moved_permanently = StatusApp('301 Moved Permanently')
+ method_not_allowed = StatusApp('405 Method Not Allowed')
+
+ def __init__(self, root, **kw):
+ """Just set the root and any other attribs passes via **kw."""
+ self.root = root
+ for k, v in kw.iteritems():
+ setattr(self, k, v)
+
+ def __call__(self, environ, start_response):
+ """Respond to a request when called in the usual WSGI way."""
+ if environ['REQUEST_METHOD'] not in ('GET', 'HEAD'):
+ headers = [('Allow', 'GET, HEAD')]
+ return self.method_not_allowed(environ, start_response, headers)
+ path_info = environ.get('PATH_INFO', '')
+ full_path = self._full_path(path_info)
+ if not self._is_under_root(full_path):
+ return self.not_found(environ, start_response)
+ if path.isdir(full_path):
+ if full_path[-1] <> '/' or full_path == self.root:
+ location = util.request_uri(environ, include_query=False) + '/'
+ if environ.get('QUERY_STRING'):
+ location += '?' + environ.get('QUERY_STRING')
+ headers = [('Location', location)]
+ return self.moved_permanently(environ, start_response, headers)
+ else:
+ full_path = self._full_path(path_info + self.index_file)
+ content_type = self._guess_type(full_path)
+ try:
+ etag, last_modified = self._conditions(full_path, environ)
+ headers = [('Date', rfc822.formatdate(time.time())),
+ ('Last-Modified', last_modified),
+ ('ETag', etag)]
+ if_modified = environ.get('HTTP_IF_MODIFIED_SINCE')
+ if if_modified and (rfc822.parsedate(if_modified)
+ >= rfc822.parsedate(last_modified)):
+ return self.not_modified(environ, start_response, headers)
+ if_none = environ.get('HTTP_IF_NONE_MATCH')
+ if if_none and (if_none == '*' or etag in if_none):
+ return self.not_modified(environ, start_response, headers)
+ file_like = self._file_like(full_path)
+ headers.append(('Content-Type', content_type))
+ start_response("200 OK", headers)
+ if environ['REQUEST_METHOD'] == 'GET':
+ return self._body(full_path, environ, file_like)
+ else:
+ return ['']
+ except (IOError, OSError), e:
+ print e
+ return self.not_found(environ, start_response)
+
+ def _full_path(self, path_info):
+ """Return the full path from which to read."""
+ return self.root + path_info
+
+ def _is_under_root(self, full_path):
+ """Guard against arbitrary file retrieval."""
+ if (path.abspath(full_path) + path.sep)\
+ .startswith(path.abspath(self.root) + path.sep):
+ return True
+ else:
+ return False
+
+ def _guess_type(self, full_path):
+ """Guess the mime type using the mimetypes module."""
+ return mimetypes.guess_type(full_path)[0] or 'text/plain'
+
+ def _conditions(self, full_path, environ):
+ """Return a tuple of etag, last_modified by mtime from stat."""
+ mtime = stat(full_path).st_mtime
+ return str(mtime), rfc822.formatdate(mtime)
+
+ def _file_like(self, full_path):
+ """Return the appropriate file object."""
+ return open(full_path, 'rb')
+
+ def _body(self, full_path, environ, file_like):
+ """Return an iterator over the body of the response."""
+ way_to_send = environ.get('wsgi.file_wrapper', iter_and_close)
+ return way_to_send(file_like, self.block_size)
+
+
+def iter_and_close(file_like, block_size):
+ """Yield file contents by block then close the file."""
+ while 1:
+ try:
+ block = file_like.read(block_size)
+ if block: yield block
+ else: raise StopIteration
+ except StopIteration, si:
+ file_like.close()
+ return
+
+
+def cling_wrap(package_name, dir_name, **kw):
+ """Return a Cling that serves from the given package and dir_name.
+
+ This uses pkg_resources.resource_filename which is not the
+ recommended way, since it extracts the files.
+
+ I think this works fine unless you have some _very_ serious
+ requirements for static content, in which case you probably
+ shouldn't be serving it through a WSGI app, IMHO. YMMV.
+ """
+ resource = Requirement.parse(package_name)
+ return Cling(resource_filename(resource, dir_name), **kw)
+
+
+def command():
+ parser = OptionParser(usage="%prog DIR [HOST][:][PORT]",
+ version="static 0.3.6")
+ options, args = parser.parse_args()
+ if len(args) in (1, 2):
+ if len(args) == 2:
+ parts = args[1].split(":")
+ if len(parts) == 1:
+ host = parts[0]
+ port = None
+ elif len(parts) == 2:
+ host, port = parts
+ else:
+ sys.exit("Invalid host:port specification.")
+ elif len(args) == 1:
+ host, port = None, None
+ if not host:
+ host = '0.0.0.0'
+ if not port:
+ port = 9999
+ try:
+ port = int(port)
+ except:
+ sys.exit("Invalid host:port specification.")
+ app = Cling(args[0])
+ try:
+ make_server(host, port, app).serve_forever()
+ except KeyboardInterrupt, ki:
+ print "Cio, baby!"
+ except:
+ sys.exit("Problem initializing server.")
+ else:
+ parser.print_help(sys.stderr)
+ sys.exit(1)
+
+
+def test():
+ from wsgiref.validate import validator
+ app = Cling(getcwd())
+ try:
+ print "Serving " + getcwd() + " to http://localhost:9999"
+ make_server('localhost', 9999, validator(app)).serve_forever()
+ except KeyboardInterrupt, ki:
+ print ""
+ print "Ciao, baby!"
+
+
+if __name__ == '__main__':
+ test()
+

0 comments on commit 8705c6f

Please sign in to comment.