In [4]:
%load_ext autoreload
%autoreload 2

import pandas as pd
import sqlite3
import numpy as np
import re
import pymysql
import configparser as cp
import pykml.parser

from omoccurrences_dtypes import OMOCCURRENCES_DTYPES
from datetime import datetime
from pymysql.cursors import DictCursor
from sys import stderr
from os import environ, path, remove as os_remove
from IPython.display import display
from shutil import copy2 as shutil_copy
from zipfile import ZipFile, ZIP_DEFLATED
from pandarallel import pandarallel

from lib import get_mysql_conn, get_sqlite_conn

pandarallel.initialize()

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
INFO: Pandarallel will run on 8 workers.
INFO: Pandarallel will use Memory file system to transfer data between the main process and workers.


In [2]:
COUNTRIES_FILE = "countries.txt"
GBIF_CSV = "/data/disk/jupyter-notebooks/GBIF_bee_occurrences_2021_harvest.csv"
N_AMERICA_KML = "NAmerica.kml"
PROVINCES_FILE = "provinces.txt"
SCAN_SQLITE = "/data/disk/jupyter-notebooks/{}_symbscan.sqlite".format(datetime.now().strftime("%F"))
COMBINED_SQLITE = "/data/disk/jupyter-notebooks/{}_scan_gbif.sqlite".format(datetime.now().strftime("%F"))
SQL_CONFIG_FILE = path.join(environ["HOME"], ".my.cnf")


def get_kml_poly(kml_file_name):
    with open(kml_file_name, "rb") as f:
        kml_file = pykml.parser.fromstring(f.read())

    kml_coords = str(kml_file.Document.Placemark.Polygon.outerBoundaryIs.LinearRing.coordinates).strip()
    kml_coords = [p for p in kml_coords.split(" ")]
    kml_coords = [(float(lng), float(lat)) for lng, lat, alt in [p.split(",") for p in kml_coords]]
    return kml_coords


def get_provinces(file):
    with open(file) as f:
        return [l.strip() for l in f.readlines() if l != ""]

    
def get_countries(file):
    with open(file) as f:
        return [l.strip() for l in f.readlines() if l != ""]

In [3]:
KML_POLY = get_kml_poly(N_AMERICA_KML)
OMOCCURRENCES_LATS = [p[0] for p in KML_POLY]
OMOCCURRENCES_LNGS = [p[1] for p in KML_POLY]

OMOCCURRENCES_LATITUDE_RANGE = [min(OMOCCURRENCES_LATS), max(OMOCCURRENCES_LATS)]
OMOCCURRENCES_LONGITUDE_RANGE = [min(OMOCCURRENCES_LNGS), max(OMOCCURRENCES_LNGS)]

TARGET_FAMILIES = [
    'andrenidae',
    'apidae',
    'colletidae',
    'halictidae',
    'megachilidae',
    'melittidae',
]

TARGET_COLUMNS = sorted(OMOCCURRENCES_DTYPES.keys())
TARGET_COUNTRIES = get_countries(COUNTRIES_FILE)
TARGET_PROVINCES = get_provinces(PROVINCES_FILE)

SCAN_QUERY = """
    SELECT 
        {}, 
        c.collectionCode as collectionCode, 
        c.institutionCode as institutionCode 
        
    FROM omoccurrences o
    
    INNER JOIN omcollections c
    ON o.collid = c.collid
    
    WHERE (
        sciName IS NOT NULL
        AND sciName != ''
        AND LOWER(family) IN ({})
        AND (
            (
                o.decimalLatitude BETWEEN {} AND {} 
                AND o.decimalLongitude BETWEEN {} AND {}
            )
            OR lower(o.country) in (
                {}
            )
            OR lower(o.stateProvince) in (
                {}
            )
        )
    )
""".format(
    ',\n\t'.join(["o.{0} as {0}".format(c) for c in TARGET_COLUMNS]),
    ','.join(["'{}'".format(f) for f in TARGET_FAMILIES]),
    *OMOCCURRENCES_LATITUDE_RANGE,
    *OMOCCURRENCES_LONGITUDE_RANGE,
    ',\n\t\t'.join(["'{}'".format(f) for f in TARGET_COUNTRIES]),
    ',\n\t\t'.join(["'{}'".format(f) for f in TARGET_PROVINCES]),
)


