## 选源

In [None]:
import requests
from pathlib import Path
from os import path,makedirs
#from datetime import date

def discos_query(COSPARID = None,NORADID = None,ObjectClass = None,Decayed=None,DecayDate = None,Mass = None,RCSMin = None,RCSMax = None,RCSAvg = None,sort = None,outfile=True):
    '''
    objectClass -> None, Payload, Payload Debris, Payload Fragmentation Debris, Payload Mission Ralated Object, Rocket Body, Rocket Debris, Rocket Fragmentation Debris, Rocket Mission Related Object, Unknown
    '''
    
    # discos token
    home = str(Path.home())
    direc = home + '/src/discos-data/'
    tokenfile = direc + 'discos-token'

    if not path.exists(direc): makedirs(direc)
    if not path.exists(tokenfile):
        token = input('Please input the DISCOS token(which can be achieved from https://discosweb.esoc.esa.int/tokens): ')
        outfile = open(tokenfile,'w')
        outfile.write(token)
        outfile.close()
    else:
        infile = open(tokenfile,'r')
        token = infile.readline()
        infile.close()
    
    URL = 'https://discosweb.esoc.esa.int'
    params = {}
    
    if ObjectClass is not None:
        if type(ObjectClass) is str:
            params['filter'] = "eq(objectClass,'{:s}')".format(ObjectClass)
        elif type(ObjectClass) is list:
            params_filter = []
            for element in ObjectClass:
                params_filter.append("eq(objectClass,'{:s}')".format(element))
            params['filter'] = '|'.join(params_filter)
            params['filter'] = '(' + params['filter'] + ')'
        else:
            raise Exception('Type of ObjectClass should be either string or list.')
            
    # Decayed
    if Decayed is not None:
        if Decayed is False:
            if 'filter' in params.keys():
                params['filter'] += "&(eq(reentry.epoch,null))"
            else:
                params['filter'] = "eq(reentry.epoch,null)"
        elif Decayed is True:
            if 'filter' in params.keys():
                params['filter'] += "&(ne(reentry.epoch,null))"
            else:
                params['filter'] = "ne(reentry.epoch,null)"
        else:
            raise Exception("'Decayed' must be one of 'False', 'True', or 'None'.")   
            
    # decaydate
    if DecayDate is not None:
        #date_now = date.today().isoformat()
            
        if 'filter' in params.keys():
            params['filter'] += "&(gt(reentry.epoch,epoch:'{:s}')&lt(reentry.epoch,epoch:'{:s}'))".format(DecayDate[0],DecayDate[1])
        else:
            params['filter'] = "gt(reentry.epoch,epoch:'{:s}')&lt(reentry.epoch,epoch:'{:s}')".format(DecayDate[0],DecayDate[1])
 
              
    if COSPARID is not None:
        if 'filter' in params.keys():
            if type(COSPARID) is str:
                params['filter'] += "&(eq(cosparId,'{:s}'))".format(COSPARID)
            elif type(COSPARID) is list:    
                params['filter'] += '&(in(cosparId,{:s}))'.format(str(tuple(COSPARID))).replace(' ', '')
            else:
                raise Exception('Type of COSPARID should be in str or list of str.')
        else:
            if type(COSPARID) is str:
                params['filter'] = "eq(cosparId,'{:s}')".format(COSPARID)
            elif type(COSPARID) is list:    
                params['filter'] = 'in(cosparId,{:s})'.format(str(tuple(COSPARID))).replace(' ', '')
            else:
                raise Exception('Type of COSPARID should be in str or list of str.')
            
            
    if NORADID is not None:
        if 'filter' in params.keys():
            if type(NORADID) is int:
                params['filter'] += '&(eq(satno,{:d}))'.format(NORADID)   
            elif type(NORADID) is str:
                params['filter'] += '&(eq(satno,{:s}))'.format(NORADID)
            elif type(NORADID) is list:    
                params['filter'] += '&(in(satno,{:s}))'.format(str(tuple(NORADID))).replace(' ', '')
            else:
                raise Exception('Type of NORADID should be in int, str, list of int, or list of str.')  
        else:
            if type(NORADID) is int:
                params['filter'] = 'eq(satno,{:d})'.format(NORADID)   
            elif type(NORADID) is str:
                params['filter'] = 'eq(satno,{:s})'.format(NORADID)
            elif type(NORADID) is list:    
                params['filter'] = 'in(satno,{:s})'.format(str(tuple(NORADID))).replace(' ', '')
            else:
                raise Exception('Type of NORADID should be in int, str, list of int, or list of str.') 
            
                
    if RCSMin is not None:
        if 'filter' in params.keys():
            params['filter'] += '&(gt(xSectMin,{:.4f})&lt(xSectMin,{:.4f}))'.format(RCSMin[0],RCSMin[1])
        else:
            params['filter'] = 'gt(xSectMin,{:.4f})&lt(xSectMin,{:.4f})'.format(RCSMin[0],RCSMin[1])
            
        
    if RCSMax is not None:
        if 'filter' in params.keys():
            params['filter'] += '&(gt(xSectMax,{:.4f})&lt(xSectMax,{:.4f}))'.format(RCSMax[0],RCSMax[1])  
        else:
            params['filter'] = 'gt(xSectMax,{:.4f})&lt(xSectMax,{:.4f})'.format(RCSMax[0],RCSMax[1])
            
        
    if RCSAvg is not None:
        if 'filter' in params.keys():
            params['filter'] += '&(gt(xSectAvg,{:.4f})&lt(xSectAvg,{:.4f}))'.format(RCSAvg[0],RCSAvg[1]) 
        else:
            params['filter'] = 'gt(xSectAvg,{:.4f})&lt(xSectAvg,{:.4f})'.format(RCSAvg[0],RCSAvg[1])
            
    if sort is None:    
        params['sort'] = 'satno'  
    else:
        pass

    # initial params for page
    params['page[number]'] = 1
    extract = []
    
    while True:
        params['page[size]'] = 100 # mumber of entries for each page   
        response = requests.get(f'{URL}/api/objects',headers = {'Authorization': f'Bearer {token}'},params = params)
        doc = response.json()

        if response.ok:
            data = doc['data']
            for element in data:
                extract.append(element['attributes'])
            currentPage = doc['meta']['pagination']['currentPage']
            totalPages = doc['meta']['pagination']['totalPages']
            
            print('CurrentPage {:3d} in TotalPages {:3d}'.format(currentPage,totalPages))
            
            if currentPage < totalPages: 
                params['page[number]'] += 1
            else:
                break
        else:
            return doc['errors']
        
    old_column = ['height', 'xSectMax', 'name', 'satno', 'vimpelId', 'objectClass','mass', 'xSectMin', 'depth', 'xSectAvg', 'length', 'shape', 'cosparId']
    new_column = ['Height[m]', 'RCSMax[m2]', 'Satellite Name', 'NORADID', 'VimpelId', 'ObjectClass', 'Mass[kg]', 'RCSMin[m2]', 'Depth[m]', 'RCSAvg[m2]', 'Length[m]', 'Shape', 'COSPARID']
    new_column_reorder = ['COSPARID', 'NORADID', 'VimpelId', 'Satellite Name','ObjectClass','Mass[kg]','Shape','Height[m]','Length[m]','Depth[m]','RCSMin[m2]','RCSMax[m2]','RCSAvg[m2]']
    df = pd.DataFrame.from_dict(extract,dtype=object).rename(columns=dict(zip(old_column, new_column)), errors='raise')
    df = df.reindex(columns=new_column_reorder) 
    if outfile: df.to_csv('discos_catalog.csv')
        
    return df   

