In [1]:
# required libraries
import pandas as pd
import numpy as np
import math

from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import col, udf, to_timestamp, lit
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import when, rand
from pyspark.ml.feature import Normalizer, StandardScaler, MinMaxScaler, VectorAssembler

from tensorflow.keras.layers import Embedding, Dense, LSTM, Dense, Input, concatenate, Dropout, Activation
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.utils import plot_model
from tensorflow.keras.optimizers import SGD, Adam

from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

In [2]:
# init spark
spark = SparkSession.builder.appName('ml_account_base_session').getOrCreate()
spark

In [3]:
#dataset path
dataset_folder_s3 = 'data/' # 's3://bot-dataset/data/'
result_path_s3 = '' # 's3://bot-dataset/result/'

In [4]:
# read dataset from csv

requiredColumns = requiredColumns = ['screen_name', 'created_at', 'updated', 'location', 'verified', 'statuses_count', 'friends_count','followers_count', 'favourites_count', 'default_profile_image', 'profile_use_background_image', 'protected', 'default_profile']

bot_accounts1 = spark.read.csv(dataset_folder_s3 + 'social_spambots_1.csv', header = True, inferSchema = True).select(requiredColumns)
bot_accounts2 = spark.read.csv(dataset_folder_s3 + 'social_spambots_2.csv', header = True, inferSchema = True).select(requiredColumns)
bot_accounts3 = spark.read.csv(dataset_folder_s3 + 'social_spambots_3.csv', header = True, inferSchema = True).select(requiredColumns)

# combine multiple bot_account dataset
bot_accounts = bot_accounts1.union(bot_accounts2.union(bot_accounts3))
clean_accounts = spark.read.csv(dataset_folder_s3 + 'geniune_accounts.csv', header = True, inferSchema = True).select(requiredColumns)

In [5]:
# check number of rows in each dataset
bot_accounts1.count(), bot_accounts2.count(), bot_accounts3.count(), bot_accounts.count(), clean_accounts.count()

(991, 3457, 464, 4912, 3474)

In [6]:
# check structure of the dataframe
bot_accounts.printSchema()
clean_accounts.printSchema()

root
 |-- screen_name: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- updated: timestamp (nullable = true)
 |-- location: string (nullable = true)
 |-- verified: string (nullable = true)
 |-- statuses_count: integer (nullable = true)
 |-- friends_count: integer (nullable = true)
 |-- followers_count: integer (nullable = true)
 |-- favourites_count: integer (nullable = true)
 |-- default_profile_image: string (nullable = true)
 |-- profile_use_background_image: integer (nullable = true)
 |-- protected: string (nullable = true)
 |-- default_profile: integer (nullable = true)

