In [1]:
import ndcctools.taskvine as vine
import json
import os

In [2]:
!ls data/parquet/3split

group_1.parquet  group_2.parquet  group_3.parquet


In [3]:
import pandas as pd
df = pd.DataFrame()
directory = 'data/parquet/3split'
for parquet in os.listdir(directory):
    if parquet.endswith('.parquet'):
        tempDf = pd.read_parquet(os.path.join(directory, parquet))
        df = pd.concat([df, tempDf])

df = df.reset_index(drop=True)
df   

                                               photo_url   latitude  \
0      http://inaturalist-open-data.s3.amazonaws.com/...  43.408011   
1      http://inaturalist-open-data.s3.amazonaws.com/...  56.031236   
2      http://inaturalist-open-data.s3.amazonaws.com/...  40.492557   
3      http://inaturalist-open-data.s3.amazonaws.com/...  42.302962   
4      http://inaturalist-open-data.s3.amazonaws.com/...  39.325331   
...                                                  ...        ...   
29995  http://inaturalist-open-data.s3.amazonaws.com/...  44.503589   
29996  http://inaturalist-open-data.s3.amazonaws.com/...  44.503589   
29997  http://inaturalist-open-data.s3.amazonaws.com/...  43.812790   
29998  http://inaturalist-open-data.s3.amazonaws.com/...  50.778121   
29999  http://inaturalist-open-data.s3.amazonaws.com/...  40.763373   

       longitude  taxon_id quality_grade observed_on  \
0     -80.504497   56061.0      research  2018-05-18   
1      37.836835   56061.0      res

In [5]:
import asyncio
import aiohttp
from urllib.parse import urlsplit, urlunsplit

async def estimate_image_size(session, url):
    try:
        # Send a HEAD request to get headers only
        async with session.head(url) as response:
            # Check if the request was successful and contains Content-Length
            if response.status == 200 and 'Content-Length' in response.headers:
                size_in_bytes = int(response.headers['Content-Length'])
                size_in_kb = size_in_bytes / 1024
                return size_in_kb
            else:
                return None
    except Exception:
        return None

async def try_different_extensions(session, url):
    # Try the original URL first
    size_kb = await estimate_image_size(session, url)
    if size_kb is not None:
        return size_kb

    # If the original URL doesn't work, try different extensions
    extensions = ['.JPG', '.jpeg', '.JPEG', '.jpg']
    url_parts = urlsplit(url)
    
    for ext in extensions:
        # Replace the extension in the URL
        base, current_ext = url_parts.path.rsplit('.', 1)
        new_url = urlunsplit((url_parts.scheme, url_parts.netloc, f"{base}{ext}", url_parts.query, url_parts.fragment))
        print(f"Retrying with {new_url}...")
        
        size_kb = await estimate_image_size(session, new_url)
        if size_kb is not None:
            return size_kb

    # Return 0 if none of the extensions worked
    print(f"Could not determine size for {url}")
    return 0

async def get_total_size(urls):
    total_size_kb = 0
    async with aiohttp.ClientSession() as session:
        tasks = [try_different_extensions(session, url) for url in urls]
        sizes_kb = await asyncio.gather(*tasks)

    total_size_kb = sum(sizes_kb)
    total_size_mb = total_size_kb / 1024
    total_size_gb = total_size_mb / 1024
    return total_size_kb, total_size_mb, total_size_gb

# Running the asyncio loop
async def run(urls):
    total_size_kb, total_size_mb, total_size_gb = await get_total_size(urls)
    print(f"Total Size: {total_size_mb:.2f} MB ({total_size_gb:.2f} GB) ")

# Now call run with await
await run(df['photo_url'])

