In [None]:
import json

import requests

# References

- https://github.com/catherinedevlin/ipython-sql


# Observations db setup

In [None]:
%load_ext sql

In [None]:
%sql sqlite:///../data/observations_2023-07-01_rotators.db

Re-build the indexes if they don't exist.

In [None]:
%%sql
-- Indexes make the world go 'round ... faster.
CREATE INDEX IF NOT EXISTS gs_norad_index
ON observations(ground_station, norad_cat_id);

CREATE INDEX IF NOT EXISTS gs_index
ON observations(ground_station);

CREATE INDEX IF NOT EXISTS norad_index
ON observations(norad_cat_id);

CREATE INDEX IF NOT EXISTS observer_index
ON observations(observer);

CREATE INDEX IF NOT EXISTS observer_gs_index
ON observations(observer, ground_station);

CREATE INDEX IF NOT EXISTS gs_observer_index
ON observations(ground_station, observer);


# Misc poking around

In [None]:
results = %sql select * from observations where id=7765459;

In [None]:
result = results.dict()

In [None]:
decodes = result['demoddata'][0]

In [None]:
type(decodes)

In [None]:
row = results[0]

In [None]:
demoddata = row['demoddata']

In [None]:
# Stored as a string, we get the double and single quotes mixed up in translation
# ensure we can read the data as JSON
dd = demoddata.replace("'", "\"")


In [None]:
d = json.loads(dd)
len(d)

In [None]:
d

# Observations with demoddata

The `demoddata` field is a list of dicts.  If there are no demodulated data frames, then the field will store an empty list.

This field is stored in the SQlite db as a string, so no frames will show up as the string "[]".

In [None]:
results = %sql select * from observations where id=776545;
demoddata = results[0]['demoddata']
demoddata

In [None]:
%%sql
select count(*) from observations
where length(demoddata) > 2;

So, presently about 2.2M observations with some sort of data.

Since we will be only asking about this set from here on out, create a new table with only these observations.

In [None]:
%%sql
CREATE TABLE obs_with_data AS
SELECT *
FROM observations
WHERE length(demoddata) > 2;

In [None]:
# get the size of a file without downloading it
import requests

URL = 'https://s3.eu-central-1.wasabisys.com/satnogs-network/data_obs/2023/6/25/16/7765459/data_7765459_2023-06-25T16-34-20'
with requests.get(URL, stream=True) as r:
    size = int(r.headers['content-length'])
size

In [None]:
from os.path import basename

basename('https://s3.eu-central-1.wasabisys.com/satnogs-network/data_obs/2023/6/25/16/7765459/data_7765459_2023-06-25T16-34-20')

## Make a local copy of the data frames?

Table with

- id
- datetime
- blob


In [None]:
%%sql
--DROP TABLE obs_demoddata;

CREATE TABLE obs_demoddata (
    id INTEGER,
    datetime TEXT,
    name TEXT UNIQUE,
    data BLOB);

CREATE INDEX idx_obs_demoddata ON obs_demoddata(id);

- Get the data from the observations

In [None]:
# extract the data frames from each obs
results = %sql select id,demoddata from obs_with_data ORDER BY id DESC;

In [None]:
from os.path import basename, splitext
from collections import Counter

extensions = Counter()

# count the extensions
for obs, demoddata in results:
    dd = demoddata.replace("'", "\"")
    frames = json.loads(dd)
    
    for f in frames:
        d = f['payload_demod']
        name, ext = splitext(basename(d))
        extensions[ext] += 1

extensions.most_common()

In [None]:
extensions.total()

In [None]:
56383592 / 57172304

For the moment, ignore all frame data with extensions, which is less than 2% of the total number.

In [None]:
import sqlite3

conn = sqlite3.connect('../data/observations_2023-07-01_rotators.db')

def add_frame(id, dt, name, data):
    things = {'id':id, 'dt':dt, 'name':name, 'data':data}

    with conn:
        conn.execute('INSERT OR IGNORE INTO obs_demoddata VALUES(:id, :dt, :name, :data);', things)
        

In [None]:
import requests
import requests_cache

# no expiration
requests_cache.install_cache()


sequences = Counter()
lengths = Counter()

