In [2]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:95% !important; }</style>"))

In [4]:
from threading import Thread

class StreamingThread(Thread):
    def __init__(self, ssc):
        Thread.__init__(self)
        self.ssc = ssc
    def run(self):
        ssc.start()
        ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [5]:
sc

In [6]:
spark

In [7]:
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import StringType

In [8]:
from difflib import unified_diff

def make_diff(old, new):
    return '\n'.join([ l for l in unified_diff(old.split('\n'), new.split('\n')) if l.startswith('+') or l.startswith('-') ])

In [9]:
globals()['models_loaded'] = False

def predict(df):
    if any([x in df.diff.lower() for x in ['bad', 'lol', 'joke']]):
        return 'vandal'
    else:
        return 'safe'

predict_udf = udf(predict, StringType())

def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    df.show()
    
    # Tip: making a diff will probably help a lot as a feature in any model:
    diff = make_diff(df.first().text_old, df.first().text_new)
    df_withdiff = df.withColumn("diff", lit(diff))
    df_withdiff.select('diff').show()
    
    # Utilize our predict function
    df_withpreds = df_withdiff.withColumn("pred", predict_udf(
        struct([df_withdiff[x] for x in df_withdiff.columns])
    ))
    df_withpreds.show()
    
    # Normally, you wouldn't use a UDF (User Defined Function) Python function to predict (you can)
    # But an MLlib model you've built and saved with Spark
    # In this case, you need to prevent loading your model in every call to "process" as follows:
    
    # Load in the model if not yet loaded:
    if not globals()['models_loaded']:
        # load in your models here
        globals()['my_model'] = '***' # Replace '***' with:    [...].load('my_logistic_regression')
        globals()['models_loaded'] = True
        
    # And then predict using the loaded model: 
    # df_result = globals()['my_model'].transform(df)
    # df_result.show()

In [10]:
ssc = StreamingContext(sc, 10)

In [11]:
lines = ssc.socketTextStream("seppe.net", 7778)
lines.foreachRDD(process)

In [12]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

