In [None]:
import os
import boto3

# Initialize a session using Amazon S3
session = boto3.session.Session(
    aws_access_key_id='...',
    aws_secret_access_key='...',
)

# Create an S3 client
s3 = session.client('s3')

# Define the bucket name and the key of the manifest file
bucket_name = 'arxiv'

In [None]:
def download_arxiv_file(origin_path, target_path=None, verbose=True):
    if target_path is None:
        target_path = origin_path
    target_path = './downloaded/' + target_path
    # Ensure the directory exists
    directory = os.path.dirname(target_path)
    if not os.path.exists(directory):
        os.makedirs(directory)
    if os.path.exists(target_path):
        if verbose:
            print(f'🐤 {origin_path} already downloaded to {target_path}')
        return
    try:
        s3.download_file(bucket_name, origin_path, target_path, ExtraArgs={'RequestPayer': 'requester'})
        if verbose:
            print(f'🥳 Download ({origin_path}) successfully.')
    except Exception as e:
        print(f"❗️ An error occurred: {e}")

In [None]:
download_arxiv_file('pdf/arXiv_pdf_manifest.xml')
download_arxiv_file('src/arXiv_src_manifest.xml')

# Process PDF & SRC manifest files

In [None]:
import xmltodict

with open('./downloaded/pdf/arXiv_pdf_manifest.xml', 'r') as manifest_pdf_file:
    manifest_pdf_dict = xmltodict.parse(manifest_pdf_file.read())

with open('./downloaded/src/arXiv_src_manifest.xml', 'r') as manifest_src_file:
    manifest_src_dict = xmltodict.parse(manifest_src_file.read())

manifest_pdf_files = manifest_pdf_dict['arXivPDF']['file']
manifest_src_files = manifest_src_dict['arXivSRC']['file']

len(manifest_pdf_files), len(manifest_src_files)

In [None]:
manifest_files = manifest_pdf_files + manifest_src_files
len(manifest_files)

In [None]:
sample_dates = [
    '0001',
    '0002',
    '0003',
    '0004',
    '0005',
    '0006',
    '0007',
    '0008',
    '0009',
    '0010',
    '0011',
    '0012',
    '0501',
    '1001',
    '1501',
    '2001',
]

sample_files = { date: [item for item in manifest_files if date in item['filename']] for date in sample_dates }
sample_filenames = [item['filename'] for sample_date in sample_files.keys() for item in sample_files[sample_date]]
sample_filenames.sort()
sample_filenames

In [None]:
# The following files might be missing from the manifest file. So download them manually.
# Comment out this block if you don't want to override the download list.
sample_filenames = [
    'pdf/arXiv_pdf_0009_001.tar',
    'pdf/arXiv_pdf_0009_002.tar',
    'pdf/arXiv_pdf_0010_001.tar',
    'pdf/arXiv_pdf_0010_002.tar',
]

In [None]:
from tqdm.notebook import tqdm
import time

pbar = tqdm(sample_filenames, desc="Downloading files")
for filename in pbar:
    pbar.set_description(f"Downloading {filename}")
    download_arxiv_file(filename, verbose=False)

# Download data pre-processing

In [None]:
import tarfile
import os
import re
import shutil

from tqdm.notebook import tqdm
import time

arxiv_output_dir = './arxiv'
temp_extraction_dir = './arxiv/temp_extracted_files'

# Function to extract tar files
def extract_tar(file_path, extraction_path=temp_extraction_dir):
    temp_output_path = os.path.join(extraction_path, os.path.basename(file_path).split('.')[0])
    with tarfile.open(file_path, 'r') as tar:
        tar.extractall(path=temp_output_path)
    return temp_output_path

# Function to organize files based on dates
def organize_by_date_and_type(filename, extracted_path):
    match = re.match(r"(pdf|src)/arXiv_.*_(\d{4})_.*\.tar", filename)
    if match:
        type_dir = match.group(1)
        date_dir = match.group(2)
        target_dir = os.path.join(arxiv_output_dir, date_dir, type_dir)

        if not os.path.exists(target_dir):
            os.makedirs(target_dir)

        temp_dir = os.path.join(extracted_path, date_dir)
        # Move the extracted files to the date and type directory
        for file in os.listdir(temp_dir):
            shutil.move(os.path.join(temp_dir, file), os.path.join(target_dir, file))

In [None]:
import os
import subprocess
from joblib import Parallel, delayed
from joblib_progress import joblib_progress

print("CPU:", os.cpu_count())

if not os.path.exists(arxiv_output_dir):
    os.makedirs(arxiv_output_dir)
if os.path.exists(temp_extraction_dir):
    shutil.rmtree(temp_extraction_dir)
os.makedirs(temp_extraction_dir)

def inner_func(filename):
    temp_output_path = extract_tar('./downloaded/' + filename)
    organize_by_date_and_type(filename, temp_output_path)
    shutil.rmtree(temp_output_path)
    return filename

with joblib_progress("Processing...", total=len(sample_filenames)):
    Parallel(n_jobs=os.cpu_count(), pre_dispatch='1*n_jobs')(
        delayed(inner_func)(f) for f in sample_filenames
    )

print('Done!')

shutil.rmtree(temp_extraction_dir)

In [None]:
!du -sh ./arxiv