In [None]:
#import numpy as np
from datetime import datetime,timedelta
from os import path,makedirs,remove
from pathlib import Path
from urllib.request import urlretrieve

def download_satcat():
    '''
    Download or update the space weather file from www.celestrak.com
    Usage: 
    swfile = download_sw([direc])
    Inputs: 
    direc -> [str, optionanl, default = $HOME+'/src/sw-data/'] Directory for storing sw file
    
    Outputs: 
    swfile -> [str] Path of sw file
    Examples:
    >>> swfile = download_sw()
    Downloading the latest space weather data ... Finished
    >>> print(swfile)
    /Users/lichunxiao/src/sw-data/SW-All.txt
    >>> swfile = download_sw('sw-data/')
    Downloading the latest space weather data ... Finished
    >>> swfile = download_sw('sw-data/')
    The existing space weather data is already up to date
    >>> print(swfile)
    sw-data/SW-All.txt
    '''
    
    home = str(Path.home())
    direc = home + '/src/satcat-data/'
    
    scfile = direc + 'satcat.txt'
    url = 'https://celestrak.com/pub/satcat.txt'

    if not path.exists(direc): makedirs(direc)

    if not path.exists(scfile):
        print('Downloading the latest satellite catalog from celestrak',end=' ... ')
        urlretrieve(url, scfile)
        print('Finished')

    else:
        modified_time = datetime.fromtimestamp(path.getmtime(scfile))
        if datetime.now() > modified_time + timedelta(days=7):
            remove(scfile)
            print('Updating the satellite catalog from celestrak',end=' ... ')
            urlretrieve(url, scfile)    
            print('Finished')
        else:
            print('The satellite catalog in {:s} is already the latest.'.format(direc))    
    return scfile

