source : https://www.interviewbit.com/pyspark-interview-questions/

## What is PySpark?

PySpark is an Apache Spark interface in Python. 
It is used for collaborating with Spark using APIs written in Python. 
It also supports Spark’s features like Spark DataFrame, Spark SQL, Spark Streaming, Spark MLlib and Spark Core. 
It provides an interactive PySpark shell to analyze structured and semi-structured data in a distributed environment.
PySpark supports reading data from multiple sources and different formats. It also facilitates the use of RDDs (Resilient Distributed Datasets).

PySpark can be installed by using the pip command:

```
pip install pyspark

```


## What are the advantages and disadvantages of PySpark?
### Advantages of PySpark:
- Library Support: Compared to Scala, Python has a huge library collection for working in the field of data science.
- Easy to Learn: PySpark is an easy to learn language.

### Disadvantages of PySpark:
- Since Spark was originally developed in Scala, while using PySpark in Python programs they are relatively less efficient and approximately 10x times slower than the Scala programs. This would impact the performance of heavy data processing applications.


## SparkSession vs SparkContext

SparkContext is an entry point to Spark programming with RDD and to connect to the Spark Cluster, Since Spark 2.0 SparkSession has been introduced and became an entry point to start programming with DataFrame and Dataset.

### SparkContext 
Spark SparkContext is an entry point to Spark and since 1.x and used to create Spark RDD and broadcast variables on the cluster. 

Since Spark 2.0 most of the functionalities (methods) available in SparkContext are also available in SparkSession. Its object **sc** is default available in spark-shell and it can be created by using SparkContext class

Note that you can create only one SparkContext per JVM, in order to create another first you need to stop the existing one using stop() method.

The Spark driver program creates and uses SparkContext to connect to the cluster manager to submit PySpark jobs, and know what resource manager (YARN, Mesos, or Standalone) to communicate to. It is the heart of the PySpark application.

<img width="608" alt="image" src="https://user-images.githubusercontent.com/26552500/193417217-225c59d5-e1ab-43b3-b972-99e547669ed5.png">

PySpark에서 선언하는 SparkContext 객체는 내부의 JVM(Java Virtual Machine) 위에 동작하는 Py4J의 SparkContext와 연결됩니다.
Py4J의 SparkContext는 Worker 노드들과도 연결되어 있고, Worker 노드들 역시 실제 동작은 JVM 위에서 동작합니다.
정리하자면, PySpark는 Python으로 코딩을 하긴 하지만 실제 동작은 JVM에서 행해집니다.

<img width="741" alt="image" src="https://user-images.githubusercontent.com/26552500/193417776-a3cf025d-847d-4b00-a7ba-9343beba2366.png">

### SparkContext - 1

In [4]:
import pyspark
from pyspark import SparkContext, SparkConf

sc = SparkContext()
print(sc)
print(type(sc))

<SparkContext master=local[*] appName=pyspark-shell>
<class 'pyspark.context.SparkContext'>


In [2]:
# 객체가 잘 만들어졌음을 볼 수 있습니다. 만약 SparkContext를 한 개 더 만들면 어떻게 될까요?
# 에러 발생

# new_sc = SparkContext()
# ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by __init__ at /var/folders/5p/l3b0fr1x5wn7450vgnp88r080000gn/T/ipykernel_1755/3605226842.py:4 


In [5]:
# SparkContext 종료
sc.stop()

### SparkContext - 2

In [6]:
sc = SparkContext(master='local', appName='pyspark test')
print(sc)

<SparkContext master=local appName=pyspark test>


In [7]:
# sparkContext의 Configuration을 확인하기 위해서 .getConf().getAll()을 이용합니다.
sc.getConf().getAll()