root
 |-- screen_name: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- updated: timestamp (nullable = true)
 |-- location: string (nullable = true)
 |-- verified: integer (nullable = true)
 |-- statuses_count: integer (nullable = true)
 |-- friends_count: integer (nullable = true)
 |-- followers_count: integer (nullable = true)
 |-- favourites_count: integer (nullable

In [7]:
#check bot account data
bot_accounts.limit(3).toPandas()

Unnamed: 0,screen_name,created_at,updated,location,verified,statuses_count,friends_count,followers_count,favourites_count,default_profile_image,profile_use_background_image,protected,default_profile
0,davideb66,Tue Mar 17 08:51:12 +0000 2009,2016-03-15 14:12:22,,,1299,40,22,1,1.0,1,,1.0
1,ElisaDospina,Sun Apr 19 14:38:04 +0000 2009,2016-03-15 14:17:13,Italy,,18665,3442,12561,16358,,1,,
2,Vladimir65,Wed May 13 15:34:41 +0000 2009,2016-03-15 14:16:44,"iPhone: 45.471680,9.192429",,22987,755,600,14,,1,,


In [8]:
# check clean_account data
clean_accounts.limit(3).toPandas()

Unnamed: 0,screen_name,created_at,updated,location,verified,statuses_count,friends_count,followers_count,favourites_count,default_profile_image,profile_use_background_image,protected,default_profile
0,0918Bask,Tue Jun 11 11:20:35 +0000 2013,2016-03-15 15:53:47,Tokyo .Japan .,,2177,332,208,265,,,,
1,1120Roll,Tue May 13 10:37:57 +0000 2014,2016-03-15 15:53:48,神奈川県横浜市,,2660,485,330,3972,,1.0,,1.0
2,14KBBrown,Wed May 04 23:30:37 +0000 2011,2016-03-15 15:53:48,,,1254,177,166,1185,,1.0,,


In [9]:
# test feature value domain space for feature engineering
def printFeatureDomain(df):
    columns = df.columns
    for col in columns:
        col_domain = df.select(col).distinct().collect()
        value_domain = [item[0] for item in col_domain]
        print("{}({}): {}\n".format(col,len(value_domain), value_domain[:5]))
        
printFeatureDomain(bot_accounts)
printFeatureDomain(clean_accounts)

screen_name(4912): ['MarcoMurante', 'CarolaParnasse', 'RobertoBusca', 'CarlaBipolare', 'EmanuelaDuccio']

created_at(4891): ['Mon Jan 16 07:46:29 +0000 2012', 'Mon Jan 16 08:41:48 +0000 2012', 'Tue Jan 17 10:51:05 +0000 2012', 'Tue Jan 17 11:29:19 +0000 2012', 'Wed Jan 18 04:59:52 +0000 2012']

updated(1687): [datetime.datetime(2016, 3, 15, 14, 12, 44), datetime.datetime(2016, 3, 15, 14, 14, 42), datetime.datetime(2016, 3, 15, 14, 17, 15), datetime.datetime(2016, 3, 15, 14, 20, 39), datetime.datetime(2016, 3, 15, 14, 16, 46)]

location(207): ['Palermo', 'Cave (RM)', 'Forte dei Marmi', 'Firenze', 'Phoenix']

verified(1): [None]

statuses_count(948): [471, 148, 26755, 540, 31]

friends_count(953): [6466, 5300, 4935, 5518, 31]

followers_count(929): [3997, 243, 4190, 4161, 1522]

favourites_count(72): [874, 76, 103, 12, 601]

default_profile_image(2): [None, '1']

profile_use_background_image(2): [None, 1]

protected(1): [None]

default_profile(2): [None, 1]

screen_name(3474): ['AddisonH

In [10]:
# def clean_df(df):
#     type(df)
#     df['created_at'] = pd.to_datetime(df['created_at']).dt.tz_localize(None)
#     df['updated'] = pd.to_datetime(df['updated']).dt.tz_localize(None)
#     df['age'] = (df['updated'] - df['created_at']).astype('timedelta64[D]').astype(int)
#     df['has_location'] = df['location'].apply(lambda x: 0 if x==x else 1)
#     df['has_avatar'] = df['default_profile_image'].apply(lambda x: 1 if x==x else 0)
#     df['has_background'] = df['profile_use_background_image'].apply(lambda x: 1 if x==x else 0)
#     df['is_verified']=df['verified'].apply(lambda x: 1 if x==x else 0)
#     df['is_protected']=df['protected'].apply(lambda x: 1 if x==x else 0)
#     df['profile_modified'] = df['default_profile'].apply(lambda x: 0 if x==x else 1)
#     df = df.rename(index=str, columns={"screen_name": "username", "statuses_count": "total_tweets", "friends_count": "total_following", "followers_count": "total_followers", "favourites_count": "total_likes"})
#     return df[['username', 'age', 'has_location', 'is_verified', 'total_tweets', 'total_following', 'total_followers', 'total_likes', 'has_avatar', 'has_background', 'is_protected', 'profile_modified']]


In [11]:
# clean dataset
def cleanData(df):
    df = df.withColumn('age', lit(0)) # need to calculate from 'updated' -'created_at'
    df = df.withColumn('has_location', when((df['location'] != None), 1).otherwise(0))
    df = df.withColumn('has_avatar', when((df['default_profile_image'] != None), 1).otherwise(0))
    df = df.withColumn('has_background', when((df['profile_use_background_image'] != None), 1).otherwise(0))
    df = df.withColumn('is_verified', when((df['verified'] != None), 1).otherwise(0))
    df = df.withColumn('is_protected', when((df['protected'] != None), 1).otherwise(0))
    df = df.withColumn('profile_modified', when((df['default_profile'] != None), 1).otherwise(0))
    df = df.withColumnRenamed("screen_name", "username")
    df = df.withColumnRenamed("statuses_count", "total_tweets")
    df = df.withColumnRenamed("friends_count", "total_following")
    df = df.withColumnRenamed("followers_count", "total_followers")
    df = df.withColumnRenamed("favourites_count", "total_likes")
    
    return df.select('username', 'age', 'has_location', 'is_verified', 'total_tweets', 'total_following', 'total_followers', 'total_likes', 'has_avatar', 'has_background', 'is_protected', 'profile_modified')
    

In [12]:
bot_accounts = cleanData(bot_accounts)
clean_accounts = cleanData(clean_accounts)

In [13]:
bot_accounts.printSchema()

root
 |-- username: string (nullable = true)
 |-- age: integer (nullable = false)
 |-- has_location: integer (nullable = false)
 |-- is_verified: integer (nullable = false)
 |-- total_tweets: integer (nullable = true)
 |-- total_following: integer (nullable = true)
 |-- total_followers: integer (nullable = true)
 |-- total_likes: integer (nullable = true)
 |-- has_avatar: integer (nullable = false)
 |-- has_background: integer (nullable = false)
 |-- is_protected: integer (nullable = false)
 |-- profile_modified: integer (nullable = false)



In [14]:
clean_accounts.printSchema()

root
 |-- username: string (nullable = true)
 |-- age: integer (nullable = false)
 |-- has_location: integer (nullable = false)
 |-- is_verified: integer (nullable = false)
 |-- total_tweets: integer (nullable = true)
 |-- total_following: integer (nullable = true)
 |-- total_followers: integer (nullable = true)
 |-- total_likes: integer (nullable = true)
 |-- has_avatar: integer (nullable = false)
 |-- has_background: integer (nullable = false)
 |-- is_protected: integer (nullable = false)
 |-- profile_modified: integer (nullable = false)



In [15]:
clean_accounts.limit(5).toPandas()

Unnamed: 0,username,age,has_location,is_verified,total_tweets,total_following,total_followers,total_likes,has_avatar,has_background,is_protected,profile_modified
0,0918Bask,0,0,0,2177,332,208,265,0,0,0,0
1,1120Roll,0,0,0,2660,485,330,3972,0,0,0,0
2,14KBBrown,0,0,0,1254,177,166,1185,0,0,0,0
3,wadespeters,0,0,0,202968,981,2248,60304,0,0,0,0
4,191a5bd05da04dc,0,0,0,82,79,21,5,0,0,0,0


In [16]:
bot_accounts.limit(5).toPandas()

Unnamed: 0,username,age,has_location,is_verified,total_tweets,total_following,total_followers,total_likes,has_avatar,has_background,is_protected,profile_modified
0,davideb66,0,0,0,1299,40,22,1,0,0,0,0
1,ElisaDospina,0,0,0,18665,3442,12561,16358,0,0,0,0
2,Vladimir65,0,0,0,22987,755,600,14,0,0,0,0
3,RafielaMorales,0,0,0,7975,350,398,11,0,0,0,0
4,FabrizioC_c,0,0,0,20218,405,413,162,0,0,0,0


In [17]:
## add BotOrNot column
bot_accounts = bot_accounts.withColumn('BotOrNot', lit(1))
clean_accounts = clean_accounts.withColumn('BotOrNot', lit(0))

In [18]:
#combine clean and bot accounts data togather
combined_df = bot_accounts.union(clean_accounts)

# shuffle dataset
new_df = combined_df.orderBy(rand())

#remove 'userrname' columns from dataset
new_df = new_df.drop('username')

In [19]:
new_df.printSchema()

root
 |-- age: integer (nullable = false)
 |-- has_location: integer (nullable = false)
 |-- is_verified: integer (nullable = false)
 |-- total_tweets: integer (nullable = true)
 |-- total_following: integer (nullable = true)
 |-- total_followers: integer (nullable = true)
 |-- total_likes: integer (nullable = true)
 |-- has_avatar: integer (nullable = false)
 |-- has_background: integer (nullable = false)
 |-- is_protected: integer (nullable = false)
 |-- profile_modified: integer (nullable = false)
 |-- BotOrNot: integer (nullable = false)



In [20]:
new_df.count()

8386

In [21]:
new_df.columns

['age',
 'has_location',
 'is_verified',
 'total_tweets',
 'total_following',
 'total_followers',
 'total_likes',
 'has_avatar',
 'has_background',
 'is_protected',
 'profile_modified',
 'BotOrNot']

In [22]:
## convert into feature vector for ml model
feature_columns = ['age', 'has_location', 'is_verified', 'total_tweets', 'total_following', 
                   'total_followers', 'total_likes', 'has_avatar', 'has_background', 
                   'is_protected', 'profile_modified']

feature_assembler = VectorAssembler(inputCols = feature_columns, outputCol = 'independent_features')
df_updated = feature_assembler.transform(new_df)
df_updated.limit(5).toPandas()

Unnamed: 0,age,has_location,is_verified,total_tweets,total_following,total_followers,total_likes,has_avatar,has_background,is_protected,profile_modified,BotOrNot,independent_features
0,0,0,0,34,39,4,0,0,0,0,0,1,"(0.0, 0.0, 0.0, 34.0, 39.0, 4.0, 0.0, 0.0, 0.0..."
1,0,0,0,2559,416,210,22887,0,0,0,0,0,"(0.0, 0.0, 0.0, 2559.0, 416.0, 210.0, 22887.0,..."
2,0,0,0,3741,986,601,4037,0,0,0,0,0,"(0.0, 0.0, 0.0, 3741.0, 986.0, 601.0, 4037.0, ..."
3,0,0,0,4089,1474,273,0,0,0,0,0,1,"(0.0, 0.0, 0.0, 4089.0, 1474.0, 273.0, 0.0, 0...."
4,0,0,0,76,43,7,0,0,0,0,0,1,"(0.0, 0.0, 0.0, 76.0, 43.0, 7.0, 0.0, 0.0, 0.0..."


In [23]:
type(df_updated), df_updated.columns

(pyspark.sql.dataframe.DataFrame,
 ['age',
  'has_location',
  'is_verified',
  'total_tweets',
  'total_following',
  'total_followers',
  'total_likes',
  'has_avatar',
  'has_background',
  'is_protected',
  'profile_modified',
  'BotOrNot',
  'independent_features'])

In [24]:
# keep only required features/columns
df_updated = df_updated.select('independent_features', 'BotOrNot')

In [25]:
df_updated.select("independent_features", 'BotOrNot').limit(5).toPandas()

Unnamed: 0,independent_features,BotOrNot
0,"(0.0, 0.0, 0.0, 34.0, 39.0, 4.0, 0.0, 0.0, 0.0...",1
1,"(0.0, 0.0, 0.0, 2559.0, 416.0, 210.0, 22887.0,...",0
2,"(0.0, 0.0, 0.0, 3741.0, 986.0, 601.0, 4037.0, ...",0
3,"(0.0, 0.0, 0.0, 4089.0, 1474.0, 273.0, 0.0, 0....",1
4,"(0.0, 0.0, 0.0, 76.0, 43.0, 7.0, 0.0, 0.0, 0.0...",1


In [26]:
## Make data standard
# https://spark.apache.org/docs/1.4.1/ml-features.html#standardscaler

scaler = StandardScaler(inputCol="independent_features", outputCol="scaled_features",
                        withStd=True, withMean=False)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(df_updated)

# Normalize each feature to have unit standard deviation.
scaled_df = scalerModel.transform(df_updated)

In [27]:
scaled_df.select("independent_features", "scaled_features", 'BotOrNot').limit(5).toPandas()

Unnamed: 0,independent_features,scaled_features,BotOrNot
0,"(0.0, 0.0, 0.0, 34.0, 39.0, 4.0, 0.0, 0.0, 0.0...","(0.0, 0.0, 0.0, 0.001563141247442364, 0.019712...",1
1,"(0.0, 0.0, 0.0, 2559.0, 416.0, 210.0, 22887.0,...","(0.0, 0.0, 0.0, 0.1176493662413238, 0.21026992...",0
2,"(0.0, 0.0, 0.0, 3741.0, 986.0, 601.0, 4037.0, ...","(0.0, 0.0, 0.0, 0.17199151196123186, 0.4983801...",0
3,"(0.0, 0.0, 0.0, 4089.0, 1474.0, 273.0, 0.0, 0....","(0.0, 0.0, 0.0, 0.18799072237623018, 0.7450429...",1
4,"(0.0, 0.0, 0.0, 76.0, 43.0, 7.0, 0.0, 0.0, 0.0...","(0.0, 0.0, 0.0, 0.0034940804354594017, 0.02173...",1


In [28]:
scaled_df.columns

['independent_features', 'BotOrNot', 'scaled_features']

In [29]:
# keep only necessary feature/column for ml model
scaled_df = scaled_df.select('scaled_features', 'BotOrNot')

In [30]:
# split data for training ana testing
train_df, test_df = scaled_df.randomSplit([0.80, 0.20])

In [31]:
train_df.count(), test_df.count()

(6722, 1664)

In [32]:
train_df.columns

['scaled_features', 'BotOrNot']

In [33]:
# features --> 'BotOrNot'
X_train = train_df.drop('BotOrNot')
y_train = train_df.select('BotOrNot')
X_test = test_df.drop('BotOrNot')
y_test = test_df.select('BotOrNot')

In [34]:
X_train.columns, y_train.columns

(['scaled_features'], ['BotOrNot'])

In [35]:
X_train.printSchema(), y_train.printSchema()

root
 |-- scaled_features: vector (nullable = true)

root
 |-- BotOrNot: integer (nullable = false)



(None, None)

In [36]:
type(X_train), type(y_train)

(pyspark.sql.dataframe.DataFrame, pyspark.sql.dataframe.DataFrame)

In [37]:
X_train.limit(5).toPandas()

Unnamed: 0,scaled_features
0,"(0.0, 0.0, 0.0, 0.00045974742571834235, 0.0, 0..."
1,"(0.0, 0.0, 0.0, 0.0005516969108620108, 0.0, 0...."
2,"(0.0, 0.0, 0.0, 0.0007355958811493477, 0.0, 0...."
3,"(0.0, 0.0, 0.0, 0.0009194948514366847, 0.0, 0...."
4,"(0.0, 0.0, 0.0, 0.0009654695940085189, 0.0, 0...."


In [38]:
## create model

# inp = Input(shape=[11])

# another = Dense(500, activation='relu')(inp)
# another = Dense(200, activation='relu')(another)
# another = Dense(1, activation='sigmoid')(another)

# mod = Model(inp, another)
# mod.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])


model = Sequential()
model.add(Dense(500, input_dim=11))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(200))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(1))
model.add(Activation('sigmoid'))
model.compile(loss='binary_crossentropy', optimizer=Adam(), metrics=['accuracy'])

