In [1]:
from typing import Generator
import random
import math
#import numpy as np
import pandas as pd
import hvplot.pandas
from operator import xor
from functools import reduce
from itertools import islice
from gps_coders import GPS_L1_PRNS, GPS_L1_coder

In [2]:
print(list(islice(GPS_L1_coder(1), 10)))

[1, 1, 0, 0, 1, 0, 0, 0, 0, 0]


In [3]:
CODELEN = 1023
code_duration = 0.001
code_freq = 1/code_duration

In [4]:
# GPS L1 C/A PRN codes repeat after CODELEN (1023) chips:
N=CODELEN
for prn in range(1, 32+1):
    for i in range(5):
        assert all(a==b for a,b in islice(zip(GPS_L1_coder(prn), islice(GPS_L1_coder(prn), i*CODELEN, i*CODELEN+N)), N))

In [5]:
prn_codes = pd.DataFrame(data={ p: list(islice(GPS_L1_coder(p), CODELEN)) for p in GPS_L1_PRNS})

In [6]:
prn_codes.head(10)

Unnamed: 0,1,2,3,4,5,6,7,8,9,10,...,23,24,25,26,27,28,29,30,31,32
0,1,1,1,1,1,1,1,1,1,1,...,1,1,1,1,1,1,1,1,1,1
1,1,1,1,1,0,1,0,1,1,1,...,0,1,1,1,1,1,0,1,1,1
2,0,1,1,1,0,0,0,0,1,0,...,0,1,1,1,1,1,0,0,1,1
3,0,0,1,1,1,0,1,0,0,1,...,0,1,1,1,1,1,1,0,0,1
4,1,0,0,1,0,1,0,1,0,0,...,1,0,1,1,1,1,0,1,0,0
5,0,1,0,0,1,0,1,0,1,0,...,1,0,0,1,1,1,1,0,1,0
6,0,0,1,0,1,1,1,1,0,0,...,0,0,0,0,1,1,0,1,0,1
7,0,0,0,1,0,1,0,1,1,1,...,0,1,0,0,0,1,1,0,1,0
8,0,0,0,0,1,0,0,0,1,0,...,1,1,1,0,0,0,1,1,0,1
9,0,0,0,0,1,1,1,0,0,0,...,1,0,1,1,0,0,1,1,1,0


In [7]:
num_chips = 20

In [8]:
prn_codes.head(num_chips).hvplot.step(subplots=True, width=300, height=100)

In [9]:
chip_rate = 1023_000 # chips/s
chip_duration = 1 / chip_rate

def ms(t): return t*1E3

def ns(t): return t*1E9
nanosec = ns

print(f'chip duration: {chip_duration}s (~{nanosec(chip_duration):1.1}ns)')

chip duration: 9.775171065493646e-07s (~1e+03ns)


In [10]:
def sample_prn(prn: int, t: float) -> int:
    ''' Returns a sample of PRN code sequence `prn` at time `t` (sec).
    The result is 0 or 1.
    '''
    prn_code = prn_codes[prn]
    return int(prn_code[math.floor(t/chip_duration) % len(prn_code)])

In [11]:
# take a chip at a certain time (in seconds) for a certain PRN code sequence:
print(sample_prn(10, 5))

# GPS L1 PRN code duration is 1ms. Therefore, bit(t0) = bit(t1) if there is an integer N such that t1 = t0 + N/1000.
print(sample_prn(10, 6))
print(sample_prn(10, 5 + 0.001))# t0 = 5, t1=5.001, 5.001 * 1000 = 5*1000 + 1

# let's try for N==100:
a = sample_prn(10, 5)
assert all(a == sample_prn(10, 5 + n/1000) for n in range(100))

# however, below 1ms, the bits differ:
print([sample_prn(10, 5 + 0.0001 * i) for i in range(10)])

1
1
1
[1, 0, 0, 1, 1, 1, 1, 0, 0, 0]


In [12]:
samples_per_chip = 10
sample_interval = chip_duration/samples_per_chip

In [13]:
def gen_sample_times(start: float, stop: float, step: float) -> Generator[float, None, None]:
    assert start <= stop
    assert step > 0
    yield from (i*step for i in range(math.floor(start/step), math.floor(stop/step)))

In [14]:
def gen_samples(prn, start, stop, freq):
    yield from (sample_prn(prn, t) for t in gen_sample_times(start, stop, 1/freq))

