### **Büyük Veri Setleri Üzerinde Veri Madenciliği Dersi:   Proje1 - Spark Uygulamaları**

#### **Author:** Süheyl Çavuşoğlu


**Keşif Aktivitesi - 1:** Apache Spark Uygulaması Geliştirme


**İntihal:** Netten alınacak kısmi kod parçaları önceden kod içinde/raporda belirtilmek ve soru sorulduğunda cevaplanması durumunda sıkıntı çıkarmayacaktır. (i) İnternet kaynağını belirtmeyen/açıklayamayan/üzerinde geliştirme yapmayan veya (ii) birbirleriyle benzer/aynı çalışma teslim edenlerin aktiviteleri sıfır üzerinden değerlendirilecektir.


1) Derste bahsedilen geliştirme ortamlarından biri kullanılabilir.

- Anaconda: Jupyter Lab/Notebook üzerinde PySpark
- Google Colab üzerinde PySpark
- Databricks ile Spark Uygulamaları
- Apache Zeppelin (http://zeppelin.apache.org/)


2) Aşağıdaki Apache Spark kütüphaneleri için birer örnek gerçekleyiniz.

- Apache Spark SQL: Bir dataset import edip üzerinde 3 query run etme.
- Mllib: Bir dataset üzerinde sınıflandırma, kümeleme veya regresyon örneği çalıştırma
- GraphX: Bir çizge datası üzerinde sorgu veya algoritma çalıştırma 


**NOT:**
- Geliştirme ortamı olarak Databricks kullanılmıştır.
- GraphX yerine graphframes kütüphanesi ve versiyon sorunu yaşanaması için de Spark 3.1.2 sürümü ile graphframes 0.8 sürümü kullanıldı.

#### Apache Spark SQL ile Bir Dataset Üzerinde Sorgulamalar

In [None]:
from pyspark.sql import SparkSession     # İlk olarak pyspark içerisinden SparkSession'ı import ediyoruz. Bu sayede Spark'ın SQL, MLlib ve GraphX gibi bileşenlerine ulaşabileceğiz.

In [None]:
spark_session = SparkSession.builder.appName("Spark_ile_SQL_Uygulamaları").getOrCreate()        # Spark üzerinden çalıştıracağımız SQL uygulaması için bir isim belirledik.
                                                                                                # (böylece birden fazla uygulama olduğunda, uygulamaları daha kolay ayırt edebiliriz)


spark_df = spark_session.read.csv("/FileStore/tables/GlobalLandTemperaturesByCity.csv", header=True, inferSchema=True)    
# Databricks'e yüklediğimiz 'GlobalLandTemperaturesByCity.csv' isimli veri dosyanın path'ini  girerek okuduk ve bu veri setini spark_df'e atadık.
# header = True ile veri setindeki ilk satırın sütun adı olduğunu belirttik.
# inferSchema = True ile veri tiplerini Spark'a otomatik olarak okuttuk.
            
spark_df.createOrReplaceTempView("sıcaklık_verisetimiz")   # veri setimizin geçici bir kopyasını oluşturuyoruz. Bu sayede bu veri seti üzerinde çok daha hızlı sorgulamalar yapabileceğiz. 
                                                            # ayrıca bu SparkSession kapandıktan sonra bu geçici tablo silinecektir ve yer kaplamayacaktır.

Veri setimizi hazırladık. Şimdi veri seti üzerinden bazı sorgulamalar yapabiliriz.

In [None]:
spark_df.show()

+----------+-------------------+-----------------------------+-----+-------+--------+---------+
|        dt| AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+-------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|              6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01| 5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01|             10.644|           1.2830000000000001|Århus|Denmark|  57.05N|   10.33E|
|1744-06-01| 14.050999999999998|        