In [None]:
import numpy as np
import pandas as pd
def celestrak_query(COSPARID = None, NORADID = None, Payload = None,Decayed = None,DecayDate = None,OrbitalPeriod = None,Inclination = None,ApoAlt = None,PerAlt = None,MeanAlt = None,Country = None,sort = None,outfile=True):
    '''
    Download the Raw SATCAT Data from https://celestrak.com/pub/satcat.txt
    Set the field width according to SATCAT Format Documentation[https://celestrak.com/satcat/satcat-format.php]
    '''
    set_colspecs = [(0,11),(13,18),(19,20),(20,21),(21,22),(23,47),(49,54),(56,66),(68,73),\
                    (75,85),(87,94),(96,101),(103,109),(111,117),(119,127),(129,132)]
    satcat_file = download_satcat()
    data = pd.read_fwf(satcat_file, colspecs = set_colspecs, header = None) 

    data.columns = ['COSPARID', 'NORADID', 'Multiple Name Flag', 'Payload Flag','Operational Status Code','Satellite Name',\
                    'Country','Launch Date','Launch Site','Decay Date','Orbital period[min]','Inclination[deg]',\
                    'Apogee Altitude[km]','Perigee Altitude[km]','Radar Cross Section[m2]','Orbital Status Code']

    Mean_Alltitude = (data['Apogee Altitude[km]'] + data['Perigee Altitude[km]'])/2 # mean alltitude[kilometers]
    full_of_true = np.ones_like(Mean_Alltitude,dtype=bool)
    
    # COSPARID 
    if COSPARID is not None:
        if type(COSPARID) in [str,list]:
            COSPARID_flag = np.in1d(data['COSPARID'],COSPARID,assume_unique=True)
        else:
            raise Exception('Type of COSPARID should be in str or list of str.')             
    else:
        COSPARID_flag = full_of_true
    
    # NORADID 
    if NORADID is not None:
        if type(NORADID) is int:
            NORADID_flag = np.in1d(data['NORADID'],NORADID,assume_unique=True)
        elif type(NORADID) is str:   
            NORADID_flag = np.in1d(data['NORADID'],int(NORADID),assume_unique=True)
        elif type(NORADID) is list:
            NORADID_flag = np.in1d(data['NORADID'],np.array(NORADID).astype(int),assume_unique=True)
        else:
            raise Exception('Type of NORADID should be in int, str, list of int, or list of str.')             
    else:
        NORADID_flag = full_of_true
    
    # payload
    Payload_flag = data['Payload Flag'] == '*'
    if Payload is True:
        pass
    elif Payload is False:
        Payload_flag = ~Payload_flag
    else:
        Payload_flag = full_of_true
        
    # decayed 
    Decayed_flag = data['Operational Status Code'] == 'D'
    if Decayed is True:
        pass
    elif Decayed is False:
        Decayed_flag = ~Decayed_flag
    else:
        Decayed_flag = full_of_true
       
    # apoAlt
    if ApoAlt is not None:
        ApoAlt_flag = (data['Apogee Altitude[km]'] > ApoAlt[0]) & (data['Apogee Altitude[km]'] < ApoAlt[1])
    else:
        ApoAlt_flag = full_of_true
        
    # perAlt
    if PerAlt is not None:
        PerAlt_flag = (data['Perigee Altitude[km]'] > PerAlt[0]) & (data['Perigee Altitude[km]'] < PerAlt[1])
    else:
        PerAlt_flag = full_of_true
        
    # MeanAlt
    if MeanAlt is not None:
        MeanAlt_flag = (Mean_Alltitude > MeanAlt[0]) & (Mean_Alltitude < MeanAlt[1])
    else:
        MeanAlt_flag = full_of_true    
       
    selected_flag = COSPARID_flag & NORADID_flag & Payload_flag & Decayed_flag & ApoAlt_flag & PerAlt_flag & MeanAlt_flag
    df = data[selected_flag].drop(['Multiple Name Flag','Operational Status Code','Orbital Status Code'],axis=1)
    column_reorder = ['COSPARID', 'NORADID', 'Satellite Name','Payload Flag','Decay Date',\
                      'Orbital period[min]', 'Inclination[deg]','Apogee Altitude[km]', 'Perigee Altitude[km]',\
                      'Launch Date','Launch Site','Radar Cross Section[m2]','Country']
    df = df.reindex(columns=column_reorder)
    if outfile: df.to_csv('celestrak_catalog.csv')
    return df

