Skip to content

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
...
  • 9 commits
  • 3 files changed
  • 0 commit comments
  • 1 contributor
Showing with 154 additions and 61 deletions.
  1. +121 −56 liveweb/arc_proxy.py
  2. +26 −4 liveweb/filetools.py
  3. +7 −1 liveweb/webapp.py
View
177 liveweb/arc_proxy.py
@@ -7,12 +7,15 @@
import httplib
import logging
import os
+import random
import socket
import struct
import urllib
import urlparse
import random
import string
+from cStringIO import StringIO
+
import redis
from warc import arc
@@ -20,18 +23,6 @@
from .errors import BadURL, ConnectionFailure
from . import config
-def get_ip_address(ifname):
- """Return the IP address of the requested interface.
- This was obtained from
-
- http://code.activestate.com/recipes/439094-get-the-ip-address-associated-with-a-network-inter/
- """
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- return socket.inet_ntoa(fcntl.ioctl(
- s.fileno(),
- 0x8915, # SIOCGIFADDR
- struct.pack('256s', ifname[:15])
- )[20:24])
def get_storage_location(url):
"""This function is to be used to spread the record writes across
@@ -43,24 +34,27 @@ def get_storage_location(url):
def random_string(length):
return "".join(random.choice(string.letters) for i in range(length))
-def write_arc_file(url, arc_record):
- # XXX-Anand: Why is url passed as argument?
- # Can't we get it from the arc_record
-
+def get_arc_file_name(url):
location = get_storage_location(url)
- # TODO: Need to understand what this format is.
- # alexa-web-20110904174118-00038/51_23_20110804161301_crawl103.arc.gz
now = datetime.datetime.now()
+
arc_file_name = location + "/liveweb-%s-%s.arc.gz" % (now.strftime("%Y%m%d%H%M%S"), random_string(5))
-
- outfile = gzip.GzipFile(arc_file_name + ".tmp", "wb")
+
+ return arc_file_name
+
+def write_arc_file(arc_file_name, arc_record):
+ # TODO: Need to understand what this format is.
+ # alexa-web-20110904174118-00038/51_23_20110804161301_crawl103.arc.gz
+
+ fp = open(arc_file_name + ".tmp", "wb")
+ outfile = gzip.GzipFile(filename = "", fileobj = fp)
arc_record.write_to(outfile)
outfile.close()
os.rename(arc_file_name + ".tmp", arc_file_name)
file_size = os.stat(arc_file_name).st_size
- return file_size, arc_file_name
+ return file_size
def decompose_url(url):
@@ -70,6 +64,7 @@ def decompose_url(url):
TODO: This logic might belong in the web app rather than the
TODO: arc_proxy module. It'll have to be done for WARCs too.
"""
+
scheme, netloc, path, query, fragment, = urlparse.urlsplit(url)
if not netloc: # This will happen if there are issues with URLs like www.google.com
scheme, netloc, path, query, fragment, = urlparse.urlsplit("http://"+url)
@@ -82,17 +77,14 @@ def decompose_url(url):
resource = "/"
return netloc, resource
-def retrieve_url(url):
+def establish_connection(url):
"""
- Fetches the given url using an HTTP GET and returns the complete
- HTTP transaction.
+ Establishes an HTTP connection to the given URL and returns the
+ HTTPResponse Object.
This uses thes spyfile class to get the actual transaction without
any modifications made by by httplib.
- Returns the HTTPResponse Object, the actual data sent back on the
- line and the ip address of the remote host
-
"""
server, resource = decompose_url(url)
logging.debug("Attempting to fetch '%s' from '%s'", resource, server)
@@ -110,16 +102,8 @@ def retrieve_url(url):
conn.request("GET", resource, headers=headers)
except socket.gaierror:
raise ConnectionFailure()
-
- # In some cases conn.sock is becoming None after reading the data.
- # Getting the remoteaddr early to avoid trouble.
- remoteaddr = conn.sock.getpeername()[0]
-
- response = conn.getresponse()
- fp = response.fp
- response.read()
- line_data = fp.buf.getvalue() # TODO: Stream this data back instead of this one shot read.
- return response, line_data, remoteaddr
+
+ return conn
def get(url):
"""Returns the content of the URL as an ARC record.
@@ -134,16 +118,14 @@ def get(url):
content = cache.get(url)
if content is None:
logging.info("cache miss: %s", url)
- size, arc_file_name = live_fetch(url)
+ size, arc_file_handle = live_fetch(url)
# too big to cache, just return the file from disk
if size > config.max_cacheable_size:
logging.info("too large to cache: %d", size)
- return size, open(arc_file_name)
+ return size, arc_file_handle
- # TODO: ideally live_fetch should give us a file object, it can be
- # either StringIO or real file depending on the size
- content = open(arc_file_name).read()
+ content = arc_file_handle.read()
cache.setex(url, config.expire_time, content)
else:
logging.info("cache hit: %s", url)
@@ -152,22 +134,105 @@ def get(url):
cache.expire(url, config.expire_time)
return len(content), content
+
def live_fetch(url):
"""Downloads the content of the URL from web and returns it as an ARC
record.
+
+ This will attempt to donwload the file into memory and write it to
+ disk.
+
+ However, if it finds that the file is larger than 10MB, it will
+ resort to streaming the data straight onto disk in a temporary
+ file and then process the arc file at the end. This will require
+ double the I/O but will be sufficiently rare to justify this
+ approach.
+
+ Cf. http://www.optimizationweek.com/reviews/average-web-page/
+
+
"""
- http_response, payload, remote_ip_address = retrieve_url(url)
- headers = http_response.getheaders()
- content_type = http_response.getheader('content-type',"application/octet-stream").split(';')[0]
-
- headers = dict(url = url,
- ip_address = remote_ip_address,
- date = datetime.datetime.utcnow(),
- content_type = content_type,
- length = len(payload)
- )
- arc_record = arc.ARCRecord(headers = headers, payload = payload)
-
- size, file_name = write_arc_file(url, arc_record)
- return size, file_name
+ initial_chunk_size = 10 * 1024 * 1024 # 10 MB
+
+ conn = establish_connection(url)
+ response = conn.getresponse()
+ remote_ip = conn.sock.getpeername()[0]
+ spyfile = response.fp
+ response.read(initial_chunk_size)
+
+
+ initial_data = spyfile.buf.getvalue()
+ data_length = len(initial_data)
+
+ arc_file_name = get_arc_file_name(url)
+
+ if data_length < initial_chunk_size: # We've read out the whole data
+ # Create regular arc file here
+ arc_record = arc.ARCRecord(headers = dict(url = url,
+ date = datetime.datetime.utcnow(),
+ content_type = response.getheader("content-type","application/octet-stream"),
+ ip_address = remote_ip,
+ length = data_length),
+ payload = initial_data,
+ version = 1)
+
+ size = write_arc_file(arc_file_name, arc_record)
+ # This is an optimisation to return the in memory payload so
+ # that we don't have to read it off the disk again. This
+ # takes the arc_record we've created, writes it to a StringIO
+ # (compressed_stream) via a GzipFile so that it's compressed
+ # and then returns a handle to compressed_stream.
+ spyfile.buf.seek(0)
+ compressed_stream = StringIO()
+
+ compressed_file = gzip.GzipFile(fileobj = compressed_stream, mode = "w")
+ arc_record.write_to(compressed_file)
+ compressed_file.close()
+
+ compressed_stream.seek(0)
+ arc_file_handle = compressed_stream
+ else:
+ # TODO: This block probably needs to be moved off to multiple functions
+ payload_file_name = arc_file_name + ".tmp.payload"
+ payload_file = open(payload_file_name, "wb")
+
+ data_length = response.getheader("content-length","XXX") # We won't have the size for streaming data.
+
+ # First write out the header (as much we have anyway)
+ arc_header = arc.ARCHeader(url = url,
+ date = datetime.datetime.utcnow(),
+ content_type = response.getheader("content-type", "application/octet-stream"),
+ ip_address = remote_ip,
+ length = data_length)
+
+ # Now deal with the payload
+ # Now first write the payload which we've already read into the file.
+ payload_file.write(initial_data)
+ # Then stream in the rest of the payload by changing the spy file
+ spyfile.change_spy(payload_file)
+ response.read()
+ payload_file.close()
+
+ payload_size = os.stat(payload_file_name).st_size
+
+ # Fix the content-length in the header if necessary
+ if arc_header['length'] == "XXX":
+ arc_header['length'] = payload_size
+
+ # Create the actual file
+ f = open(arc_file_name + ".tmp", "wb")
+ arc_file = gzip.GzipFile(filename = "" , fileobj = f)
+ payload = open(payload_file_name, "rb") #Reopen for read
+ # TODO: Write one file into another?
+ arc_record = arc.ARCRecord(header = arc_header, payload = payload, version = 1)
+ arc_record.write_to(arc_file)
+ arc_file.close()
+
+ os.unlink(payload_file_name)
+ os.rename(arc_file_name + ".tmp", arc_file_name)
+
+ size = os.stat(arc_file_name).st_size
+ arc_file_handle = open(arc_file_name, "rb")
+
+ return size, arc_file_handle
View
30 liveweb/filetools.py
@@ -1,19 +1,35 @@
from cStringIO import StringIO
import httplib
-def spy(fileobj):
+def spy(fileobj, spyobj = None):
"""Returns a new file wrapper the records the contents of a file
as someone is reading from it.
"""
- return SpyFile(fileobj)
+ return SpyFile(fileobj, spyobj)
class SpyFile:
"""File wrapper to record the contents of a file as someone is
reading from it.
+
+ If the "spy" parameter is passed, it will be the stream to which
+ the read data is written.
+
+ SpyFile works like a "tee"
+
+ -------------
+ Actual client <--- SpyFileObject <--- Data Source
+ ____ ____
+ \ | /
+ |
+ V
+ spy
+ (spy object)
+
+
"""
- def __init__(self, fileobj):
+ def __init__(self, fileobj, spy = None):
self.fileobj = fileobj
- self.buf = StringIO()
+ self.buf = spy or StringIO()
def read(self, *a, **kw):
text = self.fileobj.read(*a, **kw)
@@ -27,6 +43,12 @@ def readline(self, *a, **kw):
def close(self):
self.fileobj.close()
+
+ def change_spy(self, fileobj):
+ "Changes the file which recives the spied upon data to fileobj"
+ self.buf.flush()
+ self.buf.close()
+ self.buf = fileobj
View
8 liveweb/webapp.py
@@ -7,6 +7,7 @@
from . import arc_proxy
from . import errors
+logging.basicConfig(level = logging.DEBUG)
class application:
@@ -18,7 +19,12 @@ def __init__(self, environ, start_response):
def parse_request(self):
self.method = self.environ['REQUEST_METHOD']
- self.url = self.environ['REQUEST_URI'] #TODO: Is this a valid environment variable always?
+ if 'REQUEST_URI' in self.environ: # This is for uwsgi
+ self.url = self.environ['REQUEST_URI'] #TODO: Is this a valid environment variable always?
+ if 'RAW_URI' in self.environ: # This is for gunicorn
+ self.url = self.environ['RAW_URI'] #TODO: Is this a valid environment variable always?
+
+
# Allow accessing the proxy using regular URL so that we can use
# tools like ab.

No commit comments for this range

Something went wrong with that request. Please try again.