In [1]:
from functools import partial
from datetime import datetime, timedelta
import timeit
import boto3
import json
import copy
import pickle
import io
import pandas as pd
import numpy as np
import pyspark.sql.functions as F
from pyspark.sql import Row, Window
from pyspark.sql.types import *
from graphframes import *

"""
Utility functions 
"""
def date_list(endDate, delta=14):
    temp = [endDate]
    for i in range(1, delta + 1):
        temp.append(endDate - timedelta(days=i))
    return '{' + ','.join([str(d.date()) for d in temp]) + '}'


def readParquet(prefix, dates="{*}", hour="{*}", fields=None):
    path = prefix + "/dt=" + dates + "/hr=" + hour + "/"
    return spark.read.parquet(path)

def readCSV(prefix, dates="{*}", hour="{*}", fields=None):
    path = prefix + "/dt=" + dates + "/hr=" + hour + "/"
    return spark.read.csv(path)

def readSeq(prefix, date, hour="*", fields=None, toPandas=False):
    path = prefix + "/dt=" + date + "/hr=" + hour + "/"
    temp = sc.sequenceFile(path).values() \
        .map(bytearray.decode).map(json.loads)
    if isinstance(fields, list):
        temp = temp.flatMap(lambda x: Row([x[field] for field in fields]))
    temp = spark.createDataFrame(temp)
    if isinstance(fields, list):
        for idx, field in enumerate(fields):
            temp = temp.withColumnRenamed("_" + str(idx + 1), field)
    _schema = copy.deepcopy(temp.schema)
    if toPandas:
        return temp.toPandas(), _schema
    return temp, _schema


def roundDatetime(timestamp, interval=0):
    tm = datetime.fromtimestamp(timestamp)
    if interval > 0:
        tm = tm - timedelta(minutes=tm.minute % interval, seconds=tm.second)
    return tm


@F.pandas_udf("int", F.PandasUDFType.GROUPED_AGG)
def median_udf(v):
    return v.median()


@F.pandas_udf("int", F.PandasUDFType.GROUPED_AGG)
def iqr_udf(v):
    iqr = v.quantile(0.75) - v.quantile(0.25)
    return iqr

def gaussian_smooth(df, groups, window=5, std=2):
    return df.set_index('time_of_day').groupby(groups)[['upper','lower']] \
                      .rolling(window, win_type='gaussian', min_periods=1, std=std) \
                      .mean().reset_index()

def df_to_s3(df, path, filename):
    s3 = boto3.resource('s3')
    file_type = filename.split(".")[-1]
    with io.StringIO() as outputBuffer:
        if file_type == "pickle":
            pickle.dump(df, outputBuffer)
        elif file_type == "json":
            df.to_json(outputBuffer, orient='index')
        elif file_type == "csv":
            df.to_csv(outputBuffer)
            #json.dump(df, buffer)
        print(outputBuffer.closed)
        outputBuffer.seek(0)
        obj = s3.Object('mist-data-science-dev', f'{path}/{filename}')
        obj.put(Body=outputBuffer.getvalue())
    print(outputBuffer.closed)

"""
Spark UDFs
"""
curried_roundDatetime = partial(roundDatetime, interval=0)
udf_roundDate = F.udf(curried_roundDatetime, TimestampType())

udf_scaleToSeconds = F.udf(lambda tm : int(float(tm)/1E6), LongType())
udf_numElem = F.udf(lambda x : len(x), IntegerType())

### Pure python function for cycle detection
* Stops after first detected cycle

In [2]:
def get_site_graph(df_site, site_id=None):
    if site_id:
        df_site = df_site.filter(F.col('siteId')==site_id)

    df_site_adj_list = df_site.select('siteId','src','dst')\
                            .groupby('siteId','src')\
                            .agg(F.collect_set(F.col('dst')).alias('dst'))
    df_site_adj_list = df_site_adj_list.groupby("siteId")\
             .agg(F.map_from_arrays(F.collect_list("src"),F.collect_list("dst")).alias("graph"))
    
    return df_site_adj_list