[('spark.master', 'local'),
 ('spark.app.id', 'local-1665282114640'),
 ('spark.app.name', 'pyspark test'),
 ('spark.driver.host', '192.168.1.8'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.app.startTime', '1665282114586'),
 ('spark.submit.pyFiles', ''),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.port', '49419')]

In [8]:
print(sc.appName)
print(sc.master)
sc.stop()

pyspark test
local


### SparkContext - 3

In [9]:
# SparkConf()을 이용해 SparkContext의 Configuration을 설정하는 방법으로 SparkContext를 만들 수 있습니다
# .setMaster(), setAppName()을 이용해 어플리케이션의 이름과 Master의 URL을 설정해줄 수 있습니다.

conf = SparkConf().setAppName("Pyspark Test1").setMaster('local')
sc = SparkContext(conf=conf)
print(sc)
print(sc.appName)
print(sc.master)
sc.stop()

<SparkContext master=local appName=Pyspark Test1>
Pyspark Test1
local


### SparkContext - 4

In [11]:
# Create Spark Context
from pyspark import SparkConf, SparkContext
conf = SparkConf()
conf.setMaster("local").setAppName("Pyspark Test2")

#sc = SparkContext(conf=conf)
sc = SparkContext.getOrCreate(conf)

print(sc.appName)
print(sc.master)
sc.stop()

Pyspark Test2
local


### SparkSession
SparkSession introduced in version 2.0 and is an entry point to Spark functionality in order to create Spark RDD, DataFrame and DataSet. Its object **spark** is default available in spark-shell and it can be created using SparkSession class

SparkSession is a Unified API that is used in replacing the SQLContext, StreamingContext, HiveContext and all other contexts.

- Spark Context,
- SQL Context,
- Streaming Context,
- Hive Context.


SparkSession is the replacement of SparkContext since PySpark version 2.0. This acts as a starting point to access all of the PySpark functionalities related to RDDs, DataFrame, Datasets etc. 

The SparkSession internally creates SparkContext and SparkConfig based on the details provided in SparkSession. SparkSession can be created by making use of builder patterns.

<img width="778" alt="image" src="https://user-images.githubusercontent.com/26552500/193422833-b9567d2d-19ce-4bca-8224-e23c7e65c51b.png">


You can stop the SparkContext by calling the stop() method. As explained above you can have only one SparkContext per JVM. If you wanted to create another, you need to shutdown it first by using stop() method and create a new SparkContext.

```aidl
# SparkContext stop() method
spark.sparkContext.stop()
```

### SparkSession - 1

In [7]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Pyspark Test3").master('local[*]').getOrCreate()
spark

In [13]:
print(spark.sparkContext)
print(spark.sparkContext.appName)

# SparkContext stop() method
# spark.sparkContext.stop()

<SparkContext master=local[*] appName=Pyspark Test3>
Pyspark Test3


### SparkContext 만들기

In [14]:
sc = spark.sparkContext

In [15]:
# Create RDD
rdd = sc.range(1,5)
# rdd = spark.sparkContext.range(1, 5)
print(rdd.collect())

[Stage 0:>                                                          (0 + 8) / 8]

[1, 2, 3, 4]


                                                                                

In [16]:
print(sc.applicationId)
print(sc.uiWebUrl)
print(sc.version)

local-1665283057965
http://192.168.1.8:4040
3.2.1


In [8]:
print(spark.sparkContext.applicationId)
print(spark.sparkContext.uiWebUrl)
print(spark.sparkContext.version)

spark.stop()

local-1665574094899
http://172.16.227.180:4040
3.2.1


## RDD
RDD는 Resilient Distributed Dataset의 약자이다.
Resilient는 작업이 실패하지 않도록 falut tolerent 한 것이며 어느 한 노드에서 작업이 실패하면 다른 노드에서 실행하는 것을 의미한다.
Rdd는 수정할 수 없는 Read-Only 이다.

RDDs stand for Resilient Distributed Datasets. These are the elements that are used for running and operating on multiple nodes to perform parallel processing on a cluster. Since RDDs are suited for parallel processing, they are immutable elements. This means that once we create RDD, we cannot modify it. RDDs are also fault-tolerant which means that whenever failure happens, they can be recovered automatically. Multiple operations can be performed on RDDs to perform a certain task. The operations can be of 2 types:

<img width="781" alt="image" src="https://user-images.githubusercontent.com/26552500/193422414-a2d9b075-028b-425f-836b-b2e9164cc493.png">

### Transformation: 
These operations when applied on RDDs result in the creation of a new RDD. Some of the examples of transformation operations are filter, groupBy, map.

In [22]:
from pyspark import SparkContext
sc = SparkContext("local", "Pyspark Test4")
words_list = sc.parallelize (
  ["pyspark", 
  "interview", 
  "questions", 
  "at", 
  "interviewbit"]
)
filtered_words = words_list.filter(lambda x: 'interview' in x)
filtered = filtered_words.collect()
print(filtered)

['interview', 'interviewbit']


<img width="1677" alt="image" src="https://user-images.githubusercontent.com/26552500/194735235-5796d8f9-b1c9-4d08-871b-9fece2a92ef0.png">

### Action :
These operations instruct Spark to perform some computations on the RDD and return the result to the driver. It sends data from the Executer to the driver. count(), collect(), take() are some of the examples.
Let us consider an example to demonstrate action operation by making use of the count() function.

- path 모듈 사용 : https://yeo0.github.io/pg/2018/11/21/%ED%8C%8C%EC%9D%B4%EC%8D%AC-os.path-%EB%AA%A8%EB%93%88/

In [31]:
import os
temp = '/Users/sunghwanki/Desktop/Project/Python_Advance/Pyspark'

# path = os.path.join(tempdir, 'sample.txt')
path = os.path.abspath('./sample.txt')

with open(path, "w") as f:
    f.write('''Hello my world
danny 
good morning
I am good
how are you?
good 
bad
my''')

rdd = sc.textFile(path)
print(rdd.count())
print(rdd.take(2))
print(rdd.collect())

8
['Hello my world', 'danny ']
['Hello my world', 'danny ', 'good morning', 'I am good', 'how are you?', 'good ', 'bad', 'my']


In [36]:
%man find > test.txt
rdd = sc.textFile('./test.txt')
rdd.top(5)

['macOS 12.6                       March 13, 2021                       macOS 12.6',
 'S\x08SY\x08YN\x08NO\x08OP\x08PS\x08SI\x08IS\x08S',
 'S\x08ST\x08TA\x08AN\x08ND\x08DA\x08AR\x08RD\x08DS\x08S',
 'S\x08SE\x08EE\x08E A\x08AL\x08LS\x08SO\x08O',
 'P\x08PR\x08RI\x08IM\x08MA\x08AR\x08RI\x08IE\x08ES\x08S']

In [38]:
mylist = [1,2,3,4,5]
rdd = sc.parallelize(mylist)
print(rdd.first())
print(rdd.take(2))
print(rdd.collect())

spark.stop()
sc.stop()

1
[1, 2]
[1, 2, 3, 4, 5]


## What are the advantages of PySpark RDD?

PySpark RDDs have the following advantages:

- In-Memory Processing: PySpark’s RDD helps in loading data from the disk to the memory. The RDDs can even be persisted in the memory for reusing the computations.


- Immutability: The RDDs are immutable which means that once created, they cannot be modified. While applying any transformation operations on the RDDs, a new RDD would be created.


- Fault Tolerance: The RDDs are fault-tolerant. This means that whenever an operation fails, the data gets automatically reloaded from other available partitions. This results in seamless execution of the PySpark applications.


- Lazy Evolution: The transformation are not performed as soon as they are encountered. The operations would be stored in the DAG and are evaluated once it finds the first RDD action.


- Partitioning: Whenever RDD is created from any data, the elements in the RDD are partitioned to the cores available by default.

## What are PySpark serializers?
note : https://spark.apache.org/docs/0.9.0/api/pyspark/pyspark.serializers-module.html

The serialization process is used to conduct performance tuning on Spark. The data sent or received over the network to the disk or memory should be persisted. PySpark supports serializers for this purpose. It supports two types of serializers, they are:

### PickleSerializer: 
note : https://docs.python.org/2/library/pickle.html

This supports almost every Python object.

### MarshalSerializer: 
note : https://docs.python.org/2/library/marshal.html

This serializer is faster than the PickleSerializer but it supports only limited types.
Consider an example of serialization which makes use of MarshalSerializer:

In [47]:
from pyspark.context import SparkContext
from pyspark.serializers import MarshalSerializer
sc = SparkContext("local", "Marshal Serialization", serializer = MarshalSerializer())    #Initialize spark context and serializer
print(sc.parallelize(list(range(1000))).map(lambda x: 3 * x).take(5))
sc.stop()

[0, 3, 6, 9, 12]


### pyspark.RDD.glom
Return an RDD created by coalescing all elements within each partition into a list.

In [58]:
sc = SparkContext('local', 'test', batchSize=2)

In [71]:
rdd = sc.parallelize(range(16), 4)
print(rdd.collect())
print(rdd.glom().collect())
print(rdd._jrdd.count())
sc.stop()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]]
8


