<a href="https://colab.research.google.com/github/sanaaria/Big-data/blob/main/BigData_HW3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<center>
<h1 dir=rtl>
تمرین برنامه‌نویسی سوم
</h1>
<h3 dir=rtl>
Clustering
</h3>
</center>

<p dir=rtl>
در این تمرین به دنبال خوشه‌بندی داده‌های لاگ‌های مربوط به حدود ۵ میلیون connection در شبکه اینترنتی هستیم. این خوشه‌بندی مرحله مقدماتی برای تشخیص رفتارهای غیرعادی و حملات احتمالی به شبکه است. به عبارت دیگر، پس از انجام این خوشه‌بندی به شکل مناسب، می‌توان برای اطلاعات جدید بررسی کرد که آیا فاصله داده‌های جدید نسبت به مرکز خوشه مربوطه از یک حد آستانه بیشتر است یا خیر. 
</p>

<p dir=rtl>
برای انجام این پروژه از دیتای مسابقه kddcup که در سال ۱۹۹۹ انجام شده است استفاده می‌کنیم. 
<br>
برای این منظور این دیتا را از اینجا دانلود کنید:
<a href="https://www.kaggle.com/datasets/galaxyh/kdd-cup-1999-data">https://www.kaggle.com/datasets/galaxyh/kdd-cup-1999-data</a>
<br>
فایلی که لازم است دانلود شود، فایل kddcup.data.gz
است. دقت کنید که احتمالا برای دانلود فایل نیاز به فیلترشکن داشته باشید.
<br>
پس از دانلود فایل، آنرا در قسمت Files در سرور مجازی colab در سمت چپ همین فایل نوت‌بوک آپلود کنید.
</p>

<p dir=rtl>
در ادامه، فایل مورد نظر را unzip می‌کنیم:
</p>

In [None]:
!gunzip kddcup.data.gz

<p dir="rtl">
چند سطر اول فایل csv ایجاد شده را می‌توانیم با دستور ذیل بررسی کنیم:
</p>

In [None]:
!head kddcup.data

<p dir=rtl>
در ادامه، پکیج pyspark را نصب کرده و تنظیمات اولیه آنرا انجام می‌دهیم:
</p>

In [None]:
!pip install pyspark

In [None]:
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

<p dir=rtl>
حال، فایل دیتا را با دستور ذیل به یک DataFrame تبدیل می‌کنیم.
<br>
همچنین با توجه به اینکه در سطر اول این فایل اطلاعات هدر مشخص نشده است، این اطلاعات را به صورت دستی به DataFrame اضافه می‌کنیم:
</p>

In [None]:
df = spark.read.csv('kddcup.data', inferSchema=True, header=False)

In [None]:
columns = ["duration", "protocol_type", "service", "flag",
      "src_bytes", "dst_bytes", "land", "wrong_fragment", "urgent",
      "hot", "num_failed_logins", "logged_in", "num_compromised",
      "root_shell", "su_attempted", "num_root", "num_file_creations",
      "num_shells", "num_access_files", "num_outbound_cmds",
      "is_host_login", "is_guest_login", "count", "srv_count",
      "serror_rate", "srv_serror_rate", "rerror_rate", "srv_rerror_rate",
      "same_srv_rate", "diff_srv_rate", "srv_diff_host_rate",
      "dst_host_count", "dst_host_srv_count",
      "dst_host_same_srv_rate", "dst_host_diff_srv_rate",
      "dst_host_same_src_port_rate", "dst_host_srv_diff_host_rate",
      "dst_host_serror_rate", "dst_host_srv_serror_rate",
      "dst_host_rerror_rate", "dst_host_srv_rerror_rate",
      "label"]
data = df.toDF(*columns)


In [None]:
data.show(10)

<p dir=rtl>
دستور ذیل یک دسته‌بندی روی داده‌ها بر اساس برچسب آنها انجام می‌دهد. این دسته‌بندی شهود خوبی نسبت به وضعیت داده‌ها ارائه می‌کند
</p>

In [None]:
from pyspark.sql.functions import col
data.select('label').groupBy('label').count().orderBy(col('count').desc()).show(10)

