# Citibike Data Pipeline

This notebook implements a workflow to download, extract, and convert Citibike ZIP files from an S3 bucket into a cleansed Parquet file. The pipeline follows a medallion architecture:
- **01raw:** Contains the raw ZIP files and their extracted CSV files.
- **02cleansed:** Contains the final cleansed Parquet file.

Before downloading new data, the raw and cleansed folders are cleared so that only the current month's data remains.

Process

1. **Fetch the list of ZIP file links**  
   Retrieve the list of available ZIP files from the base URL.

2. **Determine the target ZIP file**
   - If a specific timestamp is provided, look for it in the filename.
   - Otherwise, select the latest available file.

3. **Check if the target file is already present**
   - If the file exists and `force_download` is set to `False`, skip the download.
   - If not, clear the `raw` and `cleansed` folders to ensure only new data remains.

4. **Download the ZIP file**  
   Save the file into the `raw` folder.

5. **Extract its contents**  
   Extract the ZIP file into a designated subfolder.

6. **Process the extracted CSV file**
   - Locate the CSV file in the extracted contents.
   - Convert it to a Parquet file and save it in the `cleansed` folder.


---
## Step 1: Imports

In [1]:
import os
import re
import time
import logging
import requests
import zipfile
import pandas as pd
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager

## Step 2: Define a Helper Function to Clear Folders

The `clear_folder` function deletes all files and subdirectories within a specified folder, neccessary to ensure that only one month of data is saved


In [2]:
def clear_folder(folder):
    """
    Remove all files and subdirectories in the given folder.

    This function ensures that the folder is empty. We use it to clean the 
    raw and cleansed folders before a new download, ensuring that only the new
    month's data remains.
    """
    if os.path.exists(folder):
        for filename in os.listdir(folder):
            file_path = os.path.join(folder, filename)
            try:
                if os.path.isfile(file_path) or os.path.islink(file_path):
                    os.unlink(file_path)  # Remove the file or link
                elif os.path.isdir(file_path):
                    import shutil
                    shutil.rmtree(file_path)  # Recursively delete directories
            except Exception as e:
                logging.error(f"Failed to delete {file_path}. Reason: {e}")


## Step 3: Define Core Functions

