In [0]:

dbutils.widgets.text("Path (for new batch data)", '/mnt/dacoursedatabricksstg/dacoursedatabricksdata/busFile')
dbutils.widgets.text("IP (for new stream data)", '10.0.0.30')

batch_path = dbutils.widgets.get("Path (for new batch data)")
stream_ip = dbutils.widgets.get("IP (for new stream data)")


In [0]:
import pyspark.sql.functions as F
from pyspark.ml import Pipeline
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoderEstimator, Imputer
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.classification import StreamingLogisticRegressionWithSGD
import pandas as pd
import datetime
from sklearn.metrics import mean_squared_error, accuracy_score

ip = '10.0.0.4'


In [0]:
from pyspark.sql.types import *

schema = StructType([StructField('_id',StructType([StructField('$oid',StringType(),True)]),True),
                     StructField('actualDelay',DoubleType(),True),
                     StructField('angle',DoubleType(),True),
                     StructField('anomaly',BooleanType(),True),
                     StructField('areaId',LongType(),True),
                     StructField('areaId1',LongType(),True),
                     StructField('areaId2',LongType(),True),
                     StructField('areaId3',LongType(),True),
                     StructField('atStop',BooleanType(),True),
                     StructField('busStop',LongType(),True),
                     StructField('calendar',StructType([StructField('$numberLong',StringType(),True)]),True),
                     StructField('congestion',BooleanType(),True),
                     StructField('currentHour',LongType(),True),
                     StructField('dateType',LongType(),True),
                     StructField('dateTypeEnum',StringType(),True),
                     StructField('delay',LongType(),True),
                     StructField('direction',LongType(),True),
                     StructField('distanceCovered',DoubleType(),True),
                     StructField('ellapsedTime',LongType(),True),
                     StructField('filteredActualDelay',LongType(),True),
                     StructField('gridID',StringType(),True),
                     StructField('journeyPatternId',StringType(),True),
                     StructField('justLeftStop',BooleanType(),True),
                     StructField('justStopped',BooleanType(),True),
                     StructField('latitude',DoubleType(),True),
                     StructField('lineId',StringType(),True),
                     StructField('loc',StructType([StructField('coordinates',ArrayType(DoubleType(),True),True),
                                                   StructField('type',StringType(),True)]),True),
                     StructField('longitude',DoubleType(),True),
                     StructField('poiId',LongType(),True),
                     StructField('poiId2',LongType(),True),
                     StructField('probability',DoubleType(),True),
                     StructField('systemTimestamp',DoubleType(),True),
                     StructField('timestamp',StructType([StructField('$numberLong',StringType(),True)]),True),
                     StructField('vehicleId',LongType(),True),
                     StructField('vehicleSpeed',DoubleType(),True)])

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
from elasticsearch import Elasticsearch
import pandas as pd

# read data file
def read_data(data):
  if data == 'full':
    df = spark.read.json('/mnt/dacoursedatabricksstg/dacoursedatabricksdata/busFile', schema=schema)        
  elif data == 'small':
    df = spark.read.json('/FileStore/tables/mini_sample.json', schema=schema)
  elif data == 'first':    
    df = spark.read.json('/FileStore/tables/seq_sample.json', schema=schema)
  else:
    raise('error')
  print('finished read_data')
  return df


# a class of an event
class Public_Event:
    def __init__(self, row):
        self.parse_date(row)
        place_dict = {'aviva': [980, 3910, 3912, 3913, 3914, 3915, 3916, 3883, 3884, 3886, 3888, 3889, 3890, 3891, 3892, 4565, 4567, 4573, 4574, 4575, 4576, 1149, 1151], 'croke': [4581, 4583, 4589, 4591, 4480, 4483, 4484, 4566, 4568, 4577, 4569, 4574, 4579, 4572, 4580, 4578, 4571, 4570, 4576]}
        self.areaIds = place_dict[row['stadium']]
        self.event = row['event']
        
    # get starting and finishing times of the event
    def parse_date(self, row):
        year = int(row['year'])
        month = int(row['month'])
        day = int(row['day'])
        start_finish_hour = int(row['start_finish_hour'])
        start_finish_minute = int(row['start_finish_minute'])

        self.start_finish = datetime.datetime(year, month, day, start_finish_hour, start_finish_minute)

        if row['event'] == 'concert':
            start_begin_hour = int(row['start_begin_hour'])
            start_begin_minute = int(row['start_begin_minute'])
            self.start_begin = datetime.datetime(year, month, day, start_begin_hour, start_begin_minute)
        else:
            self.start_begin = self.start_finish - datetime.timedelta(hours=1, minutes=0)
            self.end_begin = self.start_finish + datetime.timedelta(hours=1, minutes=50)
            self.end_finish = self.start_finish + datetime.timedelta(hours=2, minutes=15)

    
    # return for a datetime and areaid3 if its close to an event
    def is_close(self, year, month, day, hour, minute, areaId3):
        date = datetime.datetime(year, month, day, hour, minute)
        if self.event == 'concert' and (areaId3 in self.areaIds and 
           self.start_begin <= date <= self.start_finish):
            print('concert')
            return self.event
        elif self.event != 'concert' and (areaId3 in self.areaIds and 
           (self.start_begin <= date <= self.start_finish or self.end_begin <= date <= self.end_finish)):
            print('other')
            return self.event
        else:
            print('no', end=', ')
            return 'no'

