# Utils

> Utility functions used throughout the project.


In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
#| default_exp utils

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
# | export
import tarfile
import json
import pandas as pd
from fastcore.all import Path

from nbdev.config import get_config
from pathlib import Path
from io import TextIOWrapper

import warnings
warnings.filterwarnings("ignore")

In [None]:
#| export

import logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger(__name__)

In [None]:
#| export
def get_repo_directory():
    cfg = get_config()
    project_root = Path(cfg.nbs_path).parent
    return project_root

In [None]:
#| hide
#| eval: false
repo_directory = get_repo_directory()
repo_directory

In [None]:
#| export
def get_data_directory():
    data_directory = get_repo_directory() / 'data'
    data_directory.mkdir(parents=True, exist_ok=True)
    return data_directory

In [None]:
#| hide
#| eval: false
data_directory = get_data_directory()
data_directory

In [None]:
# | export
def get_latest_file(directory: Path):
    "Return the latest file, sorting by name for a given directory. Only returns files, not directories."
    files = (
        directory
        .ls()
        .filter(lambda f: not f.name.startswith('.') and f.is_file())
        .sorted(key=lambda f: f.name)
    )
    if len(files) > 0:
        return files[-1]

In [None]:
#| hide
#| eval: false

directory = data_directory / 'bmtc' / 'raw' / 'vehicles'
for item in sorted(directory.ls()):
    print(item)

filepath = get_latest_file(directory)
print(f'\nLatest file in the directory: \n{filepath}')

In [None]:
# | export
def extract_file(filepath: Path):

    if filepath.suffix != ".gz":
        return filepath

    extract_dir = filepath.parent
    extracted_files = []

    with tarfile.open(filepath, "r:gz") as tar:
        for member in tar.getmembers():
            if Path(member.name).name.startswith("._"):
                continue
            tar.extract(member, path=extract_dir)
            extracted_files.append(extract_dir / member.name)

    for du_file in extract_dir.glob("._*"):
        du_file.unlink()

    if len(extracted_files) != 1:
        assert False, f"Expected 1 file, got {len(extracted_files)}"
    return extracted_files[0]

In [None]:
#| hide
#| eval: false

extract_file(filepath)

In [None]:
# | export
def extract_files(filepath: Path):
    
    # When the filepath is not compressed, return only files in the directory.
    if filepath.suffix != ".gz":
        return [p for p in filepath.ls() if p.is_file()]

    extract_dir = filepath.parent
    extracted_files = []

    with tarfile.open(filepath, "r:gz") as tar:
        for member in tar.getmembers():
            if not member.isfile():  # skip directories
                continue
            if Path(member.name).name.startswith("._"):  # skip macOS junk
                continue
            tar.extract(member, path=extract_dir)
            extracted_files.append(extract_dir / member.name)

    # Remove duplicate macOS junk files if any slipped through
    for du_file in extract_dir.glob("._*"):
        du_file.unlink()

    return extracted_files

In [None]:
#| hide
#| eval: false

directory = data_directory / 'bmtc' / 'raw' / 'trip_details'
for item in sorted(directory.ls()):
    print(item)

filepath = get_latest_file(directory)
print(f'\nLatest file in the directory: \n{filepath}')

In [None]:
#| hide
#| eval: false

extracted_files = extract_files(filepath)
print(len(extracted_files))

for file in extracted_files[:5]:
    print(file)

In [None]:
# | export
def get_latest_directory(directory: Path):
    "Return the latest subdirectory, sorting by name for a given directory. Only returns directories, not files."
    directories = (
        directory
        .ls()
        .filter(lambda f: not f.name.startswith('.') and f.is_dir())
        .sorted(key=lambda f: f.name)
    )
    
    if len(directories):
        latest = directories[-1]
        return latest

In [None]:
#| hide
#| eval: false

directory = data_directory / 'bmtc' / 'raw' / 'trip_details'
for item in sorted(directory.ls()):
    print(item)

latest = get_latest_directory(directory)
print(f'\nLatest directory in the directory: \n{latest}')

In [None]:
#| export
def read_file(filepath: Path, format: str = 'json'):
    "Read a file in either JSON or CSV format and return its contents."
    with open(filepath) as f:
        if format == 'json':
            return json.load(f)
        elif format == 'csv':
            return pd.read_csv(f)
        else:
            raise ValueError(f"Unsupported format: {format}")

In [None]:
#| hide
#| eval: false

directory = data_directory / 'bmtc' / 'raw' / 'vehicles'
read_file(extract_file(get_latest_file(directory)))

In [None]:
#| export
def append_to_file(filepath: Path, record: dict):
    filepath.parent.mkdir(parents=True, exist_ok=True)
    record_str = json.dumps(record)

    existing_lines = set()
    if filepath.exists():
        with open(filepath, 'r', encoding='utf-8') as f:
            existing_lines = set(line.strip() for line in f)

    if record_str not in existing_lines:
        with open(filepath, 'a', encoding='utf-8') as f:
            f.write(record_str + "\n")
        logging.info(f"Record added.")
    else:
        logging.info("Record already exists.")


In [None]:
#| export
def extract_file_name(path: Path) -> str:
    if not path.suffix:
        return path.name
    return extract_file_name(path.with_suffix(''))


In [None]:
#| hide
#| eval: false

extract_file_name(filepath)

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()