In [1]:
import os
import pandas as pd

from datetime import datetime

from sqlalchemy import create_engine, func
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, String, DateTime, Integer, Text, Boolean, Numeric, Date
from sqlalchemy.ext.declarative import declarative_base

from geoalchemy2 import Geometry

conn_str = 'postgresql://test:test@localhost:5432/test'
engine = create_engine(conn_str)
Session = sessionmaker(bind=engine)

Base = declarative_base()

class Calls(Base):
    __tablename__ = 'calls'
    __table_args__ = {"schema": "events"}
    
    id = Column(Text,primary_key=True)
    outgoing = Column(Boolean)
    datetime = Column(DateTime(timezone=True))
    duration = Column(Numeric)
    network = Column(Text)
    msisdn = Column(Text)
    msisdn_counterpart = Column(Text)
    location_id = Column(Text)
    imsi = Column(Text)
    imei = Column(Text)
    tac = Column(Numeric(8))
    operator_code = Column(Numeric)
    country_code = Column(Numeric)
    
class Cells(Base):
    __tablename__ = 'cells'
    __table_args__ = {"schema": "infrastructure"}
    
    id = Column(Text,primary_key=True)
    version = Column(Integer, primary_key=True)
    site_id = Column(Text)
    name = Column(Text)
    type = Column(Text)
    msc = Column(Text)
    bsc_rnc = Column(Text)
    antenna_type = Column(Text)
    status = Column(Text)
    lac = Column(Text)
    height = Column(Numeric)
    azimuth = Column(Numeric)
    transmitter = Column(Text)
    max_range = Column(Numeric)
    min_range = Column(Numeric)
    electrical_tilt = Column(Numeric)
    mechanical_downtilt = Column(Numeric)
    date_of_first_service = Column(Date)
    date_of_last_service = Column(Date)
    geom_point = Column(Geometry('POINT'))
    geom_polygon = Column(Geometry('MULTIPOLYGON'))