In [None]:
birinci_sorgu = spark.sql("SELECT COUNT(*) FROM sıcaklık_verisetimiz")    # Hazırladığımız geçici kopya üzerinden işlem yapıyoruz ve bu komutlar ile veri setindeki toplam satır sayısını sayıyoruz.
birinci_sorgu.show()    # show() methodu ile toplam satır sayısını ekrana bastırıyoruz.

+--------+
|count(1)|
+--------+
| 8599212|
+--------+



In [None]:
ikinci_sorgu = spark.sql("SELECT * FROM sıcaklık_verisetimiz WHERE Country='Turkey'")  # burada ise ülke bilgisi 'Turkey' olan tüm satırlardaki sıcaklıkları sorguluyoruz.
ikinci_sorgu.show(10)   # ilk 10 satırını ekrana bastırıyoruz.

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|            10.013|                        2.291|Çorlu| Turkey|  40.99N|   27.69E|
|1743-12-01|              null|                         null|Çorlu| Turkey|  40.99N|   27.69E|
|1744-01-01|              null|                         null|Çorlu| Turkey|  40.99N|   27.69E|
|1744-02-01|              null|                         null|Çorlu| Turkey|  40.99N|   27.69E|
|1744-03-01|              null|                         null|Çorlu| Turkey|  40.99N|   27.69E|
|1744-04-01|            13.685|                        2.162|Çorlu| Turkey|  40.99N|   27.69E|
|1744-05-01|15.020999999999999|                        1.824|Çorlu| Turkey|  40.99N|   27.69E|
|1744-06-01|            19.663|           1.700999

In [None]:
ucuncu_sorgu = spark.sql("SELECT Country, AverageTemperature FROM sıcaklık_verisetimiz WHERE Country = 'Turkey' and AverageTemperature < -10") # Türkiye'deki ortalama sıcaklığı -10'dan düşük olanları sorguluyoruz.
ucuncu_sorgu.show(10)

+-------+-------------------+
|Country| AverageTemperature|
+-------+-------------------+
| Turkey|            -10.198|
| Turkey|-10.574000000000002|
| Turkey|            -10.331|
| Turkey|-11.165999999999999|
| Turkey|-11.209000000000001|
| Turkey|            -11.618|
| Turkey|            -13.421|
| Turkey|            -11.229|
| Turkey|            -10.835|
| Turkey|            -10.133|
+-------+-------------------+
only showing top 10 rows



#### MLlib ile Bir Lineer Regresyon Modeli Çalıştırma

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression  # İlgileneceğimiz veri seti üzerinde Lineer Regresyon modeli kuracağımız için bu kütüphaneyi import ediyoruz.
from pyspark.ml.feature import VectorAssembler  # Bağımsız değişkenleri tek bir sütunda toplamak için bu kütüphaneyi kullanacağız.

In [None]:
spark_session = SparkSession.builder.appName("Spark_ile_Lineer_Regresyon").getOrCreate() # regresyon için de bir session oluşturuyoruz.
spark_df = spark_session.read.csv("/FileStore/tables/housing.csv", header=True, inferSchema=True)   # 'California Housing Prices' isimli veri setini (housing.csv olarak kayıtlı) okuyoruz, bu veri setinde işlemler yapacağız.

In [None]:
spark_df.show()  # veri setimizi görüntüledik.

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [None]:
spark_df.dtypes # kolonların veri tipleri

Out[11]: [('longitude', 'double'),
 ('latitude', 'double'),
 ('housing_median_age', 'double'),
 ('total_rooms', 'double'),
 ('total_bedrooms', 'double'),
 ('population', 'double'),
 ('households', 'double'),
 ('median_income', 'double'),
 ('median_house_value', 'double'),
 ('ocean_proximity', 'string')]

In [None]:
from pyspark.sql.functions import isnan, when, count, col

spark_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in spark_df.columns]).show()   # burada veri setimizdeki null ve nan değerleri sorguluyoruz ve bu değerleri sayıyoruz.

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|           207|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