In [15]:
ds = pd.DataFrame(
    data= {
        f'PRN{prn}': gen_samples(prn, 0, chip_duration*num_chips, samples_per_chip/chip_duration)
        for prn in GPS_L1_PRNS
    },
    index=list(gen_sample_times(0, chip_duration*num_chips, chip_duration/samples_per_chip))
)

In [16]:
ds.hvplot.scatter(
    #y=['1','2','3'],
    subplots=True, width=300, height=100)

In [17]:
def sample_prn_stream(prn, start, end, numsamples, delay=0):
    d = (end-start)/numsamples
    start = start-delay
    return [sample_prn(prn, start + i*d) for i in range(numsamples)]

In [18]:
l0 = sample_prn_stream(1, 0, chip_duration * 10, 10 * 2)
print(l0)

[1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]


In [19]:
df = pd.DataFrame(data={
    f'PRN{prn}': sample_prn_stream(prn, 0, chip_duration * 10, 10 * samples_per_chip)
    for prn in GPS_L1_PRNS
})
df.hvplot(
    subplots=True, width=300, height=100).cols(4)

# Correlation

1. Take 2 sequences of samples of prn codes of equal length: `cs0` and `cs1`
2. `zip` the sequences with `not . xor`: `[not(xor(c0, c1) for c0,c1 in zip(cs0, cs1))]`
3. `sum` the result sequence: `sum([~(xor(c0, c1) & 1 for c0,c1 in zip(cs0, cs1))])`
   note: masking by `& 1` is necessary to discard the sign bit which Python adds automatically.
4. The higher value of the `sum`, the better the correlation of the prn codes.
5. The value of the `sum`s can be normalized by dividing by the number of elements in the sequences.

In [20]:
for a in [0,1]:
    for b in [0,1]:
        print(a, b, xor(a,b), ~xor(a,b), ~xor(a,b) & 1)

0 0 0 -1 1
0 1 1 -2 0
1 0 1 -2 0
1 1 0 -1 1


In [21]:
def correlate(l0, l1):
    assert len(l0) == len(l1)
    return sum(~xor(x0, x1) & 1 
               for (x0, x1) in zip(l0, l1))/len(l0)

In [22]:
a=1
b=32
la = sample_prn_stream(a, 0, chip_duration*100, 100)
lb = sample_prn_stream(b, 0, chip_duration*100, 100)
print(f'PRN{a}-PRN{b} correlation: {correlate(la, lb):5}')

PRN1-PRN32 correlation:  0.46


In [23]:
N = 10
d = chip_duration/8
offsets = [o*d for o in range(N)] + [code_duration]
num_samples = 10000

start = 0
end = start+0.5E-3#
l0 = sample_prn_stream(21, start, end, num_samples)
for offset in offsets:
    l1 = sample_prn_stream(21, start, end, num_samples, delay=offset)
    print(f'offset: {offset/chip_duration:5.4} chips, correlation: {correlate(l0, l1)}')

offset:   0.0 chips, correlation: 1.0
offset: 0.125 chips, correlation: 0.9432
offset:  0.25 chips, correlation: 0.8866
offset: 0.375 chips, correlation: 0.8303
offset:   0.5 chips, correlation: 0.773
offset: 0.625 chips, correlation: 0.7168
offset:  0.75 chips, correlation: 0.6589
offset: 0.875 chips, correlation: 0.6029
offset:   1.0 chips, correlation: 0.5459
offset: 1.125 chips, correlation: 0.5389
offset: 1.023e+03 chips, correlation: 0.9999


In [24]:
N=16
sample_times = [chip_duration*i/N for i in range(-4*N, 4*N)]
auto_cor = pd.DataFrame(
    data={
        f'PRN{prn}': [correlate(
                  sample_prn_stream(prn, 0, code_duration, 100),
                  sample_prn_stream(prn, 0, code_duration, 100, delay=d))
              for d in sample_times]
        for prn in GPS_L1_PRNS
        
    },
    index=[math.floor(N*t/chip_duration) for t in sample_times]
)

In [25]:
from tabulate import tabulate
def table(l, cols=6):
    return tabulate([l[i:i+cols] for i in range(0,len(l), cols)])

In [26]:
plot = auto_cor.hvplot(
    y=[str(x) for x in auto_cor.columns],
    subplots=True, 
    width=300, 
    height=150,
    )

In [27]:
plot