In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import Row
import fasttext

In [2]:
spark = SparkSession.builder.appName('mysession').getOrCreate()

## Build Spark DataFrame

In [3]:
df_input = spark.read.parquet('data/input.parquet').repartition(8)

# Approach 3: RDD's mapPartitions

In [4]:
def fn_partition(iterator,multi_prediction=False):
    
    def get_predictions(sentence, threshold=0.10, k=3):
        """
        Note: This is the same function as in classifier.py module!
        """
        labels, probs = model.predict(sentence.lower(), k=k)
        output = list(map(lambda y: y[0].replace("__label__", ""), filter(lambda x: x[1] >= threshold, zip(labels, probs))))
        if len(output) == 0:
            return None
        else:
            return output[0] if k == 1 else output
    
    model = fasttext.load_model('models/ft_tuned.ftz')
    
    for record in iterator:
        if not multi_prediction:
            yield Row(category=get_predictions(record['input'],k=1),input=record['input'])
        else: 
            yield Row(category=get_predictions(record['input'],k=3),input=record['input'])

## Single prediction

In [5]:
df_output = df_input.rdd.mapPartitions(lambda partition: fn_partition(partition,False)).toDF()
df_output.sample(False,.10,12345).show(10,False)

+--------+---------------------------------------------------------------------------------------------+
|category|input                                                                                        |
+--------+---------------------------------------------------------------------------------------------+
|null    |what deployment directories do you use for rails applications deploying to a debian box      |
|sql     |sql query order by                                                                           |
|c++     |c++ reading from a file blocks any further writing why                                       |
|null    |how much does it cost to develop an iphone application                                       |
|null    |what are some excellent examples of user sign up forms on the web                            |
|.net    |why doesn t backcolor work for tabcontrols in net                                            |
|c#      |c # compiler and caching of local variables  

## Multiple prediction

In [6]:
df_output = df_input.rdd.mapPartitions(lambda partition: fn_partition(partition,True)).toDF()
df_output.sample(False,.10,12345).show(10,False)

+-------------------+---------------------------------------------------------------------------------------------+
|category           |input                                                                                        |
+-------------------+---------------------------------------------------------------------------------------------+
|null               |what deployment directories do you use for rails applications deploying to a debian box      |
|[sql, sql-server]  |sql query order by                                                                           |
|[c++]              |c++ reading from a file blocks any further writing why                                       |
|null               |how much does it cost to develop an iphone application                                       |
|null               |what are some excellent examples of user sign up forms on the web                            |
|[.net, c#, asp.net]|why doesn t backcolor work for tabcontrols in net  

# Performance

In [7]:
%timeit -n 10 df_output.sample(False,.10).show(10)

+--------------------+--------------------+
|            category|               input|
+--------------------+--------------------+
|              [ruby]|similar thing to ...|
|        [javascript]|how do you execut...|
|                null|ddd and asynchron...|
|                null|how do i open off...|
|                [c#]|need help handlin...|
|               [c++]|c++ reading from ...|
|[asp.net, asp.net...|what are effectiv...|
|           [asp.net]|asp net ajax text...|
|          [c#, .net]|is endian convers...|
|              [java]|saving java objec...|
+--------------------+--------------------+
only showing top 10 rows

+---------+--------------------+
| category|               input|
+---------+--------------------+
|    [c++]|are incrementers ...|
|     [c#]|how to serialize ...|
|     [c#]|how can i convert...|
| [python]|how can i execute...|
|     null|windows domain ch...|
|     null|what can cause in...|
|     [c#]|deployment of cus...|
|     [c#]|  using lists in 

+--------------------+--------------------+
|            category|               input|
+--------------------+--------------------+
|                [c#]|how do you manage...|
|     [visual-studio]|how to disable vi...|
|              [java]|why don t my html...|
|                null|daemon threads ex...|
|[visual-studio, ....|any have a visual...|
|                [c#]|converting svg to...|
|                null|has and belongs t...|
|               [sql]|normalizing a tab...|
| [.net, c#, asp.net]|sending email in ...|
|          [.net, c#]|is it true that t...|
+--------------------+--------------------+
only showing top 10 rows

+--------------------+--------------------+
|            category|               input|
+--------------------+--------------------+
|                null|ddd and asynchron...|
|                null|daemon threads ex...|
|           [asp.net]|get performance c...|
|                null|is there an idiom...|
|               [c++]|c++ reading from ...|
|[sql,

+--------------------+--------------------+
|            category|               input|
+--------------------+--------------------+
|     [visual-studio]|why is visual stu...|
|[asp.net-mvc, asp...|asp net mvc beta ...|
|                null|decoding chunked ...|
|                null|what is the aspne...|
|                null|is there an idiom...|
|     [.net, php, c#]|authoritative sou...|
|                null|what are some exc...|
|                null|bat file to run a...|
|          [c#, java]|equivalent of jav...|
|              [ruby]|why is rspec so s...|
+--------------------+--------------------+
only showing top 10 rows

+--------------------+--------------------+
|            category|               input|
+--------------------+--------------------+
|                [c#]|how do you manage...|
|          [c#, .net]|is it possible to...|
|          [java, c#]|deterministic dis...|
|        [c, c#, c++]|wrapping visual c...|
|             [mysql]|how do you connec...|
|     

+-------------------+--------------------+
|           category|               input|
+-------------------+--------------------+
|[python, c++, java]|extracting text f...|
|               [c#]|find a private fi...|
|  [sql-server, sql]|data verification...|
|               [c#]|how can i convert...|
|[c#, .net, asp.net]|what exception sh...|
|          [asp.net]|edit html meta ta...|
|  [sql, sql-server]|parsing t sql to ...|
|               null|how should i impo...|
|          [windows]|are there problem...|
|               null|what s the maximu...|
+-------------------+--------------------+
only showing top 10 rows

+-----------------+--------------------+
|         category|               input|
+-----------------+--------------------+
|  [visual-studio]|why is visual stu...|
|       [java, c#]|deterministic dis...|
|             [c#]|how to serialize ...|
|[sql-server, sql]|sql server profil...|
|             null|how to effectivel...|
|  [.net, php, c#]|authoritative sou...|
|  

+--------------------+--------------------+
|            category|               input|
+--------------------+--------------------+
|                [c#]|how would you att...|
|                null|what deployment d...|
|        [sql-server]|sending e mail fr...|
|                [c#]|how to get the de...|
|                null|daemon threads ex...|
|           [asp.net]|get performance c...|
|                null|is there an idiom...|
|               [c++]|the necessity of ...|
|  [java, javascript]|which javascript ...|
|[ruby, ruby-on-ra...|what are the limi...|
+--------------------+--------------------+
only showing top 10 rows

+--------------------+--------------------+
|            category|               input|
+--------------------+--------------------+
|        [sql-server]|sending e mail fr...|
|                [c#]|how do you bind i...|
|                null|how do i open off...|
|[mysql, sql, data...|best update metho...|
|                null|running multiple ...|
|     

In [8]:
df_output.rdd.getNumPartitions()

8