FFHS DAS Data Science  
Semesterarbeit FS20  
Iwan Imsand

# Datenbeschaffung ```tripdata```

# Einleitung

In diesem Notebook werden die nötigen Tripdaten beschafft.

Tripdaten sind hierbei Systemdaten über die einzelnen Fahrten die von [Citi Bike NYC](https://www.citibikenyc.com/) zur Verfügung gestellt werden. Unter [System Data](https://www.citibikenyc.com/system-data) werden die Verfügbaren Daten detaillierter beschrieben.

# Amazon S3 Bucket Helper

Die Daten werden über [Amazone Simple Storage Service (Amazon S3)](https://aws.amazon.com/s3/) in sogenannten [Amazon S3 Buckets](https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingBucket.html) zur Verfügung gestellt. In diesem Abschnitt wird eine Hilfsklasse erstellt, um den späteren Code zu vereinfachen und Details über Amazon S3 Buckets zu abstrahieren.

In [1]:
import os
from io import BytesIO
from zipfile import ZipFile

import boto3
import requests
from botocore import UNSIGNED
from botocore.config import Config


class S3Helper:
    
    
    def __init__(self):
        # see https://www.edureka.co/community/10528/is-there-any-way-to-use-boto3-anonymously
        self.s3_client = boto3.client('s3', config=Config(signature_version=UNSIGNED))        
    
    
    def bucket_items_iterator(self, bucket, key_filter=lambda key: key):
        """
        Generator that iterates over all objects in a given s3 bucket

        See http://boto3.readthedocs.io/en/latest/reference/services/s3.html#S3.Client.list_objects_v2
        for return data format
        :param bucket: name of s3 bucket
        :param key_filter: lambda function for filtering keys
        :return: dict of metadata for an object
        """

        paginator = self.s3_client.get_paginator('list_objects_v2')
        page_iterator = paginator.paginate(Bucket=bucket)

        for page in page_iterator:
            if page['KeyCount'] > 0:
                for item in page['Contents']:
                    if key_filter(item['Key']):
                        yield item


    def get_url(self, bucket, item):
        """
        Evaluates the download url for an item in the bucket content.

        :param bucket: name of s3 bucket
        :param item: item from the content of the s3 bucket
        :return: the download url for the given item
        """

        url = '{}/{}/{}'.format(self.s3_client.meta.endpoint_url, bucket, item['Key'])
        return url


    def zip_extract(self, url, path, file_filter=lambda file: file, filename_rename=lambda filename: filename):
        """
        Extracts the content of a zip file.

        :param url: the url
        :param path: the destination folder (will be created if not exists)
        :param file_filter: lambda function for filtering files in zip
        :param filename_rename: lambda function for renaming files when extracting
        """

        print('Downloading {}\n'.format(url))

        content = requests.get(url)
        zip_ref = ZipFile(BytesIO(content.content))
        zip_info_matches = [zip_info for zip_info in zip_ref.infolist() if file_filter(zip_info.filename)]

        try:
            for zip_info in zip_info_matches:
                try:
                    if not os.path.exists(path):
                        os.makedirs(path)
                    old_filename = zip_info.filename
                    new_filename = filename_rename(zip_info.filename)
                    print('Extracting {} from {} to {}/{}'.format(zip_info.filename, url, path, new_filename))
                    zip_info.filename = new_filename
                    zip_ref.extract(member=zip_info, path=path)
                except Exception as e:
                    print('ERROR for file {}/{}: {}', url, old_filename, e)

        finally:
            zip_ref.close()


In [2]:
s3 = S3Helper()

# Herunterladen und entpacken

Ab hier wird das eigentliche Herunterladen und entpacken aller Dateien erledigt. Damit dies schneller vonstatten geht, wurde das Herunterladen mit mehreren Prozessen parallelisiert.

In [3]:
import multiprocessing as mp
import re

In [4]:
# extract all files to this path
path = './../data/citibike/tripdata/src'

# the name of the bucket, see https://s3.amazonaws.com/tripdata/index.html
bucket = 'tripdata'

# filter the files in the bucket and download only files matching the given regular expression
key_filter_regex = '^[0-9]{4}.*\\.zip' # All zip files starting with 4 numbers.
#key_filter_regex = '^201910.*\\.zip' # Only the files of october 2019

In [5]:
def file_filter(filename):
    """
    Exclude files/folders from extraction. Some zip files contain files we want to ignore.
    
    :param filename: the filename to test
    """
    
    return not filename.startswith('__')

In [6]:
def filename_rename(filename):
    """
    Renames a filename and aligns it to schema like '201910-citibike-tripdata.csv'
    
    :param filename: the filename to align to the schema
    """
    
    marker = 'citibike-tripdata'
    if re.match('^[0-9]{6}.*\\.csv', filename):
        return '{}-{}.csv'.format(filename[0:6], marker)
    elif re.match('^[0-9]{4}-[0-9]{2}.*\\.csv', filename):
        return '{}{}-{}.csv'.format(filename[0:4], filename[5:7], marker)
    else:
        print('ERROR: No matching found for {}_{}.'.format(item['Key'], filename))

In [7]:
def download_and_extract(item):
    """
    Downlaods and extracts an item in the bucket.
    
    :param item: the bucket item
    """
    url = s3.get_url(bucket=bucket, item=item)
    s3.zip_extract(url=url, path=path, file_filter=file_filter, filename_rename=filename_rename)

Mit Hilfe der oben definierten Funktionen, wird nun in der folgenden Zelle alles heruntergeladen und entpackt.

**ACHTUNG: Die Grösse der entpackten Daten beträgt ca. 20GB! <font color='red'>Daher nicht auf einem langsamen oder mit wenig Speicherplatz ausgestattetem System ausführen!</font>**

In [8]:
items_iterator = s3.bucket_items_iterator(bucket=bucket, key_filter=lambda k: re.match(key_filter_regex, k))

try:
    # see https://www.machinelearningplus.com/python/parallel-processing-python/
    pool = mp.Pool(mp.cpu_count())
    pool.map(func=download_and_extract, iterable=items_iterator)
finally:
    pool.close()
    
print("*** Completed! ***")

Downloading https://s3.amazonaws.com/tripdata/201308-citibike-tripdata.zip
Downloading https://s3.amazonaws.com/tripdata/201307-201402-citibike-tripdata.zip
Downloading https://s3.amazonaws.com/tripdata/201306-citibike-tripdata.zip
Downloading https://s3.amazonaws.com/tripdata/201307-citibike-tripdata.zip
Downloading https://s3.amazonaws.com/tripdata/201311-citibike-tripdata.zip
Downloading https://s3.amazonaws.com/tripdata/201310-citibike-tripdata.zip
Downloading https://s3.amazonaws.com/tripdata/201309-citibike-tripdata.zip
Downloading https://s3.amazonaws.com/tripdata/201402-citibike-tripdata.zip
Downloading https://s3.amazonaws.com/tripdata/201401-citibike-tripdata.zip
Downloading https://s3.amazonaws.com/tripdata/201404-citibike-tripdata.zip
Downloading https://s3.amazonaws.com/tripdata/201312-citibike-tripdata.zip
Downloading https://s3.amazonaws.com/tripdata/201405-citibike-tripdata.zip

Downloading https://s3.amazonaws.com/tripdata/201403-citibike-tripdata.zip

Downloading http