In [1]:
import pandas as pd
import numpy as np
from geopy.geocoders import Nominatim
from openai import OpenAI
import requests
import re
from defines import coordinates_dict
import glob
import difflib
import pickle
from multiprocessing import Pool, cpu_count
import unicodedata
from ast import literal_eval
from statistics import mode
import os
from pathlib import Path
import shutil
from odf import text, teletype
from odf.opendocument import load

In [None]:
directory_path  = "./local_data/data/data_selection/**/"  # Replace with the path to your folder
all_csv_files_path  = glob.glob(f"{directory_path}/*.csv", recursive=True)
all_odt_files_path = glob.glob(f"{directory_path}/*.odt", recursive=True)

# Display the list of CSV files
print(f"CSV files in the folder:{len(all_csv_files_path)}")
for csv_file_path in all_csv_files_path: #sorted(all_csv_files_path, key=lambda x: Path(x).stem):
    print(csv_file_path)

In [None]:
def rename_columns(df, first_column='Unnamed: 0'):
    return df.rename(columns={first_column:'scientificName'})

def remove_extra_space(text):
    text = str(text)
    if text !=text:
        return text
    text = re.sub(' +', ' ', text).strip()
    return text

# Clean and correct names
def remove_number(text):
    text = str(text)
    if text !=text:
        return text
    text = re.sub('^[0-9.\*]*', '', text, count=1)
    text = re.sub('^[aA-zZ]\\)', '', text, count=1)
    return text

def replace_commas(text):
    text = str(text)
    if text !=text:
        return text
    text = re.sub('^, ,', ' ,, ', text)
    text = re.sub('^，，', ',,', text)
    text = re.sub('^,,', ' ,, ', text)
    text = re.sub('^,', ' ,, ', text)
    text = re.sub('^, , , ,', ' ,, ,, ', text)
    return text

def normalize_to_ascii(text):
    try:
        return unicodedata.normalize('NFKD', text).encode('ascii', 'ignore').decode('utf-8')
    except:
        return text
        
def remove_roman_numerals(text):
    text = str(text)
    pattern = r'^((M{0,4})(CM|CD|D?C{0,3})(XC|XL|L?X{0,3})(IX|IV|V?I{0,3}))\.'
    result = re.sub(pattern, '', text)
    return result.strip()

def complete_species_name(scientificName_list, i):
    prev = scientificName_list[i-1].split()[0]
    # print('0', scientificName_list[i], '\t', scientificName_list[i-1])
    # print(prev)
    scientificName_list[i] = scientificName_list[i].replace(',,', prev)
    # print('1', scientificName_list[i], '\t', scientificName_list[i-1], end='\n\n')
    return scientificName_list[i]

def get_close_scname_and_data_gbif_list(text):
    text = str(text)
    # url = "https://api.gbif.org/v1/species/search?q={}&origin=SOURCE&status=ACCEPTED&strict=true".format(text)
    url = "https://api.gbif.org/v1/species/match?name={}&status=ACCEPTED&strict=false&verbose=true".format(text)
    payload = {}
    # headers = {'Authorization': 'Basic YWtodnlhczA6VnlAJDEyMzQ='}
    headers = {}
    response = requests.request("GET", url, headers=headers, data=payload)
    # print(text)
    try:
        if response.status_code==200:
            if 'scientificName' in response.json():
                alt_list = []
                alt_list.append((response.json()['scientificName'],\
                   response.json()['confidence'], response.json()['kingdom']))
                return alt_list, len(alt_list)
            else:
                try:
                    alt_list = []
                    # print('len: ', response.json()['alternatives'])
                    confidence = response.json()['alternatives'][0]['confidence']
                    # print('len: ', response.json()['alternatives'])
                    for i in range(len(response.json()['alternatives'])):
                        if response.json()['alternatives'][i]['confidence']==confidence:
                            alt_list.append((response.json()['alternatives'][i]['scientificName'],\
                                              response.json()['alternatives'][i]['confidence'],\
                                              response.json()['alternatives'][i]['kingdom']))
                        else:
                            break
                    return alt_list, len(alt_list)      
                except Exception as e:
                    alt_list = []
                    alt_list.append((None, None, None))
                    return alt_list, None
        else:
            alt_list = []
            alt_list.append((None, None, None))
            return alt_list, None
    except Exception as e: 
        # print('Except: ', e, response.text, end='\n\n\n\n')
        alt_list = []
        alt_list.append((None, None, None))
        return alt_list, None

