In [165]:
import numpy as np
import pandas as pd
import nltk
import string
from nltk.stem.porter import PorterStemmer
from nltk.corpus import stopwords
from nltk.stem.porter import *
from pyspark import SparkContext
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import * 
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SQLContext, SparkSession, Row
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyod.models.loci import LOCI
import pyflux as pf
from google.cloud import bigquery
from kafka import KafkaConsumer

### Initialize test dataset

In [93]:
tweets = pd.read_csv('toxic-data/tweets.csv', encoding='latin-1')
tweets.head()

Unnamed: 0,text,label
0,RT @RihannaHasAids: aight game over. dykes had...,42
1,Dude said ice jj fish created player on 2k ret...,70
2,not in the business of submitting to no bitch ...,83
3,When did it become cool to ne stupid? This tea...,53
4,"Peace fags, just remember I'm best Lux support...",78


In [94]:
tweets['timestamp'] = "2018-04-13 10:31:30"
tweets.timestamp = pd.to_datetime(tweets.timestamp)
offsets = [pd.DateOffset(hours=i) for i in range(1, tweets.shape[0])]
for i in range(len(offsets)):
    tweets.loc[i,'timestamp'] += offsets[i]
tweets = tweets.set_index("timestamp").reset_index()
tweets.head()

Unnamed: 0,timestamp,text,label
0,2018-04-13 11:31:30,RT @RihannaHasAids: aight game over. dykes had...,42
1,2018-04-13 12:31:30,Dude said ice jj fish created player on 2k ret...,70
2,2018-04-13 13:31:30,not in the business of submitting to no bitch ...,83
3,2018-04-13 14:31:30,When did it become cool to ne stupid? This tea...,53
4,2018-04-13 15:31:30,"Peace fags, just remember I'm best Lux support...",78


In [95]:
tweets = tweets.iloc[0:250,:]

In [96]:
tweets.to_csv("toxic-data/tweets-timestamped.csv", index=False)

### ML Service Endpoint

In [97]:
spark = SparkSession.builder.appName('mltox').getOrCreate()

In [148]:
MODEL_PATH = "spark-gradientboosting-toxic-tagger-cv"
FORECAST_WINDOW_PCT = 0.25
FORECAST_MCMC_SIMULATIONS = 10000 # tunable
STREAMED_FILENAME = "toxic-data/tweets-timestamped.csv"
BQ_DATASET_NAME = "MLTOX"
BQ_TIME_SERIES_TABLE = "TIME_SERIES"
BQ_FORECAST_TABLE = "FORECAST"
###  KAFKA parameters here

In [166]:
def cleanText(column):
    return F.trim(F.lower(F.regexp_replace(column, '([^\s\w_]|_)+', ''))).alias('text')

In [149]:
# Spark Streaming component is the Kafka consumer
# streams messages from Kafka into a spark dataframe or alternatively a .csv
def consumeKafka():
    pass

def writeToBigQuery(df, dataset_id, table_id):
    client = bigquery.Client()
    table_ref = client.dataset(dataset_id).table(table_id)
    table = client.get_table(table_ref) # API request
    rowsToInsert = list(df.itertuples(index=False, name=None))
    client.insert_rows(table, rowsToInsert) # API request

In [162]:
def tagAnomalies(df):
    values = df.prediction.values.reshape(-1,1)
    anomalyDetector = LOCI()
    anomalyDetector.fit(values)
    anomalyLabels = np.asarray(anomalyDetector.labels_)
    df['isAnomaly'] = anomalyLabels
    return df

def predict():
    test = spark.read.csv(STREAMED_FILENAME, header=True, mode="DROPMALFORMED")
    test = test.select(F.col("timestamp"), cleanText(F.col("text")))
    times = test.select("timestamp")
    test = test.drop("timestamp")
    toxicTagger = PipelineModel.load(MODEL_PATH)
    predictions = toxicTagger.transform(test).select(F.col("prediction"))
    testIndex = predictions.withColumn("id", F.monotonically_increasing_id())
    timesIndex = times.withColumn("id", F.monotonically_increasing_id())
    tagged = timesIndex.join(testIndex, "id", "inner").drop("id")
    tagged = tagged.withColumn("datetime", F.from_unixtime(F.unix_timestamp("timestamp", "yyyy-MM-dd HH:mm:ss")))
    tagged = tagged.select(F.col("datetime"), F.col("prediction"))
    tagged = tagged.withColumn("timestamp", F.date_trunc("hour", F.col("datetime").cast("timestamp")))
    tagged = tagged.select(F.col("timestamp"), F.col("prediction"))
    result = tagged.groupBy("timestamp").mean("prediction").sort(F.col("timestamp").asc())
    result = result.na.drop(subset=["timestamp", "avg(prediction)"])
    result = result.toPandas()
    result['timestamp'] = pd.to_datetime(result.timestamp)
    result.columns = ['timestamp', 'prediction']
    return result

