In [None]:
import pandas as pd
import dask.dataframe as dd
import ast

# Preprocessing

In [2]:
train_df = dd.read_parquet('train.parq')
test_df = dd.read_parquet('test.parq')
shodan_df = dd.read_csv('shodan_df_hashed.csv')

In [3]:
train_df['attack_month'] = train_df['attack_time'].map(lambda x: x.month).astype('int8')
train_df['attack_day'] = train_df['attack_time'].map(lambda x: x.day).astype('int8')
train_df['attack_hour'] = train_df['attack_time'].map(lambda x: x.hour).astype('int8')

test_df['attack_month'] = test_df['attack_time'].map(lambda x: x.month).astype('int8')
test_df['attack_day'] = test_df['attack_time'].map(lambda x: x.day).astype('int8')
test_df['attack_hour'] = test_df['attack_time'].map(lambda x: x.hour).astype('int8')


In [4]:
train_df = train_df.drop(['watcher_as_num', 'attack_time'], axis=1)
test_df = test_df.drop(['watcher_as_num', 'attack_time'], axis=1)

# Features with attack_type

In [5]:
train_df[['service', 'attack_type']] = train_df['attack_type'].str.split(':', expand=True, n=1).astype('category')
train_df = train_df.rename(columns={'attack_type': 'type'})

test_df[['service', 'attack_type']] = test_df['attack_type'].str.split(':', expand=True, n=1).astype('category')
test_df = test_df.rename(columns={'attack_type': 'type'})

In [6]:
train_df.compute().head()

Unnamed: 0,watcher_country,watcher_as_name,attacker_country,attacker_as_num,attacker_as_name,type,watcher_uuid_enum,attacker_ip_enum,label,attack_month,attack_day,attack_hour,service
0,DE,Host Europe GmbH,TR,47721.0,Murat Aktas,exploit,0,6466,0,7,31,7,http
1,DE,Host Europe GmbH,TR,47721.0,Murat Aktas,spam,0,6466,0,7,31,7,http
2,DE,bn:t Blatzheim Networks Telecom GmbH,DE,51167.0,Contabo GmbH,bruteforce,2,4637,0,7,31,7,http
3,DE,bn:t Blatzheim Networks Telecom GmbH,DE,51167.0,Contabo GmbH,spam,2,4637,0,7,31,7,http
4,DE,bn:t Blatzheim Networks Telecom GmbH,DE,51167.0,Contabo GmbH,exploit,2,4637,0,7,31,7,http


In [7]:
test_df.compute().head()

Unnamed: 0,watcher_country,watcher_as_name,attacker_country,attacker_as_num,attacker_as_name,type,watcher_uuid_enum,attacker_ip_enum,attack_month,attack_day,attack_hour,service
0,US,IMH-IAD,IN,14061.0,DIGITALOCEAN-ASN,spam,1,7696,7,31,7,http
1,US,IMH-IAD,IN,14061.0,DIGITALOCEAN-ASN,exploit,1,7696,7,31,7,http
2,US,IMH-IAD,IN,14061.0,DIGITALOCEAN-ASN,bruteforce,1,7696,7,31,7,http
3,US,IMH-IAD,IN,14061.0,DIGITALOCEAN-ASN,bruteforce,1,7696,7,31,7,http
4,US,GOOGLE-CLOUD-PLATFORM,US,53667.0,PONYNET,bruteforce,3,7543,7,31,7,http


# Features with shadon_df_hashed

In [8]:
IMPORTANT_PORTS = set(("80", "443", "4443", "8443", "8880", "8000", "9998", "1194"))

def extract_ports(value: dict):
    return set(key.split("/")[0] for key in value.keys())
def check_reference_ports(ports):
    return any(port in IMPORTANT_PORTS for port in ports)

In [12]:
shodan_df = pd.read_csv('shodan_df_hashed.csv')
shodan_df["shodan_info"] = shodan_df["shodan_info"].map(ast.literal_eval)
shodan_df["shodan_open_ports"] = shodan_df["shodan_info"].map(extract_ports)

In [13]:
# feature: numero de puertos abiertos por ip en shadon
shodan_df['number_of_open_ports'] = shodan_df["shodan_open_ports"].map(lambda x: len(x)).astype('int8')
# Normalizar 
max_value = shodan_df['number_of_open_ports'].max()
shodan_df['number_of_open_ports'] = (shodan_df['number_of_open_ports']) / max_value

In [14]:
# feature: tiene algun puerto importante abierto
shodan_df['ref_ports'] = shodan_df['shodan_open_ports'].apply(check_reference_ports).astype('int8')

In [15]:
shodan_df = shodan_df.drop(['shodan_open_ports', 'shodan_info'], axis=1)

In [16]:
train_df = train_df.merge(shodan_df, how='left', on='attacker_ip_enum')