In [None]:
def target_query(COSPARID=None,NORADID=None,Payload=None,ObjectClass=None,Mass=None,Decayed=None,DecayDate=None,OrbitalPeriod=None,Inclination=None,ApoAlt=None,PerAlt=None,MeanAlt=None,RCSMin=None,RCSMax=None,RCSAvg=None,Country=None,outfile=True):
    df_celestrak = celestrak_query(COSPARID,NORADID,Payload,Decayed,DecayDate,OrbitalPeriod,Inclination,ApoAlt,PerAlt,MeanAlt,Country,outfile=False).drop('Satellite Name',axis=1)
    print('Go through the DISCOS database:')
    df_discos = discos_query(COSPARID,NORADID,ObjectClass,Decayed,DecayDate,Mass,RCSMin,RCSMax,RCSAvg,outfile=False).dropna(subset=['NORADID'])
    
    nid,nid_celestrak,nid_discos = np.intersect1d(df_celestrak['NORADID'],df_discos['NORADID'],assume_unique=True,return_indices=True)
    df_celestrak = df_celestrak.iloc[nid_celestrak]
    df_discos = df_discos.iloc[nid_discos]
    
    df = pd.merge(df_celestrak, df_discos, on=['COSPARID','NORADID'],validate="one_to_one")
    df = df.drop(['Payload Flag','Radar Cross Section[m2]','VimpelId'],axis=1)
    
    column_reorder = ['COSPARID', 'NORADID', 'Satellite Name','ObjectClass','Mass[kg]','Shape',\
                      'Height[m]', 'Length[m]','Depth[m]','RCSMin[m2]', 'RCSMax[m2]', 'RCSAvg[m2]',\
                      'Orbital period[min]', 'Inclination[deg]','Apogee Altitude[km]', 'Perigee Altitude[km]',\
                      'Launch Date', 'Decay Date','Launch Site','Country']
    df = df.reindex(columns=column_reorder)
    if outfile: df.to_csv('target_catalog.csv')
    return df    

In [None]:
result = target_query(Payload = False,Decayed = False,RCSAvg = [5,100],MeanAlt = [300,800])

## TLE 轨道根数下载

In [None]:
import numpy as np
import os,glob
from pathlib import Path
from spacetrack import SpaceTrackClient

def tle_download(noradids):
    
    # for file
    if '.' in noradids:
        noradids = list(set(np.loadtxt(noradids,dtype=np.str)))
    
    # username and password for Space-Track
    home = str(Path.home())
    direc = home + '/src/spacetrack-data/'
    loginfile = direc + 'spacetrack-login'

    if not os.path.exists(direc): makedirs(direc)
    if not os.path.exists(loginfile):
        username = input('Please input the username for Space-Track(which can be created at https://www.space-track.org/auth/login): ')
        password = input('Please input the password for Space-Track: ')
        outfile = open(loginfile,'w')
        for element in [username,password]:
            outfile.write('{:s}\n'.format(element))
        outfile.close()
    else:
        infile = open(loginfile,'r')
        username = infile.readline().strip()
        password = infile.readline().strip()
        infile.close()

    st = SpaceTrackClient(username, password)
    lines_3le = st.tle_latest(norad_cat_id=noradids,ordinal=1,iter_lines=True,format='3le')
    
    # save TLE to files
    dir_TLE = 'TLE/'   
    fileList_TLE = glob.glob(dir_TLE+'*')
    if os.path.exists(dir_TLE):
        for file in fileList_TLE:
            os.remove(file)
    else:
        os.mkdir(dir_TLE) 
        
    valid_ids = []
    file_3le = open(dir_TLE+'satcat_3le.txt','w')
    for line in lines_3le:
        words = line.split()
        if words[0] == '2': valid_ids.append(words[1])
        file_3le.write(line+'\n')
    file_3le.close()   
    missing_ids = list(set(noradids)-set(valid_ids))
    if missing_ids: print(missing_ids) # empty list
    print('\n完成双行根数的下载，可到 TLE 目录下查看 3le 文件 \n ')       