def forecast(df):

    #def runForecastModel(df):
        #forecaster = ToxicityForecaster(df.prediction.values)
        #return forecaster.forecast()

    #_, forecasted = runForecastModel(df)
    #start = [df.timestamp.iloc[-1]] * FORECAST_WINDOW
    #forecastDF = pd.DataFrame({'timestamp': start})
    #forecastDF['timestamp'] = pd.to_datetime(forecastDF.timestamp)
    #offsets = [pd.DateOffset(hours=i) for i in range(1, FORECAST_WINDOW)]
    #for i in range(len(offsets)):
        #forecastDF.loc[i,'timestamp'] += offsets[i]
    #forecastDF['prediction'] = forecasted
    #return forecastDF

    forecast_window = int(df.shape[0] * FORECAST_WINDOW_PCT)
    forecasted = pd.DataFrame(columns=['timestamp','prediction'])
    model = None
    if forecast_window > 0:
        ts = df.set_index('timestamp')
        model = pf.GARCH(p=1, q=1, data=ts)
        model.fit('M-H', nsims=FORECAST_MCMC_SIMULATIONS)
        forecasted = model.predict(forecast_window).reset_index()
    return forecasted, model

In [163]:
def run():
    # consumeKafka()
    history = predict()
    history = tagAnomalies(history)
    forecasted, model = forecast(history)
    print(history.head())
    print(forecasted.head())
    #writeToBigQuery(history, BQ_DATASET_NAME, BQ_TIME_SERIES_TABLE)
    #writeToBigQuery(forecast, BQ_DATASET_NAME, BQ_FORECAST_TABLE)

### Experiment

In [103]:
spark = SparkSession.builder.appName('mltox').getOrCreate()

In [151]:
test = spark.read.csv(STREAMED_FILENAME, header=True, mode="DROPMALFORMED")
test = test.select(F.col("timestamp"), cleanText(F.col("text")))
times = test.select("timestamp")
test = test.drop("timestamp")

In [152]:
toxicTagger = PipelineModel.load(MODEL_PATH)

In [153]:
predictions = toxicTagger.transform(test).select(F.col("prediction"))
predictions.show()

+------------------+
|        prediction|
+------------------+
|50.062105637329594|
| 50.71760941386791|
| 53.27285813640033|
|50.062105637329594|
|50.062105637329594|
|50.062105637329594|
| 51.07780263384459|
|50.062105637329594|
| 53.09267422891177|
|50.062105637329594|
|50.062105637329594|
|50.062105637329594|
| 50.04597062758577|
|  53.6237102795118|
|50.062105637329594|
|50.062105637329594|
|50.062105637329594|
| 59.75237995683988|
|50.062105637329594|
|53.478748590885175|
+------------------+
only showing top 20 rows



In [154]:
testIndex = predictions.withColumn("id", F.monotonically_increasing_id())
timesIndex = times.withColumn("id", F.monotonically_increasing_id())

In [155]:
tagged = timesIndex.join(testIndex, "id", "inner").drop("id")
tagged.show()

+-------------------+------------------+
|          timestamp|        prediction|
+-------------------+------------------+
|2018-04-13 11:31:30|50.062105637329594|
|2018-04-13 12:31:30| 50.71760941386791|
|2018-04-13 13:31:30| 53.27285813640033|
|2018-04-13 14:31:30|50.062105637329594|
|2018-04-13 15:31:30|50.062105637329594|
|2018-04-13 16:31:30|50.062105637329594|
|2018-04-13 17:31:30| 51.07780263384459|
|2018-04-13 18:31:30|50.062105637329594|
|2018-04-13 19:31:30| 53.09267422891177|
|2018-04-13 21:31:30|50.062105637329594|
|2018-04-13 22:31:30|50.062105637329594|
|2018-04-13 23:31:30|50.062105637329594|
|2018-04-14 00:31:30| 50.04597062758577|
|2018-04-14 01:31:30|  53.6237102795118|
|2018-04-14 02:31:30|50.062105637329594|
|2018-04-14 03:31:30|50.062105637329594|
|2018-04-14 04:31:30|50.062105637329594|
|2018-04-14 05:31:30| 59.75237995683988|
|2018-04-14 06:31:30|50.062105637329594|
|2018-04-14 07:31:30|53.478748590885175|
+-------------------+------------------+
only showing top