def get_close_scname_and_data_from_dict_gbif_list(sc_name, close_match_sc_dict_gbif_list):
    if sc_name in close_match_sc_dict_gbif_list:
        return close_match_sc_dict_gbif_list[sc_name]
    alt_list = []
    alt_list.append((None, None, None))
    return alt_list, None

def candidate_selection(candidate_list_of_list):
    candidate_list_of_list_new = []
    for i, candidate_list in enumerate(candidate_list_of_list):
        candidate = candidate_list_of_list[i]
        if candidate_list:
            if len(candidate_list)>1:
                mode_kingdom = mode([k[2] for j in get_surrounding_index(candidate_list_of_list, i) for k in j])
                candidate = [i for i in candidate_list if mode_kingdom in i]
        candidate_list_of_list_new.append(candidate)
    return candidate_list_of_list_new  

def get_surrounding_index(lst, index):
    if index==0 or index==1 or index==2:
        return lst[0:5]
    elif index==len(lst)-1 or index==len(lst)-2 or index==len(lst)-3:
        return lst[len(lst)-5:len(lst)]
    else:
        return lst[index-2:index+3]

def create_directory_from_file_path(file_path):
    # Extract the directory path
    directory_path = os.path.dirname(file_path)

    # Check if the directory exists
    if not os.path.exists(directory_path):
        # Create the directory
        os.makedirs(directory_path)
    return directory_path

In [None]:
# Extracting location description 
def odt_to_txt(input_file):
    doc = load(input_file)
    txt = ''
    for paragraph in doc.getElementsByType(text.P):
        txt += teletype.extractText(paragraph) + '\n'
    return txt

def extract_numbers(text):
    try: 
        if re.match('\d+', text):
            number = int(re.match('\d+', text).group())
            return int(number)
        elif re.match('[aA-zZäöüÄÖÜß\s+]+:', text):
            number = re.match('[aA-zZäöüÄÖÜß\s+]+:', text)\
            .group().replace(':', '').replace('.', '')
            return number
        elif re.match('[aA-zZ\d+\s+]+:', text):
            number = re.match('[aA-zZ\d+\s+]+:', text)\
            .group().replace(':', '').replace('.', '')
            return number
        elif re.match('^M{0,3}(CM|CD|D?C{0,3})(XC|XL|L?X{0,3})(IX|IV|V?I{0,3})(.)$', text):
            number = re.match('^M{0,3}(CM|CD|D?C{0,3})(XC|XL|L?X{0,3})(IX|IV|V?I{0,3})(.)$', text)\
            .group().replace(':', '').replace('.', '')
            return number
        else:
            return None
    except Exception as e:
        print('Extract Number', e)
        return None
    return None

def split_by_multiple_blank_lines(text):
    sections = re.split('\n\n+', text)
    return sections

def odt_to_dict(input_file):
    txt = odt_to_txt(input_file)
    paras = split_by_multiple_blank_lines(txt)
    num_para_dict = {extract_numbers(para):para for para in paras if extract_numbers(para)}
    if None in num_para_dict:
        num_para_dict.pop(None)
    return num_para_dict

