In [1]:
import maidenhead
import datetime
import pandas as pd
import numpy as np
import dask.dataframe as dadf
import glob
import pyarrow.parquet as pq
import pyarrow as pa

import easier as ezr
import duckdb
import os
import sys
from sqlalchemy import create_engine
from dateutil.parser import parse
import pypeln as pyp

# import datashader.spatial.points as dsp

In [15]:
class SpotBase:
    FILE_NAME = './data/wsprnet/spots.db'
    
    def __init__(self):
        if not os.path.isfile(self.FILE_NAME):
            raise RuntimeError(f'Database file not found: {self.FILE_NAME}')
            
        self.engine = create_engine(f'sqlite:///{self.FILE_NAME}')
        
class Frame:

    @property       
    def columns(self):
        return [
            'time',
            'call_sign',
            'reporter',
            'lat',
            'lon',
            'lat_reporter',
            'lon_reporter',
            'distance',
            'power',
            'snr',
            'time_local'        
        ]
    
    @property
    def input_columns(self):
        return [
            'spot_id',
            'timestamp',
            'reporter',
            'reporters_grid',
            'snr',
            'frequency',
            'call_sign',
            'grid',
            'power',
            'drift',
            'distance',
            'azimuth',
            'band',
            'version',
            'code'
        ]
            
    def load_frame(self, file_name):

        df = pd.read_csv(
            file_name,
            names=self.input_columns,
            compression='gzip',
        )
        return df
    
    def process_frame(self, df):
        df['lat_reporter'], df['lon_reporter'] = list(zip(*(maidenhead.to_location(m) for m in df.reporters_grid)))
        df['lat'], df['lon'] = list(zip(*(maidenhead.to_location(m) for m in df.grid)))
        df = df[df.code == 0]

        df['time'] = df.timestamp // 3600

        df = df.groupby(
            by=['time', 'call_sign', 'reporter', 'lat', 'lon', 'lat_reporter', 'lon_reporter', 'distance']
        )[['power', 'snr']].mean()
        df = df.reset_index()
        df['time_local'] = [t + h for (t, h) in zip(df.time, df.lon * 12 / 180)]

        return df
    
        
        

    
class Populator(SpotBase, Frame):

    def __init__(self, overwrite=False):
        if overwrite and os.path.isfile(self.FILE_NAME):
            os.unlink(self.FILE_NAME)
        
        db_exists = os.path.isfile(self.FILE_NAME)
        self.engine = create_engine(f'sqlite:///{self.FILE_NAME}')
        
        if not db_exists:
            self.create_table()

    def create_table(self):
        with self.engine.connect() as con:
            cmd = """
                CREATE TABLE spots(
                    time INT8,
                    call_sign CHARACTER(12),
                    reporter CHARACTER(12),
                    lat DOUBLE,
                    lon DOUBLE,
                    lat_reporter DOUBLE,
                    lon_reporter DOUBLE,
                    distance INT8,
                    power INT2,
                    snr INT2,
                    time_local DOUBLE      
                )
            """
            con.execute(cmd)
            
    def import_frame(self, df):
        df.to_sql('spots', con=self.engine, if_exists='append', index=False)        
            
    def create_index(self):
        commands = """
            CREATE INDEX IF NOT EXISTS time_ind ON spots (time);
        """
#         commands = """
#             CREATE INDEX IF NOT EXISTS time_ind ON spots (time);
#             CREATE INDEX IF NOT EXISTS lat_ind ON spots (lat);
#             CREATE INDEX IF NOT EXISTS lon_ind ON spots (lon);
#             CREATE INDEX IF NOT EXISTS lat_reporter_ind ON spots (lat_reporter);
#             CREATE INDEX IF NOT EXISTS lon__reporter_ind ON spots (lon_reporter);
#         """
        
        with self.engine.connect() as con:
            for command in commands.split('\n'): 
                con.execute(command)
        


In [11]:
from tqdm.auto import tqdm
files = sorted(glob.glob('./data/wsprnet/2020/wsprspots-2020-*.csv.gz'))

pbar1 = tqdm(total=len(files), desc='loaded')
pbar2 = tqdm(total=len(files), desc='processed')
pbar3 = tqdm(total=len(files), desc='imported')

    
def get_frame(file_name):
    df = Frame().load_frame(file_name)
    pbar1.update()
    return (file_name, df)

def process_frame(tup):
    file_name, df = tup
    df = Frame().process_frame(df)
    return (file_name, df)

def post_process(tup):
    pbar2.update()
    return tup


def initialize_db():
    Populator(overwrite=True)    
    
def populate_frame(tup):
    file_name, df = tup
    Populator(overwrite=False).import_frame(df)
    pbar3.update()
    return file_name

data = (
    files
    | pyp.thread.map(get_frame, workers=2, maxsize=1)
    | pyp.process.map(process_frame, workers=4, maxsize=1)
    | pyp.thread.map(post_process, workers=1, maxsize=1)
    | pyp.thread.map(populate_frame, workers=1, on_start=initialize_db, maxsize=1)
    | list
)