In [72]:
sc = SparkContext('local', 'test', batchSize=1)
rdd = sc.parallelize(range(16), 4)
print(rdd.collect())
print(rdd.glom().collect())
print(rdd._jrdd.count())
sc.stop()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]]
16


In [73]:
sc = SparkContext('local', 'test', batchSize=2)
rdd_without_coalescing = sc.parallelize(range(16))
print(rdd_without_coalescing.collect())
print(rdd_without_coalescing.glom().collect())
print(rdd_without_coalescing._jrdd.count())
sc.stop()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]]
8


## What are the different cluster manager types supported by PySpark?

A cluster manager is a cluster mode platform that helps to run Spark by providing all resources to worker nodes based on the requirements.

<img width="781" alt="image" src="https://user-images.githubusercontent.com/26552500/193422610-95ff1441-dd4e-43de-a809-cb93567aeef2.png">

The above figure shows the position of cluster manager in the Spark ecosystem. Consider a master node and multiple worker nodes present in the cluster. The master nodes provide the worker nodes with the resources like memory, processor allocation etc depending on the nodes requirements with the help of the cluster manager.

PySpark supports the following cluster manager types:

- Standalone – This is a simple cluster manager that is included with Spark.
- Apache Mesos – This manager can run Hadoop MapReduce and PySpark apps.
- Hadoop YARN – This manager is used in Hadoop2.
- Kubernetes – This is an open-source cluster manager that helps in automated deployment, scaling and automatic management of containerized apps.
- local – This is simply a mode for running Spark applications on laptops/desktops.

## Is PySpark faster than pandas?

PySpark supports parallel execution of statements in a distributed environment, i.e on different cores and different machines which are not present in Pandas. This is why PySpark is faster than pandas.

## What do you understand about PySpark DataFrames?

PySpark DataFrame is a distributed collection of well-organized data that is equivalent to tables of relational databases and is placed into named columns. PySpark DataFrame has better optimisation when compared to R or python. These can be created from different sources like Hive Tables, Structured Data Files, existing RDDs, external databases etc as shown in the image below:

<img width="776" alt="image" src="https://user-images.githubusercontent.com/26552500/193422784-382457d2-f715-4a82-b991-d68c7969d925.png">

The data in the PySpark DataFrame is distributed across different machines in the cluster and the operations performed on this would be run parallelly on all the machines. These can handle a large collection of structured or semi-structured data of a range of petabytes.

## What are the types of PySpark’s shared variables and why are they useful?

Whenever PySpark performs the transformation operation using filter(), map() or reduce(), they are run on a remote node that uses the variables shipped with tasks. These variables are not reusable and cannot be shared across different tasks because they are not returned to the Driver. To solve the issue of reusability and sharing, we have shared variables in PySpark. There are two types of shared variables, they are:

### Broadcast variables: 
These are also known as read-only shared variables and are used in cases of data lookup the requirements. These variables are cached and are made available on all the cluster nodes so that the tasks can make use of them. The variables are not sent with every task. They are rather distributed to the nodes using efficient algorithms for reducing the cost of communication. When we run an RDD job operation that makes use of Broadcast variables, the following things are done by PySpark:

- The job is broken into different stages having distributed shuffling. The actions are executed in those stages.
- The stages are then broken into tasks.
- The broadcast variables are broadcasted to the tasks if the tasks need to use it.


Broadcast variables are created in PySpark by making use of the broadcast(variable) method from the SparkContext class. The syntax for this goes as follows:

In [77]:
sc = SparkContext('local','test')
broadcastVar = sc.broadcast([10, 11, 22, 31])
broadcastVar.value    # access broadcast variable


[10, 11, 22, 31]

An important point of using broadcast variables is that the variables are not sent to the tasks when the broadcast function is called. They will be sent when the variables are first required by the executors.


### Accumulator variables: 
note : https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.Accumulator.html

These variables are called updatable shared variables. They are added through associative and commutative operations and are used for performing counter or sum operations. PySpark supports the creation of numeric type accumulators by default. It also has the ability to add custom accumulator types. The custom types can be of two types:

- Named Accumulators: These accumulators are visible under the “Accumulator” tab in the PySpark web UI as shown in the image below:

<img width="781" alt="image" src="https://user-images.githubusercontent.com/26552500/193423081-471f3e8d-dc29-4f7f-928c-a42a70ee3a22.png">

Here, we will see the Accumulable section that has the sum of the Accumulator values of the variables modified by the tasks listed in the Accumulator column present in the Tasks table.

- Unnamed Accumulators: These accumulators are not shown on the PySpark Web UI page. It is always recommended to make use of named accumulators.


A shared variable that can be accumulated, i.e., has a commutative and associative “add” operation. Worker tasks on a Spark cluster can add values to an Accumulator with the += operator, but only the driver program is allowed to access its value, using value. Updates from the workers get propagated automatically to the driver program.

While SparkContext supports accumulators for primitive data types like int and float, users can also define accumulators for custom types by providing a custom AccumulatorParam object. Refer to its doctest for an example.

In [9]:
spark = SparkSession.builder.appName("test").master('local[*]').getOrCreate()
sc = spark.sparkContext

In [14]:
a = sc.accumulator(5)
print(a)
print(a.value)     # value : Get the accumulator’s value; only usable in driver program

5
5


In [15]:
rdd = sc.parallelize([1,2,3])
def f(x):
    global a
    a += x
    