Core functions that:
- **get_zip_links:** Use Selenium and BeautifulSoup to fetch all ZIP file links from the given URL.
- **extract_latest_file:** Choose the latest ZIP file based on the date found in the filename.
- **download_file:** Download a file from a URL (if it doesn't already exist).
- **extract_zip_file:** Extract the contents of the ZIP file.
- **convert_csv_to_parquet:** Convert the extracted CSV file into a Parquet file.
- **get_newest_data:** Tie all the above steps together into a full workflow


In [3]:
def get_zip_links(base_url, selenium_options=None):
    """
    Fetch all ZIP file links from the given URL using Selenium.

    We load the page using Selenium (to handle dynamic content) and then use
    BeautifulSoup to find links ending with '.zip'. If a link is relative,
    we prepend the S3 bucket URL.
    """
    options = selenium_options or webdriver.ChromeOptions()
    options.add_argument('--headless')
    options.add_argument('--no-sandbox')
    options.add_argument('--disable-dev-shm-usage')

    try:
        driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
        driver.get(base_url)
        time.sleep(3)  # Wait for the page to load
        page_source = driver.page_source
        driver.quit()
    except Exception as e:
        logging.error(f"Error while fetching the page: {e}")
        return []

    soup = BeautifulSoup(page_source, 'html.parser')
    links = [a['href'] for a in soup.find_all('a', href=True) if a['href'].endswith('.zip')]
    return [link if link.startswith("http") else f"https://s3.amazonaws.com/tripdata/{link}" for link in links]

def extract_latest_file(links):
    """
    From a list of ZIP file links, select the one with the most recent date.

    This function uses a regular expression to extract a date from each link,
    sorts the links by date (in descending order), and returns the first (latest) link.
    """
    def extract_date(link):
        match = re.search(r"(\d{4}(?:\d{2}){1,2})", link)
        return match.group(1) if match else None

    links_with_dates = [(link, extract_date(link)) for link in links]
    links_with_dates = [item for item in links_with_dates if item[1]]
    if not links_with_dates:
        logging.warning("No valid dates found in the file links.")
        return None

    links_with_dates.sort(key=lambda x: x[1], reverse=True)
    return links_with_dates[0][0]

def download_file(url, folder):
    """
    Download a file from a given URL and save it into the specified folder.

    If the file already exists, the download is skipped.
    """
    os.makedirs(folder, exist_ok=True)
    filename = os.path.join(folder, url.split("/")[-1])

    if os.path.exists(filename):
        logging.info(f"File already exists: {filename}. Skipping download.")
        return filename

    try:
        response = requests.get(url, stream=True)
        response.raise_for_status()
        with open(filename, 'wb') as file:
            for chunk in response.iter_content(chunk_size=1024):
                file.write(chunk)
        logging.info(f"Downloaded: {filename}")
        return filename
    except Exception as e:
        logging.error(f"Failed to download {url}: {e}")
        return None

def extract_zip_file(zip_path, extract_to):
    """
    Extract a ZIP file to the specified folder.

    Returns a list of file paths that were extracted.
    """
    os.makedirs(extract_to, exist_ok=True)
    try:
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(extract_to)
            logging.info(f"Extracted {zip_path} to {extract_to}")
            return [os.path.join(extract_to, file) for file in zip_ref.namelist()]
    except Exception as e:
        logging.error(f"Failed to extract {zip_path}: {e}")
        return []

def convert_csv_to_parquet(csv_file, parquet_path):
    """
    Read a CSV file and convert it to Parquet format.

    All columns are initially read as strings to handle mixed types, and columns
    ending with '_id' are optionally converted to numeric. The resulting DataFrame
    is saved as a Parquet file.
    """
    try:
        df = pd.read_csv(csv_file, dtype=str, low_memory=False)
        logging.info(f"Loaded CSV file: {csv_file}")

        # Convert columns ending with '_id' to numeric, if possible
        for col in df.columns:
            try:
                if col.endswith("_id"):
                    df[col] = pd.to_numeric(df[col], errors='coerce')
            except Exception as e:
                logging.warning(f"Failed to convert column {col}: {e}")

        df.to_parquet(parquet_path, index=False)
        logging.info(f"Converted {csv_file} to Parquet: {parquet_path}")
        return parquet_path

    except Exception as e:
        logging.error(f"Failed to convert {csv_file} to Parquet: {e}")
        return None


## Step 4: Execution Function
1. **Fetch the list of ZIP file links**  
2. **Determine the target ZIP file**
3. **Check if the target file is already present**
4. **Download the ZIP file** 
5. **Extract its contents**  
6. **Process the extracted CSV file**

In [4]:
def get_newest_data(
    base_url,
    raw_folder="01raw",
    extract_subfolder="csv_files",
    cleaned_folder="02cleansed",
    parquet_file="cleaned_data.parquet",
    timestamp=None,         # Use "latest" or None for the newest file; otherwise, provide a specific timestamp string
    force_download=False    # If True, force a new download even if data exists
):
    """

    
    1. Fetch the list of ZIP file links from the base URL.
    2. Determine the target ZIP file:
       - If a specific timestamp is provided, look for it in the filename.
       - Otherwise, select the latest available file.
    3. Check if the target file is already present:
       - If it is and force_download is False, skip the download.
       - Otherwise, clear the raw and cleansed folders so only the new data remains.
    4. Download the ZIP file into the raw folder.
    5. Extract its contents into a designated subfolder.
    6. Find the CSV file in the extracted files and convert it to a Parquet file in the cleansed folder.
    """
    # Ensure that the raw and cleansed folders exist
    os.makedirs(raw_folder, exist_ok=True)
    os.makedirs(cleaned_folder, exist_ok=True)
    extract_folder = os.path.join(raw_folder, extract_subfolder)
    os.makedirs(extract_folder, exist_ok=True)
    
    logging.info("Fetching ZIP file links...")
    zip_links = get_zip_links(base_url)
    if not zip_links:
        logging.error("No ZIP files found.")
        return None

    # Select the target ZIP file based on the timestamp (or default to latest)
    if timestamp and timestamp.lower() != "latest":
        target_link = None
        for link in zip_links:
            if timestamp in link:
                target_link = link
                break
        if not target_link:
            logging.error(f"No file found with timestamp {timestamp}.")
            return None
    else:
        target_link = extract_latest_file(zip_links)
        if not target_link:
            logging.error("Could not determine the latest file.")
            return None

    logging.info(f"Target ZIP file: {target_link.split('/')[-1]}")

    # Define local file paths for the ZIP and final Parquet file
    local_zip_path = os.path.join(raw_folder, target_link.split("/")[-1])
    parquet_path = os.path.join(cleaned_folder, parquet_file)

    # If data already exists (and force_download is False), skip download;
    # otherwise, clear the folders so that only the new data will remain.
    if not force_download and os.path.exists(local_zip_path) and os.path.exists(parquet_path):
        logging.info("Data already downloaded and cleansed. Skipping download.")
        return parquet_path
    else:
        logging.info("Clearing raw and cleansed folders for new data download...")
        clear_folder(raw_folder)
        clear_folder(cleaned_folder)
        # Recreate the extraction folder after clearing the raw folder
        os.makedirs(extract_folder, exist_ok=True)

    # Download the ZIP file
    downloaded_file = download_file(target_link, raw_folder)
    if not downloaded_file:
        return None

    # Extract the ZIP file into the extraction folder
    extracted_files = extract_zip_file(downloaded_file, extract_to=extract_folder)
    if not extracted_files:
        return None

    # Convert the first CSV file found in the extracted files to Parquet
    for file in extracted_files:
        if file.endswith(".csv"):
            logging.info(f"Converting {file} to Parquet...")
            return convert_csv_to_parquet(file, parquet_path)

    logging.error("No CSV file found in the extracted files.")
    return None


## Conclusion

Data pipeline for the Citibike dataset:
- **01raw:** Holds the raw ZIP and CSV files (cleared before each new download).
- **02cleansed:** Contains the final cleansed Parquet file (also cleared before each new download).

Each time you run the notebook, it checks for new data (or a specific timestamp), clears the previous data, downloads, extracts, and converts the data .