In [None]:
from pyspark.ml.feature import Imputer  # burada Imputer sınıfını çağırıyoruz, Imputer'i eksik verileri doldurmak için kullanacağız.

imputer = Imputer(inputCols=["total_bedrooms"], outputCols=["total_bedrooms_imputed"]) # burada eksik verilerin saptanacağı 'total_bedrooms' kolonunu ve eksik veriler doldurulduktan sonra bu bilgilerin yazılacağı                                                                                        # kolon olan 'total_bedrooms_imputed' kolonunu belirtiyoruz.
# Not: eksik veriler, bulunduğu kolondaki verilerin ortalamalarının alınması ile doldurulmaktadır.
model = imputer.fit(spark_df)    # Imputer nesnesini spark_df üzerinde çalıştırdık, eksik verilere sahip olan total_bedrooms kolonunu doldurabilmek için bir model oluşturduk.
spark_df = model.transform(spark_df)  # oluşturulan model ile eksik değerleri doldurduk. Bu eksik değerlerin dolduğu bilgiler de 'total_bedrooms_imputed' kolonuna yazıldı.

In [None]:
spark_df.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+----------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|total_bedrooms_imputed|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+----------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|                 129.0|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|                1106.0|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|                 190.0|
|  -122.25|   37.85|              

In [None]:
new_spark_df = spark_df.drop(spark_df['total_bedrooms'])  # eksik değerlere sahip olan eski total_bedrooms kolonunu attık.

In [None]:
new_spark_df.show()

+---------+--------+------------------+-----------+----------+----------+-------------+------------------+---------------+----------------------+
|longitude|latitude|housing_median_age|total_rooms|population|households|median_income|median_house_value|ocean_proximity|total_bedrooms_imputed|
+---------+--------+------------------+-----------+----------+----------+-------------+------------------+---------------+----------------------+
|  -122.23|   37.88|              41.0|      880.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|                 129.0|
|  -122.22|   37.86|              21.0|     7099.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|                1106.0|
|  -122.24|   37.85|              52.0|     1467.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|                 190.0|
|  -122.25|   37.85|              52.0|     1274.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|  

In [None]:
last_spark_df = new_spark_df.withColumnRenamed("total_bedrooms_imputed", "total_bedrooms")  # total_bedrooms_imputed kolonunun ismini tekrar total_bedrooms yapmak için böyle bir yola başvurdum.
last_spark_df.show()

+---------+--------+------------------+-----------+----------+----------+-------------+------------------+---------------+--------------+
|longitude|latitude|housing_median_age|total_rooms|population|households|median_income|median_house_value|ocean_proximity|total_bedrooms|
+---------+--------+------------------+-----------+----------+----------+-------------+------------------+---------------+--------------+
|  -122.23|   37.88|              41.0|      880.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|         129.0|
|  -122.22|   37.86|              21.0|     7099.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|        1106.0|
|  -122.24|   37.85|              52.0|     1467.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|         190.0|
|  -122.25|   37.85|              52.0|     1274.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|         235.0|
|  -122.25|   37.85|              

In [None]:
last_spark_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in last_spark_df.columns]).show()

# tüm işlemler bittikten sonra eksik veri kalıp kalmadığını son bir kez kontrol ediyoruz.

+---------+--------+------------------+-----------+----------+----------+-------------+------------------+---------------+--------------+
|longitude|latitude|housing_median_age|total_rooms|population|households|median_income|median_house_value|ocean_proximity|total_bedrooms|
+---------+--------+------------------+-----------+----------+----------+-------------+------------------+---------------+--------------+
|        0|       0|                 0|          0|         0|         0|            0|                 0|              0|             0|
+---------+--------+------------------+-----------+----------+----------+-------------+------------------+---------------+--------------+



