<a href="https://colab.research.google.com/github/jacksongoode/ia-remote-upload/blob/main/ia_remote_upload_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# @title Install requirements
%pip install internetarchive setuptools tqdm

Please add your access key and secret key from your `ia.ini` file that should be in your home directory after you've configured the `ia` utility.

The number of **workers** corresponds to the number of threads and active parallel processes that will upload files. The **id_type** determines how the id will be generated from the CSV, it can either be specified manually based on the identifier column, hash of the row (combination of all data within the row), or generated randomly (`identifier`, `hash`, `random` as options).

After doing this, upload your CSV within the files section to the left and change the CSV path to the name of your CSV in the **csv_path** below.

In [None]:
# @title Script
access_key = "" # @param {type:"string"}
secret_key = "" # @param {type:"string"}
workers = 1 # @param {type:"integer"}
id_type = "hash" # @param ["hash", "identifier", "random"]
skip = True # @param {type:"boolean"}
csv_path = "test.csv" # @param {type:"string"}
"""
Uploads files to the Internet Archive based on a CSV containing file URLs and metadata.

The main entry point is process_csv(), which takes a CSV path, IA keys, number of workers
(concurrent downloads), ID type and handling existing uploads. First it reads the CSV, spawns
threads to process each row, downloads the file, uploads it to IA, and logs the result. Helper
functions handle the individual steps.
"""

import argparse
import configparser
import csv
import hashlib
import logging
import os
import random
import string
import tempfile
import time
from urllib.parse import quote, urlparse

import requests
from internetarchive import delete, get_item, get_session, upload
from tqdm.notebook import tqdm
from tqdm.contrib.concurrent import thread_map


def configure_logging():
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    if logger.handlers:
        logger.handlers.clear()

    log_format = logging.Formatter(
        "%(asctime)s %(levelname)s: %(message)s", datefmt="%H:%M:%S"
    )

    file_handler = logging.FileHandler("log.txt")
    file_handler.setFormatter(log_format)

    console_handler = logging.StreamHandler()
    console_handler.setFormatter(log_format)

    logger.addHandler(file_handler)
    logger.addHandler(console_handler)


def load_session(config_file):
    return get_session(config_file=config_file)


def create_identifier(id_type, row=None):
    if id_type == "hash":
        row_str = "".join(str(value) for value in row.values())
        hash_object = hashlib.md5(row_str.encode(), usedforsecurity=False)
        identifier = hash_object.hexdigest()
    else:
        # 8 * 10^32 possibilities
        identifier = "".join(random.choices(string.ascii_letters + string.digits, k=8))
    return identifier


def clean_metadata_text(text):
    return text.replace("\x00", "")


def clean_csv_data(csv_data):
    cleaned_data = []
    for row in csv_data:
        cleaned_row = {k: clean_metadata_text(v) for k, v in row.items()}
        cleaned_data.append(cleaned_row)
    return cleaned_data


def write_failed_url(url, file_path="failed.txt"):
    with open(file_path, "a") as f:
        f.write(url + "\n")


def download_file_with_progress(url, output_path):
    response = requests.get(url, stream=True, timeout=60)
    total_size = int(response.headers.get("Content-Length", 0))

    with open(output_path, "wb") as f, tqdm(
        desc=output_path,
        total=total_size,
        unit="B",
        unit_scale=True,
        unit_divisor=1024,
    ) as bar:
        for data in response.iter_content(chunk_size=1024 * 1024):  # 1MB chunks
            f.write(data)
            bar.update(len(data))

    return True


def encode_url(url):
    parsed_url = urlparse(url)
    path = quote(parsed_url.path, safe="/+")
    return f"{parsed_url.scheme}://{parsed_url.netloc}{path}"


def upload_to_internet_archive(file_path, metadata, keys, identifier):
    try:
        logging.info(f"Uploading {os.path.basename(file_path)} to {identifier}.")
        upload(
            identifier,
            files=[file_path],
            metadata=metadata,
            access_key=keys["access_key"],
            secret_key=keys["secret_key"],
            retries=3,
            retries_sleep=5,
            verify=True,
            delete=True
        )
        logging.info(
            f"Successfully uploaded {os.path.basename(file_path)} to {identifier}."
        )
        return True
    except Exception as e:
        logging.error(f"Failed to upload {os.path.basename(file_path)}: {e}")
        return False


def process_row(row, keys, sleep=3, id_type="hash", skip=True):
    file_url = encode_url(row["file"])
    file_name = os.path.basename(file_url)
    identifier = None

    if id_type == "identifier":
        identifier = row["identifier"]
    elif id_type == "hash":
        # Use row to generate hash
        identifier = create_identifier(id_type, row)
    else:
        identifier = create_identifier(id_type)

    # Check if ID already exists
    if get_item(identifier).exists:
        if skip:
            logging.info(f"Skipping existing item: {row['identifier']}")
        else:
            logging.error(f"Item already exists: {row['identifier']}")
        return

    logging.info(f"Starting download for {file_name} from {file_url}")

    with tempfile.NamedTemporaryFile(
        delete=False, suffix=os.path.splitext(file_name)[1]
    ) as temp_file:
        local_file_path = temp_file.name

    if download_file_with_progress(file_url, local_file_path):
        logging.info(f"Downloaded {file_name} to {local_file_path}")

        metadata = {
            key: value
            for key, value in row.items()
            if key not in ["identifier", "file"]
        }

        if upload_to_internet_archive(local_file_path, metadata, keys, identifier):
            if os.path.exists(local_file_path):
                os.remove(local_file_path)
                logging.info(f"Removed local file {local_file_path}")
        else:
            write_failed_url(file_url)
    else:
        write_failed_url(file_url)
        os.remove(local_file_path)

    time.sleep(random.uniform(0, sleep))


def process_csv(csv_path, keys, id_type="hash", skip=True, max_workers=3):
    with open(csv_path, newline="", encoding="utf-8") as csvfile:
        csv_data = list(csv.DictReader(csvfile))
    
    cleaned_data = clean_csv_data(csv_data)

    thread_map(
        lambda row: process_row(row, keys, id_type=id_type, skip=skip),
        cleaned_data,
        max_workers=max_workers
    )


if __name__ == "__main__":
    configure_logging()

    keys = {"access_key": access_key, "secret_key": secret_key}

    process_csv(
        csv_path,
        keys,
        id_type=id_type,
        skip=skip,
        max_workers=workers)
