In [4]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [8]:

! pip install findspark
! pip install pyspark
import findspark
from pyspark.sql import SparkSession, functions as F
import pandas as pd

In [9]:
spark = SparkSession.builder.appName("House Price Regression").master("local[2]").config("spark.executer.memory","3g") \
.config("spark.sql.shuffle.partitions", "1").getOrCreate()

**Veri Okuma**: New York City Airbnb verilerini alıp, inceleyelim.

In [10]:
df = spark.read.format("csv").option("header",True).option("inferSchema", True).load("../input/new-york-city-airbnb-open-data/AB_NYC_2019.csv")

In [14]:
df.cache(), df.columns

In [16]:
pd.set_option('display.max_columns',None)
pd.set_option('display.max_rows',None)
df.limit(5).toPandas()

In [17]:
df.printSchema()  #veri türlerini inceleyip kontrol edelim.

In [19]:
my_schema = "id string,name string,host_id int,host_name string,neighbourhood_group string,neighbourhood string,latitude float,longitude float, \
            room_type string,price int,minimum_nights int,number_of_reviews int,last_review date,reviews_per_month float,calculated_host_listings_count int, \
            availability_365 int"  #olması gereken veri tipleri.

In [22]:
df = spark.read.format("csv").option("header",True).schema(my_schema).load("../input/new-york-city-airbnb-open-data/AB_NYC_2019.csv") 

#belirlediğimiz veri şemaları ile tekrar okutalım.

df.limit(5).toPandas()

In [25]:
df.printSchema() 
df.count()

**Eksik Veri Kontrolü**

In [27]:
avg_dict = {}
df_count = df.count()

In [28]:
cat_nulls = []
num_nulls = []

In [34]:
def null_count(df, col_name):
    nc = df.filter( 
        (F.col(col_name).isNull()) | 
        (F.col(col_name) == "") | 
        (F.col(col_name) == "NA")
    ).count()
    return nc

In [35]:
for col_name in df.dtypes:
    nc = null_count(df, col_name[0])
        
    if(  nc > 0 ):
        print("{} {} type has {} null values, % {}".format(col_name[0], col_name[1], nc, (nc/df_count)))
        if col_name[1] == 'string':
            cat_nulls.append(col_name[0])
        else:
            num_nulls.append(col_name[0])

In [36]:
cat_nulls, num_nulls #eksik veri olan kolonlar

**Kategorik Veri Analizi**

In [37]:
for col_name in df.dtypes:
    if col_name[1] == 'string':
        print(df.select(col_name[0]).distinct().count())
        df.groupBy(col_name[0]).count().orderBy(F.desc("count")).show()

In [38]:
def find_weaks(df, col_name, limit):
    '''
    Finds weak classes in a given columns according to a limit.
    For example: If I consider frequency of a class less than 200 I can consider it is waek.
    Function returns a list including weak classes
    '''
    below_limits = df.groupBy(col_name).count().filter(F.col("count") < limit).select(col_name).collect()
    to_be_grouped = [i[0] for i in below_limits]
    return to_be_grouped

#verilen kolonun belirlenen limite göre durumunu döndüren fonksiyon.

**room_type ve neighbourhood_group için zayıf sınıf analizi**

In [39]:
room_type_to_be_grouped = find_weaks(df, "room_type", 200)
room_type_to_be_grouped

In [40]:
neighbourhood_group_to_be_grouped = find_weaks(df, "neighbourhood_group", 200)
neighbourhood_group_to_be_grouped

In [41]:
df2 = df.withColumn("room_type", 
                    F.when( F.col("room_type").isin(room_type_to_be_grouped) | 
                           F.col("room_type").isNull(), 
                          "Other").otherwise(F.col("room_type"))) \
.withColumn("neighbourhood_group", 
                    F.when( F.col("neighbourhood_group").isin(neighbourhood_group_to_be_grouped) | 
                          F.col("neighbourhood_group").isNull(), 
                          "Other").otherwise(F.col("neighbourhood_group")))

In [42]:
df2.groupBy("room_type").count().orderBy(F.desc("count")).show()
df2.groupBy("neighbourhood_group").count().orderBy(F.desc("count")).show()

In [43]:
categoric_cols = []
numeric_cols = []
discarted_cols = ['id', 'name', 'host_id', 'host_name', 'neighbourhood']
label_col = ['price'] 

In [44]:
for col_name in df.dtypes:
    if (col_name[0] not in label_col+discarted_cols):
        if (col_name[1] == 'string'):
            categoric_cols.append(col_name[0])
        else: numeric_cols.append(col_name[0])

In [45]:
categoric_cols, numeric_cols, discarted_cols, label_col

In [46]:
len(df.columns) == (len(categoric_cols)  + len(numeric_cols) + len(discarted_cols) + len(label_col))  #verilerin sağlaması

In [47]:
cat_nulls = []
num_nulls = []
for col_name in df2.dtypes:
    if col_name[0] not in discarted_cols:
        nc = null_count(df2, col_name[0])
    
    
        if(  nc > 0 ):
            print("{} {} type has {} null values, % {}".format(col_name[0], col_name[1], nc, (nc/df_count)))
            if col_name[1] == 'string':
                cat_nulls.append(col_name[0])
            else:
                num_nulls.append(col_name[0])

In [48]:
cat_nulls, num_nulls

