get species nile data

In [None]:
import pandas as pd
import psycopg2
import os
import uuid
import logging
from dotenv import load_dotenv

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Load environment variables from .env file
load_dotenv()

# Connection parameters
dbname = os.getenv("DB_NAME")
user = os.getenv("DB_USER")
password = os.getenv("DB_PASSWORD")
host = os.getenv("DB_HOST")
port = os.getenv("DB_PORT")

# Connect to the database
conn = psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port)
cursor = conn.cursor()

# Fetch data in chunks and append to a dictionary
chunk_size = 100
offset = 0
species_dict = {}
while True:
    cursor.execute("SELECT species_id, binominal_name FROM species ORDER BY iucn_taxon_id DESC LIMIT %s OFFSET %s", (chunk_size, offset))
    data = cursor.fetchall()
    if not data:
        break
    for record in data:
        species_dict[record[0]] = record[1]
    offset += chunk_size
    logger.info(f"Fetched {offset} records")

# Close the cursor and connection
cursor.close()
conn.close()

# Print the dictionary
print(species_dict)

# Convert dictionary to DataFrame
df = pd.DataFrame(list(species_dict.items()), columns=['species_id', 'binominal_name'])

# Write DataFrame to CSV
df.to_csv('species_nile.csv', index=False)

get country nile data

In [None]:
import pandas as pd
import psycopg2
import os
import uuid
import logging
from dotenv import load_dotenv

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Load environment variables from .env file
load_dotenv()

# Connection parameters
dbname = os.getenv("DB_NAME")
user = os.getenv("DB_USER")
password = os.getenv("DB_PASSWORD")
host = os.getenv("DB_HOST")
port = os.getenv("DB_PORT")

# Connect to the database
conn = psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port)
cursor = conn.cursor()

countries_dict = {}

cursor.execute("SELECT country_id, iso_alpha3 FROM countries")

data = cursor.fetchall()
for record in data:
   countries_dict[record[0]] = record[1]
   logger.info(f"Fetched country records")

# Close the cursor and connection
cursor.close()
conn.close()

# Print the dictionary
print(countries_dict)

# Convert dictionary to DataFrame
countries_df = pd.DataFrame(list(countries_dict.items()), columns=['country_id', 'iso_alpha3'])

# Write DataFrame to CSV
countries_df.to_csv('../data/countries_nile.csv', index=False)

join with countries_species.csv

In [33]:
# species nile
species_nile_df = pd.read_csv('../data/species_nile.csv')

# Load countries_species.csv into a DataFrame
countries_species_df = pd.read_csv('../data/countries_species.csv')

# Merge df with countries_species_df on the 'binominal_name' column
merged_df = pd.merge(species_nile_df, countries_species_df, left_on='binominal_name', right_on='binomial', how='inner')

merged_df = pd.merge(merged_df, countries_df, left_on='iso_a3', right_on='iso_alpha3', how='inner')

merged_df.to_csv('../data/merged_countries_species.csv', index=False)

  countries_species_df = pd.read_csv('../data/countries_species.csv')


write to temp files

In [1]:
import pandas as pd
import os
import logging
from dotenv import load_dotenv
from concurrent.futures import ThreadPoolExecutor

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Load environment variables from .env file
load_dotenv()

# Define chunk size
CHUNK_SIZE = 10000

# Function to create temporary files with chunks of data
def create_temp_files(csv_file):
    df = pd.read_csv(csv_file)
    file_dir = '../data/species_countries/temp'
    if not os.path.exists(file_dir):
        os.makedirs(file_dir)
    num_chunks = len(df) // CHUNK_SIZE + 1
    for i in range(num_chunks):
        chunk_df = df[i*CHUNK_SIZE:(i+1)*CHUNK_SIZE]
        chunk_file = os.path.join(file_dir, f'temp_chunk_{i}.csv')
        chunk_df.to_csv(chunk_file, index=False)
    return num_chunks

# Read the CSV file into a DataFrame
csv_file = '../data/merged_countries_species.csv'

# Create temporary files with chunks of data
num_chunks = create_temp_files(csv_file)
logger.info(f"{num_chunks} temporary files created with chunks of data.")

INFO:__main__:54 temporary files created with chunks of data.


parrellel process across files

In [None]:
import psycopg2
from psycopg2 import extras, pool
import os
from dotenv import load_dotenv
import uuid
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from pathlib import Path