model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 500)               6000      
                                                                 
 activation (Activation)     (None, 500)               0         
                                                                 
 dropout (Dropout)           (None, 500)               0         
                                                                 
 dense_1 (Dense)             (None, 200)               100200    
                                                                 
 activation_1 (Activation)   (None, 200)               0         
                                                                 
 dropout_1 (Dropout)         (None, 200)               0         
                                                                 
 dense_2 (Dense)             (None, 1)                 2

In [39]:
X_train.printSchema(), y_train.printSchema()

root
 |-- scaled_features: vector (nullable = true)

root
 |-- BotOrNot: integer (nullable = false)



(None, None)

In [40]:
# convert DataFrame column into nparray
# nparray required for model training, validation

def to_nparray_list(df, column_name):
    rows = df.select(column_name).collect()
    lists = [x[column_name] for x in rows]
    nparr = np.array(lists)
    
    return nparr

In [41]:
# DataFrame(column) --> nparray
X_train = to_nparray_list(X_train, 'scaled_features')
y_train = to_nparray_list(y_train, 'BotOrNot')
X_test = to_nparray_list(X_test, 'scaled_features')
y_test = to_nparray_list(y_test, 'BotOrNot')

In [42]:
X_train[:5]

array([[0.        , 0.        , 0.        , 0.00045975, 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        ],
       [0.        , 0.        , 0.        , 0.0005517 , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        ],
       [0.        , 0.        , 0.        , 0.0007356 , 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        ],
       [0.        , 0.        , 0.        , 0.00091949, 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        ],
       [0.        , 0.        , 0.        , 0.00096547, 0.        ,
        0.        , 0.        , 0.        , 0.        , 0.        ,
        0.        ]])

