In [20]:
import requests
from bs4 import BeautifulSoup
import pandas as pd
import os
from datetime import datetime
import time
import re 
from collections import namedtuple
from PyPDF2 import PdfReader
from deltalake.writer import write_deltalake
from tqdm.notebook import tqdm,trange

# importing module
import logging
 
# Create and configure logger
logging.basicConfig(filename="../pipeline.log",
                    format='%(asctime)s %(message)s',
                    filemode='w')

logger=logging.getLogger()


# Setting the threshold of logger to DEBUG
logger.setLevel(logging.DEBUG)


today = datetime.today().strftime("%d-%b-%Y")

target_urls = {
    "CPD":"https://www.unicef.org/executiveboard/country-programme-documents",
    "SITAN":"",
    "COARS":""
}

urls = {tag: [] for tag in target_urls}
data = []
url_items = []



headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
}


def request(url,stream=True):
    
    if stream:
        
        """headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3',
            "Accept-Ranges": "bytes",
            "Content-Length": "301821",
            "Content-Security-Policy": "default-src 'self'; frame-src youtube.com www.youtube.com; frame-ancestors 'none';",
            "Content-Type": "application/pdf",
            "Date": "Tue, 07 May 2024 17:11:16 GMT",
            "Last-Modified": "Sat, 08 Jun 2019 02:57:00 GMT",
            "Server": "Microsoft-IIS/10.0",
            "Strict-Transport-Security": "max-age=31536000; includeSubDomains; preload",
            "X-Content-Type-Options": "nosniff",
            "X-Frame-Options": "DENY",
            "X-Powered-By": "ASP.NET",
            "X-Xss-Protection": "1; mode=block"
        }"""
        
        response=requests.get(url, headers=headers, stream=True)
        logging.info(f" url {url} {response.status_code} {response.reason} {stream}")

    
    else:
        response=requests.get(url, headers=headers)
        
        response=requests.get(url, headers=headers, stream=True)
        logging.info(f" url {url} {response.status_code} {response.reason} {stream}")

     
      
    return response

def sort_language(url):
    language_mapping = {
            "AR": "AR",
            "EN": "EN",
            "English": "EN",
            "ENG":"EN",
            "ES": "ES",
            "SP": "ES",
            "Spanish": "ES",
            "FR": "FR",
            "French": "FR",
            "RU": "RU",
            "CH": "CH",
            "final": "EN"
        }
    
    for language_code, language_name in language_mapping.items():
            #print(language_code)
            if language_code in url:
                return language_name
            if language_code in url is None:
                return "EN"


def sort_url(url):
    lowercase_url = url.lower()

    # Check for CPE related URLs
    if any(keyword in url for keyword in ("CEP", "CRR")):
        #print("CPE: {}".format(url))
        return "CPE"

    # Check for Extension related URLs
    if "extension" in lowercase_url:
        #print("Extension: {}".format(url))
        return "Extension"

    # Check for SRM related URLs
    srm_keywords = ("srm", "summary_results", "results_matrx", "matrice_de_resultats", "_matrix",
                    "summary_results_matrix", "matrix", "resultsmatrix", "results_matrix", "results-matrix","rrf")
    if any(keyword in lowercase_url for keyword in srm_keywords):
        #print("SRM: {}".format(url))
        return "SRM"

    # Check for other UN-related URLs
    un_keywords = ("UNSDCF", "UNSPF", "UNDAF", "UNCCSF", "UNPAF")
    for keyword in un_keywords:
        if keyword in url:
            #print(f"{keyword}: {url}")
            return keyword
  
    # Check for specific CPD and other cases
    if "Rwanda-UN-2018-2023-2018.04.10" in url:
        #print("UNDAP: {}".format(url))
        return "UNDAP"

    # Check for specific CPD and other cases
    if "Eastern_Caribbean_multicountry_2012-2016_20_Oct_2012" in url:
        #print("CPD: {}".format(url))
        return "CPD"
    
    # check for specific SRM 
    if "Afghanistan_CPD-SRM" in url:
        #print("SRM: {}".format(url))
        return "SRM"
    
    # Check for CPD related URLs
    cpd_keywords = ("cpd", "cdp", "final_approved", "spd", "apd", "ods", "final")
    if any(keyword in lowercase_url for keyword in cpd_keywords):
        language = sort_language(url)
        #print(f"CPD {language}: {url}")
        return "CPD"

    # If none of the above conditions match, print the URL as it is
    #print(url)


def get_tags_from_url_name(url:{list}) -> list:

    # Get the tags from the URL based on the target_urls dictoinary 
    for url in urls:

        # for CPD
        if sort_url(url) == "CPD":
            
            # regx to get year-PL29-countryname in CPD
            regx = re.compile(r'(^\d{4})(\D+\d+-)([A-Za-z]+)')
            metadata = namedtuple("metadata","url year doc_code country_name")
            
            url = url.split("/")[-1]
            search = regx.search(url)
            if search:
                year = search.group(1)
                doc_code = search.group(2).replace("-","")
                country_name = search.group(3)
                url_items.append(metadata(url,year,doc_code,country_name))
    
    
    return url_items


def download_pdf(path,url):
    try:
        response = request(url,stream=True)
        logging.info(f" url {url} {response.status_code} {response.reason}")


        with open(f"{path}", 'wb') as f:
            for chunk in response.iter_content(chunk_size=1024):
                if chunk:
                    f.write(chunk)
        return True
    except:
        return False
    