# Load environment variables and return database connection parameters
def load_db_params():
    load_dotenv()
    return {
        "dbname": os.getenv("DB_NAME"),
        "user": os.getenv("DB_USER"),
        "password": os.getenv("DB_PASSWORD"),
        "host": os.getenv("DB_HOST"),
        "port": os.getenv("DB_PORT")
    }

# Log the name of processed files along with the number of records written
def log_processed_file(file_path, num_records_written, log_file_path='processed_files.log'):
    with open(log_file_path, 'a') as log_file:
        log_file.write(f"{file_path}: {num_records_written} records written\n")

# Get the set of already processed files
def get_processed_files(log_file_path='processed_files.log'):
    try:
        with open(log_file_path, 'r') as log_file:
            return set(line.strip().split(': ')[0] for line in log_file)
    except FileNotFoundError:
        return set()

# Log failed records to a CSV file
def log_failed_records(records, log_file_path='failed_records.csv'):
    records.to_csv(log_file_path, index=False, mode='a', header=not os.path.exists(log_file_path))

# Process a single chunk of data from a CSV file
def process_and_insert_chunk(chunk, db_pool, file_path, failed_records):
    conn = db_pool.getconn()
    try:
        cursor = conn.cursor()
        values = [
            (
                str(uuid.uuid4()),  # Generate UUID for country_species_id
                row['country_id'],  # Already string
                row['species_id'],  # Already string
                row['datanam_area'],  # Already rounded
                row['datanam_pct_area'],  # Already rounded
                time.strftime('%Y-%m-%d %H:%M:%S'),  # Current timestamp
                time.strftime('%Y-%m-%d %H:%M:%S')
            )
            for index, row in chunk.iterrows()
        ]
        extras.execute_values(cursor, """
            INSERT INTO countries_species (
                country_species_id, country_id, species_id, country_habitat_range_area,
                country_habitat_range_area_pct, created, updated
            ) VALUES %s;
        """, values)
        conn.commit()
        return len(chunk)  # Return number of records written
    except Exception as e:
        conn.rollback()
        failed_records = pd.concat([failed_records, chunk], ignore_index=True)
        print(f"Failed to insert records from {file_path}: {e}")
        return 0  # Return 0 if insertion fails
    finally:
        db_pool.putconn(conn)

# Process an entire file with retry logic
def process_file_with_retry(file_path, db_pool, max_file_retries=3, max_chunk_retries=3, wait_seconds=5):
    failed_records = pd.DataFrame(columns=['country_id', 'species_id', 'datanam_area', 'datanam_pct_area'])
    file_attempts = 0
    while file_attempts < max_file_retries:
        try:
            df_data = pd.read_csv(file_path)  # Read entire file
            df_data['country_id'] = df_data['country_id'].astype(str)  # Convert to string
            df_data['species_id'] = df_data['species_id'].astype(str)  # Convert to string
            df_data['datanam_area'] = df_data['datanam_area'].round(2)  # Limit to 2 decimal places
            df_data['datanam_pct_area'] = df_data['datanam_pct_area'].round(2)  # Limit to 2 decimal places
            chunks = [df_data[i:i+100] for i in range(0, len(df_data), 100)]  # Split data into chunks of 100 rows
            num_records_written = 0
            for chunk in chunks:
                num_records_written += process_and_insert_chunk(chunk, db_pool, file_path, failed_records)
            if num_records_written > 0:
                log_processed_file(str(file_path), num_records_written)  # Log the file as processed if records are written
                if not failed_records.empty:
                    log_failed_records(failed_records)
                print(f"{file_path} processed successfully with retries. {num_records_written} records written.")
            else:
                print(f"No records written for {file_path}. Retrying...")
                time.sleep(wait_seconds)  # Wait before retrying the file
            break  # Break the file retry loop on success
        except Exception as file_error:
            print(f"Error processing file {file_path}, attempt {file_attempts+1}: {file_error}")
            file_attempts += 1
            time.sleep(wait_seconds)  # Wait before retrying the file
            if file_attempts == max_file_retries:
                print(f"Failed to process {file_path} after {max_file_retries} attempts.")

# Parallel file processing with file and batch retries
if __name__ == "__main__":
    db_params = load_db_params()
    db_pool = psycopg2.pool.SimpleConnectionPool(1, 20, **db_params)

    data_directory = Path('../data/species_countries/temp')
    csv_files = [file for file in data_directory.glob('*.csv')]
    processed_files = get_processed_files()

    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(process_file_with_retry, file_path, db_pool): file_path for file_path in csv_files if str(file_path) not in processed_files}

        for future in as_completed(futures):
            file_path = futures[future]

    db_pool.closeall()
    print("Processing complete.")