print(rdd.foreach(f))
print(a.value)

None
11


In [16]:
b = sc.accumulator(0)
print(b)
print(b.value)

0
0


In [17]:
def g(x):
    b.add(x)
    
print(rdd.foreach(g))
print(b.value)

None
6


In [19]:
ac = sc.accumulator(1)
print(ac)
print(ac.value)

1
1


In [20]:
ac.value = 3
print(ac.value)

ac += 5 
print(ac.value)

3
8


In [100]:
rdd = sc.parallelize([i for i in range(10)])

def f(x):
    global ac
    ac += x
    print(x, ac)
    
print(rdd.foreach(f))
print(ac.value)

None
53


3 3
4 7
7 7
8 8
9 17
6 6
0 0
2 2
1 1
5 5


## What is PySpark UDF?

UDF stands for User Defined Functions. In PySpark, UDF can be created by creating a python function and wrapping it with PySpark SQL’s udf() method and using it on the DataFrame or SQL. These are generally created when we do not have the functionalities supported in PySpark’s library and we have to use our own logic on the data. UDFs can be reused on any number of SQL expressions or DataFrames.

In [21]:
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+



In [22]:
def convertCase(str):
    restStr=""
    arr = str.split(" ")
    for x in arr:
        restStr = restStr + x[0:1].upper() + x[1:] + " "
    return restStr

In [23]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
convertUDF = udf(lambda z: convertCase(z),StringType())
print(convertUDF)

<function <lambda> at 0x11391cdd0>


In [24]:
df.select(col("Seqno"), \
    convertUDF(col("Name")).alias("Name") ) \
   .show(truncate=False)

+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+



In [25]:
def upperCase(str):
    return str.upper()

In [26]:
upperCaseUDF = udf(lambda z: upperCase(z))

df.withColumn("Cureated Name",upperCaseUDF(col("Name"))).show()

+-----+------------+-------------+
|Seqno|        Name|Cureated Name|
+-----+------------+-------------+
|    1|  john jones|   JOHN JONES|
|    2|tracey smith| TRACEY SMITH|
|    3| amy sanders|  AMY SANDERS|
+-----+------------+-------------+



## Dataset

- Dataset: 구조적 API의 기본 데이터 타입, Scala와 Java에서만 사용 가능
- Dataset을 사용할 시기
  - 성능 저하를 감수하면서 타입 안정성(type-safe)을 가진 데이터 타입을 사용하고 싶을 때
    사용할 Function과 대상 Object의 데이터 타입이 맞지 않는 경우 처럼 데이터 타입이 유효하지 않은 작업을 수행하지 못하도록 방어적 코드, 정확도 높은 프로그램 개발을 위해 Dataset을 사용하는 것이 좋은 선택일 수 있다.

## What is PySpark Architecture?
PySpark similar to Apache Spark works in master-slave architecture pattern. Here, the master node is called the Driver and the slave nodes are called the workers. When a Spark application is run, the Spark Driver creates SparkContext which acts as an entry point to the spark application. All the operations are executed on the worker nodes. The resources required for executing the operations on the worker nodes are managed by the Cluster Managers. The following diagram illustrates the architecture described:

## What PySpark DAGScheduler?

DAG stands for Direct Acyclic Graph. DAGScheduler constitutes the scheduling layer of Spark which implements scheduling of tasks in a stage-oriented manner using jobs and stages. The logical execution plan (Dependencies lineage of transformation actions upon RDDs) is transformed into a physical execution plan consisting of stages. It computes a DAG of stages needed for each job and keeps track of what stages are RDDs are materialized and finds a minimal schedule for running the jobs. These stages are then submitted to TaskScheduler for running the stages. This is represented in the image flow below:

<img width="776" alt="image" src="https://user-images.githubusercontent.com/26552500/193822492-0028b54f-9f54-43c1-b505-0ec3f6160900.png">

DAGScheduler performs the following three things in Spark:

- Compute DAG execution for the job.
- Determine preferred locations for running each task
- Failure Handling due to output files lost during shuffling

## Why is PySpark SparkConf used?

PySpark SparkConf is used for setting the configurations and parameters required to run applications on a cluster or local system.

##  How to create SparkSession?

To create SparkSession, we use the builder pattern. The SparkSession class from the pyspark.sql library has the getOrCreate() method which creates a new SparkSession if there is none or else it returns the existing SparkSession object. The following code is an example for creating SparkSession:

In [46]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]")\
                   .appName('InterviewBitSparkSession')\
                   .getOrCreate()

- master() – This is used for setting up the mode in which the application has to run - cluster mode (use the master name) or standalone mode. For Standalone mode, we use the local[x] value to the function, **where x represents partition count to be created in RDD, DataFrame and DataSet.** The value of **x is ideally the number of CPU cores available.**
- appName() - Used for setting the application name
- getOrCreate() – For returning SparkSession object. This creates a new object if it does not exist. If an object is there, it simply returns that.

## What are the different approaches for creating RDD in PySpark?

The following image represents how we can visualize RDD creation in PySpark:

<img width="773" alt="image" src="https://user-images.githubusercontent.com/26552500/195335900-af175180-0d5a-412f-b3e8-08a0d2a3d78d.png">

In the image, we see that the data we have is the list form and post converting to RDDs, we have it stored in different partitions.
We have the following approaches for creating PySpark RDD:

- sc.parallelize(): The parallelize() method of the SparkContext can be used for creating RDDs. This method loads existing collection from the driver and parallelizes it. This is a basic approach to create RDD and is used when we have data already present in the memory. This also requires the presence of all data on the Driver before creating RDD. Code to create RDD using the parallelize method for the python list shown in the image above:

In [54]:
num_list = [1,2,3,4,5,6,7,8,9,10,11,12]
rdd=spark.sparkContext.parallelize(num_list)
print(rdd)
print(rdd.collect())
print(rdd.glom().collect())

ParallelCollectionRDD[3] at readRDDFromFile at PythonRDD.scala:274
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]]


- sc.textFile(): Using this method, we can read .txt file and convert them into RDD. Syntax:

In [55]:
!ls