Retrying with http://inaturalist-open-data.s3.amazonaws.com/photos/962730/large.JPG...
Retrying with http://inaturalist-open-data.s3.amazonaws.com/photos/97514371/large.JPG...
Retrying with http://inaturalist-open-data.s3.amazonaws.com/photos/822089/large.JPG...
Retrying with http://inaturalist-open-data.s3.amazonaws.com/photos/817765/large.JPG...
Retrying with http://inaturalist-open-data.s3.amazonaws.com/photos/3523999/large.JPG...
Retrying with http://inaturalist-open-data.s3.amazonaws.com/photos/3509162/large.JPG...
Retrying with http://inaturalist-open-data.s3.amazonaws.com/photos/3522469/large.JPG...
Retrying with http://inaturalist-open-data.s3.amazonaws.com/photos/840140/large.JPG...
Retrying with http://inaturalist-open-data.s3.amazonaws.com/photos/2708394/large.JPG...
Retrying with http://inaturalist-open-data.s3.amazonaws.com/photos/341495/large.JPG...
Retrying with http://inaturalist-open-data.s3.amazonaws.com/photos/837858/large.JPG...
Retrying with http://inaturalist-open

In [6]:
import subprocess

def get_local_ip():
    try:
        # Run the 'hostname -I' command
        result = subprocess.run(['hostname', '-I'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
        # Get the output, strip any extra spaces or newlines
        ip_addresses = result.stdout.strip()
        return ip_addresses
    except subprocess.CalledProcessError as e:
        return f"Error: {e}"

# Get and print the local IP addresses
local_ips = get_local_ip()
print(f"My local IP addresses are: {local_ips}")
print(local_ips.split()[0])
managerIp = local_ips.split()[0]

My local IP addresses are: 10.140.116.160 10.141.32.160 169.254.95.120 10.0.116.160
10.140.116.160


In [7]:
slurm_script = """#!/bin/bash
#SBATCH --job-name=taskvine           
#SBATCH --output=result.out           
#SBATCH --account=nirav
#SBATCH --partition=standard           
#SBATCH --nodes=1
#SBATCH --ntasks=2                    
#SBATCH --time=01:00:00               

# Load any necessary modules
module load python/3.9
source ~/.bashrc
conda init bash
conda init
conda activate taskvine

# Execute the Python script or command
vine_submit_workers -T slurm -p "--cpus-per-task=4 --time=1:00:00" -t 01:00:00 """+managerIp+""" 9124 3

"""

In [8]:
slurm_filename = "taskvine.slurm"
with open(slurm_filename, "w") as slurm_file:
    slurm_file.write(slurm_script)

print(f"SLURM script saved as {slurm_filename}")

SLURM script saved as taskvine.slurm


In [9]:
%%bash
sbatch taskvine.slurm
time python bin/TaskvineLDAWT.py 

Submitted batch job 3432189
Listening on port 9124
Waiting for tasks to complete...
Task 2 completed with result   0%|          | 0/10000 [00:00<?, ?it/s]  0%|          | 1/10000 [00:00<2:17:24,  1.21it/s]  0%|          | 4/10000 [00:01<33:54,  4.91it/s]    0%|          | 11/10000 [00:01<11:39, 14.28it/s]  0%|          | 23/10000 [00:01<05:10, 32.12it/s]  0%|          | 34/10000 [00:01<03:34, 46.46it/s]  0%|          | 43/10000 [00:01<03:16, 50.58it/s]  1%|          | 51/10000 [00:01<02:55, 56.54it/s]  1%|          | 59/10000 [00:01<02:51, 58.04it/s]  1%|          | 70/10000 [00:01<02:24, 68.60it/s]  1%|          | 80/10000 [00:01<02:11, 75.69it/s]  1%|          | 89/10000 [00:02<02:16, 72.69it/s]  1%|          | 97/10000 [00:02<02:32, 64.88it/s]  1%|          | 108/10000 [00:02<02:12, 74.43it/s]  1%|          | 116/10000 [00:02<02:19, 70.82it/s]  1%|▏         | 133/10000 [00:02<01:43, 95.63it/s]  1%|▏         | 145/10000 [00:02<01:38, 100.27it/s]  2%|▏         | 156/


real	12m9.731s
user	0m0.336s
sys	0m12.426s
