# Spark

Apache Spark works in a master-slave architecture where the master is called “Driver” and slaves are called “Workers”. The starting point of your Spark application is sc, a Spark Context Class instance. It runs inside the driver.

### What is RDD?
The Spark Core implementation is a RDD (Resilient Distributed Dataset) which is a collection of distributed data across different nodes of the cluster that are processed in parallel.

In [1]:
# show locaton of the spark
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark import SparkContext

# How to create SparkContext

## Method 1: SparkSession

In [3]:
"""
master : Pyspark working mode (local, k8s, yarn) 
[4] : Number of executors(If you use * it will use all executors)
appName: Name of the aplication
config("spark.executor.memory", "4g")\ # Servers that do the work 4g: 4 GB 
config("spark.driver.memory", "2g")\ # The server where the user sends the code and returns the results.
getOrCreate : If there is a session, bring it, if not, create a new one.
"""

pyspark = SparkSession.builder \
.master("local[4]") \
.appName("RDD-Example") \
.config("spark.executor.memory", "4g") \
.config("spark.driver.memory", "2g") \
.getOrCreate()

In [4]:
#Creating Spark Context
sc = pyspark.sparkContext

In [5]:
# creating a rdd
rdd1 = sc.parallelize([("Ahmet", 25), ("Cemal", 29), ("Inci", 38), ("Burcu", 33)])

In [6]:
# Getting 2 examples for context
rdd1.take(2)

[('Ahmet', 25), ('Cemal', 29)]

In [7]:
# Stop session
sc.stop()

## Method 2: ParkSession and SparkConf

In [8]:
conf = SparkConf() \
.setMaster("local[4]") \
.setAppName("RDD-Ex-Method2") \
.setExecutorEnv("spark.executor.memory", "4g") \
.setExecutorEnv("spark.executor.memory", "4g") 

In [9]:
pyspark = SparkSession.builder\
.config(conf=conf)\
.getOrCreate()

In [10]:
sc = pyspark.sparkContext

In [11]:
# creating a rdd
rdd2 = sc.parallelize([("Ahmet", 25), ("Cemal", 29), ("Inci", 38), ("Burcu", 33)])

In [12]:
# Getting 2 examples for context
rdd2.take(2)

[('Ahmet', 25), ('Cemal', 29)]

In [13]:
# Stop session
sc.stop()

## Method 3: SparkContext and SparkConf

In [34]:
conf = SparkConf() \
.setMaster("local[4]") \
.setAppName("RDD-Ex-Method2") \
.setExecutorEnv("spark.executor.memory", "4g") \
.setExecutorEnv("spark.executor.memory", "4g") 

In [35]:
sc = SparkContext(conf=conf)

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.GatewayConnection.run(GatewayConnection.java:238)
java.lang.Thread.run(Thread.java:748)
	at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2456)
	at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2452)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2452)
	at org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2541)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:84)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [16]:
# creating a rdd - tuple
rdd3 = sc.parallelize([("Ahmet", 25), ("Cemal", 29), ("Inci", 38), ("Burcu", 33)])

#-list
rdd4 = sc.parallelize([["Ahmet", 25], ["Cemal", 29], ["Inci", 38], ["Burcu", 33], ["Faki", 42]])

In [17]:
# Getting 2 examples for context
rdd3.take(2)

[('Ahmet', 25), ('Cemal', 29)]

In [18]:
rdd3.count()

4

In [19]:
rdd4.take(2)

[['Ahmet', 25], ['Cemal', 29]]

In [20]:
rdd4.count()

5

In [21]:
# Creating RDD with integer list
numbers_rdd1 = sc.parallelize([[1, 2, 3], [4, 5, 6]])
numbers_rdd1.take(2)

[[1, 2, 3], [4, 5, 6]]

In [32]:
# Stop session
sc.stop()

In [23]:
import pandas as pd

In [37]:
pyspark = SparkSession.builder \
.master("local[4]") \
.appName("RDD-Example") \
.config("spark.executor.memory", "4g") \
.config("spark.driver.memory", "2g") \
.getOrCreate()

In [38]:
# Creating RDD with dictionary
friends_dict = {
    "Name": ["Kerem", "Fatma", "Akif"],
    "Age": [40, 42, 18]
}

In [39]:
df = pd.DataFrame(friends_dict)
df.head()

Unnamed: 0,Name,Age
0,Kerem,40
1,Fatma,42
2,Akif,18


In [40]:
rdd_df_pandas = pyspark.createDataFrame(df)
rdd_df_pandas.show()

+-----+---+
| Name|Age|
+-----+---+
|Kerem| 40|
|Fatma| 42|
| Akif| 18|
+-----+---+



In [41]:
rdd_df = rdd_df_pandas.rdd
rdd_df.take(3)

[Row(Name='Kerem', Age=40),
 Row(Name='Fatma', Age=42),
 Row(Name='Akif', Age=18)]

In [45]:
file_path = "omer_seyfettin_forsa_hikaye.txt"
sc = pyspark.sparkContext
txt_file = sc.textFile(file_path)
txt_file.take(10)

['Ömer Seyfettin - Forsa',
 '',
 'Akdeniz’in, kahramanlık yuvası sonsuz ufuklarına bakan küçük tepe, minimini bir çiçek ormanı gibiydi. İnce uzun dallı badem ağaçlarının alaca gölgeleri sahile inen keçiyoluna düşüyor, ilkbaharın tatlı rüzgârıyla sarhoş olan martılar, çılgın bağrışlarıyla havayı çınlatıyordu. Badem bahçesinin yanı geniş bir bağdı. Beyaz taşlardan yapılmış kısa bir duvarın ötesindeki harabe vadiye kadar iniyordu. Bağın ortasındaki yıkık kulübenin kapısız girişinden bir ihtiyar çıktı. Saçı sakalı bembeyazdı. Kamburunu düzeltmek istiyormuş gibi gerindi. Elleri, ayakları titriyordu. Gök kadar boş, gök kadar sakin duran denize baktı, baktı.',
 '',
 '– Hayırdır inşallah! dedi.',
 '',
 'Duvarın dibindeki taş yığınlarına çöktü. Başını ellerinin arasına aldı. Sırtında yırtık bir çuval vardı. Çıplak ayakları topraktan yoğrulmuş gibiydi. Zayıf kolları kirli tunç rengindeydi. Yine başını kaldırdı. Gökle denizin birleştiği dumandan çizgiye dikkatle baktı, Ama görünürde bir şey yoktu