def check_pdf(url,path, output_text=False):
    # get all text in pdf
    pdf_text = []

    # Initialize variable to track if any page has text
    has_text = False

    # check if path exists
    path_exsist = os.path.exists(path)

    if path_exsist:
        logging.info(f" Path {path} exists")

        try:
            reader = PdfReader(path)
            
            # Check each page for text content
            for i,page in enumerate(reader.pages):
                
                if page.extract_text():
                    has_text = True

                    if output_text:
                        pdf_text.append({url},{i:page.extract_text()})
                    else:
                        None
                        
        except Exception as e:
            has_text = False
    else :

        has_text = False
        pdf_text = None
        logging.info(f" Path {path} does not exists")

    logging.info(f" Has_text {has_text} {path}")

    return has_text , pdf_text

print("function loaded")

function loaded


In [12]:
logging.info(f"Downloading HTMLs from URLS, started at {today} {time.strftime('%H:%M:%S', time.gmtime())} GMT")

for tag in tqdm(target_urls):

    filepath = f'../sources/html/{tag}'
    
    if tag == "CPD":
        
        response = request(target_urls[tag]).text
        
        soup = BeautifulSoup(response,'lxml')
        
        os.makedirs(filepath, exist_ok=True)

        if target_urls[tag] is not None:
            # Define the file path
            file_path = f'{filepath}/{tag}.html'
            
            # Open the file in write mode
            with open(file_path, 'w') as file:
                # Write the prettified soup object to the file
                file.write(soup.prettify())
            
            #print(f"File saved: {file_path}, os.path.exists(file_path): {os.path.exists(file_path)}")

            logging.info(f"File saved: {file_path}, os.path.exists(file_path): {os.path.exists(file_path)}")

            for td in soup.find_all('td'):
                for a in td.find_all('a', href=True):
                    if a["href"].endswith(".pdf"):
                        href = a['href']
                        http = ("https://","http://")
                        if not any(keyword in href for keyword in http):
                            base_url = "https://www.unicef.org"
                            url= "{}{}".format(base_url,href)
                            urls[tag].append(url)
                        elif not a['href'].startswith(("https://undocs.org/E/ICEF","http://undocs.org/E/ICEF", "https://unsdg.un.org/")):
                            urls[tag].append(a['href'])
                            pass

df = pd.DataFrame.from_dict(urls,orient='index').transpose()
df.head(5)

logging.info(f"Downloaded HTMLs from URLS, completed at {today} {time.strftime('%H:%M:%S', time.gmtime())} GMT")


  0%|          | 0/3 [00:00<?, ?it/s]

In [26]:
logging.info(f"Downloading Files from URLS, start at {today} {time.strftime('%H:%M:%S', time.gmtime())} GMT")

for tag in urls:

    if tag == "CPD":        
        cpd_urls = urls["CPD"]
        for url in tqdm(cpd_urls):
            doc_type = sort_url(url)
            language = sort_language(url)
            filename = url.split("/")[-1]
            filetype = "pdf" if url.split("/")[-1].endswith("pdf") else "none"
            scrapped_date = "5-May-2024"

            
            if language is None:
                filepath = f'../sources/{filetype}/docs/{doc_type}/'
            else:
                filepath = f"../sources/{filetype}/docs/{doc_type}/{language}/"
            
            os.makedirs(filepath, exist_ok=True)
            
            path = filepath + filename

            has_text, text = check_pdf(url, path, output_text=True)
            print( has_text, text)

            if not  os.path.exists(filepath + filename) :
                pdf_response = request(url, stream=True)
                
                #print(f"Downloading New Files from {url}")
                logging.info(f"Downloading New Files from {url}")
                
                download_pdf(path,url)


                #print(f"File: {filename} File Created: {os.path.exists(filepath + filename)} Has Text: {has_text}")
                logging.info(f"File: {filename} File Created: {os.path.exists(filepath + filename)} Has Text: {has_text}")
                data.append([doc_type,url,filepath + filename,filename,filetype,today,language,has_text,text])
                
                df = pd.DataFrame(data,columns=("doc_type","url","file location","filename","filetype","scrapped-date","language","downloaded","Content"))
                metadata_filepath = "../META-INF/"
                os.makedirs(metadata_filepath, exist_ok=True)
                df.to_csv(f"{metadata_filepath}executiveboard-cpd-metadata.csv",index=True)

            else:
                print(f"File: {filename} File Already Exist!: {os.path.exists(filepath + filename)}")
                logging.info(f"File: {filename} File Already Exist!: {os.path.exists(filepath + filename)}")

                data.append([doc_type,url,filepath + filename,filename,filetype,today,language,has_text,text])

                df = pd.DataFrame(data,columns=("doc_type","url","file location","filename","filetype","scrapped-date","language","downloaded","Content"))
                metadata_filepath = "../META-INF/"
                os.makedirs(metadata_filepath, exist_ok=True)
                df.to_csv(f"{metadata_filepath}executiveboard-cpd-metadata-test.csv",index=True)

                break

#df = pd.read_csv('../META-INF/executiveboard-cpd-metadata-test.csv')
#df.value_counts(subset=['downloaded'])

logging.info(f"Downloaded Files from URLS, completed at {today} {time.strftime('%H:%M:%S', time.gmtime())} GMT")



0it [00:00, ?it/s]