def _isCycle(graph, v, done, stack): 

    done[v] = True
    stack[v] = True

    for neighbour in graph[v] : 
        if neighbour in graph :
            if done[neighbour] == False : 
                if _isCycle(graph, neighbour, done, stack) == True: 
                    print(f'Part of cycle: {neighbour}')
                    return True
            elif stack[neighbour] == True : 
                print(f'Last neighbour found on stack: {neighbour}')
                return True

    stack[v] = False
    return False


def isCycle(graph): 
    src_vertices = graph.keys()
    done = {k:False for k in src_vertices} 
    stack = {k:False for k in src_vertices} 

    for node in src_vertices: 
        if done[node] == False: 
            if _isCycle(graph, node, done, stack) == True: 
                return True
    return False

udf_isCycle = F.udf(isCycle, BooleanType())

def iscycle_unittest():
    graph1 = {'a':['b','c','d'], 'b':['c','e'], 'd':['e','f','g'], 'e':['a','c']}
    graph2 = {'a':['b','c','d'], 'b':['c'], 'd':['e','f','g'], 'e':['a','c']}
    graph3 = {'a':['b','c','e'], 'b':['c'], 'd':['e','f','g'], 'e':['a','c']}
    graph4 = {'a':['b','c','f'], 'b':['c'], 'd':['e','f','g'], 'e':['a','c']}


    df_test = pd.DataFrame([['test1',graph1],
                            ['test2',graph2],
                            ['test3',graph3],
                            ['test4',graph4]],
                           columns=['siteId','graph'])
    
    df_test = spark.createDataFrame(df_test)
    df_test_cycle = df_test.withColumn('cycle_detected', udf_isCycle(F.col('graph')))
    
    print('##### Check python ####')
    print('Graph1')
    print(f'Is cycle={isCycle(graph1)}')
    print('Graph2')
    print(f'Is cycle={isCycle(graph2)}')
    print('Graph3')
    print(f'Is cycle={isCycle(graph3)}')
    print('Graph4')
    print(f'Is cycle={isCycle(graph4)}')
    print('\n')

    print('##### Check pyspark ####')

    return df_test_cycle.show()

In [3]:
"""
Variables
"""
ENV = 'production'
component = 'device'
prefix_edges = f"s3://mist-aggregated-stats-{ENV}/aggregated-stats/graph/snapshots/{component}-edges/"
prefix_nodes = f"s3://mist-aggregated-stats-{ENV}/aggregated-stats/graph/snapshots/{component}-nodes/"

END_DATE = datetime.today()
#END_DATE = datetime.strptime('2021-03-03', '%Y-%m-%d')
LAG = 0

hour = '11'

""""""

dates = date_list(END_DATE, delta=LAG)
df_edges = readParquet(prefix_edges, dates, hour=hour)
df_edges = df_edges.withColumn('isExpired', F.when(F.isnull(F.col('expiredAt')),'False').otherwise('True'))
df_active_edges = df_edges.filter(F.col('isExpired')==False)
df_active_edges = df_active_edges.withColumnRenamed("from","src").withColumnRenamed("to","dst")

df_nodes = readParquet(prefix_nodes, dates)
df_nodes = df_nodes.withColumn('isExpired', F.when(F.isnull(F.col('expiredAt')),'False').otherwise('True'))
df_active_nodes = df_nodes.filter(F.col('isExpired')==False)

df_active_edges.select('siteId','src','dst').show(5, truncate=False)
#df_active_nodes.select('siteId','id').show(truncate=False)

+------------------------------------+--------------------------------+--------------------------------+
|siteId                              |src                             |dst                             |
+------------------------------------+--------------------------------+--------------------------------+
|0e62b411-43d6-4228-9d45-f9c17f611013|71baced3326fe53538c6dd39e95cff0f|bd275b21c9bca779998bad68c1ce6f62|
|22316bd0-0a39-4366-b02d-2389d0f6c6be|35a7ba2c5eb362598ba3e5f92e2ec009|f4b16a655ec2849203034256fa12be22|
|fa8c8345-fe0c-4aa5-9e16-34ed779ec315|c68b58028e8b5861589ca833529f4083|88b5c561652666376d47dbfba042fa6f|
|39ce773f-ca15-4018-a8fc-f6d9a82d82f6|7e437d5c9783c56796b6e86e0737cba0|287791e27e1aa35588e087a5d23c9d51|
|a6c4ed56-a300-4eb1-8dc3-c6ba0481d9ff|88558945e7e46a0403df537e1240e439|76798ecfadf760ed2f24f9e036c5cedc|
|de09a449-34b5-4f2d-8c56-e9bb5f62f9e1|adbe77d9ecb200b846133350b797b3de|1acfcca028293d7351f3df5785235dbd|
|675bca94-3b45-4b43-bf09-437129d9763d|42e44488f58098030