In [156]:
tagged = tagged.withColumn("datetime", F.from_unixtime(F.unix_timestamp("timestamp", "yyyy-MM-dd HH:mm:ss")))
tagged = tagged.select(F.col("datetime"), F.col("prediction"))
tagged.show()

+-------------------+------------------+
|           datetime|        prediction|
+-------------------+------------------+
|2018-04-13 11:31:30|50.062105637329594|
|2018-04-13 12:31:30| 50.71760941386791|
|2018-04-13 13:31:30| 53.27285813640033|
|2018-04-13 14:31:30|50.062105637329594|
|2018-04-13 15:31:30|50.062105637329594|
|2018-04-13 16:31:30|50.062105637329594|
|2018-04-13 17:31:30| 51.07780263384459|
|2018-04-13 18:31:30|50.062105637329594|
|2018-04-13 19:31:30| 53.09267422891177|
|2018-04-13 21:31:30|50.062105637329594|
|2018-04-13 22:31:30|50.062105637329594|
|2018-04-13 23:31:30|50.062105637329594|
|2018-04-14 00:31:30| 50.04597062758577|
|2018-04-14 01:31:30|  53.6237102795118|
|2018-04-14 02:31:30|50.062105637329594|
|2018-04-14 03:31:30|50.062105637329594|
|2018-04-14 04:31:30|50.062105637329594|
|2018-04-14 05:31:30| 59.75237995683988|
|2018-04-14 06:31:30|50.062105637329594|
|2018-04-14 07:31:30|53.478748590885175|
+-------------------+------------------+
only showing top

In [157]:
tagged = tagged.withColumn("timestamp", F.date_trunc("hour", F.col("datetime").cast("timestamp")))
tagged = tagged.select(F.col("timestamp"), F.col("prediction"))
tagged.show()

+-------------------+------------------+
|          timestamp|        prediction|
+-------------------+------------------+
|2018-04-13 11:00:00|50.062105637329594|
|2018-04-13 12:00:00| 50.71760941386791|
|2018-04-13 13:00:00| 53.27285813640033|
|2018-04-13 14:00:00|50.062105637329594|
|2018-04-13 15:00:00|50.062105637329594|
|2018-04-13 16:00:00|50.062105637329594|
|2018-04-13 17:00:00| 51.07780263384459|
|2018-04-13 18:00:00|50.062105637329594|
|2018-04-13 19:00:00| 53.09267422891177|
|2018-04-13 21:00:00|50.062105637329594|
|2018-04-13 22:00:00|50.062105637329594|
|2018-04-13 23:00:00|50.062105637329594|
|2018-04-14 00:00:00| 50.04597062758577|
|2018-04-14 01:00:00|  53.6237102795118|
|2018-04-14 02:00:00|50.062105637329594|
|2018-04-14 03:00:00|50.062105637329594|
|2018-04-14 04:00:00|50.062105637329594|
|2018-04-14 05:00:00| 59.75237995683988|
|2018-04-14 06:00:00|50.062105637329594|
|2018-04-14 07:00:00|53.478748590885175|
+-------------------+------------------+
only showing top

In [161]:
result = tagged.groupBy("timestamp").mean("prediction").sort(F.col("timestamp").asc())
result = result.na.drop(subset=["timestamp", "avg(prediction)"])
result.show()

+-------------------+------------------+
|          timestamp|   avg(prediction)|
+-------------------+------------------+
|2018-04-13 11:00:00|50.062105637329594|
|2018-04-13 12:00:00| 50.71760941386791|
|2018-04-13 13:00:00| 53.27285813640033|
|2018-04-13 14:00:00|50.062105637329594|
|2018-04-13 15:00:00|50.062105637329594|
|2018-04-13 16:00:00|50.062105637329594|
|2018-04-13 17:00:00| 51.07780263384459|
|2018-04-13 18:00:00|50.062105637329594|
|2018-04-13 19:00:00| 53.09267422891177|
|2018-04-13 21:00:00|50.062105637329594|
|2018-04-13 22:00:00|50.062105637329594|
|2018-04-13 23:00:00|50.062105637329594|
|2018-04-14 00:00:00| 50.04597062758577|
|2018-04-14 01:00:00|  53.6237102795118|
|2018-04-14 02:00:00|50.062105637329594|
|2018-04-14 03:00:00|50.062105637329594|
|2018-04-14 04:00:00|50.062105637329594|
|2018-04-14 05:00:00| 59.75237995683988|
|2018-04-14 06:00:00|50.062105637329594|
|2018-04-14 07:00:00|53.478748590885175|
+-------------------+------------------+
only showing top

