In [2]:
from dask.distributed import Client  # برای مدیریت خوشه Dask
import dask.dataframe as dd
from dask_ml.model_selection import train_test_split
from dask_ml.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

In [3]:
# راه‌اندازی کلاینت
client = Client()

# بارگذاری داده‌ها
file_path = './hospitals (1).csv'
df = dd.read_csv(file_path, assume_missing=True)

In [4]:
# نمایش وضعیت کلاینت
print(client)


<Client: 'tcp://127.0.0.1:6558' processes=4 threads=4, memory=7.89 GiB>


In [5]:
# مشاهده نوع داده‌ها
print("Data Types of Columns:")
print(df.compute().dtypes)  # محاسبه و سپس نمایش نوع داده‌ها


Data Types of Columns:
date                             object
hospital.bed.total              float64
hospital.bed.total.max          float64
hospital.bed.occupied           float64
hospital.bed.free               float64
                                 ...   
hospital.pbor.care.total.max    float64
hospital.pbid.care.occupied     float64
hospital.pbid.care.free         float64
hospital.pbid.care.total        float64
hospital.pbid.care.total.max    float64
Length: 265, dtype: object


In [6]:
# مشاهده چند ردیف اول برای بررسی داده‌ها
df_head = df.head(10)  # فقط 10 ردیف اول
print("First 10 rows:")
print(df_head)


First 10 rows:
         date  hospital.bed.total  hospital.bed.total.max  \
0  2020-03-10               270.0                     0.0   
1  2020-03-11               270.0                     0.0   
2  2020-03-12               292.0                     0.0   
3  2020-03-13               332.0                     0.0   
4  2020-03-14               332.0                     0.0   
5  2020-03-15               332.0                     0.0   
6  2020-03-16               332.0                     0.0   
7  2020-03-17               332.0                     0.0   
8  2020-03-18               332.0                     0.0   
9  2020-03-19               332.0                     0.0   

   hospital.bed.occupied  hospital.bed.free  hospital.bed.free.max  \
0                   18.0              252.0                    0.0   
1                   18.0              252.0                    0.0   
2                   17.0              275.0                    0.0   
3                   22.0         

In [7]:
# 2. انتخاب ویژگی‌ها و برچسب‌ها
# فرض می‌کنیم ستون هدف `hospital.icu.occupied` باشد و ویژگی‌ها شامل ستون‌های عددی است
X = df.drop('hospital.icu.occupied', axis=1)
y = df['hospital.icu.occupied']  # فرض می‌کنیم این ستون هدف است

In [22]:
from dask_ml.linear_model import LogisticRegression
from dask_ml.model_selection import train_test_split
from dask.distributed import Client
import dask.dataframe as dd
import xgboost as xgb

# اتصال به Dask Client
client = Client(local_directory='/path/to/local/directory')

# شناسایی ستون‌هایی با نوع داده نامناسب
invalid_columns = df.select_dtypes(include=["object", "string"]).columns
print(f"Columns with invalid types: {invalid_columns}")

# تبدیل داده‌های زمانی (ستون 'date') به ویژگی‌های عددی
if "date" in df.columns:
    df["year"] = dd.to_datetime(df["date"]).dt.year
    df["month"] = dd.to_datetime(df["date"]).dt.month
    df["day"] = dd.to_datetime(df["date"]).dt.day
    df = df.drop(columns=["date"])

# تبدیل داده‌های متنی به دسته‌ای (Categorical)
for col in invalid_columns:
    df[col] = df[col].astype("category").cat.codes

# شناسایی ستون‌هایی که همچنان مقادیر گم‌شده دارند
missing_columns = df.columns[df.isnull().any()]

for col in missing_columns:
    print(f"Handling missing values in column: {col}")

    # جدا کردن داده‌های کامل و ناقص
    complete_data = df[df[col].notnull()]
    missing_data = df[df[col].isnull()]

    # ویژگی‌ها و برچسب‌ها برای داده‌های کامل
    X_train = complete_data.drop(columns=[col])
    y_train = complete_data[col]

    # ویژگی‌های داده‌های ناقص
    X_missing = missing_data.drop(columns=[col])

    # تبدیل به DaskDMatrix
    dtrain = xgb.dask.DaskDMatrix(client, X_train, y_train)
    dmissing = xgb.dask.DaskDMatrix(client, X_missing)

    # تنظیمات مدل XGBoost
    params = {
        "objective": "reg:squarederror",  # هدف مدل: رگرسیون
        "max_depth": 6,
        "eta": 0.3,
        "enable_categorical": True  # فعال کردن دسته‌بندی
    }

    # آموزش مدل XGBoost
    bst = xgb.dask.train(client, params, dtrain, num_boost_round=100)

    # پیش‌بینی مقادیر گم‌شده
    y_pred = xgb.dask.predict(client, bst, dmissing).compute()

    # جایگزینی مقادیر گم‌شده با پیش‌بینی‌های مدل
    df.loc[df[col].isnull(), col] = y_pred

print("Missing values handled.")

# بررسی نوع X و تبدیل به Dask DataFrame در صورت نیاز
if isinstance(X, dd.DataFrame):
    print("X is already a Dask DataFrame.")
else:
    X = dd.from_pandas(X, npartitions=4)

# بررسی نوع y و تبدیل به Dask Series در صورت نیاز
if isinstance(y, dd.Series):
    print("y is already a Dask Series.")
else:
    y = dd.from_pandas(y, npartitions=4)

# تبدیل X و y به Dask Array
X = X.to_dask_array(lengths=True)
y = y.to_dask_array(lengths=True)


Perhaps you already have a cluster running?
Hosting the HTTP server on port 8707 instead
  next(self.gen)


Columns with invalid types: Index([], dtype='object')
Missing values handled.
X is already a Dask DataFrame.
y is already a Dask Series.
Model accuracy: dask.array<mean_agg-aggregate, shape=(), dtype=float64, chunksize=(), chunktype=numpy.ndarray>