In [4]:
#site_id = '03e668d2-7c72-4451-ae99-2e32b2b97b71'
site_id = None
df_site_graph = get_site_graph(df_active_edges, site_id=site_id)
df_site_graph_cycle = df_site_graph.withColumn('cycle_detected', udf_isCycle(F.col('graph')))

df_site_graph_cycle_true = df_site_graph_cycle.filter(F.col('cycle_detected')==True)
df_site_graph_cycle_true.select('siteId').show(5, truncate=False)
cycle_detect = df_site_graph_cycle_true.count()
print(f'Sites with cycles in {component} collection = {cycle_detect}')


+------------------------------------+
|siteId                              |
+------------------------------------+
|0049c3ef-3406-4dae-88f8-37783fb4b702|
|1ef1d02d-c049-4f13-8d44-71b3654f75fd|
|4599cb58-e197-4bc1-b8da-c90481b95305|
|742bf504-6760-4007-8035-a47a934836a1|
|8ac6670a-0e85-4939-84f9-a37d779a45a3|
|ad8b6eef-de92-4a84-a262-8230e40fd893|
|ec62f79d-d08a-44bf-884b-0b58ca101051|
|f25d8fd8-a695-4421-8b84-036d2164c572|
|0027af11-eb72-425f-a1b2-e8f3b4ce8d10|
|0d2c68a0-96e9-4125-a53c-325f8471724e|
|192fab31-1014-4f3d-aa7d-9454a37babe4|
|341880de-2d4d-430f-9b5e-928577cd6c84|
|6f643507-d233-4fb1-98bd-e4445446270e|
|856606a9-176d-47d1-b3b6-d9750ab56b2e|
|898b809b-2b46-443d-babb-c374ce638b94|
|9291ba26-6e1e-11e5-9cdd-02e208b2d34f|
|94650a18-f0a9-4852-a709-d12a2ce23eea|
|aac40d89-9c00-4c7e-8d7c-ff11d4535f9c|
|aba6715c-92f9-4828-9080-22dfbc2c2053|
|b3610203-d3ca-4511-803c-dca6f17c470c|
+------------------------------------+
only showing top 20 rows

Sites with cycles in device collection

In [None]:
#temp = df_site_graph_cycle_true.select('siteId').toPandas()
#df_to_s3(temp, f"spark_jobs/ruchit-dev/site_graphdb_cycle_true_{dates}_hr_{hour}",
#         f"sites.json")


### Print simple cycles

In [5]:
class Cycles:
    def __init__(self, graph):
        self._graph = graph
        self.nodes = self.getNodes()
        self.nodeMap = {node : i for i, node in enumerate(self.nodes)}
        self.visited = ['NOT_VISITED' for _ in range(len(self.nodes))]  # Initialize all nodes unvisited
        self.stack = []  # Stack to keep track of visited nodes
        self.cycles = []

    def getNodes(self):
        _nodes=set([])
        for k, v in self._graph.items():
            items = [k] + v
            for item in items:
                _nodes.add(item)
        return list(_nodes)

    def printCycles(self):
        return self.cycles
    
    def addCycle(self, v):
        self.stack2 = []
        self.stack2.append(self.stack[-1])
        self.stack.pop()
        cycle = []
        #print(f'stack2={self.stack2}')
        while self.stack2[-1] != v:
            self.stack2.append(self.stack[-1])
            self.stack.pop()
        while len(self.stack2) > 0: 
            cycle.append(self.nodes[self.stack2[-1]])
            self.stack.append(self.stack2[-1])
            self.stack2.pop()
            
        if len(cycle)>1 : self.cycles.append(cycle)
        
    def dfs(self):
        curr = self.stack[-1]
        if self.nodes[curr] in self._graph:
            for neighbour in self._graph[self.nodes[curr]]:
                to = self.nodeMap[neighbour]
                if self.visited[to] == 'ON_STACK':
                    self.addCycle(to)
                elif self.visited[to] == 'NOT_VISITED':
                    self.stack.append(to)
                    self.visited[to] = 'ON_STACK'
                    self.dfs()
                
        self.visited[curr] = 'DONE'
        self.stack.pop()

    def findCycles(self):
        for i, node in enumerate(self.nodes):
            if self.visited[i] == 'NOT_VISITED':
                self.stack = []
                self.stack.append(i)
                self.visited[i] = 'ON_STACK'
                self.dfs()
        
        return self.printCycles()
    
