In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
import sys
from time import time
import json
import datetime
import pyspark.sql.functions as F
from pyspark.sql.functions import udf



# creation of spark context objects

SparkContext.setSystemProperty('spark.executor.memory', '8g')
SparkContext.setSystemProperty('spark.driver.memory', '8g')
conf = SparkConf().setAppName('task1').setMaster('local[*]')
sc = SparkContext(conf=conf)
sc.setLogLevel('ERROR')
input_file = sys.argv[1]
sqlContext = SQLContext(sc)
## input json files
lines = sc.textFile("gs://kiana-bucket/kiana-data/.",6)


### create rdd and filter out data based on the most occuring macids
rdd=lines.map(lambda x:json.loads(x))
rdd.map(lambda x:x['Building'] ).count()

rdd.map(lambda x : (x['ClientMacAddr'],1)).reduceByKey(add).sortBy(lambda x : (x[1]),ascending=False).take(2)

rdd.map(lambda x : (x['ClientMacAddr'],1)).reduceByKey(add).count()
rdd.map(lambda x : (x['ClientMacAddr'],1)).reduceByKey(add).filter(lambda x:(x[1]>50)).count()
final_rdd = rdd.map(lambda x : (x['ClientMacAddr'],1)).reduceByKey(add).filter(lambda x:(x[1]>50))
#df_data= sqlContext.read.json(input_file,multiLine=True)

#converting the rdd to dataframe
df_data = final_rdd.toDF()

'''
root
 |-- Building: string (nullable = true)
 |-- ClientMacAddr: string (nullable = true)
 |-- Level: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- localtime: string (nullable = true)

None

'''

# {"Building":"TECA-IMP","Level":"Level 1","ClientMacAddr":"3c:a8:2a:78:82:c0","lat":-22.827847379110437,"lng":-43.246247836804116,"localtime":"2019-09-22 20:25:31.207 UTC"}
'''
+-----------------+------+
|    ClientMacAddr| count|
+-----------------+------+
|20:b3:99:e4:e9:39|147515|
|20:b3:99:e4:e9:38|146635|
|fc:2d:5e:49:7d:4b|139633|
|b4:c7:99:84:01:81|112582|
|b4:c7:99:84:01:80|112484|
|3c:a8:2a:78:d9:10|105316|
|3c:a8:2a:78:88:10|104652|
|b4:b5:2f:44:4a:50|100639|
|3c:a8:2a:78:82:d0| 93355|
|3c:a8:2a:78:93:50| 92190|
|20:b3:99:e4:fa:59| 86273|
|20:b3:99:e4:3a:04| 85942|
|20:b3:99:e4:fa:58| 84230|
|3c:a8:2a:78:91:50| 81461|
|3c:a8:2a:78:82:c0| 64853|
|50:3e:aa:31:1b:26| 64729|
|b4:b5:2f:44:4a:40| 62124|
|3c:a8:2a:78:88:00| 59787|
|20:b3:99:e4:3c:28| 58719|
|28:80:23:32:26:c0| 56981|
+-----------------+------+
only showing top 20 rows

None
'''
### converting the localtime column to date format
#print("dataframe",df_data.show())
df = df_data.select('Building','ClientMacAddr','Level','lat','lng', F.from_unixtime(F.unix_timestamp('localtime', 'yyyy-MM-dd HH:mm:ss')))
#print("df dataframe",df.show())
#print("dataframe type",df)

from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import from_unixtime
df2 = df_data.select('Building','ClientMacAddr','Level','lat','lng', from_unixtime(unix_timestamp('localtime', 'MM/dd/yyy')).alias('date'))
##print("df2 dataframe",df2.show())
#print("dataframe type",df2)



"""

'''
#df= df_data.withColumn('localtime',F.unix_timestamp('localtime', "yyyy-MM-dd HH:mm:ss UTC").cast(TimestampType()).alias("timestamp"))
#df= df_data.withColumn('localtime',F.unix_timestamp('localtime', "yyyy-MM-dd HH:mm:ss UTC"))
#df = df_data.select(*,unix_timestamp(df_data.localtime, 'yyyy-MM-dd HH:mm:ss UTC').cast(TimestampType()).alias("localtime"))
### Convert dateformat of localtime column
#
#func =  udf (datetime.datetime(2019,8,1,3,0,0,0, datetime.timezone.utc ))

# Function to covert string to datetime
'''
dateCol = udf(lambda z: convert(z))
sqlContext.udf.register("dateCol", dateCol) 
def convert(date_time): 
    format = 'yyyy-MM-dd HH:mm:ss UTC' # The format 
    datetime_str = datetime.strptime(date_time, format) 
   
    return datetime_str

# 2019-08-01 03:20:05.132000+00:00
df = df_data.withColumn('localtime',dateCol('localtime') )

df = df_data.select('Building','ClientMacAddr','Level','lat','lng', from_unixtime(unix_timestamp('localtime', 'yyyy-MM-dd HH:mm:ssXXXXX')).alias('localtime'))


'''
from dateutil import parser, tz
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, udf

utc_zone =  tz.gettz('UTC')
func =  udf (lambda x: datetime.strptime(x, '%y-%m-%d HH:mm:ssXXXX '), DateType())
df = df_data.withColumn("localtime",func(col("localtime")))

##print(df.groupBy("ClientMacAddr").count().filter("`count` >= 200").orderBy('count', ascending=False).show())

## group by macid and select only mac ids occuring more than 100 times
df1 = df_data.groupBy("ClientMacAddr").count().orderBy('count', ascending=False)
"""