In [None]:
#noradids = list(result['NORADID'][175:180])
tle_download('satno.txt')

In [10]:
from skyfield.api import Topos, Loader, load
from astropy.time import TimeDelta,Time
from astropy import units as u
from astropy.constants import R_earth,R_sun

def t_list(t_start,t_end,t_step=60):
    '''
    时间步长默认为 60s
    '''
    dt = np.around((t_end - t_start).to(u.second).value)
    t_astropy = t_start + TimeDelta(np.append(np.arange(0,dt,t_step),dt), format='sec')
    t = ts.from_astropy(t_astropy)
    return t
    
    
def next_pass(ts,satellite,site,t_start,t_end,cutoff = 10):
    '''
    时间步长默认为 60s
    '''
    passes = []
    
    P = 2*np.pi/(satellite.model.no - np.pi/(12*60)) # 卫星相对于旋转地球的轨道周期（minute）
    t_step = int(np.round(60*np.sqrt(P/120))) # 确定时间步长（s）
    t = t_list(t_start,t_end,t_step)

    sat_site = (satellite - site).at(t)
    alt_sat, az_sat, distance_sat = sat_site.altaz()
    sat_above_horizon = alt_sat.degrees > cutoff
    boundaries, = np.diff(sat_above_horizon).nonzero()
    
    if len(boundaries) == 0: return passes
        
    if sat_above_horizon[boundaries[0]]:
        boundaries = np.append(0,boundaries)
    if len(boundaries)%2 != 0:
        boundaries = np.append(boundaries,len(sat_above_horizon)-1)    
    boundaries = np.array(t[boundaries].utc_iso()).reshape(len(boundaries) // 2,2)
    seconds = TimeDelta(np.arange(t_step+1), format='sec')

    for rises,sets in boundaries:
        
        t_rise = ts.from_astropy(Time(rises)+seconds)
        sat_kum = (satellite - kum).at(t_rise)
        alt, az, distance = sat_kum.altaz()
        sat_above_horizon = alt.degrees > cutoff
        pass_rise = t_rise[sat_above_horizon][0]
        
        t_set = ts.from_astropy(Time(sets)+seconds)
        sat_kum = (satellite - kum).at(t_set)
        alt, az, distance = sat_kum.altaz()
        sat_above_horizon = alt.degrees > cutoff
        
        if sat_above_horizon[-1]:
            pass_set = t_set[sat_above_horizon][0]
        else:
            pass_set = t_set[sat_above_horizon][-1]
            
        passes.append([pass_rise.utc_iso(),pass_set.utc_iso()])     
          
    return passes

def eclipsed(sat,sun,earth,t):
    '''
    判断卫星是否在地影里
    '''
    sat_geo = sat.at(t)
    sat_geo_xyz = sat_geo.position.km
    sat_geo_d = sat_geo.distance().km
    sun_geo = (earth).at(t).observe(sun).apparent()
    sun_geo_xyz = sun_geo.position.km
    sun_geo_d = sun_geo.distance().km
    
    Radius_sun = R_sun.to(u.km).value
    Radius_earth = R_earth.to(u.km).value
    
    SinPenumbra = (Radius_sun - Radius_earth)/sun_geo_d
    CosPenumbra = np.sqrt(1 - SinPenumbra**2)
    
    CosTheta = np.sum(sat_geo_xyz*sun_geo_xyz,axis=0)/(sun_geo_d*sat_geo_d)*CosPenumbra + (sat_geo_d/Radius_earth)*SinPenumbra
    shadow = CosTheta < -np.sqrt(sat_geo_d**2-Radius_earth**2)/sat_geo_d*CosPenumbra + (sat_geo_d/Radius_earth)*SinPenumbra
    return shadow

In [None]:
from skyfield.api import Topos,Loader
from urllib.request import urlretrieve
from pathlib import Path
import os,glob
import numpy as np
import pandas as pd

def fg():

    home = str(Path.home())
    direc_eph = home + '/src/skyfield-data/ephemeris/'
    direc_time = home + '/src/skyfield-data/time_data/'
    de430 = direc_eph + 'de430.bsp'

    load_eph = Loader(direc_eph)
    load_time = Loader(direc_time)

    url = 'http://www.shareresearch.me/wp-content/uploads/2020/05/de430.bsp' # download DE430

    if not os.path.exists(de430):
        print('Downloading the JPL ephemeris de430.bsp',end=' ... ')
        urlretrieve(url, de430)
        print('Finished')

    ts = load_time.timescale()
    planets = load_eph('de430.bsp')

    print(load_time.log)
    print(load_eph.log)
    
    sun,earth = planets['sun'],planets['earth']
    dir_TLE = 'TLE/'
    sats = load.tle_file(dir_TLE+'satcat_3le.txt')

    timezone = 8  # 时区
    kum = Topos('25.0298 N', '102.7977 E',elevation_m = 1987.05) # 观测位置
    #kum = Topos('41.76 N', '87.42 E',elevation_m = 1100)
    t_start = Time('2020-06-01 00:00:00') - timezone*u.hour # 起始时间（当地）
    t_end = Time('2020-06-30 00:00:00') - timezone*u.hour # 结束时间（当地）

    dir_prediction = 'prediction/'                   
    fileList_prediction = glob.glob(dir_prediction+'*')
    if os.path.exists(dir_prediction):
        for file in fileList_prediction:
            os.remove(file)
    else:
        os.mkdir(dir_prediction)
    

    filename0 = 'VisiblePasses_bysat.csv'  
    filename1 = 'VisiblePasses_bydate.csv'

    outfile0 = open(dir_prediction+filename0,'w')
    header = ['Start Time[UTC+' + str(timezone) +']','End Time[UTC+' + str(timezone) +']','NORADID']
    outfile0.write('{},{},{}\n'.format(header[0],header[1],header[2]))

    for sat in sats:
        visible_flag = False
        noradid = sat.model.satnum
        passes = next_pass(ts,sat,kum,t_start,t_end)
        if not passes: 
            continue
        else:
            outfile = open(dir_prediction+ str(noradid) + '.txt','w')
            outfile.write('# {:18s} {:8s} {:8s} {:8s} {:8s} {:10s} {:14s} {:7s} \n'.format('Date and Time(UTC)','Alt[deg]','Az[deg]','Ra[h]','Dec[deg]','Range[km]','Solar Alt[deg]','Visible'))
        for pass_start,pass_end in passes:
        
            t = t_list(Time(pass_start),Time(pass_end),1)
            sat_kum = (sat - kum).at(t)
            sat_alt, sat_az, sat_distance = sat_kum.altaz()
            sat_ra, sat_dec, sat_distance = sat_kum.radec() 
            sun_kum = (earth+kum).at(t).observe(sun).apparent()
            sun_alt, sun_az, sun_distance = sun_kum.altaz()
            sun_beneath = sun_alt.degrees < -4 # 太阳高度角小于 -4
            shadow = eclipsed(sat,sun,earth,t)
            visible = sun_beneath & ~shadow
        
            if visible.any():
            
                visible_flag = True
            
                t_visible = np.array(t.utc_iso())[visible]
                t_visible_0 = (Time(t_visible[0])+timezone*u.hour).iso
                t_visible_1 = (Time(t_visible[-1])+timezone*u.hour).iso
                outfile0.write('{:s},{:s},{:d}\n'.format(t_visible_0,t_visible_1,noradid))

            if Time(pass_end) < t_start + 1: # 预报一天 
                for i in range(len(t)):
                    outfile.write('{:20s} {:>8.4f} {:>8.4f} {:>8.5f} {:>8.4f} {:>10.4f} {:>10.4f} {:>7d} \n'.format(t.utc_iso()[i],sat_alt.degrees[i],sat_az.degrees[i],sat_ra.hours[i],sat_dec.degrees[i],sat_distance.km[i],sun_alt.degrees[i],visible[i]))
                outfile.write('\n')
        
            '''
            if visible.any():
                t_visible = np.array(t.utc_iso())[visible]
                for i in range(len(t_visible)):
                    outfile.write('{:20s} {:>8.4f} {:>8.4f} {:>8.5f} {:>8.4f} {:>10.4f} \n'.format(t_visible[i],sat_alt.degrees[visible][i],sat_az.degrees[visible][i],sat_ra.hours[visible][i],sat_dec.degrees[visible][i],sat_distance.km[visible][i]))
                outfile.write('\n')
            ''' 
        if not visible_flag:
            outfile0.write('{:s},{:s},{:d}\n\n'.format('','',noradid))
        else:
            outfile0.write('\n')
        outfile.close()
    outfile0.close() 
    
    dates,aaa = [],[]
    VisiblePasses = pd.read_csv(dir_prediction+filename0,dtype=object)
    for date in VisiblePasses[header[0]]:
        if str(date) != 'nan': dates.append(date[:10])
    dates = np.sort(list(set(dates))) 
    for date in dates:
        date_flag = VisiblePasses[header[0]].str.contains(date,na=False)
        aaa.append(VisiblePasses[date_flag].sort_values(by=[header[0]]).append(pd.Series(dtype=object), ignore_index=True)) 
    pd.concat(aaa).to_csv(dir_prediction+'VisiblePasses_bydate.csv',index=False,mode='w')

In [8]:
from skyfield.api import Topos,Loader
from urllib.request import urlretrieve
from pathlib import Path
import os,glob
import numpy as np

home = str(Path.home())
direc_eph = home + '/src/skyfield-data/ephemeris/'
direc_time = home + '/src/skyfield-data/time_data/'
de430 = direc_eph + 'de430.bsp'

load_eph = Loader(direc_eph)
load_time = Loader(direc_time)

url = 'http://www.shareresearch.me/wp-content/uploads/2020/05/de430.bsp' # download DE430

if not os.path.exists(de430):
    print('Downloading the JPL ephemeris de430.bsp',end=' ... ')
    urlretrieve(url, de430)
    print('Finished')

ts = load_time.timescale()
planets = load_eph('de430.bsp')

print(load_time.log)
print(load_eph.log)

Already exists: /Users/lichunxiao/src/skyfield-data/time_data/deltat.data
  Parsing with: parse_deltat_data()
  Does not expire til: 2021-02-01
Already exists: /Users/lichunxiao/src/skyfield-data/time_data/deltat.preds
  Parsing with: parse_deltat_preds()
  Does not expire til: 2021-01-01
Already exists: /Users/lichunxiao/src/skyfield-data/time_data/Leap_Second.dat
  Parsing with: parse_leap_seconds()
  Does not expire til: 2021-01-27
Already exists: /Users/lichunxiao/src/skyfield-data/ephemeris/de430.bsp
  Opening with: SpiceKernel


In [24]:
sun,earth = planets['sun'],planets['earth']
t = ts.from_astropy(Time('2020-06-01 12:20:00'))

In [25]:
kum = Topos('25.0298 N', '102.7977 E',elevation_m = 1987.05) # 观测位置

In [26]:
sun_kum = (earth+kum).at(t).observe(sun).apparent()
sun_alt, sun_az, sun_distance = sun_kum.altaz()

In [27]:
sun_alt

<Angle -05deg 58' 39.0">

In [19]:
sun,earth = planets['sun'],planets['earth']
dir_TLE = 'TLE/'
sats = load.tle_file(dir_TLE+'satcat_3le.txt')

timezone = 8  # 时区
kum = Topos('25.0298 N', '102.7977 E',elevation_m = 1987.05) # 观测位置
#kum = Topos('41.76 N', '87.42 E',elevation_m = 1100)
t_start = Time('2020-06-01 00:00:00') - timezone*u.hour # 起始时间（当地）
t_end = Time('2020-06-30 00:00:00') - timezone*u.hour # 结束时间（当地）

dir_prediction = 'prediction/'                   
fileList_prediction = glob.glob(dir_prediction+'*')
if os.path.exists(dir_prediction):
    for file in fileList_prediction:
        os.remove(file)
else:
    os.mkdir(dir_prediction)
    

filename0 = 'VisiblePasses_bysat.csv'  
filename1 = 'VisiblePasses_bydate.csv'

outfile0 = open(dir_prediction+filename0,'w')
header = ['Start Time[UTC+' + str(timezone) +']','End Time[UTC+' + str(timezone) +']','NORADID']
outfile0.write('{},{},{}\n'.format(header[0],header[1],header[2]))

for sat in sats:
    visible_flag = False
    noradid = sat.model.satnum
    #discos_flagged = discos_data[discos_data['SATNO'] == noradid]
    #data_flagged = data[data['NORADID'] == noradid]
    passes = next_pass(ts,sat,kum,t_start,t_end)
    if not passes: 
        continue
    else:
        outfile = open(dir_prediction+ str(noradid) + '.txt','w')
        outfile.write('# {:18s} {:8s} {:8s} {:8s} {:8s} {:10s} {:14s} {:7s} \n'.format('Date and Time(UTC)','Alt[deg]','Az[deg]','Ra[h]','Dec[deg]','Range[km]','Solar Alt[deg]','Visible'))
    for pass_start,pass_end in passes:
        
        t = t_list(Time(pass_start),Time(pass_end),1)
        sat_kum = (sat - kum).at(t)
        sat_alt, sat_az, sat_distance = sat_kum.altaz()
        sat_ra, sat_dec, sat_distance = sat_kum.radec() 
        sun_kum = (earth+kum).at(t).observe(sun).apparent()
        sun_alt, sun_az, sun_distance = sun_kum.altaz()
        sun_beneath = sun_alt.degrees < -4 # 太阳高度角小于 -4
        shadow = eclipsed(sat,sun,earth,t)
        visible = sun_beneath & ~shadow
        
        if visible.any():
            
            visible_flag = True
            
            t_visible = np.array(t.utc_iso())[visible]
            t_visible_0 = (Time(t_visible[0])+timezone*u.hour).iso
            t_visible_1 = (Time(t_visible[-1])+timezone*u.hour).iso
            outfile0.write('{:s},{:s},{:d}\n'.format(t_visible_0,t_visible_1,noradid))

        if Time(pass_end) < t_start + 1: # 预报一天 
            for i in range(len(t)):
                outfile.write('{:20s} {:>8.4f} {:>8.4f} {:>8.5f} {:>8.4f} {:>10.4f} {:>10.4f} {:>7d} \n'.format(t.utc_iso()[i],sat_alt.degrees[i],sat_az.degrees[i],sat_ra.hours[i],sat_dec.degrees[i],sat_distance.km[i],sun_alt.degrees[i],visible[i]))
            outfile.write('\n')
        
        '''
        if visible.any():
            t_visible = np.array(t.utc_iso())[visible]
            for i in range(len(t_visible)):
                outfile.write('{:20s} {:>8.4f} {:>8.4f} {:>8.5f} {:>8.4f} {:>10.4f} \n'.format(t_visible[i],sat_alt.degrees[visible][i],sat_az.degrees[visible][i],sat_ra.hours[visible][i],sat_dec.degrees[visible][i],sat_distance.km[visible][i]))
            outfile.write('\n')
        ''' 
    if not visible_flag:
        outfile0.write('{:s},{:s},{:d}\n\n'.format('','',noradid))
    else:
        outfile0.write('\n')
    outfile.close()
outfile0.close()    

In [61]:
import pandas as pd
dates,aaa = [],[]
VisiblePasses = pd.read_csv(dir_prediction+filename0,dtype=object)
#VisiblePasses.columns = header
for date in VisiblePasses[header[0]]:
    if str(date) != 'nan': dates.append(date[:10])
dates = np.sort(list(set(dates))) 
for date in dates:
    date_flag = VisiblePasses[header[0]].str.contains(date,na=False)
    aaa.append(VisiblePasses[date_flag].sort_values(by=[header[0]]).append(pd.Series(dtype=object), ignore_index=True)) 
pd.concat(aaa).to_csv(dir_prediction+'VisiblePasses_bydate.csv',index=False,mode='w')

In [1]:
from skyfield import almanac

In [2]:
almanac.dark_twilight_day?

In [3]:
from skyfield.api import Topos, load
from skyfield.functions import length_of

ts = load.timescale()
t = ts.utc(2019, 1, 1)

bierstadt = Topos('39.5828 N', '105.6686 W', elevation_m=4287.012)
m1 = length_of(bierstadt.at(t).position.m)
print(int(m1))

[#################################] 100% deltat.data
[#################################] 100% deltat.preds
[#################################] 100% Leap_Second.dat


6373784


In [7]:
bierstadt.at(t)

<Geocentric ICRS position and velocity at date t center=399 target=<object object at 0x7fb496bd5a50>>

In [29]:
type(2) is float

False

In [30]:
t1 = Time('2020-06-01 12:24:37')
t2 = Time('2020-06-01 12:26:53')

In [38]:
round((t2 - t1).sec)

136

In [40]:
t = ts.utc(2019, 12, 9, 15, 36)

In [54]:
t.utc_strftime('%Y-%m-%d %H:%M:%S')

'2019-12-09 15:36:00'

In [None]:
from sl