# core

> Fill in a module description here

In [None]:
#| default_exp core

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export 
import httpx
import zipfile
from fastcore.all import *
from fastcore.utils import *
import pandas as pd

In [None]:
#| hide 
df = pd.read_csv("company1.csv", index_col=0)
df.head()

Unnamed: 0,TATAMOTORS,ASHOKLEY,OLECTRA,FORCEMOT,SMLISUZU
Financial Year 2024,https://www.bseindia.com/stockinfo/AnnPdfOpen....,https://www.bseindia.com/stockinfo/AnnPdfOpen....,https://www.bseindia.com/stockinfo/AnnPdfOpen....,https://www.bseindia.com/stockinfo/AnnPdfOpen....,https://www.bseindia.com/stockinfo/AnnPdfOpen....
Financial Year 2023,https://www.bseindia.com/stockinfo/AnnPdfOpen....,https://www.bseindia.com/stockinfo/AnnPdfOpen....,https://www.bseindia.com/stockinfo/AnnPdfOpen....,https://www.bseindia.com/stockinfo/AnnPdfOpen....,https://www.bseindia.com/stockinfo/AnnPdfOpen....
Financial Year 2022,https://www.bseindia.com/bseplus/AnnualReport/...,https://www.bseindia.com/bseplus/AnnualReport/...,https://www.bseindia.com/bseplus/AnnualReport/...,https://www.bseindia.com/bseplus/AnnualReport/...,https://www.bseindia.com/bseplus/AnnualReport/...
Financial Year 2021,https://www.bseindia.com/bseplus/AnnualReport/...,https://www.bseindia.com/bseplus/AnnualReport/...,https://www.bseindia.com/bseplus/AnnualReport/...,https://www.bseindia.com/bseplus/AnnualReport/...,https://www.bseindia.com/bseplus/AnnualReport/...
Financial Year 2020,https://www.bseindia.com/bseplus/AnnualReport/...,https://www.bseindia.com/bseplus/AnnualReport/...,https://www.bseindia.com/bseplus/AnnualReport/...,https://www.bseindia.com/bseplus/AnnualReport/...,https://www.bseindia.com/bseplus/AnnualReport/...


In [None]:
#| export
def make_dirs(base:Path, df:pd.core.frame.DataFrame):
    """
    Takes in base directory make all the directory for given dataframe wrt index and and columns in hierarchy
    Example:
    If df has columns ['A', 'B'] and index ['X', 'Y'], the following structure is created:
    
    base/
    ├── A/
    │   ├── X/
    │   └── Y/
    └── B/
        ├── X/
        └── Y/
    """
    base.mkdir(exist_ok=True)
    for c in df.columns:
        dir = base/c
        dir.mkdir(exist_ok=True)
        for i in df.index:
            if "Right " not in i:
                (dir/i).mkdir(exist_ok=True)

## download url and rename

In [None]:
dir = Path("result")
make_dirs(dir, df)

In [None]:
#| export 
import httpx
async def download_url(url:str, fn:Path):
    """
    download given url and write it a given fn.
    """
    headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
    'Accept': 'text/html,application/pdf,*/*'
    }
    timeout = httpx.Timeout(connect=20.0, read=120.0, write=30.0, pool=60.0)

    async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client:
        try:
            res = await client.get(url, headers=headers)
            if res.status_code == 200:
                with open(fn, 'wb') as f:
                    f.write(res.content)
                return True
            else:
                print(f"Error downloading: Status code {res.status_code}")
                return False
        except httpx.RequestError as e:
            print(f"Request failed: {e}")
            return False

In [None]:
url = "https://www.bseindia.com/xml-data/corpfiling/AttachHis/0913b647-b205-4a3c-ad9b-8c493f97f972.pdf"
fn = dir/"temp.pdf"
await download_url(url, fn)
assert fn.is_file()

True

In [None]:
url = "https://archives.nseindia.com/annual_reports/AR_ASHOKLEY_2009_2010_18082010121500.zip"
fn = dir/"temp.zip"
await download_url(url, fn)
assert fn.is_file() 

In [None]:
#| hide
fn = dir/"1.pdf"
url = "https://static-assets.tatamotors.com/Production/www-tatamotors-com-NEW/wp-content/uploads/2024/05/tata-motor-IAR-2023-24.pdf"
await download_url(url, fn)
assert fn.is_file()

True

In [None]:
#| export
def rn_zip_extract(zipf, fn):
    """
    Donwload and extract the given files with filename fn 
    """
    home = Path(zipf).parent
    with zipfile.ZipFile(zipf, 'r') as zip_ref:
        f = zip_ref.namelist()[0]
        zip_ref.extractall(path=home)
        fi = home/f
        fi.rename(fn)

In [None]:
p = dir/"i.pdf"
rn_zip_extract(fn, p)
assert p.is_file()