def get_scan_query(limit, offset):
    return "{} LIMIT {} OFFSET {}".format(SCAN_QUERY, limit, offset)

#print(SCAN_QUERY)

In [None]:
LIMIT = 500000
offset = 0

if path.exists(SCAN_SQLITE):
    os_remove(SCAN_SQLITE)

def populate_scientificName_scan(row):
    row["scientificName"] = row["sciname"]
    return row

target_dtypes = OMOCCURRENCES_DTYPES.copy()
target_dtypes.update({
    "collectionCode": np.dtype("unicode"),
    "institutionCode": np.dtype("unicode"),
    "source": np.dtype("unicode")
})

scan_conn = get_mysql_conn(SQL_CONFIG_FILE)
sqlite_conn = get_sqlite_conn(SCAN_SQLITE)

try:
    with scan_conn:
        with scan_conn.cursor() as cursor:
            row_count = cursor.execute(get_scan_query(LIMIT, offset))   
            
            while row_count > 0:
                chunk = cursor.fetchall()
                chunk_df = pd.DataFrame(chunk)
                chunk_df['source'] = 'scan'
                
                chunk_df = chunk_df[target_dtypes.keys()].astype(target_dtypes)
                chunk_df = chunk_df.parallel_apply(populate_scientificName_scan, axis="columns")

                with sqlite_conn:
                    chunk_df.to_sql(
                        "omoccurrences",
                        con=sqlite_conn,
                        index=False,
                        if_exists="append"
                    )
                
                offset += row_count
                row_count = cursor.execute(get_scan_query(LIMIT, offset))
                
except Exception as e:
    try:
        scan_conn.close()
    
        sqlite_conn.rollback()
        sqlite_conn.close()
    except:
        pass
    
    raise e
    
sqlite_conn.close()

In [None]:
gbif_df = pd.read_csv(GBIF_CSV, sep="\t", nrows=1)
gbif_cols = sorted(list(gbif_df.columns))

In [None]:
shutil_copy(SCAN_SQLITE, COMBINED_SQLITE)

sqlite_conn = get_sqlite_conn(COMBINED_SQLITE)

try:
    with sqlite_conn:
        scan_cols_query = sqlite_conn.execute("select * from omoccurrences limit 1")
        scan_cols = sorted([desc[0] for desc in scan_cols_query.description])
                
except Exception as e:
    print(e, file=stderr)
    
finally:
    sqlite_conn.close()

In [None]:
common_cols = list(np.intersect1d(gbif_cols, scan_cols))
#[print(c) for c in common_cols]

In [None]:
CHUNK_SIZE = 500000

def populate_scientificName_gbif(row):
    rank_col = row["taxonRank"].lower()
    if rank_col in row:
        row["scientificName"] = row[rank_col]
    return row

sqlite_conn = get_sqlite_conn(COMBINED_SQLITE)

try:   
    gbif_df = pd.read_csv(GBIF_CSV, chunksize=CHUNK_SIZE, sep="\t", low_memory=False)
    for chunk in gbif_df:
        chunk['source'] = 'gbif'
        chunk = chunk.parallel_apply(populate_scientificName_gbif, axis='columns')
        chunk = chunk[['source', *common_cols]]
        #display(chunk["scientificName"].head())
        #break

        with sqlite_conn:
            chunk.to_sql(
                "omoccurrences",
                con=sqlite_conn,
                index=False,
                if_exists="append"
            )
            
except Exception as e:
    sqlite_conn.rollback()
    sqlite_conn.close()
    raise e

sqlite_conn.close()

In [None]:
ZIP_FILE = "{}.zip".format(COMBINED_SQLITE)

if path.exists(ZIP_FILE):
    os_remove(ZIP_FILE)

with ZipFile(ZIP_FILE, 'w', compression=ZIP_DEFLATED) as zipfile:
    zipfile.write(COMBINED_SQLITE)