In [None]:
count_file = 0
close_match_sc_dict_gbif_list =  dict()
for csv_file_path in all_csv_files_path:
    try:
        file_id = int(csv_file_path.split("/")[-1].split(".")[0])
        try:
            df = pd.read_csv(filepath_or_buffer=csv_file_path, encoding='utf-8')
        except:
            df = pd.read_csv(filepath_or_buffer=csv_file_path, encoding='windows_1258', sep='\t')
        df = df.map(remove_extra_space, na_action='ignore')
        first_column = df.columns[0]
        df = rename_columns(df, first_column)
        df['scientificName'] = df['scientificName'].apply(remove_number)
        df = df.map(remove_extra_space, na_action='ignore')
        df['scientificName'] = df['scientificName'].apply(replace_commas)
        df['scientificName'] = df['scientificName'].apply(normalize_to_ascii)
        df['scientificName'] = df['scientificName'].apply(remove_roman_numerals)
        df = df.map(remove_extra_space, na_action='ignore')
        scientificName_list = df['scientificName'].tolist()

        # Complete scientificName
        df['scientificName'] = [scientificName_list[0]]+ [complete_species_name(scientificName_list, i) for i, j in enumerate(scientificName_list) if i>0]
        close_match_sc_dict_gbif_list.update({sc_name:get_close_scname_and_data_gbif_list(sc_name) for sc_name in df['scientificName'].unique().tolist()})

        # Fuzzy Match Scientific Names
        df[['scientificName_matchingScore_kingdom_CloseGbiflist', 'scientificName_matchingScore_kingdom_CloseGbiflistLength']]\
                    = pd.DataFrame(df['scientificName'].\
                                   apply(get_close_scname_and_data_from_dict_gbif_list, args=(close_match_sc_dict_gbif_list,)).\
                                   tolist(), index=df.index)
    
        df['scientificName_matchingScore_kingdom_CloseGbif_Candidate'] = [i[0] for i in candidate_selection(df['scientificName_matchingScore_kingdom_CloseGbiflist']\
                                                                                               .tolist())]
        df[['scientificNameGbif', 'matchingScoreGbif', 'kingdomGbif']]= pd.DataFrame(df['scientificName_matchingScore_kingdom_CloseGbif_Candidate'].tolist(),index=df.index)


        # drop unnamed columns
        df.drop(df.columns[df.columns.str.contains('unnamed', case=False)], axis=1, inplace=True)
        
        # save data
        if df['matchingScoreGbif'].mean() >=60:
            directory_path = os.path.dirname(csv_file_path)
            csv_file_path = csv_file_path.replace('./local_data/data/data_selection/', './data/selected_data/')
            # path of cleaned data
            csv_file_path_dir = create_directory_from_file_path(csv_file_path)
            
            # src - dest
            num_para_dict = odt_to_dict(os.path.join(directory_path, str(file_id) +'.odt'))
            df_odt = pd.DataFrame(list(num_para_dict.items()), columns=['Index', 'Location Description'])
            df_odt.to_csv(os.path.join(csv_file_path_dir, str(file_id) +'_odt.csv'), encoding='utf-8', index=False)
            df.to_csv(os.path.join(csv_file_path_dir, str(file_id) +'.csv'), encoding='utf-8', index=False)
            count_file = count_file + 1 
            
    except Exception as e:
        print(f"Error processing csv location '{csv_file_path}': {e}")
        print(f"Error processing csv location '{first_column}': {df.columns}", end='\n\n\n\n')

print("Files with correct scientific names: ", count_file)

In [4]:
## get location Coordinates and Location

import google.generativeai as genai
import os
import ast
from IPython.display import display
from key import GEMINI_KEY
import json
import time

genai.configure(api_key=GEMINI_KEY)
MODEL_CONFIG = {
  "temperature": 0.2,
  "top_p": 0.95,
  "top_k": 32,
  "max_output_tokens": 8192,
}
model = genai.GenerativeModel(model_name = "gemini-1.0-pro",
                              generation_config = MODEL_CONFIG)

def get_coordinates(text):
    # Ermitteln Sie die Standortkoordinaten aus dem angegebenen
    text = f"""Find the latitude and longitude of the location described in the given text:{text} 
               \n if it is not found then output None"""
    output = """\nHere is the output schema:\n{"latitude":, "longitude":}"""
    text = text + output
    # print(text)
    response = model.generate_content(text, request_options={"timeout": 600})
    print('Response Coordinates', response.text)
    if 'json' in response.text:
        try:
        # print(response.text)
            coordinates = json.loads(response.text.replace("```json", "").replace("```", ""))
            return coordinates['latitude'], coordinates['longitude']
        except Exception as e:
            print('Error get coordinates: ', e, response.text)
    else:
        try:
            # print(response.text)
            coordinates = ast.literal_eval(response.text)
            return coordinates['latitude'], coordinates['longitude']
        except Exception as e:
            print('Error get coordinates: ', e, response.text)
    return (None, None)