HBox(children=(FloatProgress(value=0.0, description='loaded', max=1064.0, style=ProgressStyle(description_widt…

HBox(children=(FloatProgress(value=0.0, description='processed', max=1064.0, style=ProgressStyle(description_w…

HBox(children=(FloatProgress(value=0.0, description='imported', max=1064.0, style=ProgressStyle(description_wi…


















In [21]:
class Loader(SpotBase):
    EPOCH = datetime.datetime(1970, 1, 1)
    
    def get(self, starting, ending_exclusive):
        min_hours = int(np.floor((starting - self.EPOCH).total_seconds() / 3600))
        max_hours = int(np.floor((ending_exclusive - self.EPOCH).total_seconds() / 3600))
        
        sql = f"""
            SELECT 
                * 
            FROM 
                spots 
            WHERE
                time >= {min_hours}
            AND
                time < {max_hours}
        """
#         sql = f"""
#             SELECT 
#                 (SELECT MIN(time) from spots) as min_time,
#                 (SELECT MAX(time) from spots) as max_time
                
#         """
        
        
        df = pd.read_sql_query(sql, self.engine)
        
        
        df = df.rename(columns={'time': 'timestamp', 'time_local': 'timestamp_local'})
        df['time'] = self.EPOCH + pd.to_timedelta(df.timestamp, 'H')
        df['time_local'] = self.EPOCH + pd.to_timedelta(df.timestamp_local, 'H')
        df['hour'] = df.time.dt.hour
        df['hour_local'] = df.time_local.dt.hour
        
        
        return df
    
starting, ending = datetime.datetime(2020, 8, 15), datetime.datetime(2020, 8, 16)

loader = Loader()
# df = loader.get(starting, ending)
# df.head()

with ezr.Timer('query'):
    df = loader.get(starting, ending)
    df = df[[
        'time',
        'time_local',
        'hour',
        'hour_local',       
        'call_sign',
        'reporter',
        'lat',
        'lon',
        'lat_reporter',
        'lon_reporter',
        'distance',
        'power',
        'snr',
    ]]
    
display(df.head())
len(df)
    

__time__,2.160313,query


Unnamed: 0,time,time_local,hour,hour_local,call_sign,reporter,lat,lon,lat_reporter,lon_reporter,distance,power,snr
0,2020-08-15,2020-08-14 23:51:39.999999600,0,23,2E0DSS,AJ8S/1,52.458333,-2.083333,44.208333,-71.0,4976,33.0,-19.0
1,2020-08-15,2020-08-14 23:51:39.999999600,0,23,2E0DSS,DB9OH,52.458333,-2.083333,52.333333,10.75,869,33.0,-18.0
2,2020-08-15,2020-08-14 23:51:39.999999600,0,23,2E0DSS,DC5AL-R,52.458333,-2.083333,51.416667,6.916667,627,33.0,-13.0
3,2020-08-15,2020-08-14 23:51:39.999999600,0,23,2E0DSS,DD5XX-3,52.458333,-2.083333,48.708333,9.25,901,33.0,-24.5
4,2020-08-15,2020-08-14 23:51:39.999999600,0,23,2E0DSS,DD9LH,52.458333,-2.083333,54.166667,9.083333,765,33.0,-8.0


671706

In [19]:
len(df)

81203

In [114]:
pop = Populator(overwrite=False)
pop.create_index()

In [31]:
sq = SQ(overwrite=True)
files = glob.glob('./data/wsprnet/2020/wsprspots-2020-01_a*.csv.gz')
for file in tqdm(files):
    sq.import_file(file)
print('creating index')
with ezr.Timer('indexing'):
    sq.create_index()


100%|██████████| 26/26 [02:15<00:00,  5.21s/it]

creating index





NameError: name 'duck' is not defined

In [32]:
with ezr.Timer('indexing'):
    sq.create_index()

__time__,4.649083,indexing


In [23]:
with sq.engine.connect() as con:
    result = con.execute("""
        SELECT 
            name
        FROM 
            sqlite_master 
        WHERE 
            type ='table' AND 
            name NOT LIKE 'sqlite_%';
    """)
    for rec in result:
        print(rec)

('spots',)


In [24]:
with sq.engine.connect() as con:
    result = con.execute("""
        SELECT 
            *
        FROM 
            sqlite_master 
    """)
    for rec in result:
        display(dict(rec))


{'type': 'table',
 'name': 'spots',
 'tbl_name': 'spots',
 'rootpage': 2,
 'sql': 'CREATE TABLE spots(\n                spot_id INT8,\n                timestamp INT8,\n                reporter VARCHAR,\n                reporters_grid VARCHAR,\n                snr INT2,\n                frequency DOUBLE,\n                call_sign VARCHAR,\n                grid VARCHAR,\n                power INT2,\n                drift INT2,\n                distance INT8,\n                azimuth INT8,\n                band INT2,\n                version VARCHAR,\n                code INT2\n            )'}

In [25]:
df.to_sql('spots', con=sq.engine, if_exists='append', index=False)

In [27]:
with sq.engine.connect() as con:
    result = con.execute("""
        SELECT 
            *
        FROM 
            spots
        LIMIT 5;
    """)
    
    xxx = pd.DataFrame(result, columns=sq.columns())
    
xxx


Unnamed: 0,spot_id,timestamp,reporter,reporters_grid,snr,frequency,call_sign,grid,power,drift,distance,azimuth,band,version,code
0,1913207476,1577836800,DP0GVN,IB59uh,-23,7.040077,2E0ETU,IO81wv,27,0,13640,182,7,,0
1,1913208199,1577836800,IW2NKE,JN63np,-10,7.040077,2E0ETU,IO81wv,27,0,1458,123,7,,0
2,1913208025,1577836800,IZ3EAW/B,JN55xi,-20,7.040029,2E0ETU,IO81wv,27,0,1262,120,7,1.3 Kiwi,0
3,1913206488,1577836800,KF4TEK,FM18dr,-22,7.040079,2E0ETU,IO81wv,27,0,5810,287,7,2.1.2,0
4,1913208961,1577836800,OE9HLH,JN47um,-25,7.040077,2E0ETU,IO81wv,27,0,978,115,7,,0