In [49]:
df2.filter("latitude IS NULL").limit(10).toPandas() 

#'latitude' kolonunun eksik verisini olan satırları filtreleyelim, eksik verilerin aynı gözlemlerde olduğunu görüyoruz.

In [50]:
df3 = df2.dropna(thresh=7, subset=num_nulls)

In [51]:
df3.count(), df2.count() #eksik verileri dropladığımızda kalan gözlem sayıları karşılaştırması

In [52]:
cat_nulls = []
num_nulls = []
for col_name in df3.dtypes:
    if col_name[0] not in discarted_cols:
        nc = null_count(df3, col_name[0])
    
    
        if(  nc > 0 ):
            print("{} {} type has {} null values, % {}".format(col_name[0], col_name[1], nc, (nc/df_count)))
            if col_name[1] == 'string':
                cat_nulls.append(col_name[0])
            else:
                num_nulls.append(col_name[0])

In [53]:
df4 = df3.withColumn("reviews_per_month", F.when( F.col("number_of_reviews") == 0,0.0).otherwise(F.col("reviews_per_month")))

In [54]:
for col_name in df4.dtypes:
    if col_name[0] not in discarted_cols:
        nc = null_count(df4, col_name[0])
    
    
        if(  nc > 0 ):
            print("{} {} type has {} null values, % {}".format(col_name[0], col_name[1], nc, (nc/df_count)))

In [55]:
df4.where("number_of_reviews = 0").limit(7).toPandas()

In [56]:
df5 = df4.filter("price > 0")

In [57]:
df2.count(), df3.count(), df4.count(), df5.count()

In [58]:
categoric_cols = []
numeric_cols = []
discarted_cols = ['id', 'name', 'host_id', 'host_name', 'neighbourhood', 'last_review']
label_col = ['price']

for col_name in df5.dtypes:
    if (col_name[0] not in label_col+discarted_cols):
        if (col_name[1] == 'string'):
            categoric_cols.append(col_name[0])
        else: numeric_cols.append(col_name[0])

In [62]:
df5.select(categoric_cols+numeric_cols+label_col).limit(6).toPandas()

In [64]:
last_df = df5 #işlem yapacağımız dataframei elde ettik.

In [65]:
one_hot_cols = []
for col_name in categoric_cols:
    dist_count = last_df.select(col_name).distinct().count()
    print(col_name, dist_count)
    if  dist_count > 2:
        one_hot_cols.append(col_name)

one_hot_cols   #onehot yapılacak kolonlar

**StringIndexer İşlemleri**

In [66]:
from pyspark.ml.feature import StringIndexer #etiketleri düzenleyelim

In [67]:
my_dict = {}
string_indexer_objs = []
string_indexer_output_names = []
ohe_input_names = []
ohe_output_names = []

for col_name in categoric_cols:
    my_dict[col_name+"_index_obj"] = StringIndexer() \
                                    .setHandleInvalid("skip") \
                                    .setInputCol(col_name) \
                                    .setOutputCol(col_name+"_indexed")
    
    string_indexer_objs.append(my_dict.get(col_name+"_index_obj"))
    string_indexer_output_names.append(col_name+"_indexed")
    if col_name in one_hot_cols:
        ohe_input_names.append(col_name+"_indexed")
        ohe_output_names.append(col_name+"_ohe")

In [68]:
string_indexer_objs, string_indexer_output_names, ohe_input_names, ohe_output_names

**OnehotEncoder**

In [69]:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder().setInputCols(ohe_input_names).setOutputCols(ohe_output_names)

**VectorAssembler**

In [70]:
from pyspark.ml.feature import VectorAssembler
not_to_hot_coded = list(set(string_indexer_output_names).difference(set(ohe_input_names)))
not_to_hot_coded


In [71]:
if len(categoric_cols) == (len(not_to_hot_coded)+len(ohe_output_names)):
    print("Columns verified")
else: print("Columns are not verified")

In [72]:
assembler = VectorAssembler().setHandleInvalid("skip").setInputCols(numeric_cols+not_to_hot_coded+ohe_output_names).setOutputCol('unscaled_features')

#birden çok kolonu bir vektör kolonunda birleştirelim.

**Feature Scaling**

In [73]:
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler().setInputCol("unscaled_features").setOutputCol("scaled_features")

**PCA**

In [74]:
from pyspark.ml.feature import PCA
pca = PCA().setInputCol("unscaled_features").setK(5).setOutputCol("features")

**Estimator**

In [75]:
from pyspark.ml.regression import RandomForestRegressor
estimator = RandomForestRegressor(numTrees=400).setFeaturesCol("unscaled_features").setLabelCol(label_col[0])   #Rfregressor kullandık.

**Pipeline**

In [76]:
from pyspark.ml import Pipeline
pipeline_obj = Pipeline().setStages(string_indexer_objs + [encoder, assembler, estimator])


**Test-Train Ayrımı:**

In [77]:
train_df, test_df = last_df.randomSplit([.8, .2], seed=142)
pipeline_model = pipeline_obj.fit(train_df)


**Prediction:**

In [78]:
transformed_df = pipeline_model.transform(test_df)
transformed_df.select(label_col[0],'prediction').show(truncate=False)


**Model Değerlendirmesi:**

In [79]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol=label_col[0], metricName='r2')
evaluator.evaluate(transformed_df), evaluator.getMetricName()