In [124]:
import os
import re
import json
import kml2geojson as k2g
import zipfile
import requests
import shutil
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import xml.etree.ElementTree as ET
from lxml import etree
from datetime import datetime
import subprocess
from urllib.parse import urljoin


In [125]:
# Define paths
base_directory = r"C:\Users\CrudeIntern\OneDrive - Hengli Petrochemical International Pte Ltd\Market Analysis\Current Projects\Hurricane"
def webscraping_kmz_JTWC(base_directory: str) -> None:
    driver = webdriver.Chrome()
    try:
        url = "https://www.metoc.navy.mil/jtwc/jtwc.html"
        driver.get(url)
        driver.implicitly_wait(5)
        kmz_link_elements = WebDriverWait(driver, 5).until(
            EC.presence_of_all_elements_located((By.LINK_TEXT, "Google Earth Overlay"))
        )
        if not kmz_link_elements:
            print("No Google Earth Overlay links found.")
            return
        file_names, kmz_file_paths = [], []
        for index, kmz_link_element in enumerate(kmz_link_elements):
            kmz_url = kmz_link_element.get_attribute('href')
            print(f"KMZ File URL {index + 1}: {kmz_url}")
            file_name = kmz_url.split('/')[-1].split('.')[0]
            kmz_file = requests.get(kmz_url) 
            kmz_file_path = os.path.join(base_directory, f'{file_name}.kmz')
            kmz_file_paths.append(kmz_file_path)
            file_names.append(file_name)
            with open(kmz_file_path, 'wb') as f:
                f.write(kmz_file.content)
            print(f"KMZ file {index + 1} saved to {kmz_file_path}")
    except Exception as e:
        print("Error occurred while scraping KMZ files:", e)
    finally:
        driver.quit()
    return kmz_file_paths, file_names

kmz_file_paths_JTWC, file_names_JTWC = webscraping_kmz_JTWC(base_directory)
print(kmz_file_paths_JTWC)
print(file_names_JTWC)


KMZ File URL 1: https://www.metoc.navy.mil/jtwc/products/wp2124.kmz
KMZ file 1 saved to C:\Users\CrudeIntern\OneDrive - Hengli Petrochemical International Pte Ltd\Market Analysis\Current Projects\Hurricane\wp2124.kmz
KMZ File URL 2: https://www.metoc.navy.mil/jtwc/products/ep9924.kmz
KMZ file 2 saved to C:\Users\CrudeIntern\OneDrive - Hengli Petrochemical International Pte Ltd\Market Analysis\Current Projects\Hurricane\ep9924.kmz
['C:\\Users\\CrudeIntern\\OneDrive - Hengli Petrochemical International Pte Ltd\\Market Analysis\\Current Projects\\Hurricane\\wp2124.kmz', 'C:\\Users\\CrudeIntern\\OneDrive - Hengli Petrochemical International Pte Ltd\\Market Analysis\\Current Projects\\Hurricane\\ep9924.kmz']
['wp2124', 'ep9924']


In [126]:

def webscraping_kmz_NHC(base_directory: str) -> None:
    url = 'https://www.nhc.noaa.gov/gis/'
    save_directory = base_directory
    os.makedirs(save_directory, exist_ok=True)
    file_names, kmz_file_paths = [], []
    response = requests.get(url)
    if response.status_code == 200:
        soup = BeautifulSoup(response.content, 'html.parser')
        forecast_rows = soup.find_all('tr')
        # Only look at the third to fifth rows (index 2, 3, 4 in zero-based index)
        for i in range(2, 5):
            row = forecast_rows[i]
            tds = row.find_all('td')[1:4]  # Look at the second to fourth <td> (index 1 to 3)
            for td in tds:
                hurricane_names = []
                if i <4:
                    element_names = td.get_text(strip=True).replace(' ', '_').split('KMZ:')
                else:
                    element_names = td.get_text(strip=True).replace(' ', '_').split('kmz')
                for element_name in element_names:
                    if ':' in element_name:
                        hurricane_name = element_name.split(':')[0].replace(' ', '_').split(']')[-1]
                        hurricane_names.append(hurricane_name)
                links = td.find_all('a')
                for link in links:
                    link_text = link.get_text(strip=True)
                    if i == 2:  # Third row (zero-indexed), find 'Cone' and 'Track'
                        if link_text == 'Cone':
                            download_NHC_file(link, f"{hurricane_names[0]}_Cone.kmz",file_names, kmz_file_paths)
                        elif link_text == 'Track':
                            download_NHC_file(link, f"{hurricane_names[0]}_Track.kmz", file_names, kmz_file_paths)
                            hurricane_names.pop(0)
                        
                    elif i == 3:  # Fourth row (zero-indexed), find 'Initial Radii'
                        if link_text == 'Initial Radii':
                            download_NHC_file(link, f"{hurricane_names[0]}_Initial_Radii.kmz", file_names, kmz_file_paths)
                            hurricane_names.pop(0)
                    elif i == 4:  # Fifth row (zero-indexed), find 'kmz'
                        if link_text == 'kmz':
                            download_NHC_file(link, f"{hurricane_names[0]}_kmz.kmz", file_names, kmz_file_paths)
                            hurricane_names.pop(0)
    else:
        print(f"Failed to retrieve page content. Status code: {response.status_code}")
    return  kmz_file_paths, file_names