In [43]:
# ml model train and validation

model.fit(X_train, y_train,
          batch_size=64,
          epochs=20,
          validation_data=(X_test, y_test))
score, acc = model.evaluate(X_test, y_test, verbose=0)
print('Test accuracy:', acc)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
Test accuracy: 0.9585336446762085


In [44]:
model.save(result_path_s3 + 'my_model.h5')

In [45]:
#model cross validation
from sklearn.model_selection import KFold

In [46]:
X = np.concatenate([X_train, X_test])
Y = np.concatenate([y_train, y_test])

len(X), len(Y)

(8386, 8386)

In [47]:
# def splitDataset(n_split, X, Y):
#     for train_index,test_index in KFold(n_split).split(X):

#         x_train, x_test=X[train_index],X[test_index]
#         #y_train, y_test=Y[train_index],Y[t est_index]
#         #print( "train: {},{} test: {},{}".format(len(x_train), len(y_train), len(x_test), len(y_test)))
#         print( "train: {},{} test: {},{}".format(len(x_train), len(y_train), len(x_test)))
# splitDataset(5, X, Y)

In [57]:
def distributedTrainingGradients(df, feature_column, target_column, n_splits):
    print(df.count())
    each_len = df.count() // n_splits
    
    ##split dataset into 'n_splits' part
    copy_df = df
    for i in range(n_splits):
        temp_df = copy_df.limit(each_len)
        copy_df = copy_df.subtract(temp_df)
        
        print(temp_df.count())

# distributedTrainingGradients(scaled_df, "", "", 5)

In [58]:
scaled_df.printSchema()

root
 |-- scaled_features: vector (nullable = true)
 |-- BotOrNot: integer (nullable = false)



In [59]:
scaled_df.count()

8386

In [60]:
distributedTrainingGradients(scaled_df, '', '', 5)

8386
1677
1677
1677
1677
699