In [17]:
test_df = test_df.merge(shodan_df, how='left', on='attacker_ip_enum')

In [18]:
train_df = train_df.drop(['attacker_ip_enum'], axis=1)

In [19]:
train_df.head()

Unnamed: 0,watcher_country,watcher_as_name,attacker_country,attacker_as_num,attacker_as_name,type,watcher_uuid_enum,label,attack_month,attack_day,attack_hour,service,number_of_open_ports,ref_ports
0,DE,Host Europe GmbH,TR,47721.0,Murat Aktas,exploit,0,0,7,31,7,http,0.133858,1
1,DE,Host Europe GmbH,TR,47721.0,Murat Aktas,spam,0,0,7,31,7,http,0.133858,1
2,DE,bn:t Blatzheim Networks Telecom GmbH,DE,51167.0,Contabo GmbH,bruteforce,2,0,7,31,7,http,0.165354,1
3,DE,bn:t Blatzheim Networks Telecom GmbH,DE,51167.0,Contabo GmbH,spam,2,0,7,31,7,http,0.165354,1
4,DE,bn:t Blatzheim Networks Telecom GmbH,DE,51167.0,Contabo GmbH,exploit,2,0,7,31,7,http,0.165354,1


In [20]:
test_df.head()

Unnamed: 0,watcher_country,watcher_as_name,attacker_country,attacker_as_num,attacker_as_name,type,watcher_uuid_enum,attacker_ip_enum,attack_month,attack_day,attack_hour,service,number_of_open_ports,ref_ports
0,US,IMH-IAD,IN,14061.0,DIGITALOCEAN-ASN,spam,1,7696,7,31,7,http,0.125984,1
1,US,IMH-IAD,IN,14061.0,DIGITALOCEAN-ASN,exploit,1,7696,7,31,7,http,0.125984,1
2,US,IMH-IAD,IN,14061.0,DIGITALOCEAN-ASN,bruteforce,1,7696,7,31,7,http,0.125984,1
3,US,IMH-IAD,IN,14061.0,DIGITALOCEAN-ASN,bruteforce,1,7696,7,31,7,http,0.125984,1
4,US,GOOGLE-CLOUD-PLATFORM,US,53667.0,PONYNET,bruteforce,3,7543,7,31,7,http,0.007874,0


# Features with Countries

In [21]:
train_df['foreign_attacker_country'] = (train_df['watcher_country'].astype(str) != train_df['attacker_country'].astype(str)).astype('int8')
test_df['foreign_attacker_country'] = (test_df['watcher_country'].astype(str) != test_df['attacker_country'].astype(str)).astype('int8')

# Val and Train split

In [22]:
from sklearn.model_selection import train_test_split
train_df = train_df.compute()
X_train, X_val = train_test_split(train_df, test_size=0.2, random_state=42, stratify=train_df['label'])

# Imputation of Nulls

In [23]:
from dask_ml.impute import SimpleImputer
imputer = SimpleImputer(strategy='most_frequent')
train_df_imputed = imputer.fit_transform(X_train)
val_df_imputed = imputer.transform(X_val)
test_df_imputed = imputer.transform(test_df)

In [None]:
train_df_imputed.to_parquet('train_imputed.parq')
val_df_imputed.to_parquet('val_imputed.parq')

In [27]:
test_df_imputed.compute().to_parquet('test_imputed.parq')

# Encoding

In [2]:
train_df = pd.read_parquet('train_imputed.parq')
val_df = pd.read_parquet('val_imputed.parq')
test_df = pd.read_parquet('test_imputed.parq')

In [3]:
import category_encoders as ce
ordinal_encoder = ce.OrdinalEncoder(cols=['watcher_country', 'attacker_country', 'watcher_as_name', 'service'])
x_train_encoded = ordinal_encoder.fit_transform(train_df)
x_val_encoded = ordinal_encoder.transform(val_df)
x_test_encoded = ordinal_encoder.transform(test_df)

In [4]:
mean_encoder = ce.TargetEncoder(cols=['attacker_as_name'])
encoded_data_train = mean_encoder.fit_transform(x_train_encoded, x_train_encoded['label'])
ecoded_data_val = mean_encoder.transform(x_val_encoded)
encoded_data_test = mean_encoder.transform(x_test_encoded)

In [5]:
encoded_data_train.to_parquet('train_encoded.parq')
ecoded_data_val.to_parquet('val_encoded.parq')
encoded_data_test.to_parquet('test_encoded.parq')

In [2]:
x_train_encoded = pd.read_parquet('train_encoded.parq')
x_val_encoded = pd.read_parquet('val_encoded.parq')
x_test_encoded = pd.read_parquet('test_encoded.parq')