def download_NHC_file(link, filename, file_names, kmz_file_paths):
    base_url = 'https://www.nhc.noaa.gov/gis/'
    file_url = urljoin(base_url, link['href'])
    file_response = requests.get(file_url)
    if file_response.status_code == 200:
        file_path = os.path.join(base_directory, filename)
        with open(file_path, 'wb') as file:
            file.write(file_response.content)
        print(f"Downloaded and saved: {filename}")
        filename = filename.split('.')[0]
        file_names.append(filename)
        kmz_file_paths.append(file_path)
    else:
        print(f"Failed to download {filename}. Status code: {file_response.status_code}")
kmz_file_paths_NHC, file_names_NHC = webscraping_kmz_NHC(base_directory)

kmz_file_paths = kmz_file_paths_JTWC + kmz_file_paths_NHC
file_names = file_names_JTWC + file_names_NHC

Downloaded and saved: Hurricane_Milton_Cone.kmz
Downloaded and saved: Hurricane_Milton_Track.kmz
Downloaded and saved: Hurricane_Leslie_Cone.kmz
Downloaded and saved: Hurricane_Leslie_Track.kmz
Downloaded and saved: Hurricane_Milton_Initial_Radii.kmz
Downloaded and saved: Hurricane_Leslie_Initial_Radii.kmz
Downloaded and saved: Hurricane_Milton_kmz.kmz
Downloaded and saved: Hurricane_Leslie_kmz.kmz


In [127]:

def extract_kml(kmz_file_paths: list, file_names: list, base_directory: str) -> None:
    kml_file_paths = []
    for kmz_file_path, file_name in zip(kmz_file_paths, file_names):
        output_directory = os.path.join(base_directory, file_name)
        print('base_directory:', base_directory)
        print('file_name:', file_name)
        with zipfile.ZipFile(kmz_file_path, 'r') as zip_ref:
            print('output_directory:', output_directory)
            zip_ref.extractall(output_directory)
        print(f"KMZ file {kmz_file_path} has been extracted to {output_directory}")
        ld = os.listdir(output_directory)
        kml_file = [file for file in ld if file.endswith('.kml')]
        kml_file_path = os.path.join(output_directory, kml_file[0])
        try:
            new_kml_path = os.path.join(base_directory, f'{file_name}.kml')
            print(new_kml_path)
            os.rename(kml_file_path, new_kml_path)
            print(f"KML file has been moved and renamed to {new_kml_path}")
            kml_file_paths.append(new_kml_path)
        except Exception as e:
            print(f"Error moving KML file: {e}")
        try:
            shutil.rmtree(output_directory)
            print(f"Folder {output_directory} has been deleted.")
        except Exception as e:
            print(f"Error deleting folder {output_directory}: {e}")
        try:
            os.remove(kmz_file_path)
            print(f"KMZ file {kmz_file_path} has been deleted.")
        except Exception as e:
            print(f"Error deleting KMZ file {kmz_file_path}: {e}")
    return kml_file_paths
print (kmz_file_paths, file_names)
kml_file_paths = extract_kml(kmz_file_paths, file_names, base_directory)
print(kml_file_paths)