In [None]:
#| hide
!rm -rf result/*.pdf result/*.zip

In [None]:
#| export 
async def download_rename(url, fn, dir):
    """
    download given url and rename the file only handles zip and pdf
    """
    fn, dir = Path(fn), Path(dir)
    dir.mkdir(exist_ok=True)

    if ".zip" in url:
        zip = dir/"temp.zip"
        await download_url(url, zip)
        rn_zip_extract(zip, fn)
        Path(dir/"temp.zip").unlink()
    elif ".pdf" in url:
        await download_url(url, fn)

In [None]:
fn, dir

(Path('result/1.pdf'), Path('result'))

In [None]:
await download_rename(url, fn, dir)
assert fn.is_file()

## Pdf functions

In [None]:
#| export
import PyPDF2

def pdfPC(pth):
    """
    given pdf file name returns the no of pages 
    """
    with open(pth, 'rb') as f:
        reader = PyPDF2.PdfReader(f)
        return len(reader.pages)

In [None]:
PC = pdfPC(fn)
assert PC != 0

In [None]:
#| export 
from pdf2image import convert_from_path
from PIL.PpmImagePlugin import PpmImageFile
from typing import Tuple, Callable, List, Optional
from PIL import Image

In [None]:
#| export 
def pdf2imgOffset(
    fn: str, 
    limit: Tuple[int, int], 
    transform: Optional[Callable[[Image.Image], Image.Image]] = None,
    dir:Optional[Path] = None) -> List[Image.Image]:
    """
    given a pdf file it `fn` and limit -> [first_page, last_page] it return pdf in image
    transform: performs the operation on a given image if given
    dir: save the images to a file
    """
    fp, lp = limit
    ims =  convert_from_path(str(fn), dpi=200, first_page=fp, last_page=lp)
    
    if transform is not None:
        ims = [transform(i) for i in ims]

    if dir:
        [im.save(dir/"{0}.png".format(fp+i)) for i, im in enumerate(ims) ]  
    else:  
        return ims

In [None]:
#| export 
def devide_img(im:Image, n:float=0.5):
    """
    Devide the image by factor `n`
    """
    w, h = im.size
    im.thumbnail((int(w*n), int(h*n) ), Image.LANCZOS)
    return im

In [None]:
im1 = pdf2imgOffset(fn, (0, 10)) # reads first 10 pages
assert type(im1) == list
assert len(im1) == 10

In [None]:
im2 = pdf2imgOffset(fn, (0, 10), devide_img) # read first 10 pages then dived the image by factor of 0.5
assert all([im2[0].size[0] * .5 <= im1[0].size[0], im2[0].size[1] * 0.5 <= im1[0].size[1]])

In [None]:
im2 = pdf2imgOffset(fn, (0, 10), devide_img, dir) # read 10 pages, device the image and save it to dir
assert all([(dir/f"{i}.png").is_file() for i in range(10)])

In [None]:
#| hide
[(dir/f"{i}.png").unlink() for i in range(10)]

[None, None, None, None, None, None, None, None, None, None]

In [None]:
from math import ceil

def pdf2img(_process_chunk:Callable, pdf_path:Path, offset:int, n_workers:int=4):
    
    PC = pdfPC(pdf_path)
    li = [(0, offset) if i == 0 else (i * offset + 1, i * offset + offset) for i in range(ceil(PC/offset))]
    print(li)
    
    return parallel(_process_chunk, li, progress=True, n_workers=n_workers), li

In [None]:
def _process_chunk(offset):
    return pdf2imgOffset(fn=fn, limit=offset, transform=devide_img, dir=dir)

In [None]:
ims, li = pdf2img(_process_chunk, fn, 10)
assert all([all([(dir/f"{i}.png").is_file() for i in range(a[0], a[1])]) for a in li])

[(0, 10), (11, 20), (21, 30), (31, 40), (41, 50), (51, 60), (61, 70), (71, 80), (81, 90), (91, 100), (101, 110), (111, 120), (121, 130), (131, 140), (141, 150), (151, 160), (161, 170), (171, 180), (181, 190), (191, 200), (201, 210), (211, 220), (221, 230), (231, 240), (241, 250), (251, 260), (261, 270), (271, 280), (281, 290), (291, 300), (301, 310), (311, 320), (321, 330), (331, 340), (341, 350), (351, 360), (361, 370), (371, 380), (381, 390), (391, 400), (401, 410), (411, 420), (421, 430), (431, 440), (441, 450), (451, 460), (461, 470), (471, 480), (481, 490), (491, 500), (501, 510), (511, 520), (521, 530)]


In [None]:
#| hide
!rm result/*.png

## Save file to md file

In [None]:
#| export 
def save_md(fn: str, txt: str):
    """
    Writes the given text to a markdown file as binary (UTF-8 encoded).
    Parameters:
    fn (str): Filename or path to save the markdown file.
    txt (str): Content to write to the file.
    """
    try:
        # Open the file in binary write mode and encode the text to bytes
        with open(fn, 'wb') as f:
            f.write(txt.encode('utf-8'))
    except Exception as e:
        print(f"Error writing to file {fn}: {e}")
    

In [None]:
fn = dir/"temp.md"
save_md(fn, "# h1\n- 1\n- 2")
assert fn.is_file()

In [None]:
#| hide
!rm -rf  result/

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()

JSONDecodeError: Expecting value: line 1 column 1 (char 0)