In [111]:
result = tagged.groupBy(F.col("timestamp")).agg(F.mean(F.col("prediction"))).sort(F.col("timestamp").asc())
result = result.na.drop(subset=["timestamp", "avg(prediction)"])
result.show()

+-------------------+------------------+
|          timestamp|   avg(prediction)|
+-------------------+------------------+
|2018-04-13 11:00:00|50.062105637329594|
|2018-04-13 12:00:00| 50.71760941386791|
|2018-04-13 13:00:00| 53.27285813640033|
|2018-04-13 14:00:00|50.062105637329594|
|2018-04-13 15:00:00|50.062105637329594|
|2018-04-13 16:00:00|50.062105637329594|
|2018-04-13 17:00:00| 51.07780263384459|
|2018-04-13 18:00:00|50.062105637329594|
|2018-04-13 19:00:00| 53.09267422891177|
|2018-04-13 21:00:00|50.062105637329594|
|2018-04-13 22:00:00|50.062105637329594|
|2018-04-13 23:00:00|50.062105637329594|
|2018-04-14 00:00:00| 50.04597062758577|
|2018-04-14 01:00:00|  53.6237102795118|
|2018-04-14 02:00:00|50.062105637329594|
|2018-04-14 03:00:00|50.062105637329594|
|2018-04-14 04:00:00|50.062105637329594|
|2018-04-14 05:00:00| 59.75237995683988|
|2018-04-14 06:00:00|50.062105637329594|
|2018-04-14 07:00:00|53.478748590885175|
+-------------------+------------------+
only showing top

In [112]:
result = result.toPandas()
result['timestamp'] = pd.to_datetime(result.timestamp)
result.columns = ['timestamp', 'prediction']
result.head()

Unnamed: 0,timestamp,prediction
0,2018-04-13 11:00:00,50.062106
1,2018-04-13 12:00:00,50.717609
2,2018-04-13 13:00:00,53.272858
3,2018-04-13 14:00:00,50.062106
4,2018-04-13 15:00:00,50.062106


In [113]:
result = tagAnomalies(result)
result.head()

Unnamed: 0,timestamp,prediction,isAnomaly
0,2018-04-13 11:00:00,50.062106,0
1,2018-04-13 12:00:00,50.717609,0
2,2018-04-13 13:00:00,53.272858,1
3,2018-04-13 14:00:00,50.062106,0
4,2018-04-13 15:00:00,50.062106,0


In [132]:
history = tagAnomalies(predict())
history.head()

Unnamed: 0,timestamp,prediction,isAnomaly
0,2018-04-13 11:00:00,50.062106,0
1,2018-04-13 12:00:00,50.717609,0
2,2018-04-13 13:00:00,53.272858,1
3,2018-04-13 14:00:00,50.062106,0
4,2018-04-13 15:00:00,50.062106,0


In [137]:
forecast, model = forecast(history)
forecast.head()

Acceptance rate of Metropolis-Hastings is 0.001
Acceptance rate of Metropolis-Hastings is 0.1485
Acceptance rate of Metropolis-Hastings is 0.215
Acceptance rate of Metropolis-Hastings is 0.281

Tuning complete! Now sampling.
Acceptance rate of Metropolis-Hastings is 0.273


Unnamed: 0,timestamp,prediction
0,2018-04-21 07:00:00,4.813088
1,2018-04-21 08:00:00,4.813088
2,2018-04-21 09:00:00,4.813088
3,2018-04-21 10:00:00,4.813088
4,2018-04-21 11:00:00,4.813088


In [164]:
run()

Acceptance rate of Metropolis-Hastings is 0.00205
Acceptance rate of Metropolis-Hastings is 0.15125
Acceptance rate of Metropolis-Hastings is 0.21225
Acceptance rate of Metropolis-Hastings is 0.28525

Tuning complete! Now sampling.
Acceptance rate of Metropolis-Hastings is 0.278575
            timestamp  prediction  isAnomaly
0 2018-04-13 11:00:00   50.062106          0
1 2018-04-13 12:00:00   50.717609          0
2 2018-04-13 13:00:00   53.272858          1
3 2018-04-13 14:00:00   50.062106          0
4 2018-04-13 15:00:00   50.062106          0
            timestamp  prediction
0 2018-04-21 07:00:00    4.947318
1 2018-04-21 08:00:00    4.947318
2 2018-04-21 09:00:00    4.947318
3 2018-04-21 10:00:00    4.947318
4 2018-04-21 11:00:00    4.947318
