# **pyspark 패키지를 활용한 Spark 프로그래밍(1)**
## SparkSession 객체 생성

In [5]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[2]") \
                    .appName('sparkedu') \
                    .getOrCreate()
# master 호출 // []대괄호 안의 숫자는 코어 개수이다. loc[2]는 2개
# 이름 호출
# 스팍 개체가 있으면 꺼내오고 없으면 생성
# 체이닝 방식 / 변수에 이름을 담아서 사용할 수 있음 

spark

![spark1](images/spark1.png)

## Spark 1단계

## 리스트객체로 RDD 객체 생성하기

### RDD(Resilient Distributed Dataset) -> 분산환경에 적합하게 만들어진 데이터셋
#### read-only 데이터셋으로서 다양한 머신에 데이터셋의 멀티셋(중복을 허용)을 분산해두고 특정한 머신에 문제가 생기더라도 문제없이 읽을수로 있도록 지원한다

- MapReduce 작업
- 분산하여 병렬적 처리
- 빠른 연산
- 불변(Immutable)
- Transformation 과 Action 으로 함수 종류가 나눠지며, Action 함수가 실행됐을 때 실제 연산
- Lineage 를 통해 Fault Tolerant(내고장성) 보장

In [None]:
dataList = [("Java", 20000), ("Python", 100000), ("Scala", 3000)]
rdd=spark.sparkContext.parallelize(dataList) # parallelzie는 무조건 RDD 객체
print(rdd)
print(type(rdd))
print(rdd.collect())

In [None]:
import numpy as np
lst=np.random.randint(0,10,20)
rdd=spark.sparkContext.parallelize(lst)
print(type(rdd))
print(rdd.collect())
print(rdd.count())

## 텍스트 파일 내용 읽어서 RDD 객체 생성하기

In [None]:
rdd = spark.read.text("data/korean_stopwords.txt")
print(type(rdd))
print(rdd.collect())

In [None]:
rdd = spark.sparkContext.textFile("data/korean_stopwords.txt")
print(type(rdd))
print(rdd.collect())

## 생성한 RDD 객체 Spark DataFrame 으로 변환하기

### Spark DataFrame
- DataFrame은 명명 된 열로 구성된 데이터 세트 
- 개념적으로는 관계형 데이터베이스의 테이블 또는 R / Python의 데이터 프레임과 동일하지만 내부적으로 더욱  최적화가 있음
- RDB Table처럼 Schema를 가지고 있고 RDB의 Table 연산이 가능
- 구조화 된 데이터 파일, Hive의 테이블, 외부 데이터베이스 또는 기존 RDD와 같은 다양한 소스 에서 구성 할 수 있늠 
- DataFrame API는 Scala, Java, Python 및 R 에서 사용할 수 있음
- SparkSQL을 통해 사용 가능

In [None]:
dept = [("Finance",10), 
        ("Marketing",20), 
        ("Sales",30), 
        ("IT",40) 
      ]
rdd = spark.sparkContext.parallelize(dept) # rdd 에 저장
print(rdd.collect())

In [None]:
df = rdd.toDF() # 데이터프레임 객체로 변환
df.printSchema() # 만들어진 데이터프레임의 스키마정보 출력 (어떤 컬럼으로 구성, 타입, ~~ 등에 대한 정보 // 컬럼명이 없으면 _1, _2 와 같은 형식으로 이름이 주어짐) // long은 숫자
df.show() # 화면에 데이터프레임을 테이블 구조로 출력

In [None]:
deptColumns = ["dept_name","dept_id"] # 컬럼명 바꾸기
df2 = rdd.toDF(deptColumns)
df2.printSchema()
df2.show(truncate=False)

In [None]:
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns) # 주어진 데이터셋을 바로 데이터프레임으로 // to_df는 RDD를 바꾸는 것
print(type(df)) 
print(df)
df.printSchema()# 스키마 정보 출력
df.show() # 테이블 구조로 출력

In [None]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data2 = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

# structType은 스트럭필드 객체들을 생성
schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df = spark.createDataFrame(data=data2,schema=schema) # data2는 튜플객체로 구성된 것을 ~
df.printSchema()
df.show(truncate=False)

## CSV 파일 내용 읽어서 DataFrame 객체 생성하기

In [6]:
df = spark.read.csv("data/emp.csv") # 스팍의 read.csv 메서드 사용해서 csv 파일 읽어오기
df.printSchema()
df.show()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)