# parse all the events       
def parse_events():
    df_events = pd.read_csv("/dbfs/FileStore/shared_uploads/moshe.abadi@campus.technion.ac.il/events-1.csv")
#     display(df_events)
    events_list = []
    for i, row in df_events.iterrows():
        events_list.append(Public_Event(row))
    return events_list
events_list = parse_events()
# tag every report from the bus if and which event is close
@F.udf()
def tag_reports(minute, hour, day, month, year, areaId):
    for event in events_list:
        event_close = event.is_close(year, month, day, hour, minute, areaId)
        if event_close != 'no':
            return event_close
    return 'no'

class Tweet:
    def __init__(self, row):
        self.parse_date(row)
        self.tweet = row
        
    # get starting and finishing times of the tweet
    def parse_date(self, row):
        year = int(row['year'])
        month = int(row['month'])
        day = int(row['day'])
        hour = int(row['hour'])
        minute = int(row['minute'])

        self.tweetime = datetime.datetime(year, month, day, hour, minute)

        self.start = self.tweetime - datetime.timedelta(hours=1, minutes=0)
        self.finish = self.tweetime + datetime.timedelta(hours=1, minutes=0)

    
    # return for a datetime and areaid3 if its close to an tweet
    def is_close(self, year, month, day, hour, minute):
        date = datetime.datetime(year, month, day, hour, minute)
        if self.start <= date <= self.finish:
            return self.tweet['text']
        else:
            return 'no'

# parse all the events       
def parse_tweets():
    df_tweets = pd.read_csv("/dbfs/FileStore/shared_uploads/moshe.abadi@campus.technion.ac.il/combined_csv.csv")
    tweets_list = []
    for i, row in df_tweets.iterrows():
        tweets_list.append(Tweet(row))
    return tweets_list
tweets_list = parse_tweets()

# tag every report from the bus if and which tweet is close
@F.udf()
def tag_tweets(minute, hour, day, month, year):
    for tweet in tweets_list:
        tweet_close = tweet.is_close(year, month, day, hour, minute)
        if tweet_close != 'no':
            return tweet_close
    return 'no'


@F.udf()
def prefix(s):
  return s[:4]
@F.udf()
def direction(s):
  return s[4]
@F.udf()
def loc(lat, lon):
  return ','.join([lat, lon])
@F.udf('int')
def delay_interval(delay):
    if delay <= -600:
        return 0
    elif -600 < delay <= -300:
        return 1
    elif -300 < delay <= -180:
        return 2
    elif -180 < delay <= -60:
        return 3
    elif -60 < delay <= 60:
        return 4
    elif 60 < delay <= 180:
        return 5
    elif 180 < delay <= 300:
        return 6
    elif 300 < delay <= 600:
        return 7
    elif 600 < delay:
        return 8