class Admin3(Base):
    __tablename__ = 'admin3'
    __table_args__ = {"schema": "geography"}
    
    gid = Column(Integer, primary_key=True)
    admin0name = Column(String(50))
    admin0pcod = Column(String(50))
    admin1name = Column(String(50))
    admin1pcod = Column(String(50))
    admin2name = Column(String(50))
    admin2pcod = Column(String(50))
    admin3name = Column(String(50))
    admin3pcod = Column(String(50))
    admin3refn = Column(String(50))
    admin3altn = Column(String(50))
    admin3al_1 = Column(String(50))
    date = Column(Date)
    validon = Column(Date)
    validto = Column(Date)
    shape_star = Column(Numeric)
    shape_stle = Column(Numeric)
    shape_leng = Column(Numeric)
    shape_area = Column(Numeric)
    geom = Column(Geometry('MULTIPOLYGON'))

  """)


In [2]:
session = Session()

In [227]:
def cell_to_geom(geom_table, col_name, session):
    
    res = session.query(
        Cells.id,
        getattr(geom_table, col_name)
    ).outerjoin(
        geom_table, func.ST_Within(Cells.geom_point,geom_table.geom)
    ).subquery()
    
    return res


def cell_to_latlon(session):
    
    res = session.query(
        Cells.id,
        func.ST_x(Cells.geom_point).label('lon'),
        func.ST_y(Cells.geom_point).label('lat')
    ).subquery()
    
    return res

def grid_geoms(polygon, size, session):
    
    q = session.query(
        func.ST_Dump(
            func.makegrid_2d(
                polygon,
                size
            )
        ).geom.label('geom')
    ).subquery()
    
    geom = q.c.geom
    centroid = func.ST_Centroid(geom)
    x = func.ST_x(centroid)
    y = func.ST_y(centroid)
    
    q = session.query(
        centroid.label('centroid'),
        x.label('lon'),
        y.label('lat'),
        geom.label('geom')
    ).subquery()
    
    prefix = literal(f"{str(size).replace('.','_')}_")
    row_num = func.concat(
        prefix,
        func.row_number().over(order_by = [q.c.lat,q.c.lon])
    ).label('row_num')
    
    res = session.query(
        row_num,
        q.c.centroid,
        q.c.lon,
        q.c.lat,
        q.c.geom
    ).subquery()
    
    return res

def cell_to_grid(polygon, size, session):
    
    grid = grid_geoms(polygon, size, session)
    
    res = session.query(
        Cells.id,
        grid.c.row_num
    ).outerjoin(
        grid,func.ST_Within(Cells.geom_point,grid.c.geom)
    ).subquery()
    
    return res
                     
def event_locid_join(event_table, locid_table, session):
    
    res = session.query(
        event_table.c.msisdn,
        event_table.c.datetime,
        *[getattr(locid_table.c,col) for col in locid_table.c.keys() if col != 'id']
    ).outerjoin(
        locid_table,
        event_table.c.location_id == locid_table.c.id
    ).subquery()
    
    return res
                     
def last_location(event_locid_table, session):
    
    rank = func.row_number().over(
        partition_by = event_locid_table.c.msisdn, 
        order_by = event_locid_table.c.datetime.desc()
    ).label('rank')
    
    q = session.query(
        event_locid_table,
        rank
    ).subquery()
    
    res = session.query(
        q.c.msisdn,
        *[getattr(q.c,col) for col in q.c.keys() if col not in ['msisdn','datetime','rank']]
    ).filter(q.c.rank == 1).subquery()
    
    return res

def most_common_location(event_locid_table, session):
    
    group_cols = [col for col in event_locid_table.c.keys() if col not in ['msisdn', 'datetime']]
    count = func.count().label('count')
    
    q = session.query(
        event_locid_table.c.msisdn, 
        *[getattr(event_locid_table.c,col) for col in group_cols],
        count
    ).group_by(
        event_locid_table.c.msisdn, 
        *[getattr(event_locid_table.c,col) for col in group_cols]
    ).subquery()
    
    rank = func.row_number().over(
        partition_by = q.c.msisdn, 
        order_by = q.c.count.desc()
    ).label('rank')
    
    q = session.query(q,rank).subquery()
    
    res = session.query(
        *[getattr(q.c, col) for col in q.c.keys() if col in ['msisdn'] + group_cols]
    ).filter(q.c.rank == 1).subquery()
    
    return res

def time_bound(start_time, end_time, event_table, session):
    
    res = session.query(event_table).filter(
        event_table.c.datetime >= start_time,
        event_table.c.datetime < end_time
    ).subquery()
    
    return res
                     
def daily_location(date, level, method, session):
    
    location_methods = {
        'last' : last_location,
        'most_common' : most_common_location
    }
    
    admin0 = session.query(func.ST_Union(Admin3.geom)).all()[0][0]
    level_methods = {
        'grid' : lambda session : cell_to_grid(admin0,5000, session),
        'admin3' : lambda session : cell_to_geom(Admin3, 'admin3name', session),
        'lat-lon' : cell_to_latlon
    }
    
    event_table = session.query(Calls).subquery()
    locid_table = level_methods[level](session)
    
    start_time = date
    end_time = date + timedelta(1)
    
    bounded_event_table = time_bound(start_time, end_time, event_table, session)
    
    bounded_event_table_loc_join = event_locid_join(bounded_event_table,locid_table,session)
    
    res = location_methods[method](bounded_event_table_loc_join, session)
    
    return res
                     
def modal_location(dls, session):
    
    union = session.query(dls[0]).union_all(
        *[session.query(dl) for dl in dls[1:]]
    ).subquery()
    
    # rename union cols
    col_names = dls[0].c.keys()
    union = session.query(
        *[getattr(union.c,col).label(rename_col) for col,rename_col in zip(union.c.keys(),col_names)]
    ).subquery()
    
    res = most_common_location(union, session)
    
    return res
                     
def subquery_to_df(sq, session):
    
    cols = sq.c.keys()
    res = session.query(sq).all()
    
    df = pd.DataFrame.from_records(res,columns = cols)
    
    return df

In [228]:
dl1 = daily_location(start_time,'lat-lon','most_common',session)
dl2 = daily_location(start_time + timedelta(1),'lat-lon','most_common',session)
dl3 = daily_location(start_time + timedelta(2),'lat-lon','most_common',session)

dls = [dl1, dl2, dl3]
ml = modal_location(dls, session)
df = subquery_to_df(ml, session)

In [232]:
event_table = session.query(Calls).subquery()
c2ll = cell_to_latlon(session)
q = event_locid_join(event_table, c2ll, session)

In [251]:
x = daily_location(start_time,'lat-lon','most_common',session)
y = daily_location(start_time,'lat-lon','most_common',session)

In [252]:
x == y

False

In [253]:
str(x) == str(y)

True

In [264]:
list(x.get_children())

[Column('msisdn', Text(), table=<%(4845108080 anon)s>),
 <sqlalchemy.sql.elements.ColumnClause at 0x1210030b8; lon>,
 <sqlalchemy.sql.elements.ColumnClause at 0x1210030f0; lat>,
 <sqlalchemy.sql.annotation.AnnotatedSelect at 0x120cb2358; AnnotatedSelect object>]

In [265]:
list(y.get_children())

[Column('msisdn', Text(), table=<%(4846081584 anon)s>),
 <sqlalchemy.sql.elements.ColumnClause at 0x121003278; lon>,
 <sqlalchemy.sql.elements.ColumnClause at 0x121003860; lat>,
 <sqlalchemy.sql.annotation.AnnotatedSelect at 0x120d95a20; AnnotatedSelect object>]