['C:\\Users\\CrudeIntern\\OneDrive - Hengli Petrochemical International Pte Ltd\\Market Analysis\\Current Projects\\Hurricane\\wp2124.kmz', 'C:\\Users\\CrudeIntern\\OneDrive - Hengli Petrochemical International Pte Ltd\\Market Analysis\\Current Projects\\Hurricane\\ep9924.kmz', 'C:\\Users\\CrudeIntern\\OneDrive - Hengli Petrochemical International Pte Ltd\\Market Analysis\\Current Projects\\Hurricane\\Hurricane_Milton_Cone.kmz', 'C:\\Users\\CrudeIntern\\OneDrive - Hengli Petrochemical International Pte Ltd\\Market Analysis\\Current Projects\\Hurricane\\Hurricane_Milton_Track.kmz', 'C:\\Users\\CrudeIntern\\OneDrive - Hengli Petrochemical International Pte Ltd\\Market Analysis\\Current Projects\\Hurricane\\Hurricane_Leslie_Cone.kmz', 'C:\\Users\\CrudeIntern\\OneDrive - Hengli Petrochemical International Pte Ltd\\Market Analysis\\Current Projects\\Hurricane\\Hurricane_Leslie_Track.kmz', 'C:\\Users\\CrudeIntern\\OneDrive - Hengli Petrochemical International Pte Ltd\\Market Analysis\\Curren

In [128]:
def parse_kml_file(kml_file):
    """
    Parse a KML file and return its root element.
    Args:
        kml_file (str): The path to the KML file.
    Returns:
        root (Element): The root element of the parsed KML file.
    """
    try:
        tree = etree.parse(kml_file)
        return tree.getroot()
    except etree.XMLSyntaxError as e:
        print(f"Error parsing file {kml_file}: {e}")
        return None

def adjust_ids(element, id_suffix):
    """
    Adjust IDs of elements to ensure uniqueness in the merged KML.
    Args:
        element (Element): The root element whose child IDs need adjustment.
        id_suffix (int): A unique suffix to append to IDs.
    """
    for elem in element.iter():
        if 'id' in elem.attrib:
            elem.attrib['id'] = f"{elem.attrib['id']}_{id_suffix}"

def replace_second_line(kml_file):
    """
    Replace the second line of the KML file if the file ends with 'CONE.kml'.
    Args:
        kml_file (str): The path to the KML file.
    """
    if kml_file.endswith('CONE.kml'):
        with open(kml_file, 'r', encoding='utf-8') as file:
            lines = file.readlines()

        if len(lines) > 1:
            # Replace the second line
            lines[1] = "<kml xmlns='http://www.opengis.net/kml/2.2'>\n"

            # Write the modified lines back to the file
            with open(kml_file, 'w', encoding='utf-8') as file:
                file.writelines(lines)
        else:
            print(f"File {kml_file} does not have enough lines to modify.")

def merge_kml_files(selected_files, output_file):
    """
    Merge the selected KML files into a single KML file.
    Args:
        selected_files (list): List of KML files selected for merging.
        output_file (str): The path for the output merged KML file.
    """
    # Use KML 2.2 namespace for the output file
    KML_NAMESPACE = "http://www.opengis.net/kml/2.2"
    NSMAP = {None: KML_NAMESPACE}

    # Create a root for the new KML file
    merged_root = etree.Element('kml', nsmap=NSMAP)
    merged_document = etree.SubElement(merged_root, 'Document')

    # Keep track of unique IDs to avoid conflicts
    id_counter = 0

    for kml_file in selected_files:
        # Replace the second line if it's a CONE.kml file
        replace_second_line(kml_file)
        
        root = parse_kml_file(kml_file)
        if root is not None:
            # Determine the namespace used in the KML file
            kml_namespace = root.nsmap.get(None)
            if kml_namespace is None:
                kml_namespace = root.nsmap.get('')

            ns = {'kml': kml_namespace}

            # Find the <Document> element inside each KML file
            document = root.find('kml:Document', namespaces=ns)
            if document is not None:
                # Adjust IDs to avoid conflicts
                adjust_ids(document, id_counter)
                id_counter += 1

                # Append child elements of the <Document> to the merged document
                for elem in document:
                    merged_document.append(elem)
            else:
                # If <Document> is not found, check for <Folder> elements
                folders = root.findall('kml:Folder', namespaces=ns)
                for folder in folders:
                    adjust_ids(folder, id_counter)
                    id_counter += 1
                    merged_document.append(folder)
        else:
            print(f"Failed to parse file {kml_file}.")

    if len(merged_document) > 0:
        # Write the merged KML content into the output file
        with open(output_file, 'wb') as f:
            f.write(etree.tostring(merged_root, pretty_print=True, xml_declaration=True, encoding='UTF-8'))
        print(f"Merged KML file saved as: {output_file}")
    else:
        print("No valid KML content was merged.")


output_combine_file = os.path.join(base_directory, 'hurricane_combined.kml')
merge_kml_files(kml_file_paths, output_file=output_combine_file)

File name: ['wp2124', 'ep9924', 'Hurricane_Milton_Cone', 'Hurricane_Milton_Track', 'Hurricane_Leslie_Cone', 'Hurricane_Leslie_Track', 'Hurricane_Milton_Initial_Radii', 'Hurricane_Leslie_Initial_Radii', 'Hurricane_Milton_kmz', 'Hurricane_Leslie_kmz', 'al142024_012Aadv_CONE', 'al142024_012Aadv_TRACK']
KML file paths: ['C:\\Users\\CrudeIntern\\OneDrive - Hengli Petrochemical International Pte Ltd\\Market Analysis\\Current Projects\\Hurricane\\wp2124.kml', 'C:\\Users\\CrudeIntern\\OneDrive - Hengli Petrochemical International Pte Ltd\\Market Analysis\\Current Projects\\Hurricane\\ep9924.kml', 'C:\\Users\\CrudeIntern\\OneDrive - Hengli Petrochemical International Pte Ltd\\Market Analysis\\Current Projects\\Hurricane\\Hurricane_Milton_Cone.kml', 'C:\\Users\\CrudeIntern\\OneDrive - Hengli Petrochemical International Pte Ltd\\Market Analysis\\Current Projects\\Hurricane\\Hurricane_Milton_Track.kml', 'C:\\Users\\CrudeIntern\\OneDrive - Hengli Petrochemical International Pte Ltd\\Market Analysis

FileNotFoundError: [Errno 2] No such file or directory: 'C:\\Users\\CrudeIntern\\OneDrive - Hengli Petrochemical International Pte Ltd\\Market Analysis\\Current Projects\\Hurricane\\al142024_012Aadv_CONE.kml'

Transform the Geojson


In [None]:
def kml_to_geojson(output_geojson_path: str, output_combine_file: str) -> None:
    file = k2g.main.convert(kml_path_or_buffer=output_combine_file, feature_collection_name='hurricane_combined.geojson')
    with open(output_geojson_path, 'w') as f:
        f.write(str(file[0]))
    with open(output_geojson_path, 'r') as f:
        raw_geojson_content = f.read()
    while True:
        try:
            raw_geojson_content = raw_geojson_content.replace("'", '"')
            geojson_obj = json.loads(raw_geojson_content)
            formatted_geojson_content = json.dumps(geojson_obj, indent=4)
            with open(output_geojson_path, 'w') as f:
                f.write(formatted_geojson_content)
            print("GeoJSON cleaned and saved successfully.")
            break
        except json.JSONDecodeError as e:
            print(f"Error processing the GeoJSON file: {e}")
            escape_index = e.colno - 2
            raw_geojson_content = raw_geojson_content[:escape_index] + "\\" + raw_geojson_content[escape_index:]
            for i, val in enumerate(raw_geojson_content[escape_index+2:]):
                if val == '"':
                    raw_geojson_content = raw_geojson_content[:escape_index+i+2] + "\\" + raw_geojson_content[escape_index+i+2:] 
                    break

output_geojson_path = os.path.join(base_directory, 'hurricane_combined.geojson')
output_combine_file = os.path.join(base_directory, 'hurricane_combined.kml')
kml_to_geojson(output_geojson_path, output_combine_file)

Error processing the GeoJSON file: Expecting ',' delimiter: line 1 column 424 (char 423)
Error processing the GeoJSON file: Expecting ',' delimiter: line 1 column 440 (char 439)
Error processing the GeoJSON file: Expecting ',' delimiter: line 1 column 457 (char 456)
Error processing the GeoJSON file: Expecting ',' delimiter: line 1 column 5225 (char 5224)
Error processing the GeoJSON file: Expecting ',' delimiter: line 1 column 5241 (char 5240)
Error processing the GeoJSON file: Expecting ',' delimiter: line 1 column 5258 (char 5257)
Error processing the GeoJSON file: Expecting ',' delimiter: line 1 column 10046 (char 10045)
Error processing the GeoJSON file: Expecting ',' delimiter: line 1 column 10062 (char 10061)
Error processing the GeoJSON file: Expecting ',' delimiter: line 1 column 10079 (char 10078)
Error processing the GeoJSON file: Expecting ',' delimiter: line 1 column 15213 (char 15212)
Error processing the GeoJSON file: Expecting ',' delimiter: line 1 column 15229 (char 15

Upload to github

In [None]:
import subprocess

# Path to Git Bash executable
bash_path = r"C:\Users\CrudeIntern\AppData\Local\Programs\Git\bin\bash.exe"  # Update this path based on where Git Bash is installed

# Path to your shell script (.sh file)
script_path = "/c/Users/CrudeIntern/OneDrive - Hengli Petrochemical International Pte Ltd/Market Analysis/Current Projects/Hurricane/auto_upload.sh"

# Prepare the command to run the script using Git Bash
cmd = [bash_path, '-c', f"'{script_path}'"]

# Execute the script
shellscript = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, text=True)

# Read and print the output line by line
for line in shellscript.stdout:
    print(line.strip())

# Wait for the process to complete and get the return code
returncode = shellscript.wait()
print(f"Process ended with the return code of {returncode}.")


[main 0937cd3] Automated commit 2024-10-08 16:57:08
1 file changed, 33 insertions(+), 9 deletions(-)
To https://github.com/sirsir23333/Hurricane.git
8a34c71..0937cd3  main -> main
Process ended with the return code of 0.