IVP_Pyspark_1.ipynb        sample1.txt
PySpark_Script_Template.py sample_1.txt
Pyspark_2_practice_1.ipynb sample_2.txt
Pyspark_practice_1.ipynb   test.txt
pyspark_1.ipynb            test2.txt
sample-text.txt            test5.txt
sample.txt


In [57]:
rdd_txt = spark.sparkContext.textFile("./test2.txt")
rdd_txt.collect()

['my name is danny',
 'your name is john',
 'we are friend',
 'good morning',
 'the king',
 'may the force be with you']

- sc.wholeTextFiles(): This function returns PairRDD (RDD containing key-value pairs) with **file path being the key and the file content is the value.**

In [59]:
rdd_whole_text = spark.sparkContext.wholeTextFiles("./test2.txt")
rdd_whole_text.collect()

[('file:/Users/sunghwanki/Desktop/Project/Python_Advance/Pyspark/test2.txt',
  'my name is danny\nyour name is john\nwe are friend\ngood morning\nthe king\nmay the force be with you\n')]

We can also read csv, json, parquet and various other formats and create the RDDs.
- Empty RDD with no partition using sparkContext.emptyRDD: RDD with no data is called empty RDD. We can create such RDDs having no partitions by using emptyRDD() method as shown in the code piece below:

In [70]:
empty_rdd = spark.sparkContext.emptyRDD
print(empty_rdd)

# AttributeError: 'function' object has no attribute 'collect'
# print(empty_rdd.collect())

<bound method SparkContext.emptyRDD of <SparkContext master=local[1] appName=InterviewBitSparkSession>>


- Empty RDD with partitions using sparkContext.parallelize: When we do not require data but we require partition, then we create empty RDD by using the parallelize method as shown below:

In [67]:
#Create empty RDD with 20 partitions
empty_partitioned_rdd = spark.sparkContext.parallelize([],20) 
print(empty_partitioned_rdd)
print(empty_partitioned_rdd.collect())

ParallelCollectionRDD[11] at readRDDFromFile at PythonRDD.scala:274
[]


## Is it possible to create PySpark DataFrame from external data sources?

Yes, it is! Realtime applications make use of external file systems like local, HDFS, HBase, MySQL table, S3 Azure etc. Following example shows how we can create DataFrame by reading data from a csv file present in the local system:

In [81]:
import csv

header = ['name', 'area', 'country_code2', 'country_code3']
data = [
    ['Albania', 28748, 'AL', 'ALB'],
    ['Algeria', 2381741, 'DZ', 'DZA'],
    ['American Samoa', 199, 'AS', 'ASM'],
    ['Andorra', 468, 'AD', 'AND'],
    ['Angola', 1246700, 'AO', 'AGO']
]

with open('countries.csv', 'w', encoding='UTF8',  newline='') as f:
    writer = csv.writer(f)
    
    # write the header
    writer.writerow(header)

    # write the data (한줄씩 입력)
    # writer.writerow(data)
    
    # write multiple rows
    writer.writerows(data)


In [82]:
df = spark.read.csv("./countries.csv", header=True)
df.show()

+--------------+-------+-------------+-------------+
|          name|   area|country_code2|country_code3|
+--------------+-------+-------------+-------------+
|       Albania|  28748|           AL|          ALB|
|       Algeria|2381741|           DZ|          DZA|
|American Samoa|    199|           AS|          ASM|
|       Andorra|    468|           AD|          AND|
|        Angola|1246700|           AO|          AGO|
+--------------+-------+-------------+-------------+



In [88]:
# csv header
fieldnames = ['name', 'area', 'country_code2', 'country_code3']

# csv data
rows = [
    {'name': 'Albania',
    'area': 28748,
    'country_code2': 'AL',
    'country_code3': 'ALB'},
    {'name': 'Algeria',
    'area': 2381741,
    'country_code2': 'DZ',
    'country_code3': 'DZA'},
    {'name': 'American Samoa',
    'area': 199,
    'country_code2': 'AS',
    'country_code3': 'ASM'}
]

with open('d_countries.csv', 'w', encoding='UTF8', newline='') as f:
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerows(rows)

In [91]:
df = spark.read.csv("./d_countries.csv", header=True)
df.show()

+--------------+-------+-------------+-------------+
|          name|   area|country_code2|country_code3|
+--------------+-------+-------------+-------------+
|       Albania|  28748|           AL|          ALB|
|       Algeria|2381741|           DZ|          DZA|
|American Samoa|    199|           AS|          ASM|
+--------------+-------+-------------+-------------+



## What do you understand by Pyspark’s startsWith() and endsWith() methods?

These methods belong to the Column class and are used for searching DataFrame rows by checking if the column value starts with some value or ends with some value. They are used for filtering data in applications.

- startsWith() – returns boolean Boolean value. It is true when the value of the column starts with the specified string and False when the match is not satisfied in that column value.

- endsWith() – returns boolean Boolean value. It is true when the value of the column ends with the specified string and False when the match is not satisfied in that column value.

Both the methods are case-sensitive.

Consider an example of the startsWith() method here. We have created a DataFrame with 3 rows:

In [138]:
data = [('Harry', 20),
       ('Ron', 20),
       ('Hermoine', 20)]
columns = ["Name","Age"]
df = spark.createDataFrame(data=data, schema = columns)

If we have the below code that checks for returning the rows where all the names in the Name column start with “H”,

In [124]:
df.filter(df.Name.startswith("H")).show()
df.filter(col("Name").startswith("H")).show()
df.select("Name").filter(df.Name.startswith("H")).show()

+--------+---+
|    Name|Age|
+--------+---+
|   Harry| 20|
|Hermoine| 20|
+--------+---+

+--------+---+
|    Name|Age|
+--------+---+
|   Harry| 20|
|Hermoine| 20|
+--------+---+

+--------+
|    Name|
+--------+
|   Harry|
|Hermoine|
+--------+



Notice how the record with the Name “Ron” is filtered out because it does not start with “H”.

## What is PySpark SQL?