<p dir=rtl>
برای انجام خوشه‌بندی لازم است تا داده‌های ثبت شده در DataFrame همگی به صورت عدد باشند. در حالیکه در بعضی ستون‌ها داده‌های داده شده به صورت string هستند. با اینکه می‌توان این داده‌ها را با کمک بعضی دستورات Spark به عدد تبدیل کرد، ولی برای سادگی در این پروژه ستون‌هایی که داده متنی دارند را حذف می‌کنیم:
</p>

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans, KMeansModel 
from pyspark.ml import Pipeline

numeric_only = data.drop("protocol_type", "service", "flag").cache()


<p dir=rtl>
حال داده‌های مورد نظر را لازم است تا به یک بردار عددی تبدیل کنیم. برای این منظور همه اطلاعات هر سطر (به جز ستون آخر) را در قالب یک ستون جدید برداری در نظر می‌گیریم.
</p>

In [None]:
assembler = VectorAssembler().setInputCols(numeric_only.columns[:-1]).setOutputCol('featureVector')

<p dir=rtl>
برای انجام خوشه‌بندی از الگوریتم KMeans استفاده می‌کنیم:
</p>

In [None]:
kmeans = KMeans().setPredictionCol("cluster").setFeaturesCol("featureVector")

<p dir=rtl>
در ادامه با کمک ابزارهای موجود در پکیج یادگیری ماشین کار خوشه‌بندی را انجام می‌دهیم. 
این بخش کد را لازم است شما تکمیل کنید. برای این منظور پیشنهاد می‌شود که توضیحات اولیه درباره نحوه استفاده از توابع و مفاهیم مطرح در Spark برای استفاده از ابزارهای یادگیری ماشین را مطالعه کنید:
<br>
<a href="https://spark.apache.org/docs/latest/ml-guide.html">https://spark.apache.org/docs/latest/ml-guide.html</a>
<br>
به طور مشخص پیشنهاد می‌شود که با مفاهیم Pipeline و Transform و Estimator آشنا شوید.
<br>
همچنین مطالعه بخش Clustering در راهنمای فوق می‌تواند مفید باشد.
<br>
</p>

<p dir=rtl>
حال، کد لازم برای انجام خوشه‌بندی داده‌های فوق به دو خوشه را بنویسید. 
<br>
دقت کنید که برای این کار لازم است که از مفهوم Pipeline نیز استفاده کنید.
</p>

In [None]:
## Write your Code here.

<p dir=rtl>
یکی از سوالات مهمی که در خوشه‌بندی KMeans می‌بایست پاسخ دهیم این است که عدد مناسب برای K چه مقداری می‌بایست باشد. برای پاسخ به این سوال، مشابه روشی که در کلاس توضیح داده شد، می‌توانیم به ازای K های مختلف خوشه‌بندی را انجام دهیم و  جایی که کاهش قابل توجهی در فاصله بین نقاطه و مراکز خوشه اتفاق افتاد را به عنوان مقدار K مطلوب در نظر بگیریم. برای توضیحات بیشتر به اسلایدهای درس مراجعه کنید.
<br>
می‌خواهیم کد لازم برای یافتن مقدار مطلوب K را بنویسیم. برای اینکار می‌توانیم از کتابخانه ClusterEvaluator در اسپارک استفاده کنیم. با مطالعه این کتابخانه و نوشتن کد Pipeline مناسب، الگوریتم KMeans را به ازای K های مختلف اجرا کرده و مشخص کنید که مقدار مطلوب برای K چه مقداری باید باشد. 
<br>
با توجه به اینکه زمان اجرای خوشه‌بندی برای تعداد زیاد K می‌تواند بسیار طولانی باشد، پیشنهاد می‌شود که این آزمایش را برای مقادیر  40, 60, 80, 100, 120 انجام دهید.
</p>

In [None]:
# Write your code here.

<p dir=rtl>
مقدار مطلوب K چه عددی خواهد بود؟ چرا؟ پاسخ را در همینجا بنویسید.
</p>

<p dir=rtl>
پاسخ سوال...

</p>