In [None]:
%run ../talktools.py
!rm galaxy1000.csv
!rm test.csv
!rm -r joblib_cache

# A (Simplistic) Introduction to Data Caching

<div style="text-align: center"><font size=-1>AY128/256 UC Berkeley (2024)</font> </div> 

<quote>
    <i><font color="grey">"There are two hard things in Computer Science, cache invalidation, naming things, and off-by-one errors."</i></font>
    <div style="text-align: right"><font color="grey">--Socrates</font></div>

</quote>


<b>Motivation</b>: In data-intensive workloads, like the ones you'll be building in this class, it's a good idea to think about your data caching strategies up front. During exploratory analysis, especially when interacting with external sources of databases (e.g., flat files hosted remotely, external databases, etc.) sensible caching can save you a bunch of time in not needing to (re)download data. 

Even for pure local interactions, you might want to cache the results of computation to avoid having to rerun long-running/expensive calcuations.

In the context of reproducbility/replicability, having a cache/store of the data inputs will make it easier for others to redo the steps you did and get the same results.

## Persistent or Ephemeral?

Every time you run an expensive computation or do a time-intensive query, you implicitly store the results in emphemeral Python object.

Let's get a CSV file with some data about galaxies from SDSS:

In [None]:
import time
import os
import io

import pandas as pd
import requests  # requests is the standard URL

external_file_location = "https://raw.githubusercontent.com/AstroHackWeek/AstroHackWeek2014/master/day4/galaxy1000.csv"
local_filename = os.path.basename(external_file_location)

In [None]:
# ATTEMPT 1: download the CSV and load it into a DataFrame
r = requests.get(external_file_location)
df = pd.read_csv(io.StringIO(r.text))
df

Great: we can use the pandas DataFrame `df` in the downstream analysis. The **problem of course is that we rerun our notebook, we need to redownload the data**. This ephemeral caching is usually just fine when the data is small and the computations are quick, but generally slows us down with larger files and beefier computations. 

Why don't we modify the above to check to see if the file exists and load it in if so. That is, let's create a persistent store of our data:

In [None]:
def get_galaxy_dataframe():
    """
    get the galaxy CSV file from SDSS if we dont have it already
    and parse it into a pandas DataFrame. Save the file
    for future use.
    
    Returns: dataframe
    """
    if not os.path.exists(local_filename):
        start = time.time()
        r = requests.get(external_file_location)

        # note: below is for smallish files. To download and save larger files
        # see https://stackoverflow.com/a/14114741
        with open(local_filename, 'w') as handle:
            handle.write(r.text)

        print(f"Wrote the file {local_filename} to disk")
        # from the Python object `r` instead of from disk to avoid disk IO
        df = pd.read_csv(io.StringIO(r.text))
        print(f"  Total time: {time.time() - start:0.3} sec")
    else:
        start = time.time()
        print(f"Reading the file {local_filename} from disk")
        df = pd.read_csv(local_filename)
        print(f"  Total time: {time.time() - start:0.3} sec")
    return df

In [None]:
#!rm galaxy1000.csv
get_galaxy_dataframe()

Hey, the capability (download and save, otherwise load) might be something we want to use a lot. Let's make the function a big more generic:

In [None]:
def get_and_parse_cached_remote_csvfile(url, local_filename=None, verbose=True):
    """
    get a CSV file at `url` if we dont have it already
    and parse it into a pandas DataFrame. Save the file
    for future use. Guess the output filename if not given.
    
    Returns: dataframe
    """
    
    # here we might error check to see if the URL is valid, points to a CSV file
    # etc.
    if local_filename is None:
        local_filename = os.path.basename(url)
        
    if not os.path.exists(local_filename):
        start = time.time()
        r = requests.get(url)

        # note: below is for smallish files. To download and save larger files
        # see https://stackoverflow.com/a/14114741
        with open(local_filename, 'w') as handle:
            handle.write(r.text)
        if verbose:
            print(f"Wrote the file {local_filename} to disk", flush=True)
        # from the Python object `r` instead of from disk to avoid disk IO
        df = pd.read_csv(io.StringIO(r.text))
        if verbose:
            print(f"  Total time: {time.time() - start:0.3} sec")
    else:
        start = time.time()
        if verbose:
            print(f"Reading the file {local_filename} from disk")
        df = pd.read_csv(local_filename)
        if verbose:
            print(f"  Total time: {time.time() - start:0.3} sec")
    return df

In [None]:
df = get_and_parse_cached_remote_csvfile(external_file_location)

In [None]:
df = get_and_parse_cached_remote_csvfile(external_file_location, local_filename="test.csv", verbose=False)

Let's take a look at a similar workflow for external queries. 

In [None]:
import astropy.units as u
from astropy.coordinates import SkyCoord
from astroquery.gaia import Gaia

In [None]:
query = """ -- Get some nearby sources
        SELECT top 1000 
            ra,dec,source_id,parallax,pm
        FROM gaiaedr3.gaia_source 
        WHERE parallax_over_error > 15.0
            AND parallax > 200.0
        ORDER BY parallax DESC"""

In [None]:
def get_gaia_query(q):
    start = time.time()
    job = Gaia.launch_job(q)
    print(f"Total time: {time.time()-start:0.2f} sec")
    return job.get_results()

result_table = get_gaia_query(query)
print(result_table)

Great: we can use `result_table` in the downstream analysis. The problem of course is that we rerun our notebook we have to download the data again (and the computation on the remote side needs to happen as well. This isn't ecofriendly!)

We might be better off memorizing the result for a given input and save the data locally. We could do this "by hand" but [`joblib`](https://joblib.readthedocs.io/en/latest/) was built for just this (and parallel computing):

<pre>
    The Memory class defines a context for lazy evaluation of function, by putting the results in a store, by default using a disk, and not re-running the function twice for the same arguments. It works by explicitly saving the output to a file and it is designed to work with non-hashable and potentially large input and output data types such as numpy arrays.
</pre>

In [None]:
from joblib import Memory
cachedir = './joblib_cache'
memory = Memory(cachedir, verbose=0, bytes_limit=1e7)

In [None]:
@memory.cache
def get_gaia_query(q):
    start = time.time()
    job = Gaia.launch_job(q)
    print(f"Total time: {time.time()-start:0.2f} sec")
    return job.get_results()

Use `memory` as a function decorator. Every time `get_gaia_query` is called now, the variable `q` will be checked. If we've seen this before, then joblib will just return the output from last time. Otherwise we'll run this function and save the results in the `joblib_cache` directory.

In [None]:
result_table = get_gaia_query(query)

In [None]:
%time result_table = get_gaia_query(query)

<font size=+1 color=red>**Question**</font>: In what case's might you NOT want to cache query results?

In [None]:
def get_weather_alerts(area="CA"):
    r = requests.get(f"https://api.weather.gov/alerts/active?area={area}")
    zone = r.json()["features"][0]["properties"]["areaDesc"]
    return f'{zone}: {r.json()["features"][0]["properties"]["headline"]}'

In [None]:
print(get_weather_alerts())