PySpark SQL is the most popular PySpark module that is used to process structured columnar data. Once a DataFrame is created, we can interact with data using the SQL syntax. Spark SQL is used for bringing native raw SQL queries on Spark by using select, where, group by, join, union etc. For using PySpark SQL, the first step is to create a temporary table on DataFrame by using createOrReplaceTempView() function. Post creation, the table is accessible throughout SparkSession by using sql() method. When the SparkSession gets terminated, the temporary table will be dropped.
For example, consider we have the following DataFrame assigned to a variable df:

In [139]:
df.show()

+--------+---+
|    Name|Age|
+--------+---+
|   Harry| 20|
|     Ron| 20|
|Hermoine| 20|
+--------+---+



In the below piece of code, we will be creating a temporary table of the DataFrame that gets accessible in the SparkSession using the sql() method. The SQL queries can be run within the method.

In [140]:
df.createOrReplaceTempView("STUDENTS")
df_new = spark.sql("SELECT * from STUDENTS")
df_new.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)



In [141]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)



In [143]:
groupByGender = spark.sql("SELECT Name, count(*) as Name_Count from STUDENTS group by Name")
groupByGender.show()

+--------+----------+
|    Name|Name_Count|
+--------+----------+
|Hermoine|         1|
|   Harry|         1|
|     Ron|         1|
+--------+----------+



## Pyspark 함수 정리

source : https://assaeunji.github.io/python/2022-03-26-pyspark/

### 전반적인 함수와 데이터 준비

- printSchema() : 테이블의 스키마를 보여주는 함수
- collect() : 테이블에서 행을 가져오는 함수
- show(truncate=False) : 테이블 결과를 보여주는 함수, truncate = False를 사용하면 테이블 내용이 잘리지 않도록 보여줍니다.
- describe() : 서머리 결과를 보여주는 함수

Pyspark의 함수는 대부분 SQL 언어와 비슷하게 구성되어 있습니다. SQL 언어와 비교하며 다음의 함수들에 대해 설명하겠습니다.

<img width="908" alt="image" src="https://user-images.githubusercontent.com/26552500/195344599-1d82dfea-2e6c-4f7a-b286-5a6ee14d7ba8.png">

In [126]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType
from pyspark.sql.functions import *

data = [
    (("James","","Smith"),["Java","Scala","C++"],"OH","M"),
    (("Anna","Rose",""),["Spark","Java","C++"],"NY","F"),
    (("Julia","","Williams"),["CSharp","VB"],"OH","F"),
    (("Maria","Anne","Jones"),["CSharp","VB"],"NY","M"),
    (("Jen","Mary","Brown"),["CSharp","VB"],"NY","M"),
    (("Mike","Mary","Williams"),["Python","VB"],"OH","M")
 ]

schema = StructType([
     StructField('name', StructType([
        StructField('firstname', StringType(), True),
        StructField('middlename', StringType(), True),
         StructField('lastname', StringType(), True)
     ])),
     StructField('languages', ArrayType(StringType()), True),
     StructField('state', StringType(), True),
     StructField('gender', StringType(), True)
 ])

df = spark.createDataFrame(data = data, schema = schema)
df.show(truncate=False)
df.printSchema()

+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|{James, , Smith}      |[Java, Scala, C++]|OH   |M     |
|{Anna, Rose, }        |[Spark, Java, C++]|NY   |F     |
|{Julia, , Williams}   |[CSharp, VB]      |OH   |F     |
|{Maria, Anne, Jones}  |[CSharp, VB]      |NY   |M     |
|{Jen, Mary, Brown}    |[CSharp, VB]      |NY   |M     |
|{Mike, Mary, Williams}|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- languages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)



### select와 drop

In [127]:
df.select("name").show()