def is_location(text):
    # Ermitteln Sie die Standortkoordinaten aus dem angegebenen
    text1 = f"""Is there any location description in the given text:{text}?
    Give me answer in Yes and No only without explanation"""
    output = """\nHere is the output schema: {"isLocationDescription":} """
    text = text1 + output
    # print(text)
    response = model.generate_content(text, request_options={"timeout": 600})
    print('Response Location', response.text)
    if 'json' in response.text:
        try:
            # print(response.text)
            isLocationDescription = json.loads(response.text.replace("```json", "").replace("```", ""))
            #isLocationDescription = ast.literal_eval(response.text)
            # print(isLocationDescription)
            return isLocationDescription['isLocationDescription']
        except Exception as e:
            print('Error isLocationDescription: ', e, response.text)
    elif '{' in response.text:
        try:
            isLocationDescription =  json.loads(response.text)
            return isLocationDescription['isLocationDescription']
        except Exception as e:
            print('Error isLocationDescription: ', e, response.text)
    else:
        try:
            return response.text
        except Exception as e:
            print('Error isLocationDescription: ', e, response.text)
    return None

In [6]:
directory_path  = "./data/selected_data/**/"  # Replace with the path to your folder
all_csv_odt_files_path  = glob.glob(f"{directory_path}/*_odt.csv", recursive=True)

for csv_odt_file_path in all_csv_odt_files_path:
    try:
        df = pd.read_csv(filepath_or_buffer=csv_odt_file_path, encoding='utf-8')
        df['isLocationDescription'] = df['Location Description'].apply(is_location)
        df['coordinates'] = df['Location Description'].apply(get_coordinates)
        df[['latitude', 'longitude']] = pd.DataFrame(df['coordinates'].tolist(), index=df.index)
        df.to_csv(csv_odt_file_path, encoding='utf-8', index=False)
        time.sleep(60)
    except Exception as e:
        print(f"Error processing location '{csv_odt_file_path}': {e}")

Response Location {"isLocationDescription": "Yes"}
Response Location {"isLocationDescription": "Yes"}
Response Location {"isLocationDescription": "Yes"}
Response Location No
Response Location {"isLocationDescription": "Yes"}
Error processing location './data/selected_data/CSV_05_Dec/335_odt.csv': 429 Resource has been exhausted (e.g. check quota).
Error processing location './data/selected_data/CSV_05_Dec/307_odt.csv': 429 Resource has been exhausted (e.g. check quota).
Response Location {"isLocationDescription": "Yes"}
Response Location {"isLocationDescription": "Yes"}
Response Location {"isLocationDescription": "Yes"}
Error processing location './data/selected_data/CSV_05_Dec/272_odt.csv': 429 Resource has been exhausted (e.g. check quota).
Error processing location './data/selected_data/CSV_05_Dec/319_odt.csv': 429 Resource has been exhausted (e.g. check quota).
Error processing location './data/selected_data/CSV_05_Dec/311_odt.csv': 429 Resource has been exhausted (e.g. check quota

In [None]:
# Filter files not having coordinate data
directory_path  = "./data/selected_data/**/"  # Replace with the path to your folder
all_csv_files_path  = glob.glob(f"{directory_path}/*.csv", recursive=True)
all_odt_files_path = glob.glob(f"{directory_path}/*_odt.csv", recursive=True)

all_csv_files_path = list(set(all_csv_files_path)-set(all_odt_files_path))
print(len(all_csv_files_path), len(all_odt_files_path))
print(all_csv_files_path[0:10],'\n\n' ,all_odt_files_path[0:10])

# if coordinate not yes delete csv and odt file
for odt_file in all_odt_files_path:
    df = pd.read_csv(odt_file)
    try:
        if (df['isLocationDescription']=="Yes").sum()/len(df['isLocationDescription'])>=0.5:
            pass
        else:
            # delete data files
            os.remove(odt_file.replace('_odt', ''))
    except Exception as e:
        print(f"Error deleting location '{odt_file}': {e}")