+--------------------+------+--------------+--------------------+--------------------+--------------------+--------------------+
|             comment| label|     name_user|            text_new|            text_old|          title_page|            url_page|
+--------------------+------+--------------+--------------------+--------------------+--------------------+--------------------+
|Adding localshort...|  safe|  Red Director|{{short descripti...|{{Other people|Al...|Alan Jones (footb...|//en.wikipedia.or...|
|(→‎Royal Warrant:...|unsafe|213.205.200.10|{{refimprove|date...|{{refimprove|date...|    Turnbull & Asser|//en.wikipedia.or...|
|→‎Credits and per...|  safe|Thedivinemania|{{Infobox album
|...|{{Infobox album
|...|Afterburner (Danc...|//en.wikipedia.or...|
|→‎External links:...|  safe|      Yarnalgo|{{short descripti...|{{short descripti...|            Starlink|//en.wikipedia.or...|
|→‎Sudler Trophy:F...|  safe|   Shonebrooks|{{infobox organiz...|{{infobox organiz...|John Philip

+--------------------+------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+----+
|             comment| label|    name_user|            text_new|            text_old|          title_page|            url_page|                diff|pred|
+--------------------+------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+----+
|Adding localshort...|  safe| Red Director|{{short descripti...|{{other people}}
...|          Bert Evans|//en.wikipedia.or...|--- 

+++ 

+{{sh...|safe|
|   miscapitalization|  safe|      Wbm1058|{{Use mdy dates|d...|{{Use mdy dates|d...|     Jack Kent Cooke|//en.wikipedia.or...|--- 

+++ 

+{{sh...|safe|
|                    |unsafe|136.35.32.140|{{Use mdy dates|d...|{{Use mdy dates|d...|The Amityville Ho...|//en.wikipedia.or...|--- 

+++ 

+{{sh...|safe|
|Adding localshort...|  safe| Red Director|{{short descripti...|{{Use dmy da

+--------------------+-----+----------------+--------------------+--------------------+-------------------+--------------------+--------------------+----+
|             comment|label|       name_user|            text_new|            text_old|         title_page|            url_page|                diff|pred|
+--------------------+-----+----------------+--------------------+--------------------+-------------------+--------------------+--------------------+----+
|Adding localshort...| safe|    Red Director|{{short descripti...|{{Use dmy dates|d...|    Byron Stevenson|//en.wikipedia.or...|--- 

+++ 

+{{sh...|safe|
|Adding localshort...| safe|    Red Director|{{short descripti...|{{about|Samuel Me...|       Sam Meredith|//en.wikipedia.or...|--- 

+++ 

+{{sh...|safe|
|copy edit without...| safe|LauraBradleyMoss|[[File:William He...|[[File:William He...|Curse of Tippecanoe|//en.wikipedia.or...|--- 

+++ 

+{{sh...|safe|
+--------------------+-----+----------------+--------------------+----

+--------------------+
|                diff|
+--------------------+
|--- 

+++ 

-|ima...|
|--- 

+++ 

-|ima...|
|--- 

+++ 

-|ima...|
+--------------------+

+--------------------+-----+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+----+
|             comment|label|      name_user|            text_new|            text_old|          title_page|            url_page|                diff|pred|
+--------------------+-----+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+----+
|      Better picture| safe|       Snowdawg|{{Infobox weapon
...|{{Infobox weapon
...|           73mm LRAC|//en.wikipedia.or...|--- 

+++ 

-|ima...|safe|
|Adding localshort...| safe|   Red Director|{{short descripti...|{{Infobox footbal...|        Reece Deakin|//en.wikipedia.or...|--- 

+++ 

-|ima...|safe|
|→‎See also:Capita...| safe|Cryptopocalypse|{{infobox biodata..

+--------------------+
|                diff|
+--------------------+
|--- 

+++ 

+{{sh...|
|--- 

+++ 

+{{sh...|
|--- 

+++ 

+{{sh...|
|--- 

+++ 

+{{sh...|
+--------------------+

+--------------------+-----+------------+--------------------+--------------------+--------------------+--------------------+--------------------+----+
|             comment|label|   name_user|            text_new|            text_old|          title_page|            url_page|                diff|pred|
+--------------------+-----+------------+--------------------+--------------------+--------------------+--------------------+--------------------+----+
|Addingshort descr...| safe|Red Director|{{short descripti...|{{use British Eng...|Lyn Thomas (footb...|//en.wikipedia.or...|--- 

+++ 

+{{sh...|safe|
|                    | safe|   Hhfjbaker|{{Infobox militar...|{{Infobox militar...|    Charles G. Gould|//en.wikipedia.or...|--- 

+++ 

+{{sh...|safe|
|      →‎May:Spelling| safe|     EdmundT|{{Use dmy date

+--------------------+-----+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+----+
|             comment|label|    name_user|            text_new|            text_old|          title_page|            url_page|                diff|pred|
+--------------------+-----+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+----+
|     Added citations| safe|Kingarthur581|{{unreferenced|da...|{{unreferenced|da...|            Garbalia|//en.wikipedia.or...|--- 

+++ 

-The ...|safe|
|Adding localshort...| safe| Red Director|{{short descripti...|{{Use dmy dates|d...|          Ron Powell|//en.wikipedia.or...|--- 

+++ 

-The ...|safe|
|Adding localshort...| safe| Red Director|{{short descripti...|{{Use dmy dates|d...|Frank Donovan (fo...|//en.wikipedia.or...|--- 

+++ 

-The ...|safe|
+--------------------+-----+-------------+--------------------+-------------------

+--------------------+-----+---------------+--------------------+--------------------+--------------------+--------------------+
|             comment|label|      name_user|            text_new|            text_old|          title_page|            url_page|
+--------------------+-----+---------------+--------------------+--------------------+--------------------+--------------------+
|→‎Special Intelli...| safe|       Cjrother|{{Infobox militar...|{{Infobox militar...|Special Support a...|//en.wikipedia.or...|
|→‎External links:...| safe|       Tinshome|{{About|the curre...|{{About|the curre...|Front Nacional de...|//en.wikipedia.or...|
|       →‎Exhibitions| safe| Soydeaguadilla|{{More citations ...|{{More citations ...|     Daniela Rossell|//en.wikipedia.or...|
|Added information...| safe|Doctor S Monkey|[[File:Matilda El...|[[File:Matilda El...|Matilda Ellen Bishop|//en.wikipedia.or...|
+--------------------+-----+---------------+--------------------+--------------------+-----------

+--------------------+-----+---------------+--------------------+--------------------+-------------------+--------------------+
|             comment|label|      name_user|            text_new|            text_old|         title_page|            url_page|
+--------------------+-----+---------------+--------------------+--------------------+-------------------+--------------------+
|    Added categories| safe|          Srbtx|Sigi Friedmann (b...|Sigi Friedmann (b...|     Sigi Friedmann|//en.wikipedia.or...|
|→‎Unconfirmed iso...| safe|ComplexRational|{{infobox hassium...|{{infobox hassium...|Isotopes of hassium|//en.wikipedia.or...|
|               →‎Art| safe| Soydeaguadilla|{{More citations ...|{{More citations ...|    Daniela Rossell|//en.wikipedia.or...|
|               →‎top| safe|    KingSkyLord|{{short descripti...|{{short descripti...|       Adam Thielen|//en.wikipedia.or...|
|    →‎External links| safe|        Moe1810|{{Infobox televis...|{{Infobox televis...|      Dead Husband

In [13]:
ssc_t.stop()

----- Stopping... this may take a few seconds -----
+--------------------+-----+-------------+--------------------+--------------------+--------------------+--------------------+
|             comment|label|    name_user|            text_new|            text_old|          title_page|            url_page|
+--------------------+-----+-------------+--------------------+--------------------+--------------------+--------------------+
|                    | safe|      Ythrych|#REDIRECT [[LGBT ...|#REDIRECT [[LGBT ...|LGBT history in B...|//en.wikipedia.or...|
|→‎Collegiate care...| safe|     1980fast|{{short descripti...|{{short descripti...|        O. J. Howard|//en.wikipedia.or...|
|→‎Accolades:CAPIT...| safe|    Fezzy1347|{{Short descripti...|{{Short descripti...|         Duma Ndlovu|//en.wikipedia.or...|
|→‎Effects on huma...| safe|RockMagnetist|{{about|general a...|{{about|general a...|               Water|//en.wikipedia.or...|
+--------------------+-----+-------------+-----------------

+--------------------+-----+-----------------+--------------------+--------------------+----------------+--------------------+
|             comment|label|        name_user|            text_new|            text_old|      title_page|            url_page|
+--------------------+-----+-----------------+--------------------+--------------------+----------------+--------------------+
|      improved prose| safe|         Vmavanti|{{Infobox musical...|{{Infobox musical...|    Ulf Wakenius|//en.wikipedia.or...|
|     Added citations| safe|    Kingarthur581|{{unreferenced|da...|{{unreferenced|da...|        Garbalia|//en.wikipedia.or...|
|Added sentence ab...| safe|Osu.archivist.tem|{{good article}}
...|{{good article}}
...|Women in brewing|//en.wikipedia.or...|
|→‎Early life and ...| safe|     Urbanguru182|{{Lead too short|...|{{Lead too short|...|     Mike Albert|//en.wikipedia.or...|
+--------------------+-----+-----------------+--------------------+--------------------+----------------+------

+--------------------+-----+---------+--------------------+--------------------+------------+--------------------+
|             comment|label|name_user|            text_new|            text_old|  title_page|            url_page|
+--------------------+-----+---------+--------------------+--------------------+------------+--------------------+
|→‎Collegiate care...| safe| 1980fast|{{short descripti...|{{short descripti...|O. J. Howard|//en.wikipedia.or...|
+--------------------+-----+---------+--------------------+--------------------+------------+--------------------+

+--------------------+
|                diff|
+--------------------+
|--- 

+++ 

-As a...|
+--------------------+

+--------------------+-----+---------+--------------------+--------------------+------------+--------------------+--------------------+----+
|             comment|label|name_user|            text_new|            text_old|  title_page|            url_page|                diff|pred|
+--------------------+----