# get the data content
for obs, demoddata in results:
    dd = demoddata.replace("'", "\"")
    frames = json.loads(dd)
    
    print(obs, end='')
    
    for f in frames:
        path = f['payload_demod']
        filename = basename(path)
        name, ext = splitext(filename)
        
        # ignore files with extensions
        if len(ext) > 0:
            continue
        
        try:
            prefix, obsid, datestr, *sequence = name.split('_')
        except:
            print(name)
            continue
        
        sequence = '' if not sequence else sequence[0]
        
        sequences[sequence] += 1
        
        year, month, dayhour, minute, second = datestr.split('-')
        dt = f'{year}-{month}-{dayhour}:{minute}:{second}'
        #print([obs, dt, sequence])
        
        # TODO: what to do with the sequence suffix?
        #print(sequence)

        # fetch the data content
        with requests.get(path) as r:
            n = int(r.headers['content-length'])
            lengths[n] += 1
            data = r.content
        
        print('.', end='')
        
        add_frame(obs, dt, filename, data)
    print()

sequences.most_common()



# put into the db table

In [None]:
import requests
import requests_cache
import concurrent.futures
import threading

# no expiration
requests_cache.install_cache()


thread_local = threading.local()

def get_session():
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
    return thread_local.session



def get_frame_content(obs, dt, filename, path):
    session = get_session()
    with session.get(path) as r:
        data = r.content
    return (obs, dt, filename, data)


def stuff_demoddata_table(id_low, id_high):
    # get the data content
    for obs, demoddata in results:
        dd = demoddata.replace("'", "\"")
        frames = json.loads(dd)

        # only deal with ids in the range
        if (obs < id_low) or (obs > id_high):
            continue

        print(obs, end=' ')
        with concurrent.futures.ThreadPoolExecutor(max_workers=20) as pool:
            futures = []

            for f in frames:
                path = f['payload_demod']
                filename = basename(path)
                name, ext = splitext(filename)

                # ignore files with extensions
                if len(ext) > 0:
                    continue

                try:
                    prefix, obsid, datestr, *sequence = name.split('_')
                except:
                    print(name)
                    continue

                sequence = '' if not sequence else sequence[0]

                sequences[sequence] += 1

                year, month, dayhour, minute, second = datestr.split('-')
                dt = f'{year}-{month}-{dayhour}:{minute}:{second}'
                #print([obs, dt, sequence])

                # TODO: what to do with the sequence suffix?
                #print(sequence)

                futures.append(pool.submit(get_frame_content, obs, dt, filename, path))


            for future in concurrent.futures.as_completed(futures):
                (obs, dt, filename, data) = future.result()
                add_frame(obs, dt, filename, data)
                print('.', end='')
            print()


In [None]:
stuff_demoddata_table(7_600_000, 7_737_381)

In [None]:
%%sql
select id, name from obs_demoddata order by id asc limit 10;

In [None]:
with open('demoddata_urls.txt', 'w') as urlfile:
    for obs, demoddata in results:
        dd = demoddata.replace("'", "\"")
        frames = json.loads(dd)

        for f in frames:
            path = f['payload_demod']
            urlfile.write(path + '\n')

In [None]:
from itertools import islice
import time

def batched(iterable, n):
    "Batch data into tuples of length n. The last batch may be shorter."
    # batched('ABCDEFG', 3) --> ABC DEF G
    if n < 1:
        raise ValueError('n must be at least one')
    it = iter(iterable)
    while batch := tuple(islice(it, n)):
        yield batch


with open('demoddata_urls.txt', 'r') as urls:
    with concurrent.futures.ThreadPoolExecutor(max_workers=40) as pool:
        futures = []

        
        
        for batch in batched(urls, 1000):
            t1 = time.perf_counter()
            for pathline in batch:
                path = pathline.rstrip()
                filename = basename(path)
                name, ext = splitext(filename)

                # ignore files with extensions
                if len(ext) > 0:
                    continue

                try:
                    prefix, obsid, datestr, *sequence = name.split('_')
                except:
                    print(name)
                    continue

                obs = int(obsid)
                sequence = '' if not sequence else sequence[0]

                # skip things already touched
                if obs > 7656518:
                    continue
                    
                year, month, dayhour, minute, second = datestr.split('-')
                dt = f'{year}-{month}-{dayhour}:{minute}:{second}'
                #print([obs, dt, sequence])

                # TODO: what to do with the sequence suffix?
                #print(sequence)

                futures.append(pool.submit(get_frame_content, obs, dt, filename, path))

                if pathline == batch[0]:
                    print(obs, end=' -> ')
                elif pathline == batch[-1]:
                    print(obs, end=' ')

            for future in concurrent.futures.as_completed(futures):
                (obs, dt, filename, data) = future.result()
                add_frame(obs, dt, filename, data)
            
            if len(futures):
                t2 = time.perf_counter()
                print(f"{t2-t1:0.2f} seconds / batch")


In [None]:
conn.close()