In [4]:
oh_encoder = ce.OneHotEncoder(cols=['type'])
encoded_data_train = oh_encoder.fit_transform(x_train_encoded)
ecoded_data_val = oh_encoder.transform(x_val_encoded)
encoded_data_test = oh_encoder.transform(x_test_encoded)

In [5]:
encoded_data_train.to_parquet('train_encoded.parq')
ecoded_data_val.to_parquet('val_encoded.parq')
encoded_data_test.to_parquet('test_encoded.parq')

# Normalization

In [2]:
x_train_encoded = dd.read_parquet('train_encoded.parq')
x_val_encoded = dd.read_parquet('val_encoded.parq')
x_test_encoded = dd.read_parquet('test_encoded.parq')

In [3]:
cols_to_normalize = ['watcher_uuid_enum', 'attacker_as_num', 'watcher_as_name', 'watcher_country', 'attacker_country', 'service', 'attack_month', 'attack_day', 'attack_hour']
train_features_to_normalize = x_train_encoded[cols_to_normalize]
val_features_to_normalize = x_val_encoded[cols_to_normalize]
test_features_to_normalize = x_test_encoded[cols_to_normalize]

In [4]:
from dask_ml.preprocessing import MinMaxScaler
scaler = MinMaxScaler()
train_features_normalized = scaler.fit_transform(train_features_to_normalize)
val_features_normalized = scaler.transform(val_features_to_normalize)
test_features_normalized = scaler.transform(test_features_to_normalize)
x_train_encoded[cols_to_normalize] = train_features_normalized
x_val_encoded[cols_to_normalize] = val_features_normalized
x_test_encoded[cols_to_normalize] = test_features_normalized

In [5]:
x_test_encoded.compute().to_parquet('test_normalized.parq')
x_val_encoded.compute().to_parquet('val_normalized.parq')
x_train_encoded.compute().to_parquet('train_normalized.parq')

# Hyper Parameter Search

In [None]:
from sklearn.model_selection import RandomizedSearchCV
from sklearn.linear_model import SGDClassifier
from dask_ml.model_selection import RandomizedSearchCV as DaskRandomizedSearchCV  
train_ddf = dd.read_parquet('train_normalized.parq').sample(frac=0.1)
y_train = train_ddf['label']
train_df = train_ddf.drop('label', axis=1)
model = SGDClassifier()
param_grid = {
    'loss': ['hinge', 'perceptron', 'huber', 'epsilon_insensitive'],
    'alpha': [0.0001, 0.001, 0.01, 0.1],
    'eta0': [0.0001, 0.001, 0.01, 0.1],
    'penalty': ['l2', 'l1', 'elasticnet'],
    'learning_rate': ['optimal', 'invscaling'],
    'max_iter': [1500, 1850, 500, 2600],  
}
search = DaskRandomizedSearchCV(model, param_grid, random_state=42, scoring='f1')
search.fit(train_df, y_train)
print(f'Best score: {search.best_score_}')
print(f'Best params: {search.best_params_}')
print(f'Best estimator: {search.best_estimator_}')

# Training

In [3]:
import pyarrow.parquet as pq
from sklearn.metrics import f1_score  
from sklearn.linear_model import SGDClassifier


train_df = pq.ParquetFile('train_normalized.parq')

model = SGDClassifier(penalty='l2', max_iter=1500, eta0=0.1, alpha=0.0001, loss='hinge', learning_rate='optimal')
for batch in train_df.iter_batches(batch_size=100000):
    df_batch = batch.to_pandas()
    y_batch = df_batch['label']
    X_batch = df_batch.drop('label', axis=1)  
    # Entrena el modelo en el lote actual
    model.partial_fit(X_batch, y_batch, classes=[0, 1])

print('Entrenamiento completo para todos los batches')
valid_df = pd.read_parquet('val_normalized.parq')
y_valid = valid_df['label']
X_valid = valid_df.drop('label', axis=1)

y_pred = model.predict(X_valid)  
f1 = f1_score(y_valid, y_pred)
print(f'Puntaje F1 del modelo: {f1}')

Entrenamiento completo para todos los batches
Puntaje F1 del modelo: 0.6530211522020903


# Predict for test

In [4]:
train_df = pd.read_parquet('train_normalized.parq')
test_df = pd.read_parquet('test_normalized.parq')
ips = test_df['attacker_ip_enum']
train_df = train_df.drop('label', axis=1)
feature_names = train_df.columns
# Reordenar las columnas en X_test para que coincida con el orden del conjunto de entrenamiento
X_test_reordered = test_df[feature_names]

In [5]:
y_pred = model.predict(X_test_reordered)
kaggle_df = pd.DataFrame({'attacker_ip_enum': ips, 'label': y_pred})
kaggle_df = kaggle_df.groupby('attacker_ip_enum')['label'].apply(lambda x: x.mode().iloc[0]).reset_index(name='label')
kaggle_df.to_csv('submission_sgd_model.csv', index=False)