# parse and remove uneeded columns
def pre_process(df):
  df = df.withColumn('id', F.col('_id.$oid'))
  df = df.withColumn('patternLine', prefix(F.col("journeyPatternId")))
  df = df.withColumn('directionLine', direction(F.col("journeyPatternId")))
  df = df.withColumn('delay_interval', delay_interval(F.col("delay")))
  df = df.withColumn("actualDelay", df["actualDelay"].cast("double"))
  df = df.withColumn('coordinates', df['loc']['coordinates'])
  df = df.withColumn('timestamp', F.col('timestamp.$numberLong'))
  spark.conf.set("spark.sql.session.timeZone", "Europe/Dublin")
  df = df.withColumn('datetime', F.from_unixtime(F.substring(F.col("timestamp"),0,10))) #.cast(DateType())
  df = df.withColumn('dayOfWeek', F.dayofweek(df['datetime']))
  df = df.withColumn('minute', F.minute(df['datetime']))
  df = df.withColumn('hour', F.hour(df['datetime']))
  df = df.withColumn('day', F.dayofmonth(df['datetime']))
  df = df.withColumn('month', F.month(df['datetime']))
  df = df.withColumn('year', F.year(df['datetime']))
  df = df.withColumn("event_around", tag_reports(F.col("minute"), F.col("hour"), F.col("day"), F.col("month"), F.col("year"), F.col("areaId3")))
  df = df.withColumn("relevant_tweet", tag_tweets(F.col("minute"), F.col("hour"), F.col("day"), F.col("month"), F.col("year")))
  df = df.withColumn("event_around_bool", F.when(F.col("event_around") != "no", False).otherwise(False))
  df = df.withColumn("relevant_tweet_bool", F.when(F.col("relevant_tweet") != "no", False).otherwise(False))
  

  df = df.drop(*['_id', 'angle', 'anomaly', 'direction', 'dateType', 'dateTypeEnum', 'poiId', 'poiId2', 'probability', 'filteredActualDelay', 'calendar', 'loc', 'systemTimestamp', 'currentHour', 'timestamp'])
  print('finished pre_process')
  return df

# remove ouliers from selected columns
def remove_outliers(df):
  outliers_ranges = {'vehicleSpeed': [0, 120], 'actualDelay': [-2200, 2000], 'delay': [-10000, 10000], 'distanceCovered': [0, 5]}
  
  outliers_cols = ['actualDelay', 'delay', 'distanceCovered', 'vehicleSpeed',]
  for outlier_col, outlier_range in outliers_ranges.items():
    mini = outlier_range[0]
    maxi = outlier_range[1]

    
    df = df.withColumn(outlier_col, F.when((df[outlier_col] >= mini) & (df[outlier_col] <= maxi), df[outlier_col]))
#     df = col_remove_outliers(df, outlier_col)
  print('finished remove_outliers')
  return df


ip = '10.0.0.4'
# ip = 'da2020w-0000.eastus.cloudapp.azure.com'

# upload the data to elastic
def upload(df, data):
  index = 'df_'+data
  es = Elasticsearch([{'host': ip}], request_timeout=30000)
  if es.indices.exists(index):
      es.indices.delete(index=index)

  DEFUALT_SCEHMA = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0
    },
    "mappings": {
        "properties": {
            "actualDelay" : { "type": "double" },
            "areaId" : { "type": "long" },
            "areaId1" : { "type": "long" },
            "areaId2" : { "type": "long" },
            "areaId3" : { "type": "long" },
            "atStop" : { "type": "boolean" },
            "busStop" : { "type": "long" },
            "congestion" : { "type": "boolean" },
            "coordinates" : { "type": "geo_point" },
            "datetime" : { "type": "date", "format": "yyyy-MM-dd HH:mm:ss"},#2017-07-24 09:13:47
            "dayOfWeek" : { "type": "long" },
            "delay" : { "type": "long" },
            "distanceCovered" : { "type": "double" },
            "ellapsedTime" : { "type": "double" },
            "hour" : { "type": "long" },
            "gridID" : { "type": "keyword" },
            "id" : { "type": "keyword" },
            "journeyPatternId" : { "type": "keyword" },
            "justLeftStop" : { "type": "boolean" },
            "justStopped" : { "type": "boolean" },
            "latitude" : { "type": "double" },
            "lineId" : { "type": "keyword" },
            "longitude" : { "type": "double" },
            "minute" : { "type": "long" },
            "vehicleId" : { "type": "long" },
            "vehicleSpeed" : { "type": "long" },
            "patternLine" : { "type": "keyword" },
        }
    }
}  

  es.indices.create(index=index, ignore=400, request_timeout=30000, body=DEFUALT_SCEHMA)
  df.write.format("org.elasticsearch.spark.sql")\
          .option("es.nodes",ip)\
          .option("es.resource", index)\
          .option("es.nodes.wan.only","true")\
          .save()
    
  print('finished upload')
  