In [None]:
assembler = VectorAssembler(inputCols=["longitude", "latitude", "housing_median_age", "total_rooms", "total_bedrooms", "population", "households", "median_income"], outputCol="features")
last_spark_df = assembler.transform(last_spark_df).select(col("features"), col("median_house_value").alias("label"))

# modelimizi eğitmedne önce veri setini features ve label olarak ayırmalıyız. Yani bağımsız değişkenler ile bağımlı değişkeni ayırıyoruz. 
# bunun için VectorAssembler sınıfını kullandık.

In [None]:
last_spark_df.show(truncate = False)  # veri setimizi eğitmeden önceki son hali aşağıdaki şekilde

+-------------------------------------------------------+--------+
|features                                               |label   |
+-------------------------------------------------------+--------+
|[-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252]    |452600.0|
|[-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014]|358500.0|
|[-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574]   |352100.0|
|[-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431]   |341300.0|
|[-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462]   |342200.0|
|[-122.25,37.85,52.0,919.0,213.0,413.0,193.0,4.0368]    |269700.0|
|[-122.25,37.84,52.0,2535.0,489.0,1094.0,514.0,3.6591]  |299200.0|
|[-122.25,37.84,52.0,3104.0,687.0,1157.0,647.0,3.12]    |241400.0|
|[-122.26,37.84,42.0,2555.0,665.0,1206.0,595.0,2.0804]  |226700.0|
|[-122.25,37.84,52.0,3549.0,707.0,1551.0,714.0,3.6912]  |261100.0|
|[-122.26,37.85,52.0,2202.0,434.0,910.0,402.0,3.2031]   |281500.0|
|[-122.26,37.85,52.0,3503.0,752.0,1504.0,734.0,3.2705]  |24180

In [None]:
(trainingData, testData) = last_spark_df.randomSplit([0.7, 0.3], seed=42)  # veri setimizi %70 eğitim, %30 test olacak şekilde ayırdık.

In [None]:
lr = LinearRegression(maxIter=10, regParam=0.1, elasticNetParam=1.0)   # modelimizi eğitirken kullandığımız hiperparametreler parantez içerisinde verilmiştir. (modelin başarısını bu hiperparametreleri değiştirerek artırmaya çalıştım.)
model = lr.fit(trainingData)

In [None]:
predictions = model.transform(testData)  # bu satırda modelimizin tahmin yapmasını sağlıyoruz ve tahmin sonuçlarını (prediction), features ve label'lar ile ekrana yazdırıyoruz.
predictions.show(truncate = False)

+------------------------------------------------------+--------+------------------+
|features                                              |label   |prediction        |
+------------------------------------------------------+--------+------------------+
|[-124.3,41.84,17.0,2677.0,531.0,1244.0,456.0,3.0313]  |103600.0|98229.82417836785 |
|[-124.23,40.54,52.0,2694.0,453.0,1152.0,435.0,3.0806] |106700.0|190545.55740307132|
|[-124.23,41.75,11.0,3159.0,616.0,1343.0,479.0,2.4805] |73200.0 |72841.97822647775 |
|[-124.22,41.73,28.0,3003.0,699.0,1530.0,653.0,1.7038] |78300.0 |73555.56689241575 |
|[-124.19,40.73,21.0,5694.0,1056.0,2907.0,972.0,3.5363]|90100.0 |163840.22100019315|
|[-124.19,40.77,30.0,2975.0,634.0,1367.0,583.0,2.442]  |69000.0 |142969.35729245562|
|[-124.19,40.78,37.0,1371.0,319.0,640.0,260.0,1.8242]  |70000.0 |115085.39866885915|
|[-124.18,40.78,34.0,1592.0,364.0,950.0,317.0,2.1607]  |67000.0 |118390.01847769786|
|[-124.18,40.79,39.0,1836.0,352.0,883.0,337.0,1.745]   |70500.0 |

In [None]:
# yukarıdaki şekilde tahmin değerlerimizin ne derece tutarlı olduğunu anlamak zor olabilir dolayısıyla r2 değeri hesaplayarak modelimizin kalitesini değerlendirelim.

from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")  # label kolonu ile prediction kolonunun karşılaştırılmasını ve sonuçları r2 olarak vermesini söyledik.
r2 = evaluator.evaluate(predictions)
print("r2 value: {}".format(r2))

r2 value: 0.6369639931627558


**Çeşitli hiperparametreler ile model sonuçları:**

- **lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
model = lr.fit(trainingData)**

- Train: 60 Test:40 --> r2 value: 0.623217
- Train: 70 Test:30 --> r2 value: 0.630162
- Train: 80 Test:20 --> r2 value: 0.624386

- **lr = LinearRegression(maxIter=10, regParam=0.5, elasticNetParam=0.8)
model = lr.fit(trainingData)**

- Train: 70 Test:30 --> r2 value: 0.623903

- **lr = LinearRegression(maxIter=10, regParam=0.1, elasticNetParam=0.8)
model = lr.fit(trainingData)**

- Train: 70 Test:30 --> r2 value: 0.634986

- **lr = LinearRegression(maxIter=10, regParam=0.1, elasticNetParam=0.5)
model = lr.fit(trainingData)**

- Train: 70 Test:30 --> r2 value: 0.629392

- **lr = LinearRegression(maxIter=10, regParam=0.1, elasticNetParam=1.0)
model = lr.fit(trainingData)**

- Train: 70 Test:30 --> r2 value: 0.636964 -------------------------------------> **_En yüksek R2 değeri_**

- **lr = LinearRegression(maxIter=10, regParam=0.05, elasticNetParam=1.0)
model = lr.fit(trainingData)**

- Train: 70 Test:30 --> r2 value: 0.629392

- **lr = LinearRegression(maxIter=20, regParam=0.1, elasticNetParam=1.0)
model = lr.fit(trainingData)**

- Train: 70 Test:30 --> r2 value: 0.633612

- **lr = LinearRegression(maxIter=50, regParam=0.1, elasticNetParam=1.0)
model = lr.fit(trainingData)**

- Train: 70 Test:30 --> r2 value: 0.633984

#### Not: Birkaç bağımsız değişkeni (longitude, latitude, population gibi) hesaptan çıkartarak modelin performansını test ettim ancak model performansında bir iyileşme göremedim. En iyi seçenek tüm bağımsız değişkenleri modele dahil etmek oldu.

#### Bir çizge datası üzerinde sorgu veya algoritma çalıştırma

In [None]:
from pyspark.sql import SparkSession
from graphframes import GraphFrame # çizge datası üzerinde sorgulamalar için graphframes kütüphanesini kullandım.
from pyspark.sql.functions import col # sorguları yaparken kullandım.

In [None]:
spark_session = SparkSession.builder.appName("GraphFrame Örneği").getOrCreate() # çizge datalarında işlem yapacağım bir session yarattım.

In [None]:
spark_df = spark_session.read.csv("/FileStore/tables/trip.csv", header=True, inferSchema=True) # graphframe kütüphanesi için oldukça uygun bir veri seti olduğu için trip.csv isimli veri setini import ettim. 

In [None]:
spark_df.show(10) # veri setinin ilk 10 satırını ekrana bastırdım.

+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|  id|duration|     start_date|  start_station_name|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|4576|      63|8/29/2013 14:13|South Van Ness at...|              66|8/29/2013 14:14|South Van Ness at...|            66|    520|       Subscriber|   94127|
|4607|      70|8/29/2013 14:42|  San Jose City Hall|              10|8/29/2013 14:43|  San Jose City Hall|            10|    661|       Subscriber|   95138|
|4130|      71|8/29/2013 10:16|Mountain View Cit...|              27|8/29/2013 10:17|Mountain View Cit...|            27|     48|       Subscriber|   97214|
|4251|      77|8/29/2013 11:29|  San Jose City Hall|      

In [None]:
# veri setini hazırladık, şimdi birkaç sorgu örneği yapabiliriz.
# graphframe kütüphanesi için ilk olarak veri setimizden köşeleri ve kenarları belirleyelim.

vertices = spark_df.selectExpr("start_station_id as id").union(spark_df.selectExpr("end_station_id as id")).distinct()
# grafiğin düğümlerini oluşturmak için veri setindeki başlangıç istasyonu ve bitiş istasyonu sütunlarından yalnızca benzersiz istasyon kimliklerini seçtik.

edges = spark_df.selectExpr("start_station_id as src", "end_station_id as dst", "duration as relationship")
# veri setindeki başlangıç istasyonu, bitiş istasyonu ve seyahat süresi sütunnlarını seçtik. Seçtiğimiz sütunların isimlerini de graphframe'e uygun olacak şekilde değiştirdik.

graph = GraphFrame(vertices, edges)  # bir graph frame oluşturdk.

# hangi rotanın kaç kez kullanıldığını sorgulayalım. Bunun için başlangıç ve bitiş istasyonlarını gruplayıp count metodu ile sayabiliriz.
most_common_trips = graph.edges.groupBy("src", "dst").count().orderBy("count", ascending=False)
most_common_trips.show()
# rotaların kullanılma sıklığını gösteren tabloyu ekrana bastırdık.

+---+---+-----+
|src|dst|count|
+---+---+-----+
| 69| 65| 6216|
| 50| 60| 6164|
| 65| 70| 5041|
| 61| 50| 4839|
| 50| 61| 4357|
| 60| 74| 4269|
| 51| 70| 3967|
| 74| 61| 3903|
| 64| 77| 3627|
| 70| 50| 3622|
| 55| 70| 3526|
| 70| 55| 3510|
| 65| 69| 3495|
| 74| 70| 3477|
| 69| 39| 3438|
| 77| 64| 3427|
| 60| 50| 3231|
| 67| 70| 3190|
| 74| 60| 3116|
| 50| 70| 3033|
+---+---+-----+
only showing top 20 rows



In [None]:
most_common_trips.where(col('count') > 3000).show() # kullanılma sıklığı 3000'den büyük olan rotaları ekrana bastırdık

+---+---+-----+
|src|dst|count|
+---+---+-----+
| 69| 65| 6216|
| 50| 60| 6164|
| 65| 70| 5041|
| 61| 50| 4839|
| 50| 61| 4357|
| 60| 74| 4269|
| 51| 70| 3967|
| 74| 61| 3903|
| 64| 77| 3627|
| 70| 50| 3622|
| 55| 70| 3526|
| 70| 55| 3510|
| 65| 69| 3495|
| 74| 70| 3477|
| 69| 39| 3438|
| 77| 64| 3427|
| 60| 50| 3231|
| 67| 70| 3190|
| 74| 60| 3116|
| 50| 70| 3033|
+---+---+-----+
only showing top 20 rows



In [None]:
# istasyonlar arasında geçen süreleri de sorgulayabiliriz. bunun için istasyonları vertices olarak kullanalım.
vertices_2 = spark_df.selectExpr("start_station_id as id", "start_station_name as name").distinct()

# edges için de başlangıç ve bitiş istasyonlarından bir dataframe oluşturalım.
edges_2 = spark_df.selectExpr("start_station_id as src", "end_station_id as dst", "duration")

# bir graph frame oluşturalım.
graph_2 = GraphFrame(vertices_2, edges_2)
# id'si 66 olan istasyondan id'si 67 olan istasyona giden tüm yolları bulalım.
paths = graph_2.bfs("id = 66", "id = 67")

# bulduğumuz bu yolların ne kadar sürede (duration kolonu saniye cinsinden hesaplanmaktadır) kat edildiğine bakalım ve sonuçları ekrana yazdıralım.
paths_results = paths.select("from.id", "to.id", "e0.duration")
paths_results.show(10)

+---+---+--------+
| id| id|duration|
+---+---+--------+
| 66| 67|      83|
| 66| 67|     228|
| 66| 67|     399|
| 66| 67|     185|
| 66| 67|    1116|
| 66| 67|     127|
| 66| 67|     119|
| 66| 67|     122|
| 66| 67|     103|
| 66| 67|     183|
+---+---+--------+
only showing top 10 rows



In [None]:
paths_results.where(col('duration') > 1000).show() # kat edilme süresi 1000 saniyeden uzun olanları ekrana bastıralım.

+---+---+--------+
| id| id|duration|
+---+---+--------+
| 66| 67|    1116|
| 66| 67|    1526|
| 66| 67|    1225|
| 66| 67|    1002|
| 66| 67|    4954|
| 66| 67|   13032|
| 66| 67|    1366|
| 66| 67|    2151|
| 66| 67|    4746|
| 66| 67|    1140|
| 66| 67|   40521|
| 66| 67|    2640|
| 66| 67|   15517|
| 66| 67|    1829|
| 66| 67|    2313|
| 66| 67|    2414|
| 66| 67|    1058|
| 66| 67|    1317|
| 66| 67|    1704|
| 66| 67|    1697|
+---+---+--------+
only showing top 20 rows



In [None]:
# son olarak da istasyonların önemini sıralayalım.
# bunun için ilk olarak grafiğin köşelerini(vertices) oluşturalım. başlangıç istasyonu id'sini ve adını seçip ve bunlardan unique olanları tutalım. kolon isimlerini uygun şekilde değştirelim.
vertices_3 = spark_df.select("start_station_id", "start_station_name").distinct().withColumnRenamed("start_station_id", "id").withColumnRenamed("start_station_name", "name")

# şimdi de grafiğin kenarlarını(edges) seçelim. başlangıç ve bitiş istasyonlarını seçelim ve bu kolonların isimlerini graphframes'e uygun olarak değiştirelim.
edges_3 = spark_df.select("start_station_id", "end_station_id").withColumnRenamed("start_station_id", "src").withColumnRenamed("end_station_id", "dst")

# graph frame'i oluşturalım.
graph_3 = GraphFrame(vertices_3, edges_3)

# genelde web sayfalarının önemini sıralamak için kullanılan bir algoritma olan PageRank algoritmasını uygulayalım.
# resetprobabilty ve maxiter parametrelerini en sık kullanıldığı şekilde aldık.
page_ranks = graph_3.pageRank(resetProbability=0.15, maxIter=10)

# pagerank algoritmasını uyguladıktan sonra artık sonuçları görüntüleyebiliriz. 
result = page_ranks.vertices.orderBy(col("pagerank").desc())
# istasyonların id'lerini ve isimlerini önem sıralamaları azalan olacak şekilde (ilk 10 tanesini) ekrana bastırdık.
result.show(10, truncate = False)

+---+----------------------------------------+------------------+
|id |name                                    |pagerank          |
+---+----------------------------------------+------------------+
|2  |San Jose Diridon Caltrain Station       |3.5440610635004153|
|70 |San Francisco Caltrain (Townsend at 4th)|3.256357252033588 |
|28 |Mountain View Caltrain Station          |2.3963182860249934|
|22 |Redwood City Caltrain Station           |2.2220233275269825|
|69 |San Francisco Caltrain 2 (330 Townsend) |1.881714591128523 |
|50 |Harry Bridges Plaza (Ferry Building)    |1.8146546040806695|
|60 |Embarcadero at Sansome                  |1.6784180823085835|
|61 |2nd at Townsend                         |1.5408450673683143|
|77 |Market at Sansome                       |1.4703197568612778|
|65 |Townsend at 7th                         |1.4597489702478812|
+---+----------------------------------------+------------------+
only showing top 10 rows