+--------------------+
|                name|
+--------------------+
|    {James, , Smith}|
|      {Anna, Rose, }|
| {Julia, , Williams}|
|{Maria, Anne, Jones}|
|  {Jen, Mary, Brown}|
|{Mike, Mary, Will...|
+--------------------+



drop은 특정 컬럼만 제외하고 불러올 때 사용합니다.

In [128]:
df.drop("name").show() ## name을 제외한 컬럼을 불러오기

+------------------+-----+------+
|         languages|state|gender|
+------------------+-----+------+
|[Java, Scala, C++]|   OH|     M|
|[Spark, Java, C++]|   NY|     F|
|      [CSharp, VB]|   OH|     F|
|      [CSharp, VB]|   NY|     M|
|      [CSharp, VB]|   NY|     M|
|      [Python, VB]|   OH|     M|
+------------------+-----+------+



### count와 countDistinct

In [129]:
df.select(count("state"), countDistinct("state")).show()

+------------+---------------------+
|count(state)|count(DISTINCT state)|
+------------+---------------------+
|           6|                    2|
+------------+---------------------+



### withColumn과 withColumnRenamed

withColumn은 컬럼의 정보를 바꾸고자 할 때 혹은 새로운 컬럼을 추가할 때 사용합니다.

In [135]:
# withColumnRenamed("변경 전", "변경 후")
df.withColumnRenamed("name", "Name").show() 

+--------------------+------------------+-----+------+
|                Name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    {James, , Smith}|[Java, Scala, C++]|   OH|     M|
|      {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
| {Julia, , Williams}|      [CSharp, VB]|   OH|     F|
|{Maria, Anne, Jones}|      [CSharp, VB]|   NY|     M|
|  {Jen, Mary, Brown}|      [CSharp, VB]|   NY|     M|
|{Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+



### withColumn으로 컬럼 타입 바꾸기

In [130]:
df.withColumn("state", df.state.cast("String")).show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    {James, , Smith}|[Java, Scala, C++]|   OH|     M|
|      {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
| {Julia, , Williams}|      [CSharp, VB]|   OH|     F|
|{Maria, Anne, Jones}|      [CSharp, VB]|   NY|     M|
|  {Jen, Mary, Brown}|      [CSharp, VB]|   NY|     M|
|{Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+



In [131]:
df.printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- languages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)



### withColumn으로 컬럼 추가하기

In [132]:
df.withColumn("Country", lit("U.S.A")).show() 
# lit: 문자열(literal value)로 새로운 컬럼을 만들 때 사용하는 함수

+--------------------+------------------+-----+------+-------+
|                name|         languages|state|gender|Country|
+--------------------+------------------+-----+------+-------+
|    {James, , Smith}|[Java, Scala, C++]|   OH|     M|  U.S.A|
|      {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|  U.S.A|
| {Julia, , Williams}|      [CSharp, VB]|   OH|     F|  U.S.A|
|{Maria, Anne, Jones}|      [CSharp, VB]|   NY|     M|  U.S.A|
|  {Jen, Mary, Brown}|      [CSharp, VB]|   NY|     M|  U.S.A|
|{Mike, Mary, Will...|      [Python, VB]|   OH|     M|  U.S.A|
+--------------------+------------------+-----+------+-------+



### withColumn으로 CASE WHEN 구문 쓰기

when과 otherwise 함수를 사용합니다.

In [134]:
df.withColumn("gender"
                    , when(df.gender == "F", "Female")
                   .when(df.gender=="M", "Male")
                   .when(df.gender.isNull(), "")
                   .otherwise(df.gender)
                   ).show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    {James, , Smith}|[Java, Scala, C++]|   OH|  Male|
|      {Anna, Rose, }|[Spark, Java, C++]|   NY|Female|
| {Julia, , Williams}|      [CSharp, VB]|   OH|Female|
|{Maria, Anne, Jones}|      [CSharp, VB]|   NY|  Male|
|  {Jen, Mary, Brown}|      [CSharp, VB]|   NY|  Male|
|{Mike, Mary, Will...|      [Python, VB]|   OH|  Male|
+--------------------+------------------+-----+------+



이를 SQL로 쓰면 다음과 같습니다.

```
SELECT *
    , CASE WHEN gender = 'F' THEN 'Female'
        WHEN gender = 'M'    THEN 'Male'
        WHEN gender IS NULL  THEN ''
        ELSE gender 
    END as gender2
FROM df
```

### flatMap VS Map

map과 비슷하지만 map에서 이중 리스트로 표현되는 데이터를 하나의 리스트에서 전부 나열해줍니다.

아래 4개의 문자열이 있는 리스트를 RDD로 생성하고 동일한 lambda 함수를 인자로 각각 map과 flatMap을 사용합니다.

map을 사용한 RDD는 lambda 함수로 인해 문자열과 문자열 끝에 s가 붙은 문자열의 쌍으로 이루어져 있지만 flatMap을 사용한 RDD는 모든 문자열이 하나의 리스트안에 나열되어있습니다.

각 RDD를 count함수를 사용하면 결과가 다르게 나오는 것을 확인할 수 있습니다.

이렇게 모든 데이터를 하나의 리스트로 변환해주기 때문에 워드카운트와 같은 작업에 flatMap이 유용하게 사용됩니다.

In [15]:
animals = ['cat','dog','elephant', 'tiger']
animalsRdd = sc.parallelize(animals)
animalsRdd.collect()

['cat', 'dog', 'elephant', 'tiger']

In [20]:
animalsRDDMap = animalsRdd.map(lambda x: (x, x+'s'))
animalsRDDFlatmap = animalsRdd.flatMap(lambda x: (x,x+'s'))

print(animalsRDDMap.collect())
print(animalsRDDFlatmap.collect())

[('cat', 'cats'), ('dog', 'dogs'), ('elephant', 'elephants'), ('tiger', 'tigers')]
['cat', 'cats', 'dog', 'dogs', 'elephant', 'elephants', 'tiger', 'tigers']


In [21]:
print(animalsRDDMap.count())
print(animalsRDDFlatmap.count())

4
8


In [17]:
def animals(data):
    return data, data+'s'

animalsRdd.map(animals).collect()

[('cat', 'cats'),
 ('dog', 'dogs'),
 ('elephant', 'elephants'),
 ('tiger', 'tigers')]

In [18]:
animalsRdd.flatMap(animals).collect()

['cat', 'cats', 'dog', 'dogs', 'elephant', 'elephants', 'tiger', 'tigers']

In [22]:
data = ["Project Gutenberg’s",
        "Alice’s Adventures in Wonderland",
        "Project Gutenberg’s",
        "Adventures in Wonderland",
        "Project Gutenberg’s"]

rdd = sc.parallelize(data)
for i in rdd.collect():
    print(i)

Project Gutenberg’s
Alice’s Adventures in Wonderland
Project Gutenberg’s
Adventures in Wonderland
Project Gutenberg’s


In [23]:
rdd.collect()

['Project Gutenberg’s',
 'Alice’s Adventures in Wonderland',
 'Project Gutenberg’s',
 'Adventures in Wonderland',
 'Project Gutenberg’s']

In [24]:
def sub2(n):
    return n.split(" ")

In [25]:
rdd.flatMap(sub2).collect()

['Project',
 'Gutenberg’s',
 'Alice’s',
 'Adventures',
 'in',
 'Wonderland',
 'Project',
 'Gutenberg’s',
 'Adventures',
 'in',
 'Wonderland',
 'Project',
 'Gutenberg’s']

In [26]:
rdd.map(sub2).collect()

[['Project', 'Gutenberg’s'],
 ['Alice’s', 'Adventures', 'in', 'Wonderland'],
 ['Project', 'Gutenberg’s'],
 ['Adventures', 'in', 'Wonderland'],
 ['Project', 'Gutenberg’s']]

### Using flatMap() transformation on DataFrame

Unfortunately, PySpark DataFame doesn’t have flatMap() transformation however, DataFrame has explode() SQL function that is used to flatten the column. Below is a complete example.

In [30]:
arrayData = [
        ('James',['Java','Scala'],{'hair':'black','eye':'brown'}),
        ('Michael',['Spark','Java',None],{'hair':'brown','eye':None}),
        ('Robert',['CSharp',''],{'hair':'red','eye':''}),
        ('Washington',None,None),
        ('Jefferson',['1','2'],{})]

In [31]:
df = spark.createDataFrame(arrayData)
df

DataFrame[_1: string, _2: array<string>, _3: map<string,string>]

In [32]:
df.show()

+----------+-------------------+--------------------+
|        _1|                 _2|                  _3|
+----------+-------------------+--------------------+
|     James|      [Java, Scala]|{eye -> brown, ha...|
|   Michael|[Spark, Java, null]|{eye -> null, hai...|
|    Robert|         [CSharp, ]|{eye -> , hair ->...|
|Washington|               null|                null|
| Jefferson|             [1, 2]|                  {}|
+----------+-------------------+--------------------+



In [36]:
df = spark.createDataFrame(arrayData, schema=['name', 'knownLanguages', 'properties'])
df.show()

+----------+-------------------+--------------------+
|      name|     knownLanguages|          properties|
+----------+-------------------+--------------------+
|     James|      [Java, Scala]|{eye -> brown, ha...|
|   Michael|[Spark, Java, null]|{eye -> null, hai...|
|    Robert|         [CSharp, ]|{eye -> , hair ->...|
|Washington|               null|                null|
| Jefferson|             [1, 2]|                  {}|
+----------+-------------------+--------------------+



In [37]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- knownLanguages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



In [43]:
from pyspark.sql.functions import explode

# df2 = df.select('name')
df2 = df.select(df.name, explode(df.knownLanguages))
df2.show()

+---------+------+
|     name|   col|
+---------+------+
|    James|  Java|
|    James| Scala|
|  Michael| Spark|
|  Michael|  Java|
|  Michael|  null|
|   Robert|CSharp|
|   Robert|      |
|Jefferson|     1|
|Jefferson|     2|
+---------+------+



In [44]:
df2.printSchema()

root
 |-- name: string (nullable = true)
 |-- col: string (nullable = true)



In [45]:
sc.stop()
#spark.sparkContext.stop()

### filter

In [83]:
from pyspark import SparkContext
sc = SparkContext("local", "Transdormation Demo")
words_list = sc.parallelize (
  ["pyspark", 
  "interview", 
  "questions", 
  "at", 
  "interviewbit"]
)
filtered_words = words_list.filter(lambda x: 'interview' in x)
filtered = filtered_words.collect()
print(filtered)

[Stage 0:>                                                          (0 + 1) / 1]

['interview', 'interviewbit']


                                                                                

In [85]:
counts = filtered_words.count()
print("Count of elements in RDD -> ",  counts)

Count of elements in RDD ->  2


### Word Count

In [27]:
%%writefile ./test2.txt
my name is danny
your name is john
we are friend
good morning
the king
may the force be with you

Overwriting ./test2.txt


In [29]:
import os
myRdd = sc.textFile(os.path.abspath('./test2.txt'))
myRdd = myRdd.flatMap(lambda x: x.split(" ")).map(lambda word:(word,1)).reduceByKey(lambda x,y: x+y)
myRdd.collect()

[('name', 2),
 ('is', 2),
 ('danny', 1),
 ('john', 1),
 ('we', 1),
 ('are', 1),
 ('good', 1),
 ('king', 1),
 ('may', 1),
 ('my', 1),
 ('your', 1),
 ('friend', 1),
 ('morning', 1),
 ('the', 2),
 ('force', 1),
 ('be', 1),
 ('with', 1),
 ('you', 1)]

In [30]:
output = myRdd.collect()
for (word,count) in output:
    print("%s: %i"% (word, count))

name: 2
is: 2
danny: 1
john: 1
we: 1
are: 1
good: 1
king: 1
may: 1
my: 1
your: 1
friend: 1
morning: 1
the: 2
force: 1
be: 1
with: 1
you: 1


### 정렬

In [31]:
simpleData = [("James","Sales","NY",90000,34,10000), \
    ("Michael","Sales","NY",86000,56,20000), \
    ("Robert","Sales","CA",81000,30,23000), \
    ("Maria","Finance","CA",90000,24,23000), \
    ("Raman","Finance","CA",99000,40,24000), \
    ("Scott","Finance","NY",83000,36,19000), \
    ("Jen","Finance","NY",79000,53,15000), \
    ("Jeff","Marketing","CA",80000,25,18000), \
    ("Kumar","Marketing","NY",91000,50,21000) \
  ]
columns = ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(simpleData, schema = columns )
df.show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



In [32]:
df.orderBy("department", "salary").show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|          Jen|   Finance|   NY| 79000| 53|15000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|        James|     Sales|   NY| 90000| 34|10000|
+-------------+----------+-----+------+---+-----+



In [33]:
df.sort("department", "salary").show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|          Jen|   Finance|   NY| 79000| 53|15000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|        James|     Sales|   NY| 90000| 34|10000|
+-------------+----------+-----+------+---+-----+



In [34]:
from pyspark.sql.functions import col
df.sort(col("department"), col("salary").desc()).show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
+-------------+----------+-----+------+---+-----+



In [35]:
df.orderBy(col("department"), col("salary").desc()).show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
+-------------+----------+-----+------+---+-----+



In [36]:
df.createOrReplaceTempView("EMP")
spark.sql("select * from EMP order by department, salary desc").show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
+-------------+----------+-----+------+---+-----+



In [39]:
tempdir='/Users/sunghwanki/Desktop/Project/Python_Advance/Pyspark'
myRdd1 = sc.textFile(os.path.join(tempdir, 'test2.txt'))
print(myRdd1.first())
print(myRdd1.take(3))
print(myRdd1.collect())


my name is danny
['my name is danny', 'your name is john', 'we are friend']
['my name is danny', 'your name is john', 'we are friend', 'good morning', 'the king', 'may the force be with you']


In [40]:
dataRange = [i for i in range(20)]
print(dataRange)
print(type(dataRange))

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
<class 'list'>


In [41]:
rangeRdd = spark.sparkContext.parallelize(dataRange)
print(rangeRdd)
print(type(rangeRdd))
print(rangeRdd.collect())

ParallelCollectionRDD[61] at readRDDFromFile at PythonRDD.scala:274
<class 'pyspark.rdd.RDD'>
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]


In [42]:
def sub(n):
    return n-1

subRdd = rangeRdd.map(sub)
print(subRdd.collect())

[-1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18]