# Generate time series data for desired mac addresses
from datetime import timedelta

def createInitialAddresses(df, startTime):
    endTime = startTime + timedelta(hours=1)
    timeFrame = df[(df['localtime'] >= startTime) & (df['localtime'] < endTime)]
    addrCounts = df['ClientMacAddr'].value_counts()[:20].to_dict()
    return list(addrCounts.keys())

def generateMacAddrLocationTimeSeries(df, addrList,starTime , endTime):
    timeFrame = df[(df['localtime'] >= startTime) & (df['localtime'] < endTime)]
    macAddrLocation = dict()
    for addr in addrList:
        macAddrLocation[addr] = []
    
    for time in range(0, 160, 20):
        frameStartTime = startTime + timedelta(minutes=time)
        frameEndTime = frameStartTime + timedelta(minutes=10)
        frame = df[(df['localtime'] >= frameStartTime) & (df['localtime'] < frameEndTime)]
        for addr in macAddrLocation:
            records = frame.loc[frame['ClientMacAddr'] == addr]
            if records.empty:
                macAddrLocation[addr].append(None)
            else:
                macAddrLocation[addr].append(records.iloc[0])
    return macAddrLocation
        
import datetime
# knnGroupSoloClassifier(df, addrList)

startTime = datetime.datetime(2019,8,1,3,0,0,0, datetime.timezone.utc ) 
endTime = datetime.datetime(2019,8,1,6,0,0,0, datetime.timezone.utc)
addrList = createInitialAddresses(df, startTime)
macAddrLocation = generateMacAddrLocationTimeSeries(df, addrList, startTime, endTime)
#macAddrLocation

# Compute distances from neighbor to neighbor
def computeNeighborDistances(df, addrList,starTime , endTime, k_neighbors=10):
    timeFrame = df[(df['localtime'] >= startTime) & (df['localtime'] < endTime)]
    
    addrList = set(addrList)
    neighbor_distances = dict()
    for addr in addrList:
        neighbor_distances[addr] = dict()
        
    for time in range(0, 160, 20):
        frameStartTime = startTime + timedelta(minutes=time)
        frameEndTime = frameStartTime + timedelta(minutes=2)
        frame = df[(df['localtime'] >= frameStartTime) & (df['localtime'] < frameEndTime)]
        for addr in addrList:
            addrLoc = frame.loc[frame['ClientMacAddr'] == addr][:1]
            if addrLoc.empty:
                neighbor_distances[addr][time] = None
            else:
                addrLoc = frame.loc[frame['ClientMacAddr'] == addr].iloc[0]
                coord_1 = (addrLoc['lat'],addrLoc['lng'])
                neighbors = []
                visited = set()
                visited.add(addr)
                for m,record in frame.iterrows():
                    d = distance.vincenty(coord_1, (record['lat'],record['lng']))
                    if record['ClientMacAddr'] not in visited:
                        visited.add(record['ClientMacAddr'])
                        neighbors.append((d.m, record['ClientMacAddr']))
                neighbors.sort()
                neighbor_distances[addr][time] = neighbors[:k_neighbors]


    return neighbor_distances
        
    
startTime = datetime.datetime(2019,8,1,3,0,0,0, datetime.timezone.utc ) 
endTime = datetime.datetime(2019,8,1,6,0,0,0, datetime.timezone.utc)
addrList = ['3c:a8:2a:78:83:94','a4:ee:57:bf:c5:86']

neighbor_distances = computeNeighborDistances(df, addrList, startTime, endTime)
##neighbor_distances

###  Running the classifier
def classifier(neighbor_distances, max_dist_threshold=10, occurence_threshold=5):
    groups = dict()
    for addr in neighbor_distances:
        groups[addr] = []
        neighbor_count = dict()
        for time in neighbor_distances[addr]:
            if neighbor_distances[addr][time]:
                for dist, neighbor in neighbor_distances[addr][time]:
                    if dist < max_dist_threshold:
                        if neighbor in neighbor_count:
                            neighbor_count[neighbor] += 1
                        else:
                            neighbor_count[neighbor] = 1
        for neighbor in neighbor_count:
            if neighbor_count[neighbor]>occurence_threshold:
                groups[addr].append(neighbor)
        print(neighbor_count)
    return groups

groups_by_distance= classifier(neighbor_distances)