def find_cycles(graph):
    cycles = Cycles(graph)
    return cycles.findCycles()

udf_findCycles = F.udf(find_cycles, ArrayType(ArrayType(StringType())))

In [6]:
df_site_graph_cycles = df_site_graph_cycle_true.withColumn('cycles', udf_findCycles(F.col('graph')))
#df_site_graph_cycles.select('siteId', 'cycles').show()

In [10]:
temp_pd = df_site_graph_cycles.select('siteId', 'cycles').toPandas()
df_to_s3(temp_pd, f"spark_jobs/ruchit-dev/site_graphdb_cycles_{dates}_hr_{hour}",
         f"sites_cycles.csv")

False
True


In [8]:
#site = '742bf504-6760-4007-8035-a47a934836a1'
for site in temp_pd.siteId[::10]:
    print(f'Site={site}')
    for cycles in temp_pd[temp_pd.siteId==site].cycles:
        for cycle in cycles:
            print(f'-->{cycle}')

Site=0049c3ef-3406-4dae-88f8-37783fb4b702
-->['79f275fc2c4b5fa8db87ea8d2928dd98', '112866ce3150072ce0ee0fb8be35f2a4']
Site=192fab31-1014-4f3d-aa7d-9454a37babe4
-->['c7e7042eb95d0b8ad30a1ce874fe83a7', 'd3afedabbca6573bf09cb753693e8c3d']
-->['c7e7042eb95d0b8ad30a1ce874fe83a7', '6d2342cf1e4b90b32975a12489887737']
Site=cc235a99-d83f-43e1-a2e0-23c475e649ba
-->['86b64fc3f4ffabbb43f9c694bfe903e3', '7ad9023d6d988c9fec7ef16a2f280cfc']
Site=aebea3ce-8635-40fa-aaab-d23bf1aae603
-->['2b33e8e4e44eaaf3dc5e83dd23277d0d', '703ba9b2db196bc2a5bcb92a53ce5f1b']
Site=4d9330f1-8816-449a-b3cc-d4e4e0a3313b
Site=b72c169b-fdae-4157-859c-e1f6d63c746c
-->['1965b37077735462fcdfefed2c283fbd', 'f96232e15e28c5b833cfb9b1b99d10c3']
-->['1965b37077735462fcdfefed2c283fbd', '0826d5dfea5e34f87e940654bbed1902']
Site=baf57076-5c94-4025-85bb-934cf9ffbd4e
-->['eb9c21ce46b70879f24217a6538ddf85', '416c4bcd80dfca87a948279ee6c59b05']
Site=c8482263-acb6-496b-8d30-c21c6210e607
-->['987761a549d1c78537242bd7906d05f9', 'a0983710432dfd1

### Strongly connected components
* Find strong connected components (SCC) in a graph G
* Each node in G belongs to one SCC (alone or with other nodes)
* Each node in a SCC is reachable by every other node within a SCC
* Intuitively, we think of a SCC as containing one or more simple cycles

In [None]:
class SCC:
    def __init__(self, graph):
        self._graph = graph
        self.nodes = self.getNodes()
        self.nodeMap = {i : node for i, node in enumerate(self.nodes)}
        self.inverse_nodeMap = {node : i for i, node in enumerate(self.nodes)}
        self.ids = [-1]*len(self.nodes)  # Initialize all nodes unvisited
        self.low = [-1]*len(self.nodes)  # Initialize all nodes low-link is unvisited
        self.onStack = [False]*len(self.nodes)  # Initialize all node onStack to False
        self.stack = []  # Stack to keep track of visited nodes
        self.nodeId = 0
        self.scc_map = {}

        
    def printStack(self):
        print([self.nodeMap[i] for i in self.stack])

    def getNodes(self):
        _nodes=set([])
        for k, v in self._graph.items():
            items = [k] + v
            for item in items:
                _nodes.add(item)
        return _nodes
    
    def dfs(self, curr):
        self.stack.append(curr)
        self.onStack[curr] = True
        #print(f"{self.nodeMap[curr]}={curr}")
        self.ids[curr] = self.nodeId
        self.low[curr] = self.nodeId
        self.nodeId = self.nodeId + 1


        # print(low)
        if self.nodeMap[curr] in self._graph:
            for neighbour in self._graph[self.nodeMap[curr]]:
                to = self.inverse_nodeMap[neighbour]
                if self.ids[to] == -1: # Unvisited
                    self.dfs(to)  # Run dfs on neighbour if neighbour is unvisited
                if to in self.stack: 
                    self.low[curr] = min(self.low[curr], self.low[to])  # If neighbour on stack then 
                                                         # Min the low-link value of current node with neighbour

        # After visiting all adjacent nodes
        # check if current node is root of SCC
        # If yes, then pop of all nodes upto and including current node
        if self.ids[curr] == self.low[curr]:
            while True:
                #self.printStack()
                temp = self.stack.pop(-1)
                self.onStack[temp] = False
                self.low[temp] = self.ids[curr]
                if temp == curr:
                    break
        #self.printStack()

    def get_scc(self):
        for node in self.nodeMap.keys():
            if self.ids[node] == -1:
                self.dfs(node)
                
        for k, v in enumerate(self.low):
            self.scc_map.setdefault(v, []).append(self.nodeMap[k])

        # Only save non-unique components
        for k in list(self.scc_map.keys()):
            if len(self.scc_map[k])<2:
                # continue
                _ = self.scc_map.pop(k)    

        return self.scc_map
    
def find_scc(graph):
    scc = SCC(graph)
    return scc.get_scc()

udf_findSCC = F.udf(find_scc, MapType(IntegerType(), ArrayType(StringType())))

def count_dict(map_obj):
    count = {}
    for key in map_obj: 
        if isinstance(map_obj[key], list): 
            count[key] = len(map_obj[key])
    return count

udf_countDict = F.udf(count_dict, MapType(IntegerType(), IntegerType()))

In [None]:
df_site_graph_scc = df_site_graph_cycle_true.withColumn('scc_map', udf_findSCC(F.col('graph')))

df_site_graph_cycle_to_save = df_site_graph_scc.select('siteId',F.explode('graph').alias('src','dst'))\
                .select('siteId', 'src', F.explode('dst').alias('dst'))


df_to_s3(df_site_graph_cycle_to_save.toPandas(), f"spark_jobs/ruchit-dev/site_graphdb_cycle_true_{dates}_hr_{hour}",
         "sites_with_edges.csv")

#df_site_graph_scc = df_site_graph_scc.withColumn('scc_count', udf_countDict(F.col('scc_map')))
#temp.select('scc_map').show(truncate=True)


In [None]:
temp_to_save = df_site_graph_scc.select('siteId',F.explode('scc_map').alias('componentId','value'))\
    .select('siteId','componentId',F.explode('value').alias('nodeId'))

temp_pd = temp_to_save.toPandas()
df_to_s3(temp_pd, f"spark_jobs/ruchit-dev/site_graphdb_scc_{dates}_hr_{hour}",
         "sites_with_scc.csv")

In [None]:
temp_pd.head()

In [None]:
site = "ad8b6eef-de92-4a84-a262-8230e40fd893"
df_active_edges.filter(F.col('siteId')==site).select('siteId','src','dst').show(50)