# RDD ve Dataframe oluşturma teknikleri

<h3> Ön işlemler: SparkSession ve SparkContext Oluşturma </h3>

In [26]:
import findspark
findspark.init()

In [27]:
from pyspark.sql import SparkSession

<b>SparkSession</b> ve <b>SparkContext</b>; Spark için bir başlangıç noktasıdır.
<p>SparkSession içerisindeki <b><i>master</i></b>; çalışılacak ortamı ifade eder. Bir küme üzerinde çalışılıyorsa; 
bu küme burada belirtilir. Örneğin YARN gibi. Örneğimizde bağımsız modda çalıştığımız için <b><i>local[x]</i></b> ifadesini kullandık. x bir tamsayı değeri olmalı ve 0'dan büyük olmalıdır; bu, RDD, DataFrame ve Dataset kullanılırken kaç bölüm oluşturması gerektiğini gösterir. İdeal olarak, x değeri sahip olduğunuz CPU çekirdeği sayısı olmalıdır.</p>
<p><b><i>appName()</i></b> - Uygulama adınızı ayarlamak için kullanılır.</p>
<p><b><i>getOrCreate()</i></b> - Bu, zaten varsa bir SparkSession nesnesi döndürür, yoksa yeni bir tane oluşturur.</p>

In [29]:
spark = SparkSession.builder \
.master("local[8]") \
.appName("Python-Spark-Veri-Seti-İslemleri") \
.config("spark.executor.memory","4g") \
.config("spark.driver.memory","2g") \
.getOrCreate()

sc = spark.sparkContext

<h3>1 - parallelize( ) metodu </h3>

<h4>Örnek 1</h4>

In [30]:
from pyspark.sql import Row
daylist_as_rdd = sc.parallelize(["Pazartesi","Salı","Çarşamba","Perşembe","Cuma","Cumartesi","Pazar"]).map(lambda x: Row(x))

In [31]:
df_1 = daylist_as_rdd.toDF(['Günler']) # toDF ile rdd'yi DataFrame formatına çeviriyoruz.

In [32]:
df_1.show()

+---------+
|   Günler|
+---------+
|Pazartesi|
|     Salı|
| Çarşamba|
| Perşembe|
|     Cuma|
|Cumartesi|
|    Pazar|
+---------+



<h4>Örnek 2</h4>

In [19]:
df_2 = sc.parallelize([(1, 2, 3, 'a b c'),
(4, 5, 6, 'd e f'),
(7, 8, 9, 'g h i')]).toDF(['Sütun-1', 'Sütun-2', 'Sütun-3','Sütun-4'])

In [20]:
df_2.show()

+-------+-------+-------+-------+
|Sütun-1|Sütun-2|Sütun-3|Sütun-4|
+-------+-------+-------+-------+
|      1|      2|      3|  a b c|
|      4|      5|      6|  d e f|
|      7|      8|      9|  g h i|
+-------+-------+-------+-------+



<font color="red">Not</font>: ToDF fonksiyonu kullanılmadan elde edilen veri RDD verisidir. Bu veriyi listelemek için bir action fonksiyonu olarak collect metodu kullanılmaktadır. Örnek 1'e tekrar bakalım. 

In [22]:
from pyspark.sql import Row
daylist_as_rdd = sc.parallelize(["Pazartesi","Salı","Çarşamba","Perşembe","Cuma","Cumartesi","Pazar"]).map(lambda x: Row(x))

In [23]:
daylist_as_rdd.collect()

[<Row('Pazartesi')>,
 <Row('Salı')>,
 <Row('Çarşamba')>,
 <Row('Perşembe')>,
 <Row('Cuma')>,
 <Row('Cumartesi')>,
 <Row('Pazar')>]

<h3>2- createDataFrame( ) metodu</h3>

In [43]:
spark = SparkSession.builder \
.master("local[8]") \
.appName("Python-Spark-Veri-Seti-İslemleri") \
.config("spark.executor.memory","4g") \
.config("spark.driver.memory","2g") \
.getOrCreate()

df_3 = spark.createDataFrame([
('1', 'Ali', '70000', '1'),
('2', 'Fatma', '80000', '2'),
('3', 'Ayşe', '60000', '2'),
('4', 'Mehmet', '90000', '1')],
['Id', 'İsim', 'Maaş','Bölüm-Id']
)

In [44]:
df_3.show()

+---+------+-----+--------+
| Id|  İsim| Maaş|Bölüm-Id|
+---+------+-----+--------+
|  1|   Ali|70000|       1|
|  2| Fatma|80000|       2|
|  3|  Ayşe|60000|       2|
|  4|Mehmet|90000|       1|
+---+------+-----+--------+



<h3>3- read() ve load () metotları</h3>

<h4>Dosyadan Okuma</h4>

In [66]:
spark = SparkSession.builder \
.master("local[8]") \
.appName("Python-Spark-Veri-Seti-İslemleri") \
.config("spark.executor.memory","4g") \
.config("spark.driver.memory","2g") \
.getOrCreate()

df = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').\
load("./mock_kaggle.csv", header=True) # Veri seti Kaggle'dan indirilmiştir.

df.show(5)
df.printSchema()

+----------+-----+-------+-----+
|      data|venda|estoque|preco|
+----------+-----+-------+-----+
|2014-01-01|    0|   4972| 1.29|
|2014-01-02|   70|   4902| 1.29|
|2014-01-03|   59|   4843| 1.29|
|2014-01-04|   93|   4750| 1.29|
|2014-01-05|   96|   4654| 1.29|
+----------+-----+-------+-----+
only showing top 5 rows

root
 |-- data: string (nullable = true)
 |-- venda: integer (nullable = true)
 |-- estoque: integer (nullable = true)
 |-- preco: double (nullable = true)



<h4>Veritabanından Okuma</h4>

In [None]:
import os
jardrv = "./postgresql-42.2.14.jar"
from pyspark.sql import SparkSession
spark = SparkSession.builder.config('spark.driver.extraClassPath', jardrv).getOrCreate()
url = 'jdbc:postgresql://url:port/database_name'
properties = {'user': 'username', 'password': 'password'}
df = spark.read.jdbc(url=url, table='public."table_name"', properties=properties)
df.printSchema()
df.show()