+-----+------+---------+----+----------+----+----+------+
|  _c0|   _c1|      _c2| _c3|       _c4| _c5| _c6|   _c7|
+-----+------+---------+----+----------+----+----+------+
|empno| ename|      job| mgr|  hiredate| sal|comm|deptno|
| 7369| SMITH|    CLERK|7902|1980-12-17| 800|null|    20|
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600| 300|    30|
| 7521|  WARD| SALESMAN|7698|1981-02-03|1250| 500|    30|
| 7566| JONES|  MANAGER|7839|1981-03-02|2975|null|    20|
| 7654|MARTIN| SALESMAN|7698|1981-10-22|1250|1400|    30|
| 7698| BLAKE|  MANAGER|7839|1981-05-01|2850|null|    30|
| 7782| CLARK|  MANAGER|7839|1981-09-06|2450|null|    10|
| 7788| SCOTT|  ANALYST|7566|1982-12-08|3000|null|    20|
| 7839|  KING|PRES

In [8]:
emp = spark.read.csv("data/emp.csv", header=True) # 첫번째 행을 컬럼명으로 지정
emp.printSchema() # 제목을 모두 string 형으로 인식
emp.show()

root
 |-- empno: string (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- mgr: string (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: string (nullable = true)
 |-- comm: string (nullable = true)
 |-- deptno: string (nullable = true)

+-----+------+---------+----+----------+----+----+------+
|empno| ename|      job| mgr|  hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7369| SMITH|    CLERK|7902|1980-12-17| 800|null|    20|
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600| 300|    30|
| 7521|  WARD| SALESMAN|7698|1981-02-03|1250| 500|    30|
| 7566| JONES|  MANAGER|7839|1981-03-02|2975|null|    20|
| 7654|MARTIN| SALESMAN|7698|1981-10-22|1250|1400|    30|
| 7698| BLAKE|  MANAGER|7839|1981-05-01|2850|null|    30|
| 7782| CLARK|  MANAGER|7839|1981-09-06|2450|null|    10|
| 7788| SCOTT|  ANALYST|7566|1982-12-08|3000|null|    20|
| 7839|  KING|PRESIDENT|null|1981-11-17|5000|null|    10|
| 784

In [9]:
emp = spark.read.csv("data/emp.csv", header=True, inferSchema=True) # 읽어오는 csv파일의 각 컬럼의 구성데이터 타입을 보고 컬럼명 데이터 타입을 결정
emp.printSchema() 
emp.show()

root
 |-- empno: integer (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- mgr: integer (nullable = true)
 |-- hiredate: string (nullable = true)
 |-- sal: integer (nullable = true)
 |-- comm: integer (nullable = true)
 |-- deptno: integer (nullable = true)

+-----+------+---------+----+----------+----+----+------+
|empno| ename|      job| mgr|  hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7369| SMITH|    CLERK|7902|1980-12-17| 800|null|    20|
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600| 300|    30|
| 7521|  WARD| SALESMAN|7698|1981-02-03|1250| 500|    30|
| 7566| JONES|  MANAGER|7839|1981-03-02|2975|null|    20|
| 7654|MARTIN| SALESMAN|7698|1981-10-22|1250|1400|    30|
| 7698| BLAKE|  MANAGER|7839|1981-05-01|2850|null|    30|
| 7782| CLARK|  MANAGER|7839|1981-09-06|2450|null|    10|
| 7788| SCOTT|  ANALYST|7566|1982-12-08|3000|null|    20|
| 7839|  KING|PRESIDENT|null|1981-11-17|5000|null|    10|


In [None]:
df = spark.read.csv("data/mpgdata.csv")
df.printSchema()
df.show()

In [None]:
df = spark.read.load("data/iris.csv",
                     format="csv", sep=",", inferSchema=True, header=True) # 구분자가 , 일때
df.printSchema() # 소수점 이하 실수데이터 --> double
df.show()

## JSON 파일 내용 읽어서 DataFrame 객체 생성하기

In [None]:
df = spark.read.json("data/seoul_geo.json") # 나름 구분해서 저장 // 처음부터 dataframe에 적합한 형식이어야 json이 쓸만함
df.show()

In [None]:
df = spark.read.load("data/seoul_geo.json", format="json")
df.show()

## 파케이 파일 내용 읽어서 DataFrame 객체 생성하기

In [None]:
df = spark.read.load("data/userdata1.parquet") ## 파케이 형식 파일 읽어오기 // 압축파일임 // 직렬화된 파일
df = df.select("first_name", "last_name", "email") # 여러 개의 열 중에서 특정 열만 가져오기 --> select 메서드 // 리턴값은 데이터프레임
df.show()

## 직접 만든 DataFrame 객체 생성하여 정보 출력하기

In [None]:
data = [("James","","Smith","36636","M",60000),
        ("Michael","Rose","","40288","M",70000),
        ("Robert","","Williams","42114","",400000),
        ("Maria","Anne","Jones","39192","F",500000),
        ("Jen","Mary","Brown","","F",0)]

columns = ["first_name","middle_name","last_name","dob","gender","salary"]
pysparkDF = spark.createDataFrame(data = data, schema = columns)
pysparkDF.printSchema() # 이때의 데이터프레임은 스팍의 데이터프레임 
pysparkDF.show(truncate=False) # 판다스 데이터프레임이 필요할 수도..?

## Spark의 DataFrame 객체를 Pandas의 DataFrame 객체로 변환하기

In [None]:
pandasDF = pysparkDF.toPandas() ## 판다스 데이터프레임 객체로 변환 가능 ---> padas aPI 사용 가능
print(type(pandasDF))
print(pandasDF)

## select()

In [None]:
emp1 = emp.select("empno", "ename", "hiredate", "sal") # 생성된 데이터프레임객체.select
print(type(emp1))
emp1.show()

## prallelize --> RDD 
## createDateFrame --> 스팍 데이터프레임

In [None]:
emp.select(emp.empno,emp.ename,emp.hiredate, emp.sal).show() # 위의 코드와 결과는 같음 // 문자열 나열

In [16]:
# Using col function
# col이라는 함수를 이용해 추출 --> col이라는 함수와 함께 추출하고 싶은 컬럼명을 줌

from pyspark.sql.functions import col
emp.select(col("empno"),col("ename"),col("hiredate"),col("sal")).show()

+-----+------+----------+----+
|empno| ename|  hiredate| sal|
+-----+------+----------+----+
| 7369| SMITH|1980-12-17| 800|
| 7499| ALLEN|1981-02-20|1600|
| 7521|  WARD|1981-02-03|1250|
| 7566| JONES|1981-03-02|2975|
| 7654|MARTIN|1981-10-22|1250|
| 7698| BLAKE|1981-05-01|2850|
| 7782| CLARK|1981-09-06|2450|
| 7788| SCOTT|1982-12-08|3000|
| 7839|  KING|1981-11-17|5000|
| 7844|TURNER|1984-10-08|1500|
| 7876| ADAMS|1983-01-12|1100|
| 7900| JAMES|1981-12-03| 950|
| 7902|  FORD|1981-12-13|3000|
| 7934|MILLER|1982-01-25|1300|
+-----+------+----------+----+



## collect()

In [None]:
dataCollect = emp.collect()
print(type(dataCollect)) # 리스트 객체, RDD든 DF든 똑같음
print("----------------------------")
print(dataCollect) # 리스트 객체 출력, 여러 개의 로우 객체로 형성된 리스트 객체
print("----------------------------")
display(dataCollect) # 좀 더 보기 좋게 출력

In [None]:
emp.printSchema() # 스키마 정보만 출력

## withColumn()

In [None]:
newemp = emp.withColumn("deptno",col("deptno").cast("Integer")) # deptno를 Integer로 바꾸라는 의미
# 컬럼의 타입을 바꿀 수도 있고, 컬럼의 값을 연산을 통해 바꿀 수도 있음
newemp.printSchema()

In [None]:
newemp = newemp.withColumn("sal",col("sal")*100) # sal 컬럼의 값들에 100을 곱해서 출력
newemp.show()

## withColumnRenamed()

In [None]:
newemp = newemp.withColumnRenamed("sal","salary") # 컬럼의 이름을 바꾸고 싶을 때
newemp.show()

In [None]:
newemp = newemp.withColumnRenamed("mgr","manager") \
    .withColumnRenamed("ename","empname")
# 백슬래쉬로 연결, 2개를 연쇄처리
newemp.show()

## filter() - where() 와 동일 // SQL where 절과 비슷

In [None]:
emp.filter(emp.ename == "KING").show(truncate=False)

In [None]:
emp.filter('ename == "KING"').show(truncate=False)

In [None]:
emp.filter((emp.deptno == 30) & (emp.sal >= 1500)).show(truncate=False) # 괄호로 연산 우선순위 설정

In [None]:
emp.where((emp.deptno == 30) & (emp.sal >= 1500)).show(truncate=False)

## drop (), dropDuplicates (), distinct()

In [None]:
empnew = emp.select("job", "deptno")
empnew.show()

In [None]:
empnew.distinct().show() # 중복된 행을 없애고 각 하나의 행만 남겨둔다.

In [None]:
empnew.dropDuplicates().show()

In [None]:
empnew.drop("deptno").show() # 열을 없애고 싶을 때 사용

## orderBy(), sort()

In [10]:
emp.sort("sal").show(truncate=False) # 오름차순으로 정렬

+-----+------+---------+----+----------+----+----+------+
|empno|ename |job      |mgr |hiredate  |sal |comm|deptno|
+-----+------+---------+----+----------+----+----+------+
|7369 |SMITH |CLERK    |7902|1980-12-17|800 |null|20    |
|7900 |JAMES |CLERK    |7698|1981-12-03|950 |null|30    |
|7876 |ADAMS |CLERK    |7788|1983-01-12|1100|null|20    |
|7654 |MARTIN|SALESMAN |7698|1981-10-22|1250|1400|30    |
|7521 |WARD  |SALESMAN |7698|1981-02-03|1250|500 |30    |
|7934 |MILLER|CLERK    |7782|1982-01-25|1300|null|10    |
|7844 |TURNER|SALESMAN |7698|1984-10-08|1500|null|30    |
|7499 |ALLEN |SALESMAN |7698|1981-02-20|1600|300 |30    |
|7782 |CLARK |MANAGER  |7839|1981-09-06|2450|null|10    |
|7698 |BLAKE |MANAGER  |7839|1981-05-01|2850|null|30    |
|7566 |JONES |MANAGER  |7839|1981-03-02|2975|null|20    |
|7788 |SCOTT |ANALYST  |7566|1982-12-08|3000|null|20    |
|7902 |FORD  |ANALYST  |7566|1981-12-13|3000|null|20    |
|7839 |KING  |PRESIDENT|null|1981-11-17|5000|null|10    |
+-----+------+

In [11]:
emp.sort(emp.sal.desc()).show(truncate=False) # 내림차순으로 정렬

+-----+------+---------+----+----------+----+----+------+
|empno|ename |job      |mgr |hiredate  |sal |comm|deptno|
+-----+------+---------+----+----------+----+----+------+
|7839 |KING  |PRESIDENT|null|1981-11-17|5000|null|10    |
|7788 |SCOTT |ANALYST  |7566|1982-12-08|3000|null|20    |
|7902 |FORD  |ANALYST  |7566|1981-12-13|3000|null|20    |
|7566 |JONES |MANAGER  |7839|1981-03-02|2975|null|20    |
|7698 |BLAKE |MANAGER  |7839|1981-05-01|2850|null|30    |
|7782 |CLARK |MANAGER  |7839|1981-09-06|2450|null|10    |
|7499 |ALLEN |SALESMAN |7698|1981-02-20|1600|300 |30    |
|7844 |TURNER|SALESMAN |7698|1984-10-08|1500|null|30    |
|7934 |MILLER|CLERK    |7782|1982-01-25|1300|null|10    |
|7654 |MARTIN|SALESMAN |7698|1981-10-22|1250|1400|30    |
|7521 |WARD  |SALESMAN |7698|1981-02-03|1250|500 |30    |
|7876 |ADAMS |CLERK    |7788|1983-01-12|1100|null|20    |
|7900 |JAMES |CLERK    |7698|1981-12-03|950 |null|30    |
|7369 |SMITH |CLERK    |7902|1980-12-17|800 |null|20    |
+-----+------+

In [12]:
emp.sort("deptno", "sal").show(truncate=False) # 정렬시 부서명과 월급으로

+-----+------+---------+----+----------+----+----+------+
|empno|ename |job      |mgr |hiredate  |sal |comm|deptno|
+-----+------+---------+----+----------+----+----+------+
|7934 |MILLER|CLERK    |7782|1982-01-25|1300|null|10    |
|7782 |CLARK |MANAGER  |7839|1981-09-06|2450|null|10    |
|7839 |KING  |PRESIDENT|null|1981-11-17|5000|null|10    |
|7369 |SMITH |CLERK    |7902|1980-12-17|800 |null|20    |
|7876 |ADAMS |CLERK    |7788|1983-01-12|1100|null|20    |
|7566 |JONES |MANAGER  |7839|1981-03-02|2975|null|20    |
|7902 |FORD  |ANALYST  |7566|1981-12-13|3000|null|20    |
|7788 |SCOTT |ANALYST  |7566|1982-12-08|3000|null|20    |
|7900 |JAMES |CLERK    |7698|1981-12-03|950 |null|30    |
|7654 |MARTIN|SALESMAN |7698|1981-10-22|1250|1400|30    |
|7521 |WARD  |SALESMAN |7698|1981-02-03|1250|500 |30    |
|7844 |TURNER|SALESMAN |7698|1984-10-08|1500|null|30    |
|7499 |ALLEN |SALESMAN |7698|1981-02-20|1600|300 |30    |
|7698 |BLAKE |MANAGER  |7839|1981-05-01|2850|null|30    |
+-----+------+

In [13]:
emp.sort(emp.deptno.desc(), emp.sal.desc()).show(truncate=False) 
# 두 컬럼 모두 내림차순으로

+-----+------+---------+----+----------+----+----+------+
|empno|ename |job      |mgr |hiredate  |sal |comm|deptno|
+-----+------+---------+----+----------+----+----+------+
|7698 |BLAKE |MANAGER  |7839|1981-05-01|2850|null|30    |
|7499 |ALLEN |SALESMAN |7698|1981-02-20|1600|300 |30    |
|7844 |TURNER|SALESMAN |7698|1984-10-08|1500|null|30    |
|7654 |MARTIN|SALESMAN |7698|1981-10-22|1250|1400|30    |
|7521 |WARD  |SALESMAN |7698|1981-02-03|1250|500 |30    |
|7900 |JAMES |CLERK    |7698|1981-12-03|950 |null|30    |
|7902 |FORD  |ANALYST  |7566|1981-12-13|3000|null|20    |
|7788 |SCOTT |ANALYST  |7566|1982-12-08|3000|null|20    |
|7566 |JONES |MANAGER  |7839|1981-03-02|2975|null|20    |
|7876 |ADAMS |CLERK    |7788|1983-01-12|1100|null|20    |
|7369 |SMITH |CLERK    |7902|1980-12-17|800 |null|20    |
|7839 |KING  |PRESIDENT|null|1981-11-17|5000|null|10    |
|7782 |CLARK |MANAGER  |7839|1981-09-06|2450|null|10    |
|7934 |MILLER|CLERK    |7782|1982-01-25|1300|null|10    |
+-----+------+

In [14]:
emp.orderBy(emp.deptno.desc(), emp.sal.desc()).show(truncate=False)
# orderBy와 sort는 동일

+-----+------+---------+----+----------+----+----+------+
|empno|ename |job      |mgr |hiredate  |sal |comm|deptno|
+-----+------+---------+----+----------+----+----+------+
|7698 |BLAKE |MANAGER  |7839|1981-05-01|2850|null|30    |
|7499 |ALLEN |SALESMAN |7698|1981-02-20|1600|300 |30    |
|7844 |TURNER|SALESMAN |7698|1984-10-08|1500|null|30    |
|7654 |MARTIN|SALESMAN |7698|1981-10-22|1250|1400|30    |
|7521 |WARD  |SALESMAN |7698|1981-02-03|1250|500 |30    |
|7900 |JAMES |CLERK    |7698|1981-12-03|950 |null|30    |
|7902 |FORD  |ANALYST  |7566|1981-12-13|3000|null|20    |
|7788 |SCOTT |ANALYST  |7566|1982-12-08|3000|null|20    |
|7566 |JONES |MANAGER  |7839|1981-03-02|2975|null|20    |
|7876 |ADAMS |CLERK    |7788|1983-01-12|1100|null|20    |
|7369 |SMITH |CLERK    |7902|1980-12-17|800 |null|20    |
|7839 |KING  |PRESIDENT|null|1981-11-17|5000|null|10    |
|7782 |CLARK |MANAGER  |7839|1981-09-06|2450|null|10    |
|7934 |MILLER|CLERK    |7782|1982-01-25|1300|null|10    |
+-----+------+

In [17]:
emp.sort(col("hiredate").asc(),col("sal").asc()).show(truncate=False) # col로 컬럼 설정 / 
emp.orderBy(col("hiredate").asc(),col("sal").asc()).show(truncate=False) # 바로 윗줄 코드와 동일

+-----+------+---------+----+----------+----+----+------+
|empno|ename |job      |mgr |hiredate  |sal |comm|deptno|
+-----+------+---------+----+----------+----+----+------+
|7369 |SMITH |CLERK    |7902|1980-12-17|800 |null|20    |
|7521 |WARD  |SALESMAN |7698|1981-02-03|1250|500 |30    |
|7499 |ALLEN |SALESMAN |7698|1981-02-20|1600|300 |30    |
|7566 |JONES |MANAGER  |7839|1981-03-02|2975|null|20    |
|7698 |BLAKE |MANAGER  |7839|1981-05-01|2850|null|30    |
|7782 |CLARK |MANAGER  |7839|1981-09-06|2450|null|10    |
|7654 |MARTIN|SALESMAN |7698|1981-10-22|1250|1400|30    |
|7839 |KING  |PRESIDENT|null|1981-11-17|5000|null|10    |
|7900 |JAMES |CLERK    |7698|1981-12-03|950 |null|30    |
|7902 |FORD  |ANALYST  |7566|1981-12-13|3000|null|20    |
|7934 |MILLER|CLERK    |7782|1982-01-25|1300|null|10    |
|7788 |SCOTT |ANALYST  |7566|1982-12-08|3000|null|20    |
|7876 |ADAMS |CLERK    |7788|1983-01-12|1100|null|20    |
|7844 |TURNER|SALESMAN |7698|1984-10-08|1500|null|30    |
+-----+------+

## groupBy()

In [18]:
emp.groupBy("deptno").sum("sal").show(truncate=False) # 그룹별 월급 합

+------+--------+
|deptno|sum(sal)|
+------+--------+
|20    |10875   |
|10    |8750    |
|30    |9400    |
+------+--------+



In [19]:
emp.groupBy("deptno").min("sal").show(truncate=False) # 그룹별 월급 최솟값

+------+--------+
|deptno|min(sal)|
+------+--------+
|20    |800     |
|10    |1300    |
|30    |950     |
+------+--------+



In [20]:
emp.groupBy("deptno").max("sal").show(truncate=False) # 그룹별 월급 최댓값

+------+--------+
|deptno|max(sal)|
+------+--------+
|20    |3000    |
|10    |5000    |
|30    |2850    |
+------+--------+



In [21]:
emp.groupBy("deptno").avg("sal").show(truncate=False) # 부서별 월급 평균값

+------+------------------+
|deptno|avg(sal)          |
+------+------------------+
|20    |2175.0            |
|10    |2916.6666666666665|
|30    |1566.6666666666667|
+------+------------------+



In [22]:
emp.groupBy("deptno", "job").sum("sal").show(truncate=False) 
# 메인, 서브 그룹으로 나눌 수 있음

+------+---------+--------+
|deptno|job      |sum(sal)|
+------+---------+--------+
|20    |ANALYST  |6000    |
|20    |MANAGER  |2975    |
|30    |MANAGER  |2850    |
|30    |SALESMAN |5600    |
|30    |CLERK    |950     |
|20    |CLERK    |1900    |
|10    |PRESIDENT|5000    |
|10    |CLERK    |1300    |
|10    |MANAGER  |2450    |
+------+---------+--------+



In [23]:
emp.groupBy("deptno").sum("sal", "comm").show(truncate=False) # sal과 comm에 대한 sum
# null이 포함된 컬럼이 있다면 null로 계산

+------+--------+---------+
|deptno|sum(sal)|sum(comm)|
+------+--------+---------+
|20    |10875   |null     |
|10    |8750    |null     |
|30    |9400    |2200     |
+------+--------+---------+



In [24]:
from pyspark.sql.functions import sum,avg,max,min,mean,count
emp.groupBy("deptno").agg(sum("sal"), avg("sal"), max("sal"), min("sal"), mean("sal")).show(truncate=False)
# 여러 값을 구하고 싶다면 agg에 여러 함수를 동시에 넣어서 계산

+------+--------+------------------+--------+--------+------------------+
|deptno|sum(sal)|avg(sal)          |max(sal)|min(sal)|avg(sal)          |
+------+--------+------------------+--------+--------+------------------+
|20    |10875   |2175.0            |3000    |800     |2175.0            |
|10    |8750    |2916.6666666666665|5000    |1300    |2916.6666666666665|
|30    |9400    |1566.6666666666667|2850    |950     |1566.6666666666667|
+------+--------+------------------+--------+--------+------------------+



In [25]:
emp.groupBy("deptno") \
    .agg(sum("sal").alias("sum_salary"), \
         avg("sal").alias("avg_salary"), \
         max("sal").alias("max_salary"), \
         min("sal").alias("min_salary"), \
         mean("sal").alias("mean_salary"), \
     ) \
    .show(truncate=False)

# 부서별로 먼저 그룹핑 한 뒤, 각각 집계함수 사용 / alias를 통해 컬럼명을 설정

+------+----------+------------------+----------+----------+------------------+
|deptno|sum_salary|avg_salary        |max_salary|min_salary|mean_salary       |
+------+----------+------------------+----------+----------+------------------+
|20    |10875     |2175.0            |3000      |800       |2175.0            |
|10    |8750      |2916.6666666666665|5000      |1300      |2916.6666666666665|
|30    |9400      |1566.6666666666667|2850      |950       |1566.6666666666667|
+------+----------+------------------+----------+----------+------------------+



In [26]:
emp.groupBy("deptno") \
    .agg(sum("sal").alias("sum_salary"), \
         avg("sal").alias("avg_salary"), \
         max("sal").alias("max_salary"), \
         min("sal").alias("min_salary"), \
         mean("sal").alias("mean_salary"), \
     ) \
    .where(col("sum_salary") > 9000)\
    .show(truncate=False)
# 부서별로 연산을 진행하고, 월급의 합이 9000 이상힌 것만 출력

+------+----------+------------------+----------+----------+------------------+
|deptno|sum_salary|avg_salary        |max_salary|min_salary|mean_salary       |
+------+----------+------------------+----------+----------+------------------+
|20    |10875     |2175.0            |3000      |800       |2175.0            |
|30    |9400      |1566.6666666666667|2850      |950       |1566.6666666666667|
+------+----------+------------------+----------+----------+------------------+



In [27]:
deptdata = [(10, '영업부', '서울'), (20, '개발부', '대전'), (30, '기획부', '서울'), (40, '마케팅부', '서울')]
deptcolname = ['deptno', 'dname', 'loc']
dept = spark.createDataFrame(data=deptdata, schema=deptcolname)
dept.show(truncate=False)

Py4JJavaError: An error occurred while calling o220.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 105.0 failed 1 times, most recent failure: Lost task 0.0 in stage 105.0 (TID 1824, DESKTOP-PFH4PNP, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\parkgun\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 477, in main
Exception: Python in worker has different version 3.7 than that in driver 3.8, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
	at sun.reflect.GeneratedMethodAccessor66.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\parkgun\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 477, in main
Exception: Python in worker has different version 3.7 than that in driver 3.8, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


## join()

In [28]:
emp.join(dept,emp.deptno ==  dept.deptno,"inner") \
     .show(truncate=False)

# emp 와 dept를 조인 / 이후 조인 조건식을 준다.--> 해당 코드에서는 일치하는 행끼리 
# 오른쪽이 dept

만

Py4JJavaError: An error occurred while calling o252.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 107.0 failed 1 times, most recent failure: Lost task 0.0 in stage 107.0 (TID 1826, DESKTOP-PFH4PNP, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\parkgun\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 477, in main
Exception: Python in worker has different version 3.7 than that in driver 3.8, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
	at sun.reflect.GeneratedMethodAccessor66.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\parkgun\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 477, in main
Exception: Python in worker has different version 3.7 than that in driver 3.8, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
emp.join(dept,emp.deptno ==  dept.deptno,"right") \
     .show(truncate=False)

# dept가 기준 (right) --> dept 내 모든 컬럼이 출력
# 하지만 일치하는 데이터가 없는 곳은 null처리

## union()

In [29]:
# union --> 메서드 

emp1 = emp.filter("job == 'MANAGER'").select("ename", "sal") # 원하는 행과 원하는 열 추출
emp2 = emp.filter("deptno == 30").select("ename", "sal") # 원하는 행과 원하는 열 추출
emp1.show()
emp2.show()
emp1.union(emp2).show() # emp1에 emp2를 합쳐줌 // 중복값도 넣는다.

+-----+----+
|ename| sal|
+-----+----+
|JONES|2975|
|BLAKE|2850|
|CLARK|2450|
+-----+----+

+------+----+
| ename| sal|
+------+----+
| ALLEN|1600|
|  WARD|1250|
|MARTIN|1250|
| BLAKE|2850|
|TURNER|1500|
| JAMES| 950|
+------+----+

+------+----+
| ename| sal|
+------+----+
| JONES|2975|
| BLAKE|2850|
| CLARK|2450|
| ALLEN|1600|
|  WARD|1250|
|MARTIN|1250|
| BLAKE|2850|
|TURNER|1500|
| JAMES| 950|
+------+----+



In [30]:
emp1 = emp.filter("job == 'MANAGER'").select("ename", "sal")
emp2 = emp.filter("deptno == 30").select("ename", "sal")
emp1.show()
emp2.show()
emp1.union(emp2).dropDuplicates().show() # 중복되는 값을 버리고 합친다.

+-----+----+
|ename| sal|
+-----+----+
|JONES|2975|
|BLAKE|2850|
|CLARK|2450|
+-----+----+

+------+----+
| ename| sal|
+------+----+
| ALLEN|1600|
|  WARD|1250|
|MARTIN|1250|
| BLAKE|2850|
|TURNER|1500|
| JAMES| 950|
+------+----+

+------+----+
| ename| sal|
+------+----+
| BLAKE|2850|
|MARTIN|1250|
|TURNER|1500|
| CLARK|2450|
| JAMES| 950|
| ALLEN|1600|
| JONES|2975|
|  WARD|1250|
+------+----+



## **map() 과 flatMap()**

### lines = [['w1',  'w2', 'w3'], ['w4', 'w5', 'w6']]
### lines를 map/flatmap을 이용하여 split하게 되면 아래와 같다.
### map: one2one mapping
###	Array(Array('w1', 'w2', 'w3'), Array('w4', 'w5', 'w6'))

### flatmap: one example → one result(flatten)
### Array('w1', 'w2', 'w3', 'w4', 'w5', 'w6')

![spark1](images/spark2.png)

In [31]:
# map은 묶인 상태 유지 -> ex. 각각 반마다 성적을 계산 // 원래 데이터 구조 유지
# flatmap은 모든 것을 풀어서 다시 하나로 묶는다. -> ex. 학교 내 모든 반 학생의 성적을 계산

data = ["둘리 또치 도우너 희동이 고길동 마이콜",
        "피카츄 꼬부기 잠만보",
        "듀크 턱시",
        "프로도 간달프 스미골",
        "코코"]
rdd=spark.sparkContext.parallelize(data)
for element in rdd.collect():
    print(element)

# collect로 원소 하나하나 모으기

둘리 또치 도우너 희동이 고길동 마이콜
피카츄 꼬부기 잠만보
듀크 턱시
프로도 간달프 스미골
코코


In [32]:
rdd2=rdd.map(lambda x: x.split(" ")) # 블랭크 문자열로 나누기 --> 리스트 별 나누기
rdd2.collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 125.0 failed 1 times, most recent failure: Lost task 0.0 in stage 125.0 (TID 2037, DESKTOP-PFH4PNP, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\parkgun\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 477, in main
Exception: Python in worker has different version 3.7 than that in driver 3.8, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2164)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:168)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\parkgun\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 477, in main
Exception: Python in worker has different version 3.7 than that in driver 3.8, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [33]:
rdd2=rdd.flatMap(lambda x: x.split(" ")) # 블랭크로 나누기 --> 모든 리스트가 풀어지고 원소별로 구분
rdd2.collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 126.0 failed 1 times, most recent failure: Lost task 1.0 in stage 126.0 (TID 2040, DESKTOP-PFH4PNP, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\parkgun\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 477, in main
Exception: Python in worker has different version 3.7 than that in driver 3.8, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2164)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:168)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\parkgun\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 477, in main
Exception: Python in worker has different version 3.7 than that in driver 3.8, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [34]:
spark.sparkContext.parallelize([3,4,5]).map(lambda x: range(1,x)).collect() 
# 3이 전달되면 1부터 3까지, ~ 각각 range 객체가 전달
# 

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 127.0 failed 1 times, most recent failure: Lost task 0.0 in stage 127.0 (TID 2041, DESKTOP-PFH4PNP, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\parkgun\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 477, in main
Exception: Python in worker has different version 3.7 than that in driver 3.8, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2164)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:168)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\parkgun\anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 477, in main
Exception: Python in worker has different version 3.7 than that in driver 3.8, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
spark.sparkContext.parallelize([3,4,5]).flatMap(lambda x: range(1,x)).collect() 
# 각 원소를 하나로 풀어서 모두 출력

In [None]:
spark.sparkContext.parallelize([3,4,5]).map(lambda x: [x,  x*x]).collect() 
# 각각 데이터에 대해 수행한 결과를 보여줌

In [None]:
spark.sparkContext.parallelize([3,4,5]).flatMap(lambda x: [x,  x*x]).collect() 
# 모든 데이터를 풀어서 전달

In [None]:
lines = spark.sparkContext.textFile("data/greeting.txt")
sorted(lines.flatMap(lambda line: line.split()).map(lambda w: (w,1)).reduceByKey(lambda v1, v2: v1+v2).collect())


In [None]:
rdd1 = spark.sparkContext.textFile("data/greeting.txt")
print(type(rdd1))
print(rdd1)
print(rdd1.collect())
print("------------------------------------------------------------------------------")
rdd2 = rdd1.flatMap(lambda line: line.split())
print(type(rdd2))
print(rdd2)
print(rdd2.collect())
print("------------------------------------------------------------------------------")
rdd3 = rdd2.map(lambda w: (w,1))
print(type(rdd3))
print(rdd3)      
print(rdd3.collect())
print("------------------------------------------------------------------------------")
rdd4 = rdd3.reduceByKey(lambda v1, v2: v1+v2)
print(type(rdd4))
print(rdd4)
print(rdd4.collect())
print("------------------------------------------------------------------------------")
result = rdd4.collect()
print(type(result))
print(result)