In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql import functions as F, types as T

conf = SparkConf().set('spark.driver.host','127.0.0.1')
spark = SparkSession.builder.config(conf = conf).getOrCreate()

In [3]:
from pywasp.modeling import builder
from pywasp.functions import server_finder,\
                             is_in_list,\
                             get_ith

In [4]:
df = spark.read.csv('dataset.csv',
                    header = True).fillna('None')

In [5]:
df = df.drop('CONTENT_LENGTH', 'URL', 'WHOIS_UPDATED_DATE', 'WHOIS_REGDATE')
df = df.withColumn('label',F.col('Type').cast(T.IntegerType()))

In [6]:
WHOIS_COUNTRY_keep  = ['none','es', 'us', 'cz', 'ru', 'gb', 'cn']
WHOIS_STATEPRO_keep = ['none','barcelona','utah','california',
                       'praha', 'wc1n','krasnoyarsk','ca','pa',
                       'washington','wa','panama','arizona',
                       'on','montevideo','beijingshi']

dict_listeds = {'WHOIS_COUNTRY' : WHOIS_COUNTRY_keep,
                'WHOIS_STATEPRO': WHOIS_STATEPRO_keep}

In [7]:
df = df.withColumn('SERVER',server_finder(F.col('SERVER')))
for c in ['WHOIS_COUNTRY', 'WHOIS_STATEPRO']:
    df = df.withColumn(c, is_in_list(F.col(c),dict_listeds[c]))

In [8]:
categorical = ['CHARSET', 'SERVER', 'WHOIS_COUNTRY', 'WHOIS_STATEPRO']
numerical   = [c for c in df.columns if c not in categorical + ['label'] + ['id']]

In [9]:
for c in numerical:
    df = df.withColumn(c,
                       F.when(F.col(c) == 'None', 0)\
                        .otherwise(F.col(c)\
                                    .cast(T.FloatType())))

In [10]:
df = df.withColumn('id', F.monotonically_increasing_id())
df_train = df.sample(False,0.2,8)
df_test  = df.join(df_train, on = 'id', how = 'left_anti')

In [11]:
model = builder(numerical,
                categorical)

In [12]:
predictor = model.fit(df_train)

In [13]:
predicition = predictor.transform(df_test).select('probability','prediction')
predicition = predicition.withColumn('probability', F.round(get_ith(F.col('probability'),
                                                            F.lit(1))))

In [14]:
y_test = [r['label'] for r in df_test.select('label').collect()]
with open('y_test.txt', 'w') as f:
    for v in y_test:
        f.write(str(v)+'\n')

y_prob = [r['probability'] for r in predicition.select('probability').collect()]
with open('y_prob.txt', 'w') as f:
    for v in y_prob:
        f.write(str(v)+'\n')
        
y_pred = [r['prediction'] for r in predicition.select('prediction').collect()]
with open('y_pred.txt', 'w') as f:
    for v in y_prob:
        f.write(str(v)+'\n')

In [15]:
%%timeit
predicition = predictor.transform(df_test.limit(1)).select('probability','prediction')

454 ms ± 76 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