# download data from elastic
def elastic_import(index):
  q = """
  {
  "query": {
    "bool": {
      "filter": [
        {
          "range": {
            "datetime": {
              "gte": "2017-05-26T07:40:30.795Z",
              "lte": "2018-10-06T03:08:25.541Z",
              "format": "strict_date_optional_time"
            }
          }
        }
      ]
    }
  }
  }"""
  
  return spark.read.format("org.elasticsearch.spark.sql")\
                 .option("es.nodes.wan.only", "true")\
                 .option("es.mapping.date.rich", "false")\
                 .option("es.port", "9200")\
                 .option("es.nodes", ip)\
                 .option("pushdown", "true")\
                 .load(index)
#                  .option("es.query", q)\


In [0]:
# data = 'small'
# # data = 'first'
data = 'full'
print('data =', data)

df = read_data(data=data)
df = pre_process(df)
df = remove_outliers(df)
# display(df)
upload(df, data)

# df = elastic_import('df_'+data)
# df = df.withColumn("actualDelay", df["actualDelay"].cast(DoubleType()))
# df = df.withColumn("vehicleSpeed", df["vehicleSpeed"].cast(DoubleType()))


In [0]:
imputer_cols = ['actualDelay'] #'distanceCovered', 'vehicleSpeed']
str_to_ix_col = ['patternLine'] #, 'event_around']
ohe_cols = ['hour', 'busStop', 'dayOfWeek']
#  'areaId',
cont_cols = ['atStop',]# 'justLeftStop', 'justStopped', 'ellapsedTime', 'congestion']

pred_col = 'delay_interval'


imputer = Imputer(
        missingValue=float('nan'),
        inputCols=imputer_cols,
        outputCols=[col + '_mean' for col in imputer_cols],
)

str_to_ix1 = StringIndexer(
    inputCol=str_to_ix_col[0],
    outputCol=str_to_ix_col[0] + '_ix',
)

# str_to_ix2 = StringIndexer(
#     inputCol=str_to_ix_col[0],
#     outputCol=str_to_ix_col[0] + '_ix',
# )

ohe = OneHotEncoderEstimator(
        inputCols=ohe_cols + [str_to_ix1.getOutputCol()] ,#+ [str_to_ix2.getOutputCol()],
        outputCols=[col + '_ohe' for col in ohe_cols] + [col + '_ohe' for col in [str_to_ix1.getOutputCol()]],#+[str_to_ix2.getOutputCol()]],
)

vec_assembler = VectorAssembler(
        inputCols=cont_cols + ohe.getOutputCols() + imputer.getOutputCols(),
        outputCol='features',
)

rf = RandomForestClassifier(
        featuresCol='features',
        labelCol=pred_col,
        predictionCol=pred_col+'_pred',
        rawPredictionCol=pred_col+'_rawpred',
        maxDepth=5, 
        maxBins=32, 
        minInstancesPerNode=1
)

lr = LogisticRegression(
        featuresCol='features',
        labelCol=pred_col,
        predictionCol=pred_col+'_pred',
        rawPredictionCol=pred_col+'_rawpred',
)


evaluator = MulticlassClassificationEvaluator(predictionCol='delay_interval_pred', labelCol='delay_interval', metricName='f1')
# f1|weightedPrecision|weightedRecall|accuracy

In [0]:
df_train, df_test = df.randomSplit([0.9, 0.1], seed=0)

pipeline = Pipeline(stages=[
    imputer,
    str_to_ix1,
#     str_to_ix2,
    ohe,
    vec_assembler,
    lr,
])

print('train len =', df_train.count())
pipeline_model = pipeline.fit(df_train)

# print('test len =', df_test.count())
# preds = pipeline_model.transform(df_test)
# score = evaluator.evaluate(preds)
# print(score)


In [0]:
preds

In [0]:
%sh pip install espandas

In [0]:
import pickle
import pyspark.sql.functions as F
 
kafka_server = '10.0.0.30:9091'
 
# Subscribe to a pattern
# specify the address of kafka
# specify the pattern of desired topics
# specify the order of the data (earliest = chronological order)


#   .option("subscribe", "vehicleId_28051,vehicleId_28052") \
#   .option("subscribePattern", "vehicleId_.*") \

kafka_raw_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_server) \
  .option("subscribePattern", "vehicleId_.*") \
  .option("startingOffsets", "earliest") \
  .option("maxOffsetsPerTrigger", 1000) \
  .load()


kafka_value_df = kafka_raw_df.selectExpr("CAST(value AS STRING)")
 
kafka_df = kafka_value_df \
           .select(F.from_json(F.col("value"), schema=schema).alias('json')) \
           .select("json.*")

df_es = pd.DataFrame()
l = []


In [0]:
from elasticsearch import Elasticsearch
from espandas import Espandas
import pandas as pd
import datetime

es = Elasticsearch([{'host': ip,}])
esp = Espandas(hosts=[{'host': ip,}])



query_template = '''
{
  "query": {
    "bool": {
      "filter": [
        {
          "match_phrase": {
            "patternLine": "%%patternLine%%"
          }
        },
        {
          "match_phrase": {
            "directionLine": "%%directionLine%%"
          }
        },
        {
          "geo_distance": {
            "distance": "200m",
            "coordinates": [ %%lon%%, %%lat%% ]
          }
        },
        {
          "range": {
            "datetime": {
              "gte": "2016-01-01T13:00:00.000Z",
              "lte": "2019-01-01T14:00:00.000Z",
              "format": "strict_date_optional_time"
            }
          }
        }
      ]
    }
  },
  "size": 0,
  "aggs": {
    "longitude_stats" : { 
      "extended_stats" : { 
        "field" : "longitude" 
      } 
    },
    "latitude_stats" : { 
      "extended_stats" : { 
        "field" : "latitude" 
      } 
    }
  }  
}'''

# @F.udf()


def is_anomaly(directionLine, patternLine, lon, lat):
  q = query_template.replace('%%directionLine%%', str(directionLine)).replace('%%patternLine%%', patternLine).replace('%%lon%%', str(lon)).replace('%%lat%%', str(lat))
  
#   l2.append(('start', datetime.datetime.now(), directionLine, patternLine, lon, lat))
  res = es.search(index="df_full", body=q, request_timeout=30000)
#   l2.append(('finis', datetime.datetime.now(), directionLine, patternLine, lon, lat))
  
  count = res['aggregations']['longitude_stats']['count']
  if count < 50:
    return True

  lon_avg = res['aggregations']['longitude_stats']['avg']
  lon_std = res['aggregations']['longitude_stats']['std_deviation']
  b1 = (abs(lon - lon_avg) > lon_std)

  lat_avg = res['aggregations']['latitude_stats']['avg']
  lat_std = res['aggregations']['latitude_stats']['std_deviation']
  b2 = (abs(lat - lat_avg) > lat_std)
  return b1 and b2

def foreach_batch_function(df_stream, batch_id):
    # Transform and write batchDF
#     l1.append((batch_id, '1', df_stream.count()))
    
    df_stream = pre_process(df_stream)
    df_stream = remove_outliers(df_stream)
    df_stream = pipeline_model.transform(df_stream)
    
#     l1.append((batch_id, '2'))

# todo: before date
    unique_cols = ['year', 'month', 'day', 'patternLine', 'directionLine', 'vehicleId']
    df_pd = df_stream.toPandas()
    df_pd['anomaly'] = df_pd.apply(lambda r: is_anomaly(r['directionLine'], r['patternLine'], r['longitude'], r['latitude']), axis=1)
    df_pd['anomaly_trip'] = df_pd.groupby(unique_cols)['anomaly'].transform(sum)>=2

#     l1.append((batch_id, '3'))

    # todo: add tweet and event
    
    l.append(df_pd[df_pd['anomaly_trip']][unique_cols+['relevant_tweet', 'event_around', 'delay_interval', 'delay_interval_pred']])
    df_anomaly = pd.concat(l)
    
#     df_anomaly = df_anomaly.drop_duplicates(unique_cols)
    
    def get_stats(x):
      return pd.Series({'prediction_rmse': mean_squared_error(x['delay_interval'], x['delay_interval_pred']),
                        'prediction_acc': accuracy_score(x['delay_interval'], x['delay_interval_pred']),
                        'relevant_tweets': list(set(x['relevant_tweet'][x['relevant_tweet']!='no'])),
                        'events_around': list(set(x['event_around'][x['event_around']!='no']))})

    df_anomaly = df_anomaly[unique_cols+['relevant_tweet', 'event_around', 'delay_interval', 'delay_interval_pred']].groupby(unique_cols).apply(get_stats).reset_index()
    df_anomaly['anomaly_id'] = range(len(df_anomaly))

#     l1.append((batch_id, '4', len(df_anomaly)))
    
    if es.indices.exists('menu_df'):
      es.indices.delete(index='menu_df')
    esp.es_write(df_anomaly, 'menu_df', 'doc', uid_name='anomaly_id')

#     l1.append((batch_id, '5'))

# l = []
# l1 = []
# l2 = []
df_es = pd.DataFrame()

kafka_df.writeStream.foreachBatch